MQTT协议笔记之头部信息
前言
MQTT(Message Queue Telemetry Transport),遥测传输协议,提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
- 对负载内容屏蔽的消息传输。
- 使用 TCP/IP 提供网络连接。
- 有三种消息发布服务质量: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。“至少一次”,确保消息到达,但消息重复可能会发生。“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
- 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
- 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
- MQTT 3.1协议在线版本: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
- 官方下载地址: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf
另外,目前MQTT大家都用在了手机推送,可能还有很多的使用方式,有待进一步的探索。
协议方面,以前曾简单实现过一点HTTP协议,基于HTTP上构建若干种通信管道的socket.io协议,不过socket.io 0.9版本的协议才两三页而已。面对领域不同,自然解决的方式也不一样。
阅读完毕MQTT协议,有一个想法,其实可以基于MQTT协议,打造更加私有、精简(协议一些地方,略显多余)的传输协议,比如一个字节的传输开销。
一、固定头部
固定头部,使用两个字节,共16位:
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | Message Type | DUP flag | QoS level | RETAIN | ||||
byte 2 | Remaining Length |
1.1 第一个字节(byte 1)
消息类型(4-7),使用4位二进制表示,可代表16种消息类型:
Mnemonic | Enumeration | Description |
---|---|---|
Reserved | 0 | Reserved |
CONNECT | 1 | Client request to connect to Server |
CONNACK | 2 | Connect Acknowledgment |
PUBLISH | 3 | Publish message |
PUBACK | 4 | Publish Acknowledgment |
PUBREC | 5 | Publish Received (assured delivery part 1) |
PUBREL | 6 | Publish Release (assured delivery part 2) |
PUBCOMP | 7 | Publish Complete (assured delivery part 3) |
SUBSCRIBE | 8 | Client Subscribe request |
SUBACK | 9 | Subscribe Acknowledgment |
UNSUBSCRIBE | 10 | Client Unsubscribe request |
UNSUBACK | 11 | Unsubscribe Acknowledgment |
PINGREQ | 12 | PING Request |
PINGRESP | 13 | PING Response |
DISCONNECT | 14 | Client is Disconnecting |
Reserved | 15 | Reserved |
除去0和15位置属于保留待用,共14种消息事件类型。
1.2 DUP flag(打开标志)
保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等。只适用于客户端或服务器端尝试重发PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要满足以下条件:
当QoS > 0消息需要回复确认
此时,在可变头部需要包含消息ID。当值为1时,表示当前消息先前已经被传送过。
1.3 QoS(Quality of Service,服务质量)
使用两个二进制表示PUBLISH类型消息:
QoS value | bit 2 | bit 1 | Description | ||
---|---|---|---|---|---|
0 | 0 | 0 | 至多一次 | 发完即丢弃 | <=1 |
1 | 0 | 1 | 至少一次 | 需要确认回复 | >=1 |
2 | 1 | 0 | 只有一次 | 需要确认回复 | =1 |
3 | 1 | 1 | 待用,保留位置 |
1. 4 RETAIN(保持)
仅针对PUBLISH消息。不同值,不同含义:
1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。
备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。
0:仅仅为当前订阅者推送此消息。
假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服务器可以删除掉对应的已被持久化的PUBLISH消息。
1.5 如何解析
因为java使用有符号(最高位为符号位)数据表示,byte范围:-128-127。该字节的最高位(左边第一位),可能为1。若直接转换为byte类型,会出现负数,这是一个雷区。DataInputStream提供了int readUnsignedByte()读取方式,请注意。下面演示了,如何从一个字节中,获取到所有定义的信息,同时绕过雷区:
public static void main(String[] args) {byte publishFixHeader = 50;// 0 0 1 1 0 0 1 0doGetBit(publishFixHeader);int ori = 224;//1110000,DISCONNECT ,Message Type (14)byte flag = (byte) ori; //有符号byte doGetBit(flag);doGetBit_v2(ori);
}public static void doGetBit(byte flags) {boolean retain = (flags & 1) > 0;int qosLevel = (flags & 0x06) >> 1;boolean dupFlag = (flags & 8) > 0;int messageType = (flags >> 4) & 0x0f;System.out.format("Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",messageType, dupFlag, qosLevel, retain);
}public static void doGetBit_v2(int flags) {boolean retain = (flags & 1) > 0;int qosLevel = (flags & 0x06) >> 1;boolean dupFlag = (flags & 8) > 0;int messageType = flags >> 4;System.out.format("Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",messageType, dupFlag, qosLevel, retain);
}
1.6 处理Remaining Length(剩余长度)
在当前消息中剩余的byte(字节)数,包含可变头部和负荷(称之为内容/body,更为合适)。单个字节最大值:01111111,16进制:0x7F,10进制为127。单个字节为什么不能是11111111(0xFF)呢?因为MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。同时MQTT协议最多允许4个字节表示剩余长度。那么最大长度为:0xFF,0xFF,0xFF,0x7F,二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455 byte=261120KB=256MB=0.25GB 四个字节之间值的范围:
Digits | From | To |
---|---|---|
1 | 0 (0x00) | 127 (0x7F) |
2 | 128 (0x80, 0x01) | 16 383 (0xFF, 0x7F) |
3 | 16 384 (0x80, 0x80, 0x01) | 2 097 151 (0xFF, 0xFF, 0x7F) |
4 | 2 097 152 (0x80, 0x80, 0x80, 0x01) | 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F) |
如何换算成十进制呢 ? 使用java语言表示如下:
public static void main(String[] args) throws IOException {// 模拟客户端写入ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);dataOutputStream.write(0xff);dataOutputStream.write(0xff);dataOutputStream.write(0xff);dataOutputStream.write(0x7f);InputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());// 模拟服务器/客户端解析System. out.println( "result is " + bytes2Length(arrayInputStream));
}/**
* 转化字节为 int类型长度
* @param in
* @return
* @throws IOException
*/
private static int bytes2Length(InputStream in) throws IOException {int multiplier = 1;int length = 0;int digit = 0;do {digit = in.read(); //一个字节的有符号或者无符号,转换转换为四个字节有符号 int类型length += (digit & 0x7f) * multiplier;multiplier *= 128;} while ((digit & 0x80) != 0);return length;
}
一般最后一个字节小于127(01111111),和0x80(10000000)进行&操作,最终结果都为0,因此计算会终止。代理中间件和请求者,中间传递的是字节流Stream,自然要从流中读取,逐一解析出来。
那么如何将int类型长度解析为不确定的字节值呢?
public static void main(String[] args) throws IOException {// 模拟服务器/客户端写入ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);// 模拟服务器/客户端解析length2Bytes(dataOutputStream, 128);
}/**
* int类型长度解析为1-4个字节
* @param out
* @param length
* @throws IOException
*/
private static void length2Bytes(OutputStream out, int length)throws IOException {int val = length;do {int digit = val % 128;val = val / 128;if (val > 0)digit = digit | 0x80;out.write(digit);} while (val > 0);
}
digit对val求模,最大值可能是127,一旦127 | 10000000 = 11111111 = 0xff = 255 请注意:剩余长度,只在固定头部中,无论是一个字节,还是四个字节,不能被算作可变头部中。
二、可变头部
固定头部仅定义了消息类型和一些标志位,一些消息的元数据,需要放入可变头部中。可变头部内容字节长度 + Playload/负荷字节长度 = 剩余长度,这个是需要牢记的。可变头部,包含了协议名称,版本号,连接标志,用户授权,心跳时间等内容,这部分和后面要讲到的CONNECT消息类型,有重复,暂时略过。
2.1 Playload/消息体/负荷
消息体主要是为配合固定/可变头部命令(比如CONNECT可变头部User name标记若为1则需要在消息体中附加用户名称字符串)而存在。CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息体。PUBLISH的消息体以二进制形式对待。
请记住,MQTT协议只允许在PUBLISH类型消息体中使用自定义特性,在固定/可变头部想加入自定义私有特性,就免了吧。这也是为了协议免于流于形式,变得很分裂也为了兼顾现有客户端等。比如支持压缩等,那就可以在Playload中定义数据支持,在应用中进行读取处理。
2.2 消息标识符/消息ID
固定头中的QoS level标志值为1或2时才会在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可变头中出现。
一个16位无符号位的short类型值(值不能为 0,0做保留作为无效的消息ID),仅仅要求在一个特定方向(服务器发往客户端为一个方向,客户端发送到服务器端为另一个方向)的通信消息中必须唯一。比如客户端发往服务器,有可能存在服务器发往客户端会同时存在重复,但不碍事。
可变头部中,需要两个字节的顺序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻译成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左边/上面,表示这是一个大端字节/网络字节序,符合人的阅读习惯,高位在最左边。
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
Message Identifier MSB | ||||||||
Message Identifier LSB |
但凡如此表示的,都可以视为一个16位无符号short类型整数,两个字节表示。在JAVA中处理比较简单:
DataInputStream.readUnsignedShort
或者
in.read() * 0xFF + in.read();
最大长度可为: 65535
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | String Length MSB | |||||||
byte 2 | String Length LSB | |||||||
bytes 3 ... | Encoded Character Data |
比如JAVA,使用writeUTF()方法写入一串文字“OTWP”,头两个字节为一个完整的无符号数字,代表字符串字节长度,后面四个字节才是字符串真正的长度,共六个字节:
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | Message Length MSB (0x00) | |||||||
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 2 | Message Length LSB (0x04) | |||||||
0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 | |
byte 3 | 'O' (0x4F) | |||||||
0 | 1 | 0 | 0 | 1 | 1 | 1 | 1 | |
byte 4 | 'T' (0x54) | |||||||
0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 | |
byte 5 | 'W' (0x57) | |||||||
0 | 1 | 0 | 1 | 0 | 1 | 1 | 1 | |
byte 6 | 'P' (0x50) | |||||||
0 | 1 | 0 | 1 | 0 | 0 | 0 | 0 |
这点,在程序中,可不用单独处理默认,直接使用readUTF()方法,可自动省去了处理字符串长度的麻烦。当然,可以手动读取字符串:
// 模拟写入
dataOutputStream.writeUTF( "abcd");// 2 + 4 = 6 byte
......
// 模拟读取
int decodedLength = dataInputStream.readUnsignedShort();//2 byte
byte[] decodedString = new byte[decodedLength]; // 4 bytes
dataInputStream.read(decodedString);
String target = new String(decodedString, "UTF-8");
等同于:
String target = dataInputStream.readUTF();
MQTT无论是可变头部还是消息体中,只要是字符串部分,都是采用了修改版的UTF-8编码,读取和写入,借助DataInputStream/DataOutputStream的帮助,一行语句,略去了手动处理的麻烦。
MQTT协议笔记之头部信息相关推荐
- mqtt android简书,iOS MQTT协议笔记
前言 接到任务项目需要用MQTT来写消息推送,经过一段时间在网上查看资料后写下这篇文章,文章内容大都来自互联网,在文章最后也会贴出相关网址和Demo.写这文章主要目的是自己总结下经验做下笔记,以便日后 ...
- php 协议头,入门PHP实现MQTT协议的固定头部(Fix header)
前面的话 直接进入主题,具体的mqtt协议请参考mqtt协议 固定头的结构 简单描述下固定头 第一个字节总共8位, 前四位表示消息类型,最大的十进制值是15(二进制是1111). 第三位表示消息是否重 ...
- MQTT协议笔记之mqtt.io项目TCP协议支持
前言 MQTT定义了物联网传输协议,其标准倾向于原始TCP实现.构建于TCP的上层协议堆栈,诸如HTTP等,在空间上多了一些处理路径,稍微耗费了CPU和内存,虽看似微乎其微,但对很多处理能力不足的嵌入 ...
- MQTT协议-报文分析及网络客户端报文测试(MQTT报文连接阿里云上传数据+订阅数据)
文章目录 一.本文章所涉及到的内容 二.感性认识MQTT协议 三.准备信息 (一)工具获取 (二)获取信息 1.获取三元组信息 2.获取发布topic和订阅topic 3.客户端ID,用户名,哈希加密 ...
- 3.2.3 使用tcpdump观察TCP头部信息(补充TCP协议的常用知识)
使用tcpdump观察TCP头部信息和三次握手四次挥手 前言 实验开始 1. 延迟确认 2. 序号(seq)和确认号(ack)之间的关系 3.TS val和ecr的关系 4. TCP状态转移(书上p4 ...
- mqtt协议调用示例(包括MQTT一键启动服务+测试工具 MQTTFX云盘下载),对捷顺门禁温感一体机进行人员信息下发
hello, 大家好 我是一只不是在戏精,就是在戏精路上的极品二哈 新年上班第一天,给大家贡献一篇 MQTT 协议使用示例文章 也是本汪自己的一篇实用笔记 本汪先总的说下: MQTT协议进行数据交互, ...
- MQTT学习笔记——MQTT协议体验 Mosquitto安装和使用
原版地址:http://blog.csdn.net/xukai871105/article/details/39252653 0 前言 MQTT是IBM开发的一个即时通讯协议.MQTT是面向M2M和物 ...
- websocket 带头部信息请求 header_BeetleX之Websocket协议分析详解
Websocket应用协议已经普及多年了,它是HTTP1.1的内部升级协议,主要作用是补充HTTP1.1无法灵活地主动推送消息给客户端的缺陷问题.在这里主要介绍一下使用组件如何扩展一个完整的Webso ...
- MQTT学习笔记——MQTT协议使用
http://mosquitto.org/files/source/mosquitto-1.4.5.tar.gz 安装出错时openssl等 需要更改 cd mosquitto-1.4 vi con ...
最新文章
- win8计算机上工具选项在哪,Win8.1在开始菜单中找不到“便笺”工具如何恢复
- 手动修改Outlook 2007 邮件签名
- java引用类型使用场景_下面有关java的引用类型,说法正确的有?
- WebSphere通过corba调Tuxedo问题(2)
- python 字符串数组_python用法笔记(数组(list、touple、dict)、字符串)
- pip下载安装与环境配置
- 计算机无法安装小丸工具箱,小丸工具箱
- 关于方法A调用方法B的事务控制问题
- Elasticsearch-7(全文搜索应用分享)
- github上成员贡献量_精确统计github贡献者的代码行数
- 《商务与经济统计》(四)
- 中国农业机械融资租赁市场预测与投资战略报告(2023版)
- Kubernetes容器平台架构之道
- RISCV 向量指令集和NICE接口学习笔记
- forwardRef的使用
- CTFshow入门命令执行29
- mysql多列索引(组合索引)特点和使用场景
- R语言ggsurv生存曲线一页多图的实现
- 手机和电脑如何制作gif动画
- 数据分析的重要一环之数据统计