扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这篇文章运用简单易懂的例子给大家介绍golang实现WebSocket推送服务的方法,代码非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
坚守“ 做人真诚 · 做事靠谱 · 口碑至上 · 高效敬业 ”的价值观,专业网站建设服务10余年为成都成都OPP胶袋小微创业公司专业提供成都定制网页设计营销网站建设商城网站建设手机网站建设小程序网站建设网站改版,从内容策划、视觉设计、底层架构、网页布局、功能开发迭代于一体的高端网站建设服务。
推送服务实现
基本原理
server 启动以后会注册两个 Handler。
websocketHandler 用于提供浏览器端发送 Upgrade 请求并升级为 WebSocket 连接。
pushHandler 用于提供外部推送端发送推送数据的请求。
浏览器首先连接 websocketHandler (默认地址为 ws://ip:port/ws)升级请求为 WebSocket 连接,当连接建立之后需要发送注册信息进行注册。这里注册信息中包含一个 token 信息。
server 会对提供的 token 进行验证并获取到相应的 userId(通常来说,一个 userId 可能同时关联许多 token),并保存维护好 token, userId 和 conn(连接)之间的关系。
推送端发送推送数据的请求到 pushHandler(默认地址为 ws://ip:port/push),请求中包含了 userId 字段和 message 字段。server 会根据 userId 获取到所有此时连接到该 server 的 conn,然后将 message 一一进行推送。
由于推送服务的实时性,推送的数据并没有也不需要进行缓存。
代码详解
我在此处会稍微讲述一下代码的基本构成,也顺便说说 Go 语言中一些常用的写法和模式(本人也是从其他语言转向 Go 语言,毕竟 Go 语言也相当年轻。所以有建议的话,敬请提出。)。
由于Go语言的发明人和一些主要维护者大都来自于 C/C++ 语言,所以 Go 语言的代码也更偏向于 C/C++ 系。
首先先看一下 Server 的结构:
// Server defines parameters for running websocket server. type Server struct { // Address for server to listen on Addr string // Path for websocket request, default "/ws". WSPath string // Path for push message, default "/push". PushPath string // Upgrader is for upgrade connection to websocket connection using // "github.com/gorilla/websocket". // // If Upgrader is nil, default upgrader will be used. Default upgrader is // set ReadBufferSize and WriteBufferSize to 1024, and CheckOrigin always // returns true. Upgrader *websocket.Upgrader // Check token if it's valid and return userID. If token is valid, userID // must be returned and ok should be true. Otherwise ok should be false. AuthToken func(token string) (userID string, ok bool) // Authorize push request. Message will be sent if it returns true, // otherwise the request will be discarded. Default nil and push request // will always be accepted. PushAuth func(r *http.Request) bool wh *websocketHandler ph *pushHandler }
这里说一下 Upgrader *websocket.Upgrader,这是 gorilla/websocket 包的对象,它用来升级 HTTP 请求。
如果一个结构体参数过多,通常不建议直接初始化,而是使用它提供的 New 方法。这里是:
// NewServer creates a new Server.func NewServer(addr string) *Server { return &Server{ Addr: addr, WSPath: serverDefaultWSPath, PushPath: serverDefaultPushPath, } }
这也是 Go 语言对外提供初始化方法的一种常见用法。
然后 Server 使用 ListenAndServe 方法启动并监听端口,与 http 包的使用类似:
// ListenAndServe listens on the TCP network address and handle websocket // request. func (s *Server) ListenAndServe() error { b := &binder{ userID2EventConnMap: make(map[string]*[]eventConn), connID2UserIDMap: make(map[string]string), } // websocket request handler wh := websocketHandler{ upgrader: defaultUpgrader, binder: b, } if s.Upgrader != nil { wh.upgrader = s.Upgrader } if s.AuthToken != nil { wh.calcUserIDFunc = s.AuthToken } s.wh = &wh http.Handle(s.WSPath, s.wh) // push request handler ph := pushHandler{ binder: b, } if s.PushAuth != nil { ph.authFunc = s.PushAuth } s.ph = &ph http.Handle(s.PushPath, s.ph) return http.ListenAndServe(s.Addr, nil) }
这里我们生成了两个 Handler,分别为 websocketHandler 和 pushHandler。websocketHandler 负责与浏览器建立连接并传输数据,而 pushHandler 则处理推送端的请求。
可以看到,这里两个 Handler 都封装了一个 binder 对象。这个 binder 用于维护 token <-> userID <-> Conn 的关系:
// binder is defined to store the relation of userID and eventConn type binder struct { mu sync.RWMutex // map stores key: userID and value of related slice of eventConn userID2EventConnMap map[string]*[]eventConn // map stores key: connID and value: userID connID2UserIDMap map[string]string }
websocketHandler
具体看一下 websocketHandler 的实现。
// websocketHandler defines to handle websocket upgrade request. type websocketHandler struct { // upgrader is used to upgrade request. upgrader *websocket.Upgrader // binder stores relations about websocket connection and userID. binder *binder // calcUserIDFunc defines to calculate userID by token. The userID will // be equal to token if this function is nil. calcUserIDFunc func(token string) (userID string, ok bool) }
很简单的结构。websocketHandler 实现了 http.Handler 接口:
// First try to upgrade connection to websocket. If success, connection will // be kept until client send close message or server drop them. func (wh *websocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { wsConn, err := wh.upgrader.Upgrade(w, r, nil) if err != nil { return } defer wsConn.Close() // handle Websocket request conn := NewConn(wsConn) conn.AfterReadFunc = func(messageType int, r io.Reader) { var rm RegisterMessage decoder := json.NewDecoder(r) if err := decoder.Decode(&rm); err != nil { return } // calculate userID by token userID := rm.Token if wh.calcUserIDFunc != nil { uID, ok := wh.calcUserIDFunc(rm.Token) if !ok { return } userID = uID } // bind wh.binder.Bind(userID, rm.Event, conn) } conn.BeforeCloseFunc = func() { // unbind wh.binder.Unbind(conn) } conn.Listen() }
首先将传入的 http.Request 转换为 websocket.Conn,再将其分装为我们自定义的一个 wserver.Conn(封装,或者说是组合,是 Go 语言的典型用法。记住,Go 语言没有继承,只有组合)。
然后设置了 Conn 的 AfterReadFunc 和 BeforeCloseFunc 方法,接着启动了 conn.Listen()。AfterReadFunc 意思是当 Conn 读取到数据后,尝试验证并根据 token 计算 userID,然乎 bind 注册绑定。BeforeCloseFunc 则为 Conn 关闭前进行解绑操作。
pushHandler
pushHandler 则容易理解。它解析请求然后推送数据:
// Authorize if needed. Then decode the request and push message to each // realted websocket connection. func (s *pushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } // authorize if s.authFunc != nil { if ok := s.authFunc(r); !ok { w.WriteHeader(http.StatusUnauthorized) return } } // read request var pm PushMessage decoder := json.NewDecoder(r.Body) if err := decoder.Decode(&pm); err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(ErrRequestIllegal.Error())) return } // validate the data if pm.UserID == "" || pm.Event == "" || pm.Message == "" { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(ErrRequestIllegal.Error())) return } cnt, err := s.push(pm.UserID, pm.Event, pm.Message) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } result := strings.NewReader(fmt.Sprintf("message sent to %d clients", cnt)) io.Copy(w, result) } Conn Conn (此处指 wserver.Conn) 为 websocket.Conn 的包装。 // Conn wraps websocket.Conn with Conn. It defines to listen and read // data from Conn. type Conn struct { Conn *websocket.Conn AfterReadFunc func(messageType int, r io.Reader) BeforeCloseFunc func() once sync.Once id string stopCh chan struct{} }
最主要的方法为 Listen():
// Listen listens for receive data from websocket connection. It blocks // until websocket connection is closed. func (c *Conn) Listen() { c.Conn.SetCloseHandler(func(code int, text string) error { if c.BeforeCloseFunc != nil { c.BeforeCloseFunc() } if err := c.Close(); err != nil { log.Println(err) } message := websocket.FormatCloseMessage(code, "") c.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second)) return nil }) // Keeps reading from Conn util get error. ReadLoop: for { select { case <-c.stopCh: break ReadLoop default: messageType, r, err := c.Conn.NextReader() if err != nil { // TODO: handle read error maybe break ReadLoop } if c.AfterReadFunc != nil { c.AfterReadFunc(messageType, r) } } } }
主要设置了当 websocket 连接关闭时的处理和不停地读取数据。
关于golang实现WebSocket推送服务的方法就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流