浅析F#简易Comet聊天服务实例

Visual Studio 2010中关于F#的部分已经众人皆知,那么具体该怎么开发呢?这里作者将本来可以用C#开发的实例,改用F#来进行,也是为大家开阔眼界。

#T#

普通的Web应用程序,都是靠大量HTTP短连接维持的。如实现一个聊天服务时,客户端会不断轮询服务器端索要新消息。这种做法的优势在于简单有效,因此广为目前的聊天服务所采用。不过Comet技术与之不同,简单地说,Comet便是指服务器推(Server-Push)技术。它的实现方式是(这里只讨论基于浏览器的Web平台)在浏览器与服务器之间建立一个长连接,待获得消息之后立即返回。否则持续等待,直至超时。客户端得到消息或超时之后,又会立即建立另一个长连接。Comet技术的***优势,自然就是很高的即使性。

如果要在ASP.NET平台上实现Comet技术,那么自然需要在服务器端使用异步请求处理。如果是普通处理方式的话,每个请求都会占用一个工作线程,要知道Comet是“长连接”,因此不需要多少客户端便会占用大量的线程,这对资源消耗是巨大的。如果是异步请求的话,虽然客户端和服务器端之间一直保持着连接,但是客户端在等待消息的时候是不占用线程的,直到“超时”或“消息到达”时才继续执行。

以前也有人实现过基于ASP.NET的Comet服务原型,不过是使用C#的。而现在我们用F#来实现这个功能。您会发现F#对于此类异步场景有其独特的优势。

F#常用的工作单元是“模块”,其中定义了大量函数或字段。例如我们要打造一个聊天服务的话,我便定义了一个Chat模块:

 
 
  1. #light  
  2. module internal Comet.Chating.Chat  
  3. open System  
  4. open System.Collections.Concurrent  
  5.  
  6. type ChatMsg = {  
  7.     From: string;  
  8.     Text: string;  
  9. }  
  10.  
  11. let private agentCache = new ConcurrentDictionary>()  
  12.  
  13. let private agentFactory = new Func>(fun _ ->   
  14.     MailboxProcessor.Start(fun o -> async { o |> ignore }))  
  15.  
  16. let private GetAgent name = agentCache.GetOrAdd(name, agentFactory) 

在这里我构建了一个名为ChatMsg的Record类型,一个ChatMsg对象便是一条消息。然后,我使用一个名为agentCache的ConcurrentDictionary对象来保存每个用户所对应的聊天队列——MailboxProcessor。它是F#核心库中内置的,用于实现消息传递式并发的组件,非常轻量级,因此我为每个用户分配一个也只使用很少的资源。GetAgent函数的作用是根据用户的名称获取对应的MailboxProcessor对象,自不必多说。

Chat模块中还定义了send和receive两个公开方法,如下:

 
 
  1. let send fromName toName msg =   
  2.     let agent = GetAgent toName  
  3.     { From = fromName; Text = msg; } |> agent.Post  
  4.  
  5. let receive name =   
  6.     let rec receive' (agent: MailboxProcessor) messages =   
  7.         async {  
  8.             let! msg = agent.TryReceive 0  
  9.             match msg with  
  10.             | None -> return messages  
  11.             | Some s -> return! receive' agent (s :: messages)  
  12.         }  
  13.  
  14.     let agent = GetAgent name  
  15.  
  16.     async {  
  17.         let! messages = receive' agent List.empty  
  18.         if (not messages.IsEmpty) then return messages  
  19.         else 
  20.             let! msg = agent.TryReceive 3000  
  21.             match msg with  
  22.             | None -> return []  
  23.             | Some s -> return [s]  
  24.     } 

send方法接受3个参数,没有返回值,它的实现只是简单地构造一个ChatMsg对象,并塞入对应的MailboxProcessor。不过receive方法是这里最关键的部分(没有之一)。receive函数的作用是接受并返回MailboxProcessor中已有的对象,或者等待3秒钟后超时——这么说其实不太妥当,因为receive方法其实只是构造了一个“做这件事情”的Async Workflow,而还没有真正执行它。至于它是如何执行的,我们稍候再谈。

receive函数的逻辑是这样的:首先我们构造一个辅助函数receive’来“尝试获取”队列中已有的所有消息。receive’是一个递归函数,每次获取一个,并递归获取剩余的消息。agent.TryReceive函数接受0,表示查询队列,并立即返回一个Option 结果,如果这个结果为None,则表示队列已为空。于是在receive这个主函数中,便先使用receive’函数获取已有消息,如果存在则立即返回,否则便接收3秒钟内获得的***个消息,如果3秒结束还没有收到则返回None。

在receive和receive’函数中都使用了let!获取agent.TryReceive函数的结果。let!是F#中构造Workflow的关键字,它起到了“语法糖”的作用。例如,以下的Async Workflow:

 
 
  1. async {  
  2.     let req = WebRequest.Create("http://moma.org/")  
  3.     let! resp = req.GetResponseAsync()  
  4.     let stream = resp.GetResponseStream()  
  5.     let reader = new StreamReader(stream)  
  6.     let! html = reader.ReadToEndAsync()  
  7.     html  

事实上在“解糖”后就变成了:

 
 
  1. async.Delay(fun () ->  
  2.     async.Let(WebRequest.Create("http://moma.org/"), (fun req ->  
  3.         async.Bind(req.GetResponseAsync(), (fun resp ->  
  4.             async.Let(resp.GetResponseStream(), (fun stream ->  
  5.                 async.Let(new StreamReader(stream), (fun reader ->  
  6.                     async.Bind(reader.ReadToEndAsync(), (fun html ->  
  7.                         async.Return(html)))))))))) 

let!关键字则会转化为Bind函数调用,Bind调用有两个参数,***个参数为Async<’a>类型,它便负责一个“回调”,待回调后才执行一个匿名函数——也就是Bind函数的第二个参数。可见,let!关键字的一个重要作用,便是将流程的“控制权”转交给“系统”,待合适的时候再继续执行下去。这便是关键,因为这样的话,在接受一个消息的时候,这等待的3秒钟是不占用任何线程的,也就是真正的纯异步。但是如果观察代码——难道不是纯粹的顺序型写法吗?

这就是F#的神奇之处。

在ASP.NET处理时需要Handler,于是在Send阶段便是简单的IHttpHandler:

 
 
  1. #light  
  2.  
  3. namespace Comet.Chating  
  4.  
  5. open Comet  
  6. open System  
  7. open System.Web  
  8.  
  9. type SendHandler() =  
  10.  
  11.     interface IHttpHandler with  
  12.         member h.IsReusable = false 
  13.         member h.ProcessRequest(context) =   
  14.             let fromName = context.Request.Form.Item("from");  
  15.             let toName = context.Request.Form.Item("to")  
  16.             let msg = context.Request.Form.Item("msg")  
  17.             Chat.send fromName toName msg  
  18.             context.Response.Write "sent" 

而Receive阶段则是个异步的IHttpAsyncHandler:

 
 
  1. #light  
  2.  
  3. namespace Comet.Chating  
  4.  
  5. open Comet  
  6. open System  
  7. open System.Collections.Generic  
  8. open System.Web  
  9. open System.Web.Script.Serialization  
  10.  
  11. type ReceiveHandler() =  
  12.  
  13.     let mutable m_context = null 
  14.     let mutable m_endReceive = null 
  15.  
  16.     interface IHttpAsyncHandler with  
  17.         member h.IsReusable = false 
  18.         member h.ProcessRequest(context) = failwith "not supported" 
  19.  
  20.         member h.BeginProcessRequest(c, cb, state) =  
  21.             m_context <- c  
  22.  
  23.             let name = c.Request.QueryString.Item("name")  
  24.             let receive = Chat.receive name  
  25.             let beginReceive, e, _ = Async.AsBeginEnd receive  
  26.             m_endReceive <- new Func<_, _>(e)  
  27.  
  28.             beginWork (cb, state)  
  29.  
  30.         member h.EndProcessRequest(ar) =  
  31.             let convert (m: Chat.ChatMsg) =  
  32.                 let o = new Dictionary<_, _>();  
  33.                 o.Add("from", m.From)  
  34.                 o.Add("text", m.Text)  
  35.                 o  
  36.  
  37.             let result = m_endReceive.Invoke ar  
  38.             let serializer = new JavaScriptSerializer()  
  39.             result  
  40.             |> List.map convert  
  41.             |> serializer.Serialize  
  42.             |> m_context.Response.Write 

这里的关键是Async.AsBeginEnd函数,它将Chat.receive函数生成的Async Workflow转化成一组标准APM形式的begin/end对,然后我们只要把BeginProcessRequest和EndProcessReqeust的职责直接交给即可。剩下的,便是一些序列化成JSON的工作了。

于是我们可以新建一个Web项目,引用F#工程,在Web.config里配置两个Handler,再准备一个Chat.aspx页面即可。您可以在文末的链接中查看该页面的代码,也可以在这里试用其效果。作为演示页面,您其实只能“自己给自己”发送消息,其主要目的是查看其响应时间而已。例如,以下便是使用效果一例:

 
 
  1. 2 - receiving...  
  2. 3026 - received nothing (3024ms)  
  3. 3026 - receiving...  
  4. 6055 - received nothing (3028ms)  
  5. 6055 - receiving...  
  6. 7256 - sending 123654...  
  7. 7268 - received: 123654 (1213ms)  
  8. 7268 - receiving...  
  9. 10281 - received nothing (3013ms)  
  10. 10281 - receiving...  
  11. 13298 - received nothing (3017ms)  
  12. 13298 - receiving...  
  13. 13679 - sending 123456...  
  14. 13698 - received: 123456 (400ms)  
  15. 13698 - receiving...  
  16. 16716 - received nothing (3018ms)  
  17. 16716 - receiving...  
  18. 18256 - sending hello world...  
  19. 18265 - received: hello world (1549ms)  
  20. 18266 - receiving...  
  21. 21281 - received nothing (3015ms)  
  22. 21281 - receiving... 

可见,如果没有收到消息,那么receive操作会在3秒钟后返回。当send一条消息后,先前的receive操作便会立即获得消息了,即无需等待3秒便可提前返回。这便是Comet的效果。

至于性能,我写了一个客户端小程序,用于模拟大量用户同时聊天,每个用户每隔1秒便给另外5个用户发送一条消息,然后查看这条消息收到时产生多少的延迟。经过本机测试(2.4GHz双核,2G内存),当超过2K个在线用户时(即2000个长连接)延迟便超过了1秒——到20K还差不多。这个性能其实并不理想。不过,我这个测试也很一般。因为测试环境相当马虎,大量程序(如N个VS)基本上已经完全用满了所有的物理内存,测试客户端和服务器也是同一台机器,甚至代码也是Debug编译的……而根据监视,测试用的客户端小程序CPU占用超过50%,而服务器进程对应的w3wp.exe的CPU占用却小于10%。因此,我们可以这样推断,其实服务器端的性能并没有用足,也有可能是MailboxProcessor的调度方式不甚理想。至于具体是什么原因,我还在调查之中。

***我想说的是,这个Comet实现只是一个原型,我最想说明的问题其实是F#在异步编程中的优势。目前我写的一些程序,例如一些网络爬虫,都已经使用F#进行开发了,因为它的Async Workflow实在是过于好用,为我省了太多力气。同时我还想证明,“语言特性”并非不重要,它对于编程的简化也是至关重要的。在我看来,“类库”也好,“框架”也罢都是可以补充的,但是语言特性是个无法突破的“限制”。例如,异步编程对于F#来说简化了不少,这是因为我们可以使用顺序的方式编写异步程序。在C#中略有不足,但还有yield可以起到相当作用,因此我们可以使用CCR和AsyncEnumerator简化异步操作。但如果您使用的是Java这种劣质语言……因此,放弃Java,使用Scala吧。

值得一提的是,Async Workflow并不是F#的语言特性,F#的语言特性是Workflow,而Async Workflow其实只是实现了一个Workflow Builder,也就是那个async { ... },以此来简化异步编程而已。PDC 09上关于F#对异步编程的支持也有相应的介绍。

当前文章:浅析F#简易Comet聊天服务实例
文章起源:http://www.csdahua.cn/qtweb/news15/412015.html

网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网