1、了解下MQTT协议

虽然上一篇用起来了MQTT,但是并不十分了解,基本就局限于,发布主题是发送数据,订阅主题是接收数据,今天就再好好了解一下吧。

分享下网页版的“MQTT协议中文版”链接:Introduction · MQTT协议中文版

1.1、基本的概念

MQTT是一个客户端/服务端架构的发布/订阅模式的消息传输协议;
  • 客户端:可以发布和订阅某主题的消息;
  • 服务器:接收消息、转发消息给订阅该主题的客户端;

1.2、主题的表示

1.2.1、主题名用UTF-8编码

UTF-8编码兼容ASCII码,所以自己定义主题名的时候就用字母、数字、下划线就行了;

1.2.2、层级分隔符"/"

主题名之间可以层级分隔符"/"隔开,这样主题名是层次化的,方便管理;

1.2.3、主题通配符

主题通配符,是为了可以一次性订阅多个主题,所以只使用在订阅主题,不能用在发布主题;

  • 多层通配符"#"

    • 在末尾分隔符后,匹配当前层级,以及所有子层级
    • "xxx/#"可匹配:"xxx","xxx/xx1","xxx/xx2","xxx/xx1/xx3";
  • 单层通配符"+"
    • 任意位置,匹配当前层级的下一子层级
    • "xxx/+"可匹配:"xxx/xx1","xxx/xx2";

1.2.4、以$开头的主题

$开头的主题名 不能匹配 通配符"#""+"开头的主题;

P.S. 比如百度云的主题名形式就为:"$iot/device/msg";关于主题名更细节或特别使用方式的说明见文档;


1.3、关于UTF-8编码

非常详细的字符编码讲解,ASCII、GB2312、GBK、Unicode、UTF-8等知识点都有_哔哩哔哩_bilibili

不知道UTF-8编码是啥,所以去瞅了瞅,这个视频主要内容:

  • 最初的ASCII编码,以及衍生出ASCII扩展码;
  • 各国家文字有自己的编码,比如中国的GB2312编码、GBK编码;
  • 为了同一各国字符编码,Unicode标准提出了USC-2、USC-4字符集;
  • UTF-8编码就是根据USC-4字符集而来的编码方式,将字符集划分为4个区间,根据实际字符选择对应的x字节格式,并以二进制填充到此格式中;

比如,王的Unicode编码:U+738B,属于3字节的UTF-8编码范围,将0x738B转换成2进制,填充到1110xxxx 10xxxxxx 10xxxxxx;

为了避免UP主糊弄我,啊不是我开玩笑的,狗头狗头,为了追本溯源我又找到了个查询Unicode的网站,网站很友好,除了Unicode编码还给出了HTML代码、CSS代码、处于哪个编码区域、UTF-8、UTF-16、UTF-32等;

王 - 中日韩象形文字: U+738B - Unicode 字符百科 (unicode-table.com)


1.4、服务质量等级QoS

QoS = 0:至多发送一次消息;

QoS = 1:至少发送一次消息;

QoS = 2:仅发送一次消息;

昨天还是前天刚接触到MQTT,随便搜索的时候看到的例子,链接不知道了,TA说明了共享单车在不同情境下的QoS设置。

  • 当骑行的时候,车辆定期上报数据,所以偶尔丢失也没关系,此时QoS = 0;
  • 当开锁的时候,因为要保证开锁成功,所以至少要有一次收到开锁指令,QoS  = 1;
  • 当骑行完成缴费的时候,就要求有且仅有一次支付,这时QoS = 2;

1.5、MQTT报文格式(固定报头、可变报头、有效载荷)

1.5.1、固定报头(所有报文都有)

分类下你可以发现,其实控制报文的类型就三大类,连接、订阅、发布

 除发布PUBLISH外,其余报文类型标志位都是与报文类型直接一一对应的;

剩余长度:可变报头和有效载荷的字节数;

采用变长编码方式,每个字节的低7位表示数据,最高位表示有无下一字节,最多4个字节;

也就是128进制,举个例子,比如剩余长度编码为两个字节,第一个字节1111 1111,第二个字节0111 1111;

可以看做1 127、0 127,也就是127*128^1 + 127*128^0 = 16383;


1.5.2、可变报头(部分报文有)

根据报文类型的不同,可变报头存在多个字段,这里只介绍其中的一个“报文标识符”,其余字段根据具体报文类型来看;


1.5.3、有效载荷(部分报文有)


1.6、MQTT协议中字符串的表示

前两个字节表示后面字符串的长度,后面是字符串的UTF-8格式的字符数据;


2、连接相关的报文

2.1、 CONNECT 连接服务端

2.1.1、思维导图

2.1.2、涉及到的概念——会话


2.1.2、涉及到的概念——遗嘱

客户端发送DISCONNECT断开连接,服务端会删除遗嘱


2.2、CONNACK 确认连接请求


2.3、DISCONNECT 断开连接

客户端发送完DISCONNECT,断开连接;

服务端发送完,如有则删除遗嘱,如客户端未关闭连接则关闭连接;

2.4、PINGREQ 心跳请求、PINGRESP 心跳响应


3、发布相关的报文

3.1、PUBLISH 发布消息

3.2、PUBACK 发布确认(对QoS=1的PUBLISH响应)


4、订阅相关的报文

4.1、SUBSCRIBE 订阅主题

4.2、SUBACK 订阅确认

4.3、UNSUBSCRIBE 取消订阅、UNSUBACK 取消订阅确认


5、MQTT的代码

5.1、config 加载或更新系统参数

config.h定义了一个系统参数的结构体,用来存放WiFi信息和MQTT服务端域名、端口、本机的用户名密码等;

配置这些系统参数的宏定义在mqtt_config.h;

这个config文件的作用就是,从flash中加载系统参数,代码中利用了两个扇区存放系统参数,一个扇区存放现在的系统参数,一个扇区存放之前的系统参数,算是做一个备份。每次修改cfg_holder的时候,就会读出不一致,然后把默认的参数及新cfg_holder写入另一个扇区

// 系统参数---------------------------------------------------------------
typedef struct{uint32_t cfg_holder;     // 持有人标识(只有更新此数值,系统参数才会更新)uint8_t device_id[64];     // 客户端ID[64]        【官方例程中是[32],将其改为[64]】uint8_t sta_ssid[64];       // WIFI名[64]uint8_t sta_pwd[64];        // WIFI密码[64]uint32_t sta_type;         // STA类型uint8_t mqtt_host[64];      // MQTT服务端域名[64]uint32_t mqtt_port;         // MQTT端口uint8_t mqtt_user[64];     // MQTT用户名[64]      【官方例程中是[32],将其改为[64]】uint8_t mqtt_pass[64];      // MQTT密码[64]       【官方例程中是[32],将其改为[64]】uint32_t mqtt_keepalive;    // 保持连接时长uint8_t security;          // 安全类型
} SYSCFG;
// 加载/更新系统参数【WIFI参数、MQTT参数】
void ICACHE_FLASH_ATTR CFG_Load()
{spi_flash_read,读扇区0x7C的标志位saveFlag.flagif (saveFlag.flag == 0){spi_flash_read,读出扇区0x79的系统参数}else{spi_flash_read,读出扇区0x7A的系统参数}if(sysCfg.cfg_holder != CFG_HOLDER){         // 持有人标识不同sysCfg.cfg_holder = CFG_HOLDER;              // 更新持有人标识写入默认参数;CFG_Save(); 将更新后的系统参数烧录到另一扇区,并更新saveFlag.flag}
}

5.2、wifi、sntp、开启或断开MQTT连接

WIFI_Connect函数中,设置STA模式、WiFi名和密码,并开启1s的定时器WiFiLinker;

WiFiLinker回调函数中检查连接状态、是否获取到IP

  • 如果获取到IP则定时器改为2s;
  • 如果未获取到IP则定时器改为0.5s;
  • 此外,如果连接状态改变,调用user_main.c中的wifiConnectCb

wifiConnectCb函数:

  • 如果更新的状态为STATION_GOT_IP,意味着WiFi连接成功,则开启SNTP服务,并启动1s的定时器sntp_timer;
  • 如果更新的状态为其他,则是断开了网络连接,调用MQTT_Disconnect(&mqttClient)断开MQTT连接;

sntp_timer回调函数:

  • 如果获取网络时间失败,则打印错误信息、继续保持这个1s的定时器;
  • 如果获取网络时间成功,则打印正确的时间、关闭这个定时器,调用MQTT_Connect(&mqttClient)开启MQTT连接;

前面大概的启动流程就是这样,下面看看有关MQTT的部分了;


5.3、MQTT_Client结构体、user_init中MQTT初始化

// MQTT客户端
typedef struct
{struct espconn *pCon;              // TCP连接结构体指针uint8_t security;                  // 安全类型uint8_t* host;                       // 服务端域名/地址uint32_t port;                       // 网络连接端口号ip_addr_t ip;                     // 32位IP地址mqtt_state_t  mqtt_state;         // MQTT状态mqtt_connect_info_t connect_info;  // MQTT【CONNECT】报文的连接参数MqttCallback connectedCb;            // MQTT连接成功_回调MqttCallback disconnectedCb;      // MQTT断开连接_回调MqttCallback publishedCb;         // MQTT发布成功_回调MqttCallback timeoutCb;               // MQTT超时_回调MqttDataCallback dataCb;            // MQTT接收数据_回调ETSTimer mqttTimer;                   // MQTT定时器uint32_t keepAliveTick;               // MQTT客户端(ESP8266)心跳计数uint32_t reconnectTick;              // 重连等待计时uint32_t sendTimeout;              // 报文发送超时时间tConnState connState;                // ESP8266运行状态QUEUE msgQueue;                       // 消息队列void* user_data;                 // 用户数据(预留给用户的指针)} MQTT_Client;
void user_init(void)
{.......// 网络连接参数赋值:服务端域名、端口【1883】、安全类型【0:NO_TLS】MQTT_InitConnection(&mqttClient, sysCfg.mqtt_host, sysCfg.mqtt_port, sysCfg.security);// MQTT连接参数赋值:客户端标识符、MQTT用户名、MQTT密钥、保持连接时长【120s】、清除会话【1:clean_session】MQTT_InitClient(&mqttClient, sysCfg.device_id, sysCfg.mqtt_user, sysCfg.mqtt_pass, sysCfg.mqtt_keepalive, 1);// 设置遗嘱参数(如果云端没有对应的遗嘱主题,则MQTT连接会被拒绝)
//  MQTT_InitLWT(&mqttClient, "Will", "ESP8266_offline", 0, 0);// 设置MQTT相关函数MQTT_OnConnected(&mqttClient, mqttConnectedCb);         // 设置【MQTT成功连接】函数的另一种调用方式MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb);  // 设置【MQTT成功断开】函数的另一种调用方式MQTT_OnPublished(&mqttClient, mqttPublishedCb);            // 设置【MQTT成功发布】函数的另一种调用方式MQTT_OnData(&mqttClient, mqttDataCb);                  // 设置【接收MQTT数据】函数的另一种调用方式.......
}
  • MQTT_InitConnection

    • 为mqttClient申请内存;
    • 赋值服务器域名、端口、安全类型;
  • MQTT_InitClient
    • mqttClient赋值,除了参数列表中的还有mqtt_state、connect_info、msgQueue等信息;
    • 安排任务MQTT_Task;
  • MQTT_Onxxx,这部分是设置mqttClient中的回调函数;

5.4、MQTT_Connect、DNS域名解析、mqttTimer定时器

5.4.1、MQTT_Connect

MQTT_Connect 是在WiFi已连接、SNTP获取成功后调用的,下面的代码省略了很多,主要内容:

  • 设置TCP连接的参数,如服务器和ESP8266的端口号;
  • 注册TCP相关的回调函数:mqtt_tcpclient_connect_cb、mqtt_tcpclient_recon_cb;
  • 开启mqttTimer定时器;
  • 设置DNS域名解析,解析MQTT服务器的域名;
void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient)
{// 开始MQTT连接前,判断是否存在MQTT的TCP连接。如果有,则清除之前的TCP连接-if (mqttClient->pCon){mqtt_tcpclient_delete(mqttClient);    // 删除TCP连接、释放pCon内存、清除TCP连接指针}// TCP连接设置---------------------------------------------------------mqttClient->pCon->proto.tcp->local_port = espconn_port();                    // 获取ESP8266可用端口mqttClient->pCon->proto.tcp->remote_port = mqttClient->port;               // 设置端口号espconn_regist_connectcb(mqttClient->pCon, mqtt_tcpclient_connect_cb);       // 注册TCP连接成功的回调函数espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb);          // 注册TCP异常中断的回调函数mqttClient->keepAliveTick = 0; // MQTT客户端(ESP8266)心跳计数mqttClient->reconnectTick = 0;   // 重连等待计时:当进入重连请求状态后,需等待5秒,之后进行重新连接// 设置MQTT定时(1秒重复)【功能:心跳计时、重连计时、TCP发送计时】os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient);                                     // 解析域名----------------------------------------------------espconn_gethostbyname(mqttClient->pCon, mqttClient->host, &mqttClient->ip, mqtt_dns_found);mqttClient->connState = TCP_CONNECTING;      // TCP正在连接
}

5.4.2、DNS域名解析回调

  • 失败

    • 状态设为 TCP_RECONNECT_REQ,表示TCP重连请求;
  • 成功:
    • 调用espconn_connect 连接TCP;
    • 状态TCP_CONNECTING,表示TCP正在连接;
    • 安排任务MQTT_Task;

TCP_CONNECTING状态,任务里并没有执行什么操作,所以下一步5.5中是看TCP是否连接成功

LOCAL void ICACHE_FLASH_ATTR mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
{if (ipaddr == NULL){     // 域名解析失败client->connState = TCP_RECONNECT_REQ; // TCP重连请求(等待5秒)return;}// 判断IP地址是否正确(?=0)if (client->ip.addr == 0 && ipaddr->addr != 0){espconn_connect(client->pCon);            // TCP连接(作为Client连接Server)client->connState = TCP_CONNECTING;       // TCP正在连接}system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);       // 安排任务MQTT_Task
}

5.4.3、mqttTimer定时器

现在看上面蓝色的,DNS解析失败的后续操作;

mqttTimer定时器回调函数中,如果状态为TCP_RECONNECT_REQ,每秒自增,到5s后切换状态为TCP_RECONNECT,运行任务;

任务中再次调用MQTT_Connect,重新设置TCP连接相关参数,设置DNS域名解析,见5.4.1;

    else if (client->connState == TCP_RECONNECT_REQ){           // TCP重连请求(等待5秒)client->reconnectTick ++;                              // 重连计时++if (client->reconnectTick > MQTT_RECONNECT_TIMEOUT){    // 重连请求超过5秒client->reconnectTick = 0;                          // 重连计时 = 0client->connState = TCP_RECONNECT;                  // TCP重新连接system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);// 安排任务}}
     case TCP_RECONNECT_REQ:     break;case TCP_RECONNECT:mqtt_tcpclient_delete(client); // 删除TCP连接、释放pCon内存、清除TCP连接指针MQTT_Connect(client);          // MQTT连接准备:TCP连接、域名解析等INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port);client->connState = TCP_CONNECTING;      // TCP正在连接break;

5.5、TCP连接的相关操作(CONNECT、CONNACK)

当前面DNS域名解析成功,得知MQTT服务器的IP地址后,就会开始TCP连接了,有关注册TCP连接成功、连接失败的回调函数是在MQTT_Connect中;

5.5.1、TCP连接失败

mqtt_tcpclient_recon_cb,切换状态为TCP_RECONNECT_REQ,进入TCP重连请求状态,上面刚说过,就是在mqttTimer定时器中自增,到5s后切换状态为TCP_RECONNECT,在任务中重新调用MQTT_Connect;

void ICACHE_FLASH_ATTR mqtt_tcpclient_recon_cb(void *arg, sint8 errType)
{struct espconn *pCon = (struct espconn *)arg;     // 获取TCP连接指针MQTT_Client* client = (MQTT_Client *)pCon->reverse; // 获取mqttClient指针INFO("TCP: Reconnect to %s:%d\r\n", client->host, client->port);client->connState = TCP_RECONNECT_REQ;             // TCP重连请求(等待5秒)system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);  // 安排任务MQTT_Task
}

5.5.2、TCP连接成功、开始建立MQTT连接

  • 注册TCP正常断开、收发数据的回调函数;
  • 发送CONNECT报文;

发送CONNECT报文,就是按照MQTT协议,一点点配置固定报头、可变报头、有效载荷的部分,太细节了,这又涉及到了mqtt_msg,好家伙,又是个死长的代码,先不看了,等我写出bug再研究,哈哈哈哈~

void ICACHE_FLASH_ATTR mqtt_tcpclient_connect_cb(void *arg)
{struct espconn *pCon = (struct espconn *)arg;      // 获取TCP连接指针MQTT_Client* client = (MQTT_Client *)(pCon->reverse);// 获取mqttClient指针// 注册回调函数--------------------------------------------------------------espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb);   // TCP断开成功_回调espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv);           // TCP接收成功_回调espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb);        // TCP发送成功_回调INFO("MQTT: Connected to broker %s:%d\r\n", client->host, client->port);// 【CONNECT】报文发送准备--------------------------------------------------------------// 初始化MQTT报文缓存区mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length);// 配置【CONNECT】控制报文,并获取【CONNECT】报文[指针]、[长度]client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info);// 获取待发送的报文类型(此处是【CONNECT】报文)client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);// 获取待发送报文中的【报文标识符】(【CONNECT】报文中没有)client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,client->mqtt_state.outbound_message->length);// TCP发送成功/报文发送5秒计时结束 => 报文发送结束(sendTimeout=0)client->sendTimeout = MQTT_SEND_TIMOUT;   // 发送MQTT报文时,sendTimeout=5INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);// TCP:发送【CONNECT】报文----------------------------------------------------------espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);// 报文发送完--------------------------------------------------------------client->mqtt_state.outbound_message = NULL;        //清除出站报文指针client->connState = MQTT_CONNECT_SENDING;     //更新状态system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);    // 安排任务MQTT_Task
}

5.5.3、TCP接收数据回调中有关CONNACK连接确认

上面已经发送了CONNECT报文,服务器要返回CONNACK报文,所以看下TCP接收数据的回调,同样这里只截取有关CONNACK的内容;

  • 打印接收数据的长度、前4字节的内容;(长度为4、前4字节为:32、2、0、0)
  • 判断是MQTT_CONNECT_SENDING状态,这一状态在发送CONNECT报文中被设置;
    • 判断当前报文是CONNACK;

      • 修改状态为MQTT_DATA;
      • 执行MQTT连接成功的回调函数:mqttConnectedCb
void ICACHE_FLASH_ATTR mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
{INFO("TCP: data received %d bytes\r\n",len);INFO("TCP: data received %d,%d,%d,%d \r\n", *pdata,*(pdata+1),*(pdata+2),*(pdata+3));if (len<MQTT_BUF_SIZE && len>0){             // 接收到的数据长度在允许范围内os_memcpy(client->mqtt_state.in_buffer, pdata, len);    // 获取接收数据,存入【入站报文缓存区】msg_type = mqtt_get_type(client->mqtt_state.in_buffer); // 获取【报文类型】// 根据ESP8266运行状态,执行相应操作---------------------------------------------switch (client->connState){case MQTT_CONNECT_SENDING:              // 【MQTT_CONNECT_SENDING】if (msg_type == MQTT_MSG_TYPE_CONNACK){  // 判断消息类型!=【CONNACK】// ESP8266发送 == 【CONNECT】报文---------------------------------------INFO("MQTT: Connected to %s:%d\r\n", client->host, client->port);client->connState = MQTT_DATA;    // ESP8266状态改变:【MQTT_DATA】if (client->connectedCb)        // 执行[mqttConnectedCb]函数(MQTT连接成功函数)client->connectedCb((uint32_t*)client);  // 参数 = mqttClient}break;.......}}system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);   // 安排任务MQTT_Task
}

5.6、建立MQTT连接之后

在建立MQTT连接之后会调用这个函数,可以看到:

  • 订阅了主题"$iot/esp8266/user/both";
  • 向主题"$iot/esp8266/user/both"发布"ESP8266_Online"的消息;
// MQTT已成功连接:ESP8266发送【CONNECT】,并接收到【CONNACK】========================
void mqttConnectedCb(uint32_t *args)
{MQTT_Client* client = (MQTT_Client*)args; // 获取mqttClient指针INFO("MQTT: Connected\r\n");// 订阅主题【参数2:主题过滤器 / 参数3:订阅Qos】MQTT_Subscribe(client, "$iot/esp8266/user/both", 0);// 发布主题【参数2:主题名 / 参数3:发布消息的有效载荷 / 参数4:有效载荷长度 / 参数5:发布Qos / 参数6:Retain】MQTT_Publish(client, "$iot/esp8266/user/both", "ESP8266_Online", strlen("ESP8266_Online"), 0, 0);
}

5.7、订阅主题、向主题发布、解析收到的PUBLISH数据

终于到这里了,其实使用的话,只知道怎么订阅、发布,解析就可以,只是我不想。

5.7.1、订阅主题,使用MQTT_Subscribe

  • 根据主题过滤器、订阅Qos,配置SUBSCRIBE报文内容,其实调用的是mqtt_msg_subscribe;
  • 然后将SUBSCRIBE报文写入队列;
  • 发送SUBSCRIBE报文在MQTT_Task的case MQTT_DATA中;
// ESP8266订阅主题【参数2:主题过滤器 / 参数3:订阅Qos】
BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
{uint8_t dataBuffer[MQTT_BUF_SIZE];     // 解析后报文缓存(1204字节)uint16_t dataLen;                     // 解析后报文长度// 配置【SUBSCRIBE】报文,并获取【SUBSCRIBE】报文[指针]、[长度]client->mqtt_state.outbound_message=mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,topic, qos,&client->mqtt_state.pending_msg_id);INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);// 将报文写入队列,并返回写入字节数(包括特殊码)while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){INFO("MQTT: Queue full\r\n");// 解析队列中的报文if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1){INFO("MQTT: Serious buffer error\r\n");return FALSE;}}system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);   // 安排任务MQTT_Taskreturn TRUE;
}

5.7.2、发布主题,使用MQTT_Publish

  • 根据主题、有效载荷及长度、订阅Qos,retain,配置PUBLISH报文内容,其实调用的是mqtt_msg_publish;
  • 然后将PUBLISH报文写入队列;
  • 发送PUBLISH报文在MQTT_Task的case MQTT_DATA中;
// ESP8266向主题发布消息:【参数2:主题名 / 参数3:发布消息的有效载荷 / 参数4:有效载荷长度 / 参数5:发布Qos / 参数6:Retain】
BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain)
{uint8_t dataBuffer[MQTT_BUF_SIZE]; // 解析后报文缓存(1204字节)uint16_t dataLen;                 // 解析后报文长度// 配置【PUBLISH】报文,并获取【PUBLISH】报文[指针]、[长度]client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,topic, data, data_length,qos, retain,&client->mqtt_state.pending_msg_id);if (client->mqtt_state.outbound_message->length == 0){   // 判断报文是否正确INFO("MQTT: Queuing publish failed\r\n");return FALSE;}// 串口打印:【PUBLISH】报文长度,(队列装填数量/队列大小)INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);// 将报文写入队列,并返回写入字节数(包括特殊码)while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){INFO("MQTT: Queue full\r\n"); // 队列已满// 解析队列中的数据包if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1){INFO("MQTT: Serious buffer error\r\n");return FALSE;}}system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);    // 安排任务return TRUE;
}

5.7.3、解析收到的PUBLISH数据,调用user_main.c中的mqttDataCb

  • 首先,TCP接收数据的回调函数mqtt_tcpclient_recv中会收到这个报文;
  • 然后判断收到的是PUBLISH报文,执行有关应答的操作,PUBLISH数据内容处理调用deliver_publish
  • deliver_publish中调用的则是user_main.c中的mqttDataCb
             // ESP8266接收到【PUBLISH】报文:发布消息case MQTT_MSG_TYPE_PUBLISH:if (msg_qos == 1)      // 【服务端->客户端】发布消息 Qos=1client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);   // 配置【PUBACK】报文else if (msg_qos == 2) // 【服务端->客户端】发布消息 Qos=2client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);   // 配置【PUBREC】报文if (msg_qos == 1 || msg_qos == 2){INFO("MQTT: Queue response QoS: %d\r\n", msg_qos);// 将ESP8266应答报文(【PUBACK】或【PUBREC】),写入队列if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){INFO("MQTT: Queue full\r\n");}}// 获取服务端【PUBLISH】报文的【主题】、【有效载荷】deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);break;
// ESP8266获取服务端【PUBLISH】报文的【主题】、【有效载荷】
LOCAL void ICACHE_FLASH_ATTR deliver_publish(MQTT_Client* client, uint8_t* message, int length)
{mqtt_event_data_t event_data;event_data.topic_length = length;        // 主题名长度初始化event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length);// 获取【PUBLISH】报文的主题名(指针)、主题名长度event_data.data_length = length;        // 有效载荷长度初始化event_data.data = mqtt_get_publish_data(message, &event_data.data_length);  // 获取【PUBLISH】报文的载荷(指针)、载荷长度// 进入【接收MQTT的[PUBLISH]数据】函数if (client->dataCb)client->dataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length);
}

在mqttDataCb这个函数里,就可以根据收到的主题名,有效载荷的内容执行你想要的操作了;

在INFO打印的信息里,有Topic_len和Data_len,就可以发现这个长度是不包括字符串末尾的'\0'的;

os_strcmp是比较两个字符串的内容是否一致,比较一致的结束是以'\0'为条件的,所以要进行缓存添加'\0'的相关操作;

// 【接收MQTT的[PUBLISH]数据】函数        【参数1:主题 / 参数2:主题长度 / 参数3:有效载荷 / 参数4:有效载荷长度】
void mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t data_len)
{char *topicBuf = (char*)os_zalloc(topic_len+1);      // 申请【主题】空间char *dataBuf  = (char*)os_zalloc(data_len+1);     // 申请【有效载荷】空间MQTT_Client* client = (MQTT_Client*)args;         // 获取MQTT_Client指针os_memcpy(topicBuf, topic, topic_len);                // 缓存主题topicBuf[topic_len] = 0;                            // 最后添'\0'os_memcpy(dataBuf, data, data_len);                 // 缓存有效载荷dataBuf[data_len] = 0;                                // 最后添'\0'INFO("Receive topic: %s, data: %s \r\n", topicBuf, dataBuf);  // 串口打印【主题】【有效载荷】INFO("Topic_len = %d, Data_len = %d\r\n", topic_len, data_len);    // 串口打印【主题长度】【有效载荷长度】// 根据接收到的主题名/有效载荷,控制LED的亮/灭if( os_strcmp(topicBuf,"$iot/esp8266/user/both") == 0){          // 主题 == "SW_LED"if( os_strcmp(dataBuf,"LED_ON") == 0){         // 有效载荷 == "LED_ON"GPIO_OUTPUT_SET(GPIO_ID_PIN(4),0);           // LED亮}else if( os_strcmp(dataBuf,"LED_OFF") == 0 ){   // 有效载荷 == "LED_OFF"GPIO_OUTPUT_SET(GPIO_ID_PIN(4),1);          // LED灭}}os_free(topicBuf); // 释放【主题】空间os_free(dataBuf);    // 释放【有效载荷】空间
}

写完了,真开心,虽然才下午三点,我也要去床上躺平!!!我要看电影,玩游戏,吃辣条,吃冰棍,睡大觉!!!溜了溜了。

ESP8266_MQTT协议相关推荐

  1. 常用开源协议介绍以及开源软件规范列表

    1. 开源协议介绍 GPL: General Public License,开源项目最常用的许可证,衍生代码的分发需开源并且也要遵守此协议.该协议也有很多变种,不同变种要求会略微不同. MPL: MP ...

  2. Redis 笔记(11)— 文本协议 RESP(单行、多行字符串、整数、错误、数组、空值、空串格式、telnet 登录 redis)

    RESP 是 Redis 序列化协议Redis Serialization Protocol 的简写.它是一种直观的文本协议,优势在于实现异常简单,解析性能极好. ​ Redis 协议将传输的结构数据 ...

  3. HTTP 协议入门 — (TCP/IP协议族、通信传输流、URI 与 URL 的区别、Cookie 状态管理、HTTP 支持的方法、状态码类别、HTTP 首部字段)

    TCP/IP协议族 在介绍 HTTP 协议之前,我们先对 TCP/IP 协议族有个大概的了解,TCP/IP 协议从上到下主要分为应用层.传输层.网络层和数据链路层,各层的主要功能如下表所示: 协议层 ...

  4. 【JavaWeb】servlet与http请求协议

    Servlet: 概念: server applet (服务端小程序)运行在服务器端的小程序 Servlet就是一个接口,定义了Java类被浏览器访问到(Tomcat识别)的规则. 将我我们自定义一个 ...

  5. synopsys PCIE IP协议解析

    synopsys PCIE IP协议解析 1.Overview Core支持单个Pcie内核的Loopback功能,该功能主要为了做芯片验证,以及在没有远程接收器件的情况下完成自己的回环.同时,Cor ...

  6. 用户自定义协议client/server代码示例

    用户自定义协议client/server代码示例 代码参考链接:https://github.com/sogou/workflow message.h message.cc server.cc cli ...

  7. Thrift协议与传输选择

    1 协议 Thrift 可以让用户选择客户端与服务端之间传输通信的消息协议类别,如我们前面所讲总体划分为文本 (text) 和二进制 (binary) ,为节约带宽,提高传输效率,一般情况下使用二进制 ...

  8. TCP/UDP协议基本概念

    TCP和UDP协议是TCP/IP协议的核心. TCP 传输协议:TCP 协议是一TCP (Transmission Control Protocol)和UDP(User Datagram Protoc ...

  9. 【网站汇总】单片机常用通讯协议

    1.UART UART协议快速扫盲(图文并茂+超详细)_GREYWALL-CSDN博客 UART串口协议详解 - 知乎 基于STM32之UART串口通信协议(一)详解 - LLLIN000 - 博客园 ...

最新文章

  1. php 进程管理,php如何管理进程
  2. 如何在Tensorflow.js中处理MNIST图像数据
  3. 【HAOI2015】树上染色
  4. java读写文件大全
  5. 关于eclipse中文注释乱码的问题
  6. C++ windows 平台的 Hook
  7. js的正则自定义金额输入验证函数
  8. 正确注释@return让PHPstorm动态返回类
  9. python 密度聚类 使用_使用python+sklearn实现硬币图像上的结构化Ward层次聚类演示...
  10. python数字转换成字符串比较大小_Python 字符串操作(string替换、删除、截取、复制、连接、比较、查找、包含、大小写转换、...
  11. HDFS进阶应用 配置 NFS 网关
  12. Sturts2 与android的图片上传交互
  13. VASP服务器第一次安装各种软件(中)
  14. 微信小程序及其兼容性
  15. 挥别2019,喜迎2020
  16. 理解本真的REST架构风格
  17. 解9*9数独算法(C++)
  18. 数值计算笔记之数值积分(一)
  19. [转]阿里云的这群疯子
  20. codeforces round#517

热门文章

  1. 2021年这些高频面试知识点最后再发一次,跳槽薪资翻倍
  2. HDOJ--2022
  3. 利用 tesseract 自动识别图片中的文字
  4. ACM-ICPC 2018 徐州赛区网络预赛 I. Characters with Hash
  5. Java依赖于抽象不依赖于具体,依赖倒置原则(Dependecy-Inversion Principle)
  6. k-nearest neighbor,k近邻法
  7. 我对计算机最感兴趣作文300,电脑让我欢喜让我忧作文300字
  8. 手拉手微商俱乐部 微信营销实战课程开讲啦
  9. 对 matplotlib.cm.RdYlBu() 的理解
  10. 新一代iPhoneSE支持5G,有望搭载A15仿生芯片