MQTT系列 | Retained消息和LWT和Keep Alive
1. Retained消息
Retained 消息是指在 PUBLISH 数据包中 Retain 标识设为 1 的消息,Broker 收到这样的 PUBLISH 包以后,将保存这个消息,当有一个新的订阅者订阅相应主题的时候,Broker 会马上将这个消息发送给订阅者。有以下这些特点:
- 一个Topic只能有一条Retained消息,发布新的Retained 消息将覆盖老的 Retained 消息(所以想删除一个 Retained 消息也很简单,只要向这个主题发布一个 Payload 长度为 0 的 Retained 消息就可以了);
- 如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的 Retained 消息;
- 只有新的订阅者才会收到 Retained 消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到 Retained 消息;
- Broker 收到 Retained 消息后,会单独保存一份,再向当前的订阅者发送一份普通的消息(Retained 标识为 0)。当有新订阅者的时候, Broker 会把保存的这条消息发给新订阅者(Retained 标识为 1)。
Retained消息和持久性会话的区别:
Retained消息是Broker为每一个Topic单独存储的;
持久性会话是Broker为每一个Client单独存储的
1.1. 代码实践
下面是publisher的代码,在发送消息时指定retain为true
import paho.mqtt.client as mqttdef on_connect(client, userdata, flags, rc):if rc == 0:client.publish("test", payload="hello world", qos=0, retain=True)else:print("connection failed ", rc)mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")
mqtt_client.on_connect = on_connectmqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
下面是subscriber的代码
import paho.mqtt.client as mqtt'''
当代理响应订阅请求时被调用
'''
def on_subscribe(client, userdata, mid, granted_qos):print("granted_qos:", granted_qos)'''
当收到关于客户订阅的主题的消息时调用
'''
def on_message(client, userdata, message):print("message retain", message.retain)print("message topic", message.topic)print("message payload", message.payload)def on_connect(client, userdata, flags, rc):if rc == 0 :print("subscribing")client.subscribe("test", 0)else:print("connection failed ", rc)mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_subscribe = on_subscribe
mqtt_client.on_message = on_messagemqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
在指定retain
为True
的情况下,先运行publisher的代码,之后再运行subscriber的代码,在subscriber运行的终端界面输出如下信息:
subscribing
granted_qos: (0,)
message retain 1
message topic test
message payload b'hello world'
输出的信息中message retain
的值为1,表示收到的消息为retained消息。
当再次运行publisher的代码,运行subscriber的控制台会输出如下内容:
message retain 0
message topic test
message payload b'hello world'
上述的输出结果同Retained消息特点中的第四点“Broker 收到 Retained 消息后,会单独保存一份,再向当前的订阅者发送一份普通的消息”所述一致,因为当前订阅者已经订阅了相应的话题,当Broker收到Retained消息之后,先保存下来,然而因为这个消息对于当前已经订阅了相应话题的订阅者来说是一个普通的消息所以message retain
的值为0。
2. LWT(Last Will and Testament)
LWT是之前讲过的Client连接到Broker时提到的遗愿,包括遗愿主题、遗愿QoS、遗愿消息等。当Broker检测到Client非正常地断开连接的时候,就会向遗愿主题发布一条消息。遗愿相关的设置是在建立连接的时候,在CONNECT数据包里面指定的。包括以下这些设置:
- Will Flag:是否使用LWT
- Will QoS:发布遗愿消息时使用的QoS
- Will Retain:遗愿消息的Retain标识
- Will Topic:遗愿主题名,不可使用通配符
- Will Message:遗愿消息内容
Broker 在以下情况下认为 Client 是非正常断开连接的:
- Broker 检测到底层的 I/O 异常;
- Client 未能在 Keep Alive 的间隔内和 Broker 之间有消息交互;
- Client 在关闭底层 TCP 连接前没有发送 DISCONNECT 数据包;
- Broker 因为协议错误关闭和 Client 的连接,比如 Client 发送了一个格式错误的 MQTT 数据包。
如果Client通过发布DISCONNECT数据包断开连接,是属于正常断开连接,不会触发LWT的机制,同时Broker会丢掉这个Client在连接时指定的LWT参数。
2.1. 代码实践:监控Client的状态
Client在连接的时候,指定Will Topic为will_test,Will Message为"client is offline",并设置该消息的QoS为1,retain也置为True(设置为True表示会被Broker保留,同Retained消息)。同时在连接成功之后,向主题will_test发布一个内容为"client is online"的Retained消息。这样订阅者,无论在任何时候订阅"will_test",都会获取Client当前的连接状态。client_will.py代码如下:
import paho.mqtt.client as mqttdef on_connect(client, userdata, flags, rc):if rc == 0:client.publish("will_test", payload="client is online", qos=1, retain=True)else:print("connection failed ", rc)mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")
mqtt_client.on_connect = on_connect
mqtt_client.will_set("will_test", payload="client is offline", qos=1, retain=True)
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
而负责监控的代码,则订阅will_test,订阅的QoS为1,client_monitor_will.py代码如下:
import paho.mqtt.client as mqttdef on_message(client, userdata, message):print("message retain", message.retain)print("message payload", message.payload)def on_connect(client, userdata, flags, rc):if rc == 0 :client.subscribe("will_test", 1)else:print("connection failed ", rc)mqtt_client = mqtt.Client(client_id="demo_mqtt_sub", clean_session=False)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_messagemqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
首先运行client_will.py,之后再运行client_monitor_will.py,终端输出如下信息:
message retain 1
message payload b'client is online'
因为client_will.py运行之后,发布了一个Retained消息,当运行client_monitor_will.py之后,因为订阅了相应的话题,所以会收到该消息。这时候终止掉client_will.py的运行,输出如下信息:
message retain 0
message payload b'client is offline'
因为在终止的时候已经订阅了相应的话题,所以当终止之后,虽然遗愿消息中的retain被设为1了,但是对当前的订阅者来说是普通消息,所以message retain为0。当这个时候终止掉client_monitor_will.py的运行,再次重新运行client_monitor_will.py,输出如下信息:
message retain 1
message payload b'client is offline'
因为终止掉client_will.py的时候,发送的遗愿消息的retain被设为了1,Broker会保证发送的遗愿消息,当新的订阅者出现的时候,会把这个Retained消息发送给订阅者。
3. Keep Alive(连接保活)
Broker需要知道Client是否正常地断开了和它的连接,以发送遗愿消息。实际上Client也需要能够很快地检测它失去了和Broker的连接,以便重新连接,虽然TCP 协议在丢失连接时会通知上层应用,但是 TCP 有一个半打开连接的问题(half-open connection),在这种状态下,一端的 TCP 连接已经失效,但是另外一端并不知情,它认为连接依然是打开的,它需要很长的时间才能感知到对端连接已经断开了,这种情况在使用移动或者卫星网络的时候尤为常见。所以仅仅依赖TCP的连接状态检测是不够的,于是MQTT协议设计了一套Keep Alive机制。
MQTT 协议是基于 TCP 的一个应用层协议
在建立连接的时候,我们可以传递一个Keep Alive参数,它的单位为秒,MQTT协议中规定:**在1.5倍的Keep Alive(1.5*Keep Alive)的时间间隔内,如果Broker没有收到来自Client的任何数据包,那么Broker认为它和Client之间的连接已经断开;同样如果Client没有收到来自Broker的任何数据包,那么Client认为它和Broker之间的连接已经断开。**在Broker和Client之间没有任何数据包传输的时候,MQTT中通过PINGREQ/PINGRESP来满足Keep Alive的约定和侦测连接状态。
- PINGREQ
PINGREQ数据包中没有可变头和消息体,当Client在一个Keep Alive时间间隔内没有向Broker发送任何数据包,比如PUBLISH和SUBSCRIBE的时候,它应该向Broker发送PINGREQ数据包。
- PINGRESP
PINGRESP数据包中没有可变头和消息体,当Broker收到来自Client的PINGREQ数据包之后,它会回复Client一个PINGRESP数据包。
对于Keep ALive 机制,还需要注意以下几点:
- 如果在一个Keep Alive时间间隔内,Client和Broker有过数据包传输,比如PUBLISH数据包,Client就没有必要再使用PINGREQ了;
- Keep Alive值是由Client指定,不同的Client可以指定不同的值;
- Keep Alive的最大值为18小时12分15秒即65535秒;
- Keep Alive的值设为0的话,代表不使用Keep Alive机制
3.1. 实验实践
启动mosquitto Broker之后,我们通过mosquitto_sub
订阅了一个hello
的话题,订阅完之后。subscriber和Broker之间再也没有任何数据包传输,但是通过运行mosquitto的控制台可以看到,他们之间有PINGREQ和PINGRESP数据包的传输。
欢迎关注微信公众号【一口程序锅】,一口想煮点技术的锅。
MQTT系列 | Retained消息和LWT和Keep Alive相关推荐
- Cris 玩转大数据系列之消息队列神器 Kafka
Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 文章目录 Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 1. Kafka 概述 1.1 消息队 ...
- HP ProLiant 服务器 - POST 错误消息和蜂鸣代码(8)1700 系列错误消息
HP ProLiant 服务器 - POST 错误消息和蜂鸣代码(8)1700 系列错误消息 2011年05月25日 1700 系列错误消息1711 错误消息 #1[b]错误消息:[/b] [b]17 ...
- SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送)
场景 Windows上Mqtt服务器搭建与使用客户端工具MqttBox进行测试: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/1 ...
- 基于mqtt协议的消息推送服务器,基于 MQTT 协议的推送服务
一.简述 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级& ...
- 如何订阅MQTT服务器历史消息,mqtt集群订阅如何只消费一个(一次)消息?
共享订阅 在实践中我们的业务系统集成是集群存在,对于mqtt消息,如果没有做特殊处理,那么集群中每个服务只要订阅了mqtt中的一个主题,那么每台服务器都会进行消费.不仅浪费资源,还限制了服务的处理上限 ...
- java mqtt客户端_MQTT消息队列遥测传输
说实话这个折腾了我好久好久,我才知道,原来学习不是最痛苦的,学习却学不会才是最痛苦的事.生产者将消息发布到一个主题,消费者从该协议里读取数据,MQTT是为IoT物联网通信设计的协议,MQTT使物联网低 ...
- .NET Core加解密实战系列之——消息摘要与数字签名算法
简介 加解密现状,编写此系列文章的背景: 需要考虑系统环境兼容性问题(Linux.Windows) 语言互通问题(如C#.Java等)(加解密本质上没有语言之分,所以原则上不存在互通性问题) 网上资料 ...
- 如何订阅MQTT服务器历史消息,MQTT协议之消息订阅
序 在MQTT协议中,最重要的就是发布/订阅,下面重点分析下消息订阅. SUBSCRIBE 一般来讲,客户端在成功建立TCP连接之后,发送CONNECT消息,在得到服务器端授权允许建立彼此连接的CON ...
- MQTT+ActiveMQ实现消息推送(移动端)
这个小程序是我导师给我布置的一个任务,网上教程不是很多,遇到的一些困难都是自己解决的,所以写出来分享一下,有什么问题大家可以留言,尽力帮大家解决. 首先,我们需要先下载activeMQ (官网:htt ...
- springboot集成MQTT协议实现消息实时推送(未实现版)
<!--mqtt依赖包--><dependency><groupId>org.springframework.integration</groupId> ...
最新文章
- 最全19000+国外AE模板合集包
- 一文详解CMake编译工具与项目构建
- 程序员初试和复试_程序员的软微mem经验贴
- jupyter提示信息安装后正常OK的
- 《Linux内核设计与实现》读书笔记(十二)- 内存管理
- php旧版本windows_Windows的旧版本中如何进行多任务处理?
- Linux驱动(7)--最简单的驱动HelloWorld
- python自动化办公都能做什么-用 Python 自动化办公,我与大神之间的差距一下就...
- apache和IIS共存,服务器对外统一使用80端口
- nginx 安全加固心得
- 最强面试题整理第二弹:Python 进阶面试题(附答案)
- 海康网络摄像头添加到萤石云
- 帝国败局:一代首富,因何退隐江湖?
- eNSP华为路由器与交换机连接
- 【航线运输驾驶员理论考试】飞行原理
- win7计算机内存占用高,win7降低电脑内存占用过高的方法
- 【智慧农业科普】什么是无人农场
- No signature of method: build_xxx.android() is applicable for argument types
- 纯靠成绩毫无科研的保研历程(电子信息工程专业)
- 开发游戏需要什么知识
热门文章
- MySQL中emoji表情包的存储问题
- 如何系统磁盘和raid卡的槽位对应起来
- 关注虚拟财富“.ME”域名的投资价值
- FineBI01:FineBI介绍
- 考研:研究生考试(五天学完)之《线性代数与空间解析几何》研究生学霸重点知识点总结之第四课欧氏空间
- python语言折半查找_c# 折半查找法实现代码
- cap 2 加州房价预测
- java阿里天气接口_天气预报接口
- 炼数成金数据分析课程---18、降维技术(后面要重点看)
- java flightrecorder_java 11 Java Flight Recorder