F#中的异步及并行模式:代理的高级使用

本文我们会来探索F#函数式编程语言的异步及并行模式、交互式的代理,以及与代理有关的一些模式,包括隔离的内部状态。

创新互联公司长期为上1000+客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为沙湾企业提供专业的成都做网站、网站建设、外贸营销网站建设沙湾网站改版等技术服务。拥有10多年丰富建站经验和众多成功案例,为您定制开发。

消息与联合类型

很多时候我们会使用联合类型(Union Type)作为消息的类型。例如,我将要展示一个基于代理的DirectX示例,我们要在模拟引擎中使用如下的消息:

 
 
 
  1. type Message =
  2.     | PleaseTakeOneStep
  3.     | PleaseAddOneBall of Ball 模拟引擎中的代理:
  4. let simulationEngine =
  5.     Agent.Start(fun inbox ->
  6.         async { while true do
  7.                     // Wait for a message
  8.                     let! msg = inbox.Receive()
  9.                     // Process a message
  10.                     match msg with
  11.                     | PleaseTakeOneStep -> state.Transform moveBalls
  12.                     | PleaseAddOneBall ball -> state.AddObject ball  })

在很多情况下使用强类型消息是个不错的做法。不过,在某些您需要和其他消息机制协作的时候,也无需担心使用如“obj”和“string”等泛化的消息类型,此时代理只需要在运行时进行类型判断或转化即可。

参数化代理及抽象代理

代理只是F#编码中的一种设计模式。这意味着您可以将F#中各种常用的技巧,如参数化,抽象或是代码片段重用与代理一起使用。例如,您可以把之前的serveQuoteStream函数参数化,指定每条股票消息传输中的间隔时间:

 
 
 
  1. open System.Net.Sockets
  2. /// serve up a stream of quotes
  3. let serveQuoteStream (client: TcpClient, periodMilliseconds: int) = async {
  4.     let stream = client.GetStream()
  5.     while true do
  6.         do! stream.AsyncWrite( "AAPL 439.2"B )
  7.         do! Async.Sleep periodMilliseconds
  8. }
  9. 这意味着您的股票服务器中不同的请求可以拥有不同长度的间隔。与此类似,您可以使用函数参数,将整个代理类的功能进行抽象:
  10. let iteratingAgent job =
  11.    Agent.Start(fun inbox ->
  12.      async { while true do
  13.                let! msg = inbox.Receive()
  14.                do! job msg })
  15. let foldingAgent job initialState =
  16.    Agent.Start(fun inbox ->
  17.      let rec loop state = async {
  18.          let! msg = inbox.Receive()
  19.          let! state = job state msg
  20.          return! loop state
  21.        }
  22.      loop initialState)您可以这样使用***个函数:
  23. let agent1 = iteratingAgent (fun msg -> async { do printfn "got message '%s'"  msg }) 及第二个:
  24. let agent2 =
  25.     foldingAgent (fun state msg ->
  26.         async { if state % 1000 = 0 then printfn "count = '%d'" msg;
  27.                 return state + 1 }) 0 从代理返回结果

在以后的文章中,我们会讨论一些访问执行中的代理的部分结果的技巧,例如,我们可以使用每个MailboxProcessor代理的PostAndAsyncReply方法。这样的技巧在创建网络通信代理时显得尤其重要。

然而,这种做法很多时候有些过了,我们可能只是需要将结果汇报给一些如GUI般的监视环境。汇报部分结果的简单方法之一,便是之前在第二篇文章中讨论过的设计模式。下面便是这样一个例子,它创建了一个代理,对每1000条消息进行采样,并将得到的事件分发给GUI或其他管理线程(请注意,其中用到了第二篇文章中SynchronizationContext的两个扩展方法CaptureCurrent和RaiseEvent)。

 
 
 
  1. // Receive messages and raise an event on each 1000th message 
  2. type SamplingAgent() = 
  3.     // The event that is raised 
  4.     // Capture the synchronization context to allow us to raise events 
  5.     // back on the GUI thread 
  6.     let syncContext = SynchronizationContext.CaptureCurrent()
  7.     // The internal mailbox processor agent 
  8.     let agent = 
  9.         new MailboxProcessor<_>(fun inbox -> 
  10.             async { let count = ref 0 
  11.                     while true do 
  12.                         let! msg = inbox.Receive() 
  13.                         incr count 
  14.                         if !count % 1000 = 0 then 
  15.                             syncContext.RaiseEvent sample msg })
  16.     /// Post a message to the agent 
  17.     member x.Post msg = agent.Post msg
  18.     /// Start the agent 
  19.     member x.Start () = agent.Start()
  20.     /// Raised every 1000'th message 
  21.     member x.Sample = sample.Publish您可以这样使用代理:
  22. let agent = SamplingAgent()
  23. agent.Sample.Add (fun s -> printfn "sample: %s" s) 
  24. agent.Start()
  25. for i = 0 to 10000 do 
  26.    agent.Post (sprintf "message %d" i) 与预料一致,这会报告agent的消息采样:
  27. sample: message 999 
  28. sample: message 1999 
  29. sample: message 2999 
  30. sample: message 3999 
  31. sample: message 4999 
  32. sample: message 5999 
  33. sample: message 6999 
  34. sample: message 7999 
  35. sample: message 8999 
  36. sample: message 9999

#p#
代理及错误

我们都无法避免错误和异常。良好的错误检测,报告及记录的措施是基于代理编程的基本要素。我们来看一下如何在F#的内存代理(MailboxProcessor)中检测和转发错误。

首先,F#异步代理的神奇之处在于异常可以由async { ... }自动捕获及分发,即使跨过多个异步等待及I/O操作。您也可以在async { ... }中使用try/with,try/finally及use关键字来捕获异常或释放资源。这意味着我们只需要在代理中处理那些未捕获的错误即可。当MailboxProcessor代理中出现未捕获的异常时便会触发Error事件。一个常见的模式是将所有的错误转发给一个监视进程,例如:

 
 
 
  1. type Agent<'T> = MailboxProcessor<'T>
  2. let supervisor = 
  3.    Agent.Start(fun inbox -> 
  4.      async { while true do 
  5.                let! err = inbox.Receive() 
  6.                printfn "an error occurred in an agent: %A" err })
  7. let agent = 
  8.    new Agent(fun inbox -> 
  9.      async { while true do 
  10.                let! msg = inbox.Receive() 
  11.                if msg % 1000 = 0 then 
  12.                    failwith "I don't like that cookie!" })
  13. agent.Error.Add(fun error -> supervisor.Post error) 
  14. agent.Start() 我们也可以很方便地并行这些配置操作:
  15. let agent = 
  16.    new Agent(fun inbox -> 
  17.      async { while true do 
  18.                let! msg = inbox.Receive() 
  19.                if msg % 1000 = 0 then 
  20.                    failwith "I don't like that cookie!" }) 
  21.    |> Agent.reportErrorsTo supervisor 
  22.    |> Agent.start 或使用辅助模块:
  23. module Agent = 
  24.    let reportErrorsTo (supervisor: Agent) (agent: Agent<_>) = 
  25.        agent.Error.Add(fun error -> supervisor.Post error); agent
  26.    let start (agent: Agent<_>) = agent.Start(); agent 

下面是一个例子,我们创建了10000个代理,其中某些会报告错误:

 
 
 
  1. let supervisor = 
  2.    Agent.Start(fun inbox -> 
  3.      async { while true do 
  4.                let! (agentId, err) = inbox.Receive() 
  5.                printfn "an error '%s' occurred in agent %d" err.Message agentId })
  6. let agents = 
  7.    [ for agentId in 0 .. 10000 -> 
  8.         let agent = 
  9.             new Agent(fun inbox -> 
  10.                async { while true do 
  11.                          let! msg = inbox.Receive() 
  12.                          if msg.Contains("agent 99") then 
  13.                              failwith "I don't like that cookie!" }) 
  14.         agent.Error.Add(fun error -> supervisor.Post (agentId,error)) 
  15.         agent.Start() 
  16.         (agentId, agent) ]我们发送消息:
  17. for (agentId, agent) in agents do 
  18.    agent.Post (sprintf "message to agent %d" agentId ) 便可看到:
  19. an error 'I don't like that cookie!' occurred in agent 99 
  20. an error 'I don't like that cookie!' occurred in agent 991 
  21. an error 'I don't like that cookie!' occurred in agent 992 
  22. an error 'I don't like that cookie!' occurred in agent 993 
  23. ...
  24. an error 'I don't like that cookie!' occurred in agent 999

这一节我们处理了F#内存中的MailboxProcessor代理发生的错误。其他一些代理(例如,表示服务器端请求的代理)也可以这样进行设计与架构,以便进行优雅的错误转发及重试。

总结

隔离的代理是一种常用的编程模式,它不断运用在各种编程领域中,从设备驱动编程到用户界面,还包括分布式编程及高度伸缩的通信服务器。每次您编写了一个对象,线程或是异步工作程序,用于处理一个长时间的通信(如向声卡发送数据,从网络读取数据,或是响应一个输入的事件流),您其实就是在编写一种代理。每次您在写一个ASP.NET网页处理程序时,其实您也在使用一种形式的代理(每次调用时都重置状态)。在各种情况下,隔离与通信有关的状态是很常见的需求。

隔离的代理是一种最终的实现方式──例如,实现可伸缩的编程算法,包括可伸缩的请求服务器及分布式编程算法。与其他各种异步及并发编程模式一样,它们也不能被滥用。然而,他们是一种优雅、强大且高效的技术,使用非常广泛。

F#是一个独特的,随Visual Studio 2010一同出现的托管语言,完整支持轻量级的异步计算及内存种的代理。在F#中,异步代理可以通过组合的形式编写,而不用使用回调函数或控制反转等方式。这里有些权衡的地方──例如:在以后的文章中,我们会观察如何使用.NET类库中标准的APM模式来释放您的代理。然而,优势也是很明显的:易于控制,伸缩性强,并且在需要的时候,便可以在组织起CPU和I/O并行操作的同时,保持CPU密集型代码在.NET中的完整性能。

当然,也有其他一些.NET或基于JVM的语言支持轻量级的交互式代理──早前,有人认为这在.NET是“不可能”的事情,因为线程的代价十分昂贵。而如今,F#在2007年引入了“async { ... }”,这被视为语言设计上的一个突破──它让程序员可以在一个被业界广泛认可的编程平台上构建轻量级、组合式的异步编程及交互式的代理。除了Axum语言原型(它也受了F#的影响)之外,F#还证明了一个异步语言特性是一个完全可行的方法,这也解放了如今业界运行时系统设计领域的一个争论话题:我们是否要将线程做得轻量?

文章转自老赵的博客,

原文地址:http://blog.zhaojie.me/2010/03/async-and-parallel-design-patterns-in-fsharp-3-more-agents.html

本文名称:F#中的异步及并行模式:代理的高级使用
网址分享:http://www.csdahua.cn/qtweb/news20/510270.html

网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网