使用STM32 W5500做MQTT Client,使得数据上传broker,并接收broker传来的消息,并支持断网/拔网线再插入网线能够重新连接broker这样的功能,需要具备以下条件:

1、STM32 W5500基础入网配置,使能PC电脑端可以PING通W5500。

2、STM32 W5500的TCP Client收发数据的回环测试没有问题。

3、了解MQTT协议。

关于MQTT的介绍,本文不做重点。需要了解的是MQTT协议是基于TCP协议之上封装的协议。

关于MQTT Client依赖的MQTT支持库函数,下载地址 《MQTT C语言库函数》

这些库函数是干嘛的?

MQTT协议在STM32 W5500中使用的前提,首先通过TCP连接到broker指定的IP和端口。

然后需要发送MQTT连接的指令,这个指令内容是通过 "MQTTConnectClient.c"文件中的

int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options)

这个方法来实现组装的,返回值大于0表示组装后的数组有效内容长度,在通过W5500的send方法,发送给broker。

broker接收到Client端发来的MQTT连接请求后,会返回一组数据,判断是否连接成功,或者各种失败(协议版本错误,用户名密码错误等)。

MQTT Client如果想要接收到broker发来的消息,需要先订阅主题,订阅主题的指令内容是通过"MQTTSubscribeClient.c"文件中的

int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, int count,MQTTString topicFilters[], int requestedQoSs[])

这个方法来实现组装的,同样返回值大于0表示组装后的数组有效内容长度,在通过W5500的send方法,发送给broker。

以上两个举例都是组装指令内容。

那么接收到broker发来消息,如何解析?

"MQTTDeserializePublish.c"这个文件的

int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName,unsigned char** payload, int* payloadlen, unsigned char* buf, int buflen)

这个方法可以实现消息内容的解析。

总之,依赖的MQTT支持库函数几乎可以使我们不用在乎协议的具体内容,就可以实现MQTT Client的功能。

STM32 W5500 MQTT Client端,我通过枚举类型给它定义三种状态

enum MQTT_STATE {MQTT_INIT, MQTT_CONNOK, MQTT_SUBOK};

MQTT_INIT - 初始状态(MQTT未连接,未订阅,注意是MQTT的,而不是TCP连接了没有)

MQTT_CONNOK - MQTT连接成功(MQTT Client端发起MQTT连接,并接收到了broker返回连接成功)

MQTT_SUBOK - MQTT订阅成功(MQTT Client端想broker订阅消息,并受到了broker返回订阅成功)

这几种状态的关连,在程序开始执行时,MQTT Client端处于MQTT_INIT状态,或者程序执行一段时间后,MQTT PING指令发几次broker没有回复,认为MQTT Client端处于MQTT_INIT状态。

MQTT Client端处于MQTT_CONNOK 状态时可以发布数据到broker,但是无法接收来自broker的消息。

MQTT Client端处于MQTT_SUBOK 状态时可以发布数据到broker,也可以接收来自broker的消息。

如果TCP Client处于CLOSE的状态,那么MQTT Client端将处于MQTT_INIT 状态。

做好MQTT Client端的难点在于维系 TCP socket的状态与MQTT Client的状态的关系。

贴出我实现MQTT Client的c代码:

impl_mqtt.c

#ifndef __IMPL_MQTT_H
#define __IMPL_MQTT_H
#include "impl_mqtt.h"
#endifint mqttstate = MQTT_INIT;
int cnt_ping_not_response = 0;
int cnt_sock_init = 0;
u8 buf_pub[1024];
u32 ping_timestamp, now_timestamp;int func_tcp_sock_send(u8 sockno, u8 *buf_mqsend, u16 len_mqsend)
{if(getSn_SR(sockno) == SOCK_ESTABLISHED){return send(sockno, buf_mqsend, len_mqsend);}return -1;
}int func_tcp_sock_read(u8 sockno, u8 *buf_mqrecv, u16 len_mqrecv)
{if((getSn_SR(sockno) == SOCK_ESTABLISHED)){len_mqrecv = getSn_RX_RSR(sockno);if(len_mqrecv > 0){return recv(sockno, buf_mqrecv, len_mqrecv);}}return -1;
}void func_judge_timeout_ms(u32 *timespan)
{delay_ms(1);*timespan = *timespan + 1;
}u8 func_judge_mqtt_recvmsg_package_type(u8 *buf_mqrecv, u16 len_mqrecv)
{MQTTHeader header = {0};if(len_mqrecv > 0){header.byte = buf_mqrecv[0];return header.bits.type;}return 0;
}void func_mqtt_client_dealwith_recvmsg(u8 sockno, u8 *buf_mqrecv, u16 len_mqbuf, u16 len_mqrecv)
{if(len_mqrecv > 0){ping_timestamp = get_systick_timestamp();// package type to dealswitch(func_judge_mqtt_recvmsg_package_type(buf_mqrecv, len_mqrecv)){case CONNACK:break;case PUBLISH://analysis msg{int rc;u8 buf_recv[1024];u8* payload;int len_payload;unsigned char retained, dup;int qos;unsigned short packetid; MQTTString topicrecv;MQTTString topicpub;payload = buf_recv;rc = MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicrecv, &payload, &len_payload, buf_mqrecv, len_mqrecv);if(rc == 1){//TODO ...//TEST codetopicpub.cstring = (char*)"mytopic";memset(buf_pub, 0, sizeof(buf_pub));func_run_mqtt_publish(sockno, buf_pub, sizeof(buf_pub), topicpub, payload, len_payload);}}         break;case PUBACK:break;case PUBREC://Qos2 msg receiptcase PUBREL://Qos2 msg receiptcase PUBCOMP://Qos2 msg receipt{unsigned char packettype, dup;unsigned short packetid; if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf_mqrecv, len_mqbuf) == 1){memset(buf_mqrecv, 0, len_mqbuf);len_mqrecv = MQTTSerialize_ack(buf_mqrecv, len_mqbuf, packettype, dup, packetid);if(len_mqrecv > 0){func_tcp_sock_send(sockno, buf_mqrecv, len_mqrecv);}}}break;case SUBACK:case UNSUBACK:case PINGREQ:case PINGRESP:case DISCONNECT:break;default:break;}}
}void func_mqtt_client_connect_broker(int *state, u8 sockno, u8 *buf_mqsend, u16 len_mqsend, MQTTPacket_connectData *conn_mqtt)
{u32 timespan;int len_cont;int res;len_cont = MQTTSerialize_connect(buf_mqsend, len_mqsend, conn_mqtt);if(len_cont > 0){res = func_tcp_sock_send(sockno, buf_mqsend, len_cont);if(res > 0){timespan = 0;memset(buf_mqsend, 0, len_mqsend);//reuse bufferfor(;;){if((len_cont = func_tcp_sock_read(sockno, buf_mqsend, len_mqsend)) > 3){if(func_judge_mqtt_recvmsg_package_type(buf_mqsend, len_cont) == CONNACK) {*state = MQTT_CONNOK;ping_timestamp = get_systick_timestamp();}break;}func_judge_timeout_ms(&timespan);if(timespan > 500){break;}}}}
}void func_mqtt_client_ping_broker(int *state, u8 sockno, u8 *buf_mqsend, u16 len_mqsend)
{u32 timespan;int len_cont;int res;if(get_systick_timestamp() - ping_timestamp > 5*1000){        len_cont = MQTTSerialize_pingreq(buf_mqsend, len_mqsend);if(len_cont > 0){res = func_tcp_sock_send(sockno, buf_mqsend, len_cont);if(res > 0){timespan = 0;memset(buf_mqsend, 0, len_mqsend);//reuse bufferfor(;;){len_cont = func_tcp_sock_read(sockno, buf_mqsend, len_mqsend);// pingrsp or others' published msg if(len_cont > 0){//recv pingrspping_timestamp = get_systick_timestamp();cnt_ping_not_response = 0;// other type msg to deal withfunc_mqtt_client_dealwith_recvmsg(sockno, buf_mqsend, len_mqsend, len_cont);break;}func_judge_timeout_ms(&timespan);if(timespan > 10){cnt_ping_not_response ++;if(cnt_ping_not_response > 1){*state = MQTT_INIT;close(sockno);cnt_ping_not_response = 0;}break;}}}else{cnt_ping_not_response ++;if(cnt_ping_not_response > 2){*state = MQTT_INIT;close(sockno);cnt_ping_not_response = 0;}}}}
}int func_run_mqtt_publish(u8 sockno, u8 *buf_mqsend, u16 len_mqsend, MQTTString topicName, u8* payload, int payloadlen)
{int len;int rc;if(mqttstate >= MQTT_CONNOK){len = MQTTSerialize_publish(buf_mqsend, len_mqsend, 0, 0, 0, 0, topicName, payload, payloadlen);if(len > 0){memcpy(buf_pub, buf_mqsend, len);rc = func_tcp_sock_send(sockno, buf_pub, len);if(rc > 0){ping_timestamp = get_systick_timestamp();}return rc;}}  return 0;
}int func_mqtt_client_subtopic_from_broker(u8 sockno, u8 *buf_mqsend, u16 len_mqsend, int count,\MQTTString topicFilters[], int requestedQoSs[])
{u32 timespan;int len_cont;int res;if(mqttstate != MQTT_SUBOK){len_cont = MQTTSerialize_subscribe(buf_mqsend, len_mqsend, 0, SUBSCRIBE, count, topicFilters, requestedQoSs);if(len_cont > 0){res = func_tcp_sock_send(sockno, buf_mqsend, len_cont);if(res > 0){timespan = 0;memset(buf_mqsend, 0, len_mqsend);//reuse bufferfor(;;){if((len_cont = func_tcp_sock_read(sockno, buf_mqsend, len_mqsend)) > 0 && func_judge_mqtt_recvmsg_package_type(buf_mqsend, len_cont) == SUBACK)//nowtime, ignore other type msg {mqttstate = MQTT_SUBOK;ping_timestamp = get_systick_timestamp();return 0;}func_judge_timeout_ms(&timespan);if(timespan > 500){return -2;}}}}}return -1;
}void func_mqtt_client_recvmsg_from_broker(u8 sockno, u8 *buf_mqsend, u16 len_mqsend)
{int len_cont;memset(buf_mqsend, 0, len_mqsend);len_cont = func_tcp_sock_read(sockno, buf_mqsend, len_mqsend);if(len_cont > 0){func_mqtt_client_dealwith_recvmsg(sockno, buf_mqsend, len_mqsend, len_cont);}
}u8 func_run_mqtt_tcpsock(u8 sockno, u8 *broker_ip, u16 broker_port, u8 *buf_mqsend, u16 len_mqsend, MQTTPacket_connectData *conn_mqtt)
{static u16 any_port = 50000;u8 res;switch(getSn_SR(sockno)){case SOCK_CLOSED:{close(sockno);socket(sockno, Sn_MR_TCP, any_port++, 0x00);cnt_sock_init++;if(cnt_sock_init > 30){cnt_sock_init = 0;close(sockno);mqttstate = MQTT_INIT;}if(any_port > 64000){any_port =50000;}}            break;case SOCK_INIT:{res = connect(sockno, broker_ip, broker_port);if(res){//mqtt connect requestmqttstate = func_run_mqtt_progress(mqttstate, sockno, buf_mqsend, len_mqsend, conn_mqtt);}else{if(cnt_ping_not_response > 0){mqttstate = MQTT_INIT;}}}break;case SOCK_ESTABLISHED:{//run mqtt progressmqttstate = func_run_mqtt_progress(mqttstate, sockno, buf_mqsend, len_mqsend, conn_mqtt);}break;case SOCK_CLOSE_WAIT: {mqttstate = MQTT_INIT;close(sockno);}break;default:break;}return mqttstate;
}u8 func_run_mqtt_progress(int state, u8 sockno, u8 *buf_mqsend, u16 len_mqsend, MQTTPacket_connectData *conn_mqtt)
{switch(state){case MQTT_INIT:{func_mqtt_client_connect_broker(&state, sockno, buf_mqsend, len_mqsend, conn_mqtt);}break;case MQTT_CONNOK:{func_mqtt_client_ping_broker(&state, sockno, buf_mqsend, len_mqsend);if(state > MQTT_INIT){
//              func_mqtt_client_subtopic_from_broker(&state, sockno, buf_mqsend, len_mqsend);}}break;case MQTT_SUBOK:{func_mqtt_client_ping_broker(&state, sockno, buf_mqsend, len_mqsend);if(state != MQTT_INIT){func_mqtt_client_recvmsg_from_broker(sockno, buf_mqsend, len_mqsend);}}break;default:break;}return state;
}

可能不是十分完美,但是一般工程上使用应该问题不大,我也测试了好久。

测试的主函数,是做了MQTT的回环测试,MQTT Client端连接到broker后,发起订阅主题,并一次性订阅多个主题,分别是字符串subtopic、subtopic2、subtopic3、subtopic4。当其他客户端连接到broker后,向这四个主题发布消息,STM32 W5500 MQTT Client端接收后,会向 mytopic发布一条消息。

#ifndef __STM32F10X_H
#define __STM32F10X_H
#include "stm32f10x.h"
#endif#ifndef __Z_UTIL_TIME_H
#define __Z_UTIL_TIME_H
#include "z_util_time.h"
#endif#ifndef __Z_HARDWARE_LED_H
#define __Z_HARDWARE_LED_H
#include "z_hardware_led.h"
#endif#ifndef __Z_HARDWARE_SPI_H
#define __Z_HARDWARE_SPI_H
#include "z_hardware_spi.h"
#endif#ifndef __W5500_H
#define __W5500_H
#include "w5500.h"
#endif#ifndef __SOCKET_H
#define __SOCKET_H
#include "socket.h"
#endif#ifndef __W5500_CONF_H
#define __W5500_CONF_H
#include "w5500_conf.h"
#endif#ifndef __DHCP_H
#define __DHCP_H
#include "dhcp.h"
#endif#ifndef __Z_HARDWARE_USART2_H
#define __Z_HARDWARE_USART2_H
#include "z_hardware_usart2.h"
#endif#include "MQTTPacket.h"#ifndef __IMPL_MQTT_H
#define __IMPL_MQTT_H
#include "impl_mqtt.h"
#endifint main(void)
{u32 dhcp_timestamp;u8 ip_broker[] = {192, 168, 1, 127};u16 port_broker = 1883;u8 buf_mqtt_send[1024];u8 mac[6]={0, };DHCP_Get dhcp_get;int mqtt_stat;MQTTString sub_topic = MQTTString_initializer;MQTTString sub_topic2 = MQTTString_initializer;MQTTString sub_topic3 = MQTTString_initializer;MQTTString sub_topic4 = MQTTString_initializer;MQTTString sub_topics[4];int nums_sub_topic_qoss[4] = {0, };char stpc_str[64] = {'t', 'c'};MQTTPacket_connectData conn_mqtt = MQTTPacket_connectData_initializer;conn_mqtt.willFlag = 0;conn_mqtt.MQTTVersion = 3;conn_mqtt.clientID.cstring = (char*)"dev_abcdef";conn_mqtt.username.cstring = (char*)"abcdef";conn_mqtt.password.cstring = (char*)"123456";conn_mqtt.keepAliveInterval = 60;conn_mqtt.cleansession = 1;   systick_configuration();init_led();init_system_spi();func_w5500_reset();init_hardware_usart2_dma(9600);getMacByLockCode(mac);setSHAR(mac);sysinit(txsize, rxsize);setRTR(2000);setRCR(3);//DHCPfor(;func_dhcp_get_ip_sub_gw(1, mac, &dhcp_get, 500) != 0;);    if(func_dhcp_get_ip_sub_gw(1, mac, &dhcp_get, 500) == 0){setSUBR(dhcp_get.sub);setGAR(dhcp_get.gw);setSIPR(dhcp_get.lip);close(1);}dhcp_timestamp = get_systick_timestamp();memcpy(stpc_str, (char*)"subtopic", strlen("subtopic"));sub_topic.cstring = stpc_str;sub_topics[0] = sub_topic;sub_topic2.cstring = "subtopic2";sub_topics[1] = sub_topic2;sub_topic3.cstring = "subtopic3";sub_topics[2] = sub_topic3;sub_topic4.cstring = "subtopic4";sub_topics[3] = sub_topic4;for(;;){if(get_systick_timestamp() - dhcp_timestamp > 59*1000)// 1 min dhcp{dhcp_timestamp = get_systick_timestamp();if(func_dhcp_get_ip_sub_gw(1, mac, &dhcp_get, 500) == 0){setSUBR(dhcp_get.sub);setGAR(dhcp_get.gw);setSIPR(dhcp_get.lip);close(1);}}mqtt_stat = func_run_mqtt_tcpsock(2, ip_broker, port_broker, buf_mqtt_send, sizeof(buf_mqtt_send), &conn_mqtt);if(mqtt_stat >= MQTT_CONNOK){if(mqtt_stat == MQTT_CONNOK){memset(buf_mqtt_send, 0, sizeof(buf_mqtt_send));func_mqtt_client_subtopic_from_broker(2, buf_mqtt_send, sizeof(buf_mqtt_send), 4,    sub_topics, nums_sub_topic_qoss);}func_led1_toggle();}      delay_ms(500);}
}

电脑端使用MQTT.fx工具进行测试,测试效果

目前测试还比较稳定,支持热插拔网线,以及路由器断网后再次联网,MQTT Client仍可继续连接broker。

STM32 W5500 MQTT Client 发布订阅及断线重连相关推荐

  1. MQTT协议 发布/订阅 机制初探 - (模拟物联网传感器设备和控制模块间的通信)

    MQTT协议 发布/订阅 机制初探 - (模拟物联网传感器设备和控制模块间的通信) 1. 实验环境介绍 Windows 2. 安装MQTT服务器并运行 管理员身份运行安装工具 选择安装路径 点击安装 ...

  2. STM32 W5500 OTA功能 - bootloader及app的设计和实现

    通过W5500的网络功能,到文件服务器下载STM32要更新的固件(可执行bin文件),存储到STM32片内FLASH的APP备份区中,以待bootloader拷贝到APP代码执行区,以实现OTA在线升 ...

  3. websocket连接mqtt实现发布及订阅主题

    2019独角兽企业重金招聘Python工程师标准>>> 环境:linux(ubuntu.Centos7),websocket,mosquitto-1.4.10,libwebsocke ...

  4. mqtt session保持 订阅消息_MQTT系列 | MQTT消息的发布和订阅

    1. MQTT的发布 MQTT发布中最重要的是PUBLISH数据包,PUBLISH数据包是用于sender和receiver之间传输消息数据的.当Publisher要向某个Topic发布一条消息的时候 ...

  5. mqtt server python_使用python实现mqtt的发布和订阅

    需要安装的python库 使用python编写程序进行测试MQTT的发布和订阅功能.首先要安装:pip install paho-mqtt 测试发布(pub) 我的MQTT部署在阿里云的服务器上面,所 ...

  6. MQTT协议之订阅及发布(使用paho-mqtt-client或mqttv3实现)

    另外一个MQTT发布订阅客户端paho-mqtt-client或mqttv3采用回调的方式实现消息的接收,下面看一下实现: 1.消息接收回调类 package cn.smartslim.mqtt.de ...

  7. 基于MQTT的消息发布订阅python实现

    简介: MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)是一种基于发布/订阅范式的"轻量级"消息协议.该协议构建于TCP ...

  8. paho架构_基于paho springboot集成mqtt实现消息的发布订阅 - 人人都是架构师

    1.首先添加pom依赖 org.springframework.boot spring-boot-starter-integration org.springframework.integration ...

  9. mosquitto c语言编程,MQTT通讯协议(mosquitto)发布订阅例子C语言实现

    1.前言 前面对MQTT进行了简单的介绍,并了解了如何在Linux上搭建MQTT 的运行 环境,参考连接:MQTT通讯协议(mosquitto)在Linux上的环境构建与测试,那些仅仅是经过命令去测试 ...

  10. Linux编程MQTT实现主题发布订阅

    [物联网阿里云平台开发项目实战|附课件资料]智能硬件开发-数据上云,零基础入门 4G模块连接阿里云教程 MQTT通信协议(mosquitto)在Linux上的环境构建与测试 MQTT通信协议(mosq ...

最新文章

  1. OpenCV4 C++学习 必备基础语法知识二
  2. Metadata Lock原理2
  3. 向sdcard中添加文件遇到的一些问题
  4. sphinx索引分析——文件格式和字典是double array trie 检索树,索引存储 – 多路归并排序,文档id压缩 – Variable Byte Coding...
  5. 《剑指offer》c++版本 18.删除链表的结点
  6. 【Python】查找目标值在列表中的索引序号
  7. docker开启mysql的binlog日志
  8. zip、rar文件格式
  9. oracle时间去掉时分秒的时间_超详细的oracle修改AWR采样时间间隔和快照保留时间教程...
  10. Spring Data + Thymeleaf 3 + Bootstrap 4 实现分页器
  11. 【基本办公软件】万彩办公大师教程丨二维条码制作工具
  12. powerdesigner 显示窗口小工具栏
  13. VirtualBox安装教程及使用(Windows)
  14. JavaScript设计模式-观察者模式
  15. 腾讯手机管家android版,腾讯手机管家上线Android8.11.0版本
  16. 74cms v6.0.48模版注入+文件包含getshell复现
  17. Nginx-反向代理
  18. 办公大师系列经典丛书 诚聘译者
  19. Baxter实战——baxter摇摆起来(打开gazebo进入baxter仿真)
  20. Express全系列教程之(十五):文件下载

热门文章

  1. linux系统查看网卡对应PCI地址
  2. 计算机键盘打字基础知识,电脑打字入门基础知识
  3. SqlServer 备份还原
  4. CentOS 7超详细安装与网络配置
  5. java 帕斯卡_Java编程实现帕斯卡三角形代码示例
  6. 色彩知识的运用 和 色彩意境解析
  7. 仅15%的L2智能驾驶搭载DMS,「安全」背后的市场爆发在即
  8. java中依赖_java中依赖、关联、聚合
  9. Z变换零极点与收敛域的关系
  10. 康托尔悖论:大全集不存在,即包含一切集合的集合是否存在