1. MQTT的发布

MQTT发布中最重要的是PUBLISH数据包,PUBLISH数据包是用于sender和receiver之间传输消息数据的。当Publisher要向某个Topic发布一条消息的时候,Publisher会向Broker发送一个PUBLISH数据包;当Broker要将一条消息转发给订阅了某条主题的Subscriber时,Broker也会向该Subscriber发送一个PUBLISH数据包。因为PUBLISH传输过程中涉及到了QoS,Recevier收到sender的PUBLISH数据包之后会根据QoS的不同,还有后续不同的应答流程(只有当QoS为0时,Receiver不做任何应答),所以关于这个具体的流程,在QoS那一章节进行讲述。下面对PUBLISH数据包进行讲解:

1.1. PUBLISH数据包

1.1.1. 固定头

PUBLISH的固定头包含了一下内容:

  • 消息重复标识(DUP flag):1bit,0 或者 1,当 DUP flag = 1 的时候,代表该消息是一条重发消息,因 Receiver 没有确认收到之前的消息而重新发送的。这个标识只在 QoS 大于 0 的消息中使用。
  • QoS:2bit,0、1 或者 2,代表 PUBLISH 消息的 QoS level。
  • Retain 标识(Retain flag):1bit,0 或者 1。在从 Client 发送到 Broker 的 PUBLISH 消息中被设为 1 的时候,Broker 应该保存该条消息,当之后有任何新的 Subscriber 订阅 PUBLISH 消息中指定的主题时,都会先收到该条消息,这种消息也叫 Retained 消息。在从 Broker 发送到 Client 的 PUBLISH 消息中被设为 1 的时候,代表该条消息是一条 Retained 消息。

1.1.2. 可变头

  • 数据包标识( Packet Identifier):2字节,用来标识一个唯一数据包。数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互(比如发送、应答为一次交互)中保持唯一就好,只在QoS大于1的消息中使用,因为QoS大于1的消息有应答流程。
  • 主题名称(Topic Name):主题名称是一个 UTF-8 编码的字符串,用来命名该消息发布到哪一个主题,Topic Name 可以是长度大于等于 1 任何一个字符串(可包含空格)。但是在实际项目中,我们最好还是遵循以下一些最优方法。
    • 主题名称应该包含层级,不同的层级用 / 划分。
    • 主题名称开头不要使用/
    • 不要在主题中使用空格
    • 只使用ASCII字符
    • 主题名称在可读的前提下尽量短
    • 主题是大小写敏感的,“data”和“Data”是两个不同的主题
    • $为开头的主题属于Broker预留的系统主题,通常用于发布Broker的内部统计信息,所以在自己定义时不要使用$开头的主题手法数据。

1.1.3. 消息体

PUBLISH数据包的消息体中包含的是该消息要发送的具体数据,数据可以是任何格式的:二进制数据、文本、JSON等都可以。

2. MQTT的订阅

订阅主题的流程如下图所示:

  1. Client向Broker发送一个SUBSCRIBE数据包,该数据包中含有Client想要订阅的主题和其他一些参数;
  2. Broker收到SUBSCRIBE数据包后,向Client发送一个SUBACK数据包作为应答。

2.1. SUBSCRIBE数据包

2.1.1. 可变头

  • 数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互中保持唯一。

2.1.2. 消息体

  • 订阅列表(List of Subscriptions):SUBSCRIBE 的消息体中包含 Client 想要订阅的主题列表,列表中的每一项由订阅主题名和对应的 QoS 组成。主题名说明
    主题名中可以包含通配符,单层通配符“+”和多层通配符“#”。使用包含通配符的主题名可以订阅满足匹配条件的所有主题。为了和 PUBLISH 中的主题区分,我们叫 SUBSCRIBE 中的主题名为主题过滤器(Topic Filter)。

    • 单层通配符“+”:“+”可以用来指代任意一个层级。
      举例:
      如“sensor/+/tem”,可以匹配:

      • sensor/data/tem
      • sensor/cmd/tem

不可以匹配:

  • sensor/data/01/tem
  • 多层通配符“#”:“#”和“+”的区别在于,“#”可以用来指代任意多个层。但是"#"必须是Topic Filter的最后一个字符,同时必须跟在“/“后面,除非Topic Filter只包含一个”#“这一个字符。如“#”是一个合法的Topic Filter,而“sensor#”不是一个合法的Topic Filter。
    举例:
    如“sensor/data/#”,可匹配:

    • sensor/data
    • sensor/data/tem
    • sensor/data/tem/01
    • sensor/data/tem/01/02

不可以匹配:

  • sensor/cmd/tem

QoS说明
SUBSCRIBE数据包中QoS代表针对某一个或着某一组的主题,Client希望Broker在转发来自这些主题的消息给它时,消息使用的QoS级别。

2.2. SUBACK数据包

为确认每一次的订阅,Broker收到SUBSCRIBE之后会回复一个SUBACK数据包作为应答。SUBACK数据包包含以下内容:

2.2.1. 可变头

  • 数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互中保持唯一。

2.2.2. 消息体

返回码(return codes):SUBBACK 数据包包含了一组返回码,返回码的数量和顺序和 SUBSCRIBE 数据包的订阅列表对应,用于标识订阅类别中的每一个订阅项的订阅结果。

返回码含义0订阅成功, 最大可用QoS为01订阅成功,最大可用QoS为12订阅成功, 最大可用QoS为2128订阅失败

返回码0~2表示订阅成功,并且Broker授予Subscriber不同等级的QoS,这个等级可能会和SubScriber在SUBSCRIBE数据包中要求的不一样。128表示订阅失败,可能是没有权限订阅这个主题,或者订阅主题的格式不对。

QoS返回和订阅时不一样的原因可以参见QoS中的QoS降级的相关知识。

3. MQTT的取消订阅

subscriber也可以取消对某些主题的订阅,取消订阅的流程如下图所示:

  1. subscriber向Broker发送一个UNSUBSCRIBE数据包,该数据包包含想要取消订阅的主题;
  2. Broker收到UNSUBSCRIBE数据包之后,向subscriber发送一个UNSUBACK数据包作为应答。

3.1. UNSUBSCRIBE数据包

3.1.1. 可变头

  • 数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互中保持唯一。

3.1.2. 消息体

  • 主题列表(List of Topics):UNSUBSCRIBE 的消息体中包含 Client 想要取消订阅的主题过滤器列表,这些主题过滤器和 SUBSCRIBE 数据包中一样,可以包含通配符。UNSUBSCRIBE 消息体里面不再包含主题过滤器对应的 QoS 了。

3.2. UNSUBACK数据包

Broker收到UNSUBSCRIBE数据包之后会回复一个UNSUBACK数据包作为应答。UNSUBACK数据包内容如下:

3.2.1. 可变头

  • 数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从 Sender 到 Receiver 的一次消息交互中保持唯一。

3.2.2. 消息体

UNSUBACK 数据包没有消息体。

4. 代码实践

4.1. 发布消息

向一个主题发布一条QoS为1的数据包,发送成功之后断开连接:

import paho.mqtt.client as mqtt
​
def on_publish(client, userdata, mid):print("message ID ", mid)client.disconnect()
​
def on_connect(client, userdata, flags, rc):if rc == 0:client.publish("test", payload="hello world", qos=1)else:print("connection failed ", rc)
​
mqtt_client = mqtt.Client(client_id="demo_mqtt_pub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_publish = on_publish
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

运行上述代码,输出如下:

message ID  1

相应订阅了test主题的订阅方输出如下:

4.2. 订阅消息

通常我们在建立和 Broker 的连接之后就可以开始订阅了,如果你建立的是持久会话的连接,那么有可能 Broker 已经保存你在之前的连接时订阅的主题,你就没有必要再发起 SUBSCRIBE 请求了,这个小优化在网络带宽或者设备处理能力较差的情况尤为重要。相应的代码如下:

import paho.mqtt.client as mqtt
​
'''
当代理响应订阅请求时被调用
'''
def on_subscribe(client, userdata, mid, granted_qos):print("granted_qos:", granted_qos)
​
'''
当收到关于客户订阅的主题的消息时调用
'''
def on_message(client, userdata, message):print(message.topic, message.payload)
​
def on_connect(client, userdata, flags, rc):if rc == 0 :if flags["session present"] == 0:print("subscribing")client.subscribe("test", 1)else:print("connection failed ", rc)
​
mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_message
​
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
​

运行上述代码得到如下输出结果:

subscribing
granted_qos: (1,)

当运行的发布消息中的代码之后,将输出:

test b'hello world'

当终止掉订阅消息中的代码运行之后,再次运行该代码,会发现什么都不输出。因为第一次运行的时候,Broker 上面没有保存这个 Client 的会话,所以需要进行订阅,当重新运行之后,因为 Broker 上面已经保存了这个 Client 的会话,所以就不需要再订阅了,你就不会看到订阅相关的输出了。

4.3. 取消订阅

在上述订阅消息中建立连接并订阅了相应主题的基础上,我们取消对之前订阅的主题

import paho.mqtt.client as mqtt
​
'''
当代理响应取消订阅请求时调用
'''
def on_unsubscribe(client, userdata, mid):print("message id:", mid)client.disconnect()
​
def on_connect(client, userdata, flags, rc):if rc == 0 :print("unsubscribing")client.unsubscribe("test")else:print("connection failed ", rc)
​
mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_unsubscribe = on_unsubscribe
​
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()

相应的输出如下:

unsubscribing
message id: 1

之后再运行订阅消息中的代码和发布消息中的代码,此时运行订阅消息的终端不再有输出。


关注微信公众号【一口程序锅】,一口想煮点技术的锅。

mqtt session保持 订阅消息_MQTT系列 | MQTT消息的发布和订阅相关推荐

  1. ROS发布/订阅Float64MultiArray数组类消息(C++和Python相互发布和订阅)

    在terminal用指令发布一个Float64MultiArray消息,这个在调试时特别好用,注意格式,最后一行的data: [1,2,3]中冒号和后面的中括号间要有空格 rostopic pub / ...

  2. Redis中消息队列/发布和订阅的介绍及代码实现

    一.概念 发布订阅是一种应用程序(系统)之间的通讯 传递数据的技术手段 特别是在异构(不同语言)的系统之间作用非常明显 发布订阅可以实现应用(系统)之间的解耦合 类似于微信中关注公众号/订阅号 那么订 ...

  3. StackExchange.Redis学习笔记(五) 发布和订阅

    StackExchange.Redis学习笔记(五) 发布和订阅 原文:StackExchange.Redis学习笔记(五) 发布和订阅 Redis命令中的Pub/Sub Redis在 2.0之后的版 ...

  4. Redis发布与订阅——PUBLISH SUBSCRIBE

    2019独角兽企业重金招聘Python工程师标准>>> Redis发布与订阅--PUBLISH  & SUBSCRIBE 一般来说,发布与订阅(又称pub/sub)的特点是 ...

  5. Redis 进阶 -- 发布与订阅

    文章目录 1. 发布与订阅 1.1 PUBLISH:向频道发送消息 1.2 SUBSCRIBE:订阅频道 1.2.1 接收频道消息 1.3 UNSUBSCRIBE:退订频道 1.4 PSUBSCRIB ...

  6. mqtt session保持 订阅消息_iOS MQTT 3 - 发送订阅消息以及发送过程

    目录: 该系列文章预计包括: 前言: 这里的代码是从MQTTSessionManager为切入点进入的,所以下面的方法如果没有特殊描述都是从这个类然后进入内部. 主题 == topic文章中可能出现多 ...

  7. mqtt session保持 订阅消息_如何使用 MQTT 报文实现发布订阅功能

    MQTT 协议通过交换预定义的MQTT控制报文来通信.下面以 Connect 连接 MQTT 协议基于 TCP/IP 协议,MQTT Broker 和 Client 都有需要有 TCP/IP 地址. ...

  8. 用MQTT.fx检查发布和订阅的mqtt消息

    简介 使用方法 简介 MQTT.fx是一个简单的测试mqtt通信的软件,支持订阅消息.发布消息.记录日志,有美观的图形化操作界面. 实际上MQTT.fx并不是一个抓包工具,它也是通过一对账户名和密码连 ...

  9. 模块学习4:(2)MQTT协议连接、发布、订阅、心跳、断链等分析和代码实现,并且通过mqtt.fx连接服务器,使用wireshark抓包分析mqtt实现过程

    文章目录 一.MQTT控制报文的结构 (1)固定报头(类型/标志 + 剩余长度) 剩余长度(这个要注意下,要注意它的计算方法,有一点特殊) 可变报头 有效载荷 二.下面直接开整各个具体的报文(MQTT ...

最新文章

  1. CYQ.Data 轻量数据层之路 使用篇-辅助工具枚举生成器 视频 C (二十)
  2. 程序进入后台继续执行
  3. SpringBoot中使用POI实现Excel导入到数据库(图文教程已实践)
  4. Vision Transformer 论文
  5. LINUX设置固定IP上网方法
  6. MySQL高级 - 案例 - 系统性能优化 - 读写分离概述
  7. jzoj3832-在哪里建酿酒厂【指针】
  8. 个人管理:如何发现自己的兴趣?
  9. (收集)vim72 .vimrc的一个样本
  10. 2012CSDN网站六大类职位火热招聘:社区编辑、产品交互设计、信息安全主管、前端工程师、Ruby工程师、搜索工程师...
  11. java文件中获取创建日期_如何在Java中获取文件的上次修改日期
  12. 5款好用的开源JS图片裁剪插件(3个jQuery插件,2个AngularJS插件)
  13. vue element 实现树形菜单栏n层级分类,NavMenu menu
  14. 前后端分离的跨域问题
  15. 计算机资源管理窗口,资源管理器怎么打开,教您打开电脑资源管理器
  16. Nevron 3DChart创建有吸引力的3D和2D图表
  17. html网页右侧悬浮代码,html悬浮窗口代码
  18. html链接外部样式表、链接网站图标
  19. 《博弈心理学》-占据主动的策略思维
  20. 化纤厂废气除臭剂净化废气中的H2S与CS2

热门文章

  1. Android字体占有内存,android随意创建字体对象引发的应用程序运行时占用内存过大...
  2. Spring官宣网传大漏洞,附解决方案!网传方案有隐患,建议加固!
  3. 成为最差开发者的10条建议
  4. 别在用U盘拷贝源码带回家了,有童鞋被判刑啦!
  5. OpenJDK 14 性能保持提升,但 OpenJDK 8 仍是最强王者
  6. RabbitMQ 延迟消息的极限是多少?
  7. linux上logbok实时日志_日志lombok插件安装及配置
  8. 计算机基础16秋在线作业,北大16秋《计算机基础与应用-第二组》在线作业.doc
  9. mysql backup_Mysqlbackup 备份详解(mysql官方备份工具)
  10. matlab里performance,关于神经网络performance图的问题