go语言mqtt心跳包 go mqtt服务器

Golang kafka简述和操作(sarama同步异步和消费组)

一、Kafka简述

成都创新互联企业建站,10余年网站建设经验,专注于网站建设技术,精于网页设计,有多年建站和网站代运营经验,设计师为客户打造网络企业风格,提供周到的建站售前咨询和贴心的售后服务。对于成都网站设计、成都做网站中不同领域进行深入了解和探索,创新互联在网站建设中充分了解客户行业的需求,以灵动的思维在网页中充分展现,通过对客户行业精准市场调研,为客户提供的解决方案。

1. 为什么需要用到消息队列

异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;

解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。

缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。

2.为什么选择kafka呢?

这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文

kafka的优点:

1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群

kafka的缺点:

1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高

3. Golang 操作kafka

3.1. kafka的环境

网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件

3.2. 第三方库

github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组

3.3. 消费者

单个消费者

funcconsumer(){varwg sync.WaitGroup  consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{      fmt.Println("Failed to start consumer: %s", err)return}  partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{      fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {      pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{        fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}      wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))        }deferpc.AsyncClose()        wg.Done()      }(pc)  }  wg.Wait()}funcmain(){  consumer()}

消费组

funcconsumerCluster(){  groupID :="group-1"config := cluster.NewConfig()  config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second  config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{      glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){      errors := c.Errors()      noti := c.Notifications()for{select{caseerr := -errors:            glog.Errorln(err)case-noti:        }      }  }(c)formsg :=rangec.Messages() {      fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))      c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}

3.4. 生产者

同步生产者

packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){  config := sarama.NewConfig()  config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}  msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")  client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{      fmt.Println("producer close err, ", err)return}deferclient.Close()  pid, offset, err := client.SendMessage(msg)iferr !=nil{      fmt.Println("send message failed, ", err)return}  fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}

异步生产者

funcasyncProducer(){  config := sarama.NewConfig()  config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Second  p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){      errors := p.Errors()      success := p.Successes()for{select{caseerr := -errors:iferr !=nil{              glog.Errorln(err)            }case-success:        }      }  }(p)for{      v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))      fmt.Fprintln(os.Stdout, v)      msg := sarama.ProducerMessage{        Topic: topics,        Value: sarama.ByteEncoder(v),      }      p.Input() - msg      time.Sleep(time.Second *1)  }}funcmain(){goasyncProducer()select{      }}

3.5. 结果展示-

同步生产打印:

分区ID:0,offset:90

消费打印:

Partition:0,Offset:90,key:,value:Hello World!

异步生产打印:

async:7272async:7616async:998

消费打印:

Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998

【内部分享】MQTT协议解读及使用经验

时间:2018-07-26

Q: 什么是网络连接?

A: 网络连接是传输层定义的概念,在传输层以下只存在网络数据包的相互交换。

所谓连接,其实也不是在网络上有一条真实存在的数据通道。只要通信双方在一段时间内持续保持数据包交换,就可以视为双方建立的连接并没有断开。

连接的建立是依托于TCP协议的三次握手,一旦连接已经建立完毕,通信双方就可以复用这条虚拟通道进行数据交换。如果连接保持长时间工作一直没有被中断,那么这样的TCP连接就俗称为长连接。

Message Queue Telemetry Transport ,中文直译: 消息队列遥测传输协议 。

在MQTT协议被设计出来的年代,还没有物联网这么时髦的词汇,当年叫做 遥测设备 。

MQTT协议真正开始声名鹊起的原因,是其正好恰恰踩中移动互联网发展的节拍,为消息推送场景提供了一个既简便又具有良好扩展性的现成解决方案。

可以看出,MQTT对消息头的规定十分精简, 固定头部占用空间大小仅为1个字节 ,一个最小的报文占用的空间也 只有两个字节 (带一字节的长度标识位)。

这也是MQTT协议针对不稳定及带宽低下的网络环境做出的特定设计 - - - - 尽可能地节省一切不必要的网络开销 。

Q:为什么MQTT协议需要心跳报文(PINGREQ, PINGRESP)来维护连接状态,只监控该TCP的连接状态是否可以实现目的?

A: TCP数据传输默认的超时时间过长,不符合应用层上细粒度的要求。

TCP数据传输超时的情况可分成三种: 服务端断开 、 客户端断开 、 中间网络断开 。

在前两种场景下,若断开操作是一方主动发起的,即表示为TCP连接正常结束,双方走四次挥手流程;若程序异常结束,则会触发被动断开事件,通信另一方也能立刻感知到本次连接所打开的 Socket 出现中断异常。

唯独中间网络的状态是通信双方不能掌握的。 在Linux系统下 ,TCP的连接超时由内核参数来控制,如果通信中的一方没有得到及时回复,默认会主动再尝试 6次 。如果还没有得到及时回应,那么其才会认定本次数据超时。

连带首次发包与六次重试,Linux系统下这7次发包的超时时间分别为 2的0次方 至 2的6次方 ,即1秒、2秒、4秒、8秒、16秒、32秒、64秒,一共127秒。MQTT协议认为如此长的超时时间对应用层而言粒度太大,因此其在应用层上还单独设计属于自身的心跳响应控制。常见的MQTT连接超时多被设定为 60秒 。

扩展知识 - TCP的KeepAlive机制:

由通信中的 报文标识符 ( Packet Identifier )传达。

Q:仅Publish与Pubrec能保证消息只被投递一次吗?

A: 业务上可以实现,但MQTT协议并没有如此设计,原因如下:

每个消息都会拥有属于自己的报文标识符,但如果需要两次数据交换就实现消息仅只收到一次,就需要通信双方记录下每次使用的报文标识符,并且在处理每一条消息时都需要去重处理,以防消息被重复消费。

但MQTT协议最初被设计的工作对象是轻量级物联设备,为此在协议的设计中报文标识符被约定为 可重用 ,以减少对设备性能的消耗,换回的代价不得不使用四次网络数据交换,才能确保消息正好被消费一次。

Q:两个不同客户端在发布与订阅同一Topic下的消息时,都可以提出通信Qos要求,此时以哪项为基准?

A: 伪命题,故意在分享时埋下坑,等人来踩。

两个不同客户端的通信是需要 Broker 进行中转,而不是直连。因此,通信中存在两个不同的会话,双方的Qos要求仅仅作用于它们与 Broker 之间的会话,最终的Qos基准只会向最低要求方看齐。

例:遗嘱消息的正确使用方式可参考此篇文章:

虽然可以借助 Retain Message 实现绑定一条消息至某个Topic,以达到消息的暂时保留目的。

但首先 Retain Message 并不是为存储场景而设计的,再次MQTT协议并没有对消息的持久化作出规定,也就是说Broker重启后,现有保留消息也将丢失。

Q:两种特殊消息的使用场景?

A: 遗嘱消息,多用于客户端间获取互相之间异常断线的消息通知;

保留消息,可保存 最近一条 广播通知,多用于公告栏信息的发布。

Eclipse Mosquitto :MQTT协议的最小集实现

有 EMQ , HiveMQ , RabbitMQ MQTT Adapter 等。

Qos=2 消息保障的网络I/O次数过多,如果不是必需,尽少在程序里使用此类消息。

毕竟当初其设计的目的是为了减少设备的性能占用,但若应用场景并不是物联网,而是用于手机、电脑或浏览器端等现在已不缺性能的设备上,最好在报文体中,使用UUID生成全局唯一的消息ID,然后自行在业务解析中判断此报文是否被消费过。

或者,业务方在处理消息时保证其被消费的幂等性,也可消除重复消息对系统带来的影响。

正如MQTT协议并没有依赖TCP连接状态,自己在应用层协议上实现心跳报文来控制连接状态,业务方作为MQTT协议的使用者,也不要完全依赖协议的工作状态,而是依托MQTT协议建立属于业务本身的信息汇报机制,以加强系统的稳健性。

Retain Message 可视为客户端主动拉取的行为。如果业务系统采用 HTTP+MQTT 双协议描述业务过程,主动拉取的操作也可使用 HTTP 请求替代。

作为一个长连接型的应用,上线前需要根据业务量级,评估对操作系统 端口数 与 文件描述符 的占用要求,以防服务器资源被打满。

在服务端的配置文件和客户端的连接参数中,都拥有 max_inflight_messages 此项配置,来维护 Qos=1 or 2 消息是否被成功消费的状态。

MQTT 最初被设计为物联网级的通信协议,因此此参数的默认配额较小(大多数情况下被限制到10至20)。

但如果将MQTT协议应用至手机、PC或Web端的推送场景时,硬件性能已不在是瓶颈,在实际使用中推荐把此参数调大。

Mosquitto提供Bridge功能,需要我们自己配置。

Bridge 意为桥接,当我们把两台Broker桥接在一起时,只需要修改一台Broker的配置,填上另一台Broker的运行地址。前一台Broker将作为客户端发布与订阅后一台Broker的所有Topic,实现消息互通的目的。

桥接带来的问题有以下几点:

我的建议:

Websockets协议被设计的目的是为浏览器提供一个全双工的通信协议,方便实现消息推送功能。

在Websockets协议被设计出来前,受限于HTTP协议的一问一答模型,消息的推送只能靠轮询来实现,在资源消耗与时效性保障上,均难以达到令人满意的效果。

Websockets协议复用了HTTP协议的头部信息,告知浏览器接下来的操作将触发协议升级,然后通信双方继续复用HTTP的Header,但报文内容已转变为双方均接受的新协议的格式。

Websockets协议改进了网页浏览中的消息推送的方式,因此被广泛应用在聊天、支付通知等实时性要求比较高的场合下。

MQTT协议重点在于 消息队列的实现,其对消息投递的方式作出约定,并提供一些额外的通信保障 。

MQTT可采取原生的TCP实现,也有基于Websockets的实现版本。当然后者在网络字节的利用率上,不如前者那么精简。但浏览器端无法直接使用TCP协议,所以就只能基于Websockets协议开发。

不过基于Websockets的应用也有方便之处:一是证书不需要额外配置,直接与网站共用一套基础设施;二是可使用 Nginx 等工具管理流量,与普通HTTP流量可共用一套配置方法。

MQTT非常适合入门,原因如下:

实际的应用场景远比理想中的复杂,无法一招走遍天下,必须做好取舍。

MQTT协议在这方面做得很优秀,以后工作中可以作为参考,设计好自己负责的业务系统。

Android MQTT 通信

  MQTT 协议 是基于发布/订阅模式的物联网通信协议,凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山。

  常用于 IOT 物联网和一些需要服务端主动通知客户端的场景。

1. 导入依赖

2. 创建 MqttHelper 辅助类,设置回调监听

3. 连接 MQTT

  连接成功或失败,以及中途的连接掉线,会触发 OnMqttStatusChangeListener 回调

4. MQTT 连接状态监听

5. MQTT 收发消息监听

  onSubMessage 订阅的消息回调,因为存在订阅多个 topic 的情况,所以回调能知道是来自哪个 Topic 的消息;

  onPubMessage 发布的消息回调,用于确认发布的消息是否发送成功。

6. MQTT 订阅 Topic

  需要在 MQTT 连接成功后才能订阅 topic,否则订阅 Topic 不成功,收不到对应消息

7. MQTT 取消订阅 Topic

8. MQTT 发布消息

9. MQTT 断开连接

10. 通知设置

  由于 MQTT 启动了一个 Service,而 Android 8.0 以上对于后台 Service 限制时长 5 秒;所以将 MqttService 绑定到 Notification 上成为了一个前台通知;通知的标题和内容显示可以在 strings.xml 中设置,对应属性如下:

  Android 8.0 及以上开启前台服务绑定到通知,8.0 以下默认不启用,可将 mqtt_foreground_notification_low_26 设为 true,将 8.0 以下设备也开启前台通知服务

  创建 MQTT 实例时需要传送参数 MqttOptions,下面将介绍下部分参数;

1. Topic

  MQTT 是一种发布/订阅的消息协议, 通过设定的主题 Topic,

发布者向 Topic 发送的 payload 负载消息会经过服务器, 转发到所有订阅

该 Topic 的订阅者

   通配符 : 假想移动端消息推送场景,有的系统消息是全体用户接收,有的消息是 Android 或 iOS 设备接收, 又或者是某些消息具体推送到用户,当然, 对应的多种类型消息可以通过多订阅几个对应的 Topic 解决,也可以使用通配符;

  通配符有两个, " + " 和 " # ", 与正斜杠 " / " 组合使用;加号只能表示一级Topic, 井号可以表示任意层级 Topic; 例如: 订阅 Topic为 " System/+ ", 发布者发布的 Topic 可以是 System、System/Android、System/iOS; 但是不能是 System/iOS/123, 而订阅的 Topic 如果是" System/# " 则可以收到;

   注意,只有订阅的 Topic 才可以使用 通配符, 发布和遗嘱的 Topic 不能包含通配符.

2. ClientID

  发布者和订阅者都是属于客户端, 客户端与服务端建立连接之后,发送的第一个报文消息必须是 Connect 消息,而 Connect 的消息载荷中必须包含 clientID 客户端唯一标识;

  如果两个客户端的 clientID 一样, 则服务端记录第一个客户端连接之后再收到第二个客户端连接请求,则会向一个客户端发送 Disconnect 报文断开连接, 并连接第二个客户端, 而如果此时设置了自动重连, 第一个客户端再次连接,服务端又断开与第二个的连接, 连上第一个客户端, 如此将导致两个客户端不断的被挤掉重连.

  注意: clientID 使用的字符最好是 大小写字母和数字, 长度最好限制在[1, 23] 之间;

3. 遗嘱消息

  可选参数, 客户端没有主动向服务端发起 disconnect 断开连接消息,然而服务端检测到和客户端之间的连接已断开, 此时服务端将该客户端设置的遗嘱消息发送出去

  应用场景: 客户端因网络等情况掉线之后, 可以及时通知到所有订阅该遗嘱 Topic 的客户端;

  遗嘱 Topic 中不能存在通配符.

4. Session

  客户端和服务端之间建立的会话状态, 一般用于消息保存, 如果设置清除 Session,则每次客户端和服务端建立连接会创建一个新的会话,之前连接中的消息不能恢复,

  而设置不清除会话, 对应发布者发送的 qos 为 1和2 的消息,还未被订阅者接收确认,则需要保存在会话中, 以便订阅者下次连接可以恢复这些消息;

  注意: Session 存储的消息是保存在内容中的, 所以如果不是重要的消息,最好是设置清除 Session, 或者设置 qos = 0;

5. 心跳包

  标识客户端传输一次控制报文到下一次传输之间允许的空闲时间;在这段时间内,如果客户端没有其他任何报文发送,必须发送一个 PINGREQ 报文到服务器,而如果服务端在 1.5 倍心跳时间内没有收到客户端消息,则会主动断开客户端的连接,发送其遗嘱消息给所有订阅者。而服务端收到 PINGREQ 报文之后,立即返回 PINGRESP 报文给客户端

  心跳时间单位为秒,占用2个字节,最大 2^16 - 1 = 65535秒(18小时12分钟15秒),设置为 0 表示不使用心跳机制; 心跳时间一般设置为几分钟或几十秒即可,时间短点可以更快的发出遗嘱消息通知掉线,但是时间短会增加消息频率,影响服务端并发; 微信长连接为 300 秒,而三大运营商貌似也有个连接时间最小的为 5 分钟。

6. qos

  服务质量等级 qos 对应两部分,一是客户端到服务端发送的消息, 一是服务端到客户端订阅的消息; 从发布者到订阅者实际 qos 为两段路中 qos 最小的。

  qos 可选值 0(最多交付一次)、1(最少交付一次)、2(正好交付一次);

   qos = 0 :接收方不发送响应,发送方不进行重试;发送方只管发一次,不管是否发成功,也不管接收方是否成功接收,适用于不重要的数据传输;

   qos = 1 :确保消息至少有一次到达接收方,发送方向接收方发送消息,需要等待接收方返回应答消息,如果发送方在一定时间之内没有收到应答,发送方继续下一次消息发送,直到收到应答消息,删除本地消息缓存,不再发送;所以接收方可能收到1-n次消息;适用于需要收到所有消息,客户端可以处理重复消息。

   qos = 2 :确保消息只一次到达接收方,发送方和接收方之间消息处理流程最复杂;

   Mqtt Qos 深度解读 和 MQTT协议QoS2 准确一次送达的实现

7. payload 负载消息

  字节流类型, 是 MQTT 通信传输的真实数据

8. 保留消息

  发布消息时设置, 对应参数 retain, 服务端将保留对应 Topic 最新的一条消息记录; 保留消息的作用是每次客户端连接上线都会收到其 Topic 的最后一条保留消息, 所以可能存在网络不稳定,频繁掉线重连,每次重连重复收到保留消息;

   可以向对应的 Topic 发送一条 空消息,用于清除保留消息。

MQTT 服务搭建 Apache Apollo 服务器 搭建 MQTT 服务

Github 仓库

mqtt 协议

paho mqtt c 源码分析-2 (心跳机制)

MQTT是基于TCP的,因此需要考虑连接心跳,paho mqtt c的心跳处理函数

概括如下:

ping_outstanding 在下述函数中处理,该函数的调用是在接收线程中,如果收到云端的 PINGRESP 数据包,会调用该接口

lastSent 是MQTT消息包发送完成,调用的场景如下2个:

MQTTPacket_Factory(...)

如果收到一个MQTT packet,则设置 lastReceived

面试笔记-Socket MQTT Websocket

1.Socket是对TCP/IP协议的封装,Socket本身并不是协议,而是一个调用接口(API),通过Socket,我们才能使用TCP/IP协议。

2.MQTT协议是应用层协议不依赖长连接,适合弱网络。通过topic缓存信息。符合物联网设备的使用场景。因为通过topic缓存信息,因此可以实现通过topic与多个端的一对多连接,而不是设备与设备的多对多连接,节省了能耗及带宽。

MQTT的心跳,及非信息的报文,较Websocket更少,更节省带宽及能耗。更适用于物理网的多种网络协议。

3.WebSocket和Http一样在应用层,提供使用一个TCP连接进行双向通讯的机制,包括网络协议和API,以取代网页和服务器采用HTTP轮询进行双向通讯的机制。 本质上来说,WebSocket是不限于HTTP协议的,但是由于现存大量的HTTP基础设施,代理,过滤,身份认证等等,WebSocket借用HTTP和HTTPS的端口。由于使用HTTP的端口,因此TCP连接建立后的握手消息是基于HTTP的,由服务器判断这是一个HTTP协议,还是WebSocket协议。 WebSocket连接除了建立和关闭时的握手,数据传输和HTTP没丁点关系了。

Socket 连接,至少需要一对套接字,分为 clientSocket,serverSocket 连接分为3个步骤:

(1) 服务器监听:服务器并不定位具体客户端的套接字,而是时刻处于监听状态;

(2) 客户端请求:客户端的套接字要描述它要连接的服务器的套接字,提供地址和端口号,然后向服务器套接字提出连接请求;

(3) 连接确认:当服务器套接字收到客户端套接字发来的请求后,就响应客户端套接字的请求,并建立一个新的线程,把服务器端的套接字的描述发给客户端。一旦客户端确认了此描述,就正式建立连接。而服务器套接字继续处于监听状态,继续接收其他客户端套接字的连接请求.

Socket为长连接:通常情况下Socket 连接就是 TCP 连接,因此 Socket 连接一旦建立,通讯双方开始互发数据内容,直到双方断开连接。在实际应用中,由于网络节点过多,在传输过程中,会被节点断开连接,因此要通过轮询高速网络,该节点处于活跃状态。

很多情况下,都是需要服务器端向客户端主动推送数据,保持客户端与服务端的实时同步。

若双方是 Socket 连接,可以由服务器直接向客户端发送数据。

若双方是 HTTP 连接,则服务器需要等客户端发送请求后,才能将数据回传给客户端。

因此,客户端定时向服务器端发送请求,不仅可以保持在线,同时也询问服务器是否有新数据,如果有就将数据传给客户端。

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。

例如:

①网络代价昂贵,带宽低、不可靠。

②在嵌入设备中运行,处理器和内存资源有限。

该协议的特点有:

①使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。 ②对负载内容屏蔽的消息传输。

③使用 TCP/IP 提供网络连接。

④有三种消息发布服务质量:

⑤"至多一次",消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。

⑥"至少一次",确保消息到达,但消息重复可能会发生。

⑦"只有一次",确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

⑧小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。

⑨使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

有三种消息发布服务质量:

“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。qos=0

“至少一次”,确保消息到达,但消息重复可能会发生。qos=1

“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。qos=2

Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:

固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。

可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。

消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

WebSocket则提供使用一个TCP连接进行双向通讯的机制,包括网络协议和API,以取代网页和服务器采用HTTP轮询进行双向通讯的机制。 本质上来说,WebSocket是不限于HTTP协议的,但是由于现存大量的HTTP基础设施,代理,过滤,身份认证等等,WebSocket借用HTTP和HTTPS的端口。由于使用HTTP的端口,因此TCP连接建立后的握手消息是基于HTTP的,由服务器判断这是一个HTTP协议,还是WebSocket协议。 WebSocket连接除了建立和关闭时的握手,数据传输和HTTP没丁点关系了。 由此可知两者的应用场景不一样: MQTT是为了物联网场景设计的基于TCP的Pub/Sub协议,有许多为物联网优化的特性,比如适应不同网络的QoS、层级主题、遗言等等。 WebSocket是为了HTML5应用方便与服务器双向通讯而设计的协议,HTTP握手然后转TCP协议,用于取代之前的Server Push、Comet、长轮询等老旧实现。 两者之所有有交集,是因为一个应用场景:如何通过HTML5应用来作为MQTT的客户端,以便接受设备消息或者向设备发送信息,那么MQTT over WebSocket自然成了最合理的途径了。

MQTT 系列之 MQTT broker 的连接

client 在可以发布和订阅消息之前,必须先连接到 broker,下面我们来看一下连接到 broker 的流程。

连接的建立由 Client 端发起,Client 端首先向 broker 发送一个 CONNECT 数据包,CONNECT 数据包包含以下内容(这里我们略过 fixed header)。

在 CONNECT 数据包可变头中,含有以下信息。

协议名称(Protocol name) :值固定为 MQTT 字符。

协议版本 :对于 MQTT3.1.1 来说,值为 4.

用户名标识 :消息体中是否有用户名字段, 1bit , 0 或 1。

密码标识 :消息体中是否有密码字段,1bit,0 或 1。

遗愿消息 Retain 标志(will retain) :标识遗愿消息是否是 retain 消息,1 bit,0 或 1。

遗愿消息 QoS 标志 (will QoS) :标志遗愿消息的 QoS 是否存在,1 bit,0 或 1。

会话清除标志(clean session) :标识 Client 是否建立一个持久化的会话,1bit,0或1,当 clean session 的标识为 0 时,代表 client 希望建立一个之久的会话连接,broker 间存储该 client 订阅的主题和未接受的消息,否则broker不会存储这些数据,同时在建立连接时清除这个 client 之前的持久化会话保存的数据。

心跳保持(keep alive) :设置一个单位为秒的时间间隔,client 和 broker 之间在这个时间内至少需要进行一次消息交互,否则 client 和 broker 会认为它们之间的连接已断开。

CONNECT 数据包的消息体中包含了以下数据。

客户端标识符(client identifier) :Client Identifier 是用来标识 Client 身份的字段,在 MQTT3.1.1 的版本中,这个字段的长度是 1 到 23个字节,而且只能包含数字和26个字母(包括大小写),broker 通过这个字段来区分不同的 client。所有在连接的时候,client应该保证它的 identifier 是唯一的,通常我们可以使用比如 UUID,唯一的设备硬件标识,或者 Android 设备的 DEVICE_ID 等作为 Client identifier 的取值来源。MQTT 协议中要求 Client 连接时必须带上 Client Identifier,但是也允许 broker 在实现时 Client Identifier 为空,这时 Broker 会为 Client 分配一个内部唯一的 Identifier。如果你需要使用持久化会话,那就必须自己为 Client 设定一个唯一的 Identifier。

用户名(Username) :如果可变头中的用户标识设为 1,那么消息体中将包含用户名字段,Broker 可以使用用户名和密码来对接入的 Client 进行验证,只允许已经授权的 Client 接入。注意不同的 Client 需要使用不同的 Client Identifier,但它们可以使用同样的用户名和密码进行连接。

密码(password) :如果可变头中的密码标识为1,那么消息体中将包含密码字段。

遗愿主题(will topic) :如果可变头中遗愿标识设为1,那么消息体中将包遗愿主题,当 Client 非正常地中断连接的时候,Broker将向指定的遗愿主题中发布遗愿消息。

遗愿消息(will message) :如果可变头中的遗愿标志为1,那么消息体中将包含遗愿消息,当 Client 非正常地中断连接的时候,Broker 将向指定的遗愿主题中发布由该字段指定的内容。

当 broker 收到 client 的 CONNECT 数据包之后,将检查并校验 CONNECT 数据包的内容,之后回复 Client 一个 CONNACK 数据包。

CONNACK 数据包包含以下内容(这里略过 Fixed header)。

CONNACK 数据包的可变头中,含有如下信息:

会话存在标识(Session Present Flag) :用于标识在 Broker 上,是否存在该 client(用 client identifier 区分)的持久性会话,1bit,0或1。当 Client 在连接时设置 clean session = 1 ,则 CONNACK 中的 Session Present Flag 始终为 0;当 client 在连接时设置 clean session = 0 时,那么分为两种情况:如果 broker 上面保存了这个 Client 之前留下的持久性会话,那么 CONNACK 中的 session present flag 值为 1,如果 broker 没有保存该 client 的任何会话数据,那么 CONNACK 中 session present flag 值为 0.

连接返回码: 用于标识 client 是否连接建立成功,连接返回码如下:

在这里强调一下 code 4 和 5。Return Code 4 在 MQTT 协议中的含义是 Username 和 Password 的格式不正确,但是在大部分的 Broker 实现中,在使用错误的用户名密码时,得到的返回码也是 4。所以这里我们认为 4 就是代表错误的用户名或密码。Return Code 5 一般在 Broker 不使用用户名和密码而使用 IP 地址或者 Client Identifier 进行验证的时候使用,来标识 Client 没有通过验证。

CONNACK 没有 payload。

Client 主动关闭连接的流程很简单,只需要向 broker 发送一个 DISCONNECT 数据包就可以了。DISCONNECT 数据包没有可变头和消息体。在 Client 发送完 DISCONNECT 后,无需等待 broker 的回复(broker 也不会有回复),直接关闭底层的 tcp 连接即可。

MQTT 协议规定 Broker 在没有收到 Client 的 DISCONNECT 数据包之前都应该保持和 Client 连接,只有 Broker 在 Keep Alive 的时间间隔里,没有收到 Client 的任何 MQTT 数据包的时候会主动关闭连接。一些 Broker 的实现在 MQTT 协议上做了一些拓展,支持 Client 的连接管理,可以主动地断开和某个 Client 的连接。

Broker 主动关闭连接之前不会向 Client 发送任何 MQTT 数据包,直接关闭底层的 TCP 连接就完事了。


本文标题:go语言mqtt心跳包 go mqtt服务器
标题路径:http://csdahua.cn/article/doddgjh.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流