潘多拉 IOT 开发板学习(RT-Thread)—— 实验19 MQTT 协议通信实验(学习笔记)
本文代码参考 RT-Thread 官方 BSP
文章目录
- 实验功能
- 代码剖析
- rt_wlan_register_event_handler()
- mq_start()
- mqtt_sub_callback()
- mqtt_sub_default_callback()
- mqtt_connect_callback()
- mqtt_online_callback()
- mqtt_offline_callback()
- LOG_D()
实验功能
例程源码:(main函数)
该实验实现的功能:WiFi 初始化后创建 mqtt 客户端,然后开启 WiFi 自动连接(WiFi 底层代码本文不研究)。在网络连接上的情况下,mqtt 客户端会订阅一个主题,同时向主题发送和接收数据。
int main(void)
{/* 注册 wlan 回调函数 */rt_wlan_register_event_handler(RT_WLAN_EVT_READY, (void (*)(int, struct rt_wlan_buff *, void *))mq_start, RT_NULL);/* 初始化自动连接功能 */wlan_autoconnect_init();/* 使能 wlan 自动连接 */rt_wlan_config_autoreconnect(RT_TRUE);
}
代码剖析
rt_wlan_register_event_handler()
Wlan 事件回调注册函数,其实就是给 event_tabe[event]
的 handler() 和 parameter 成员赋值。
rt_err_t rt_wlan_register_event_handler(rt_wlan_event_t event, rt_wlan_event_handler handler, void *parameter)
{rt_base_t level;if (event >= RT_WLAN_EVT_MAX){return RT_EINVAL;}RT_WLAN_LOG_D("%s is run event:%d", __FUNCTION__, event);MGNT_LOCK();/* Registering Callbacks */level = rt_hw_interrupt_disable();event_tab[event].handler = handler;event_tab[event].parameter = parameter;rt_hw_interrupt_enable(level);MGNT_UNLOCK();return RT_EOK;
}
mq_start()
mqtt 初始化函数,主要配置了一些参数和回调函数,最后调用 paho_mqtt_start() 启动 mqtt 客户端。
/* 创建与配置 mqtt 客户端 */
static void mq_start(void)
{/* 初始 condata 参数 */MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;static char cid[20] = {0};static int is_started = 0;if (is_started){return;}/* 配置 MQTT 文本参数 */{client.isconnected = 0;client.uri = MQTT_URI;/* 生成随机客户端 ID */rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());rt_snprintf(sup_pub_topic, sizeof(sup_pub_topic), "%s%s", MQTT_PUBTOPIC, cid);/* 配置连接参数 */memcpy(&client.condata, &condata, sizeof(condata));client.condata.clientID.cstring = cid;client.condata.keepAliveInterval = 60;client.condata.cleansession = 1;client.condata.username.cstring = MQTT_USERNAME;client.condata.password.cstring = MQTT_PASSWORD;/* 配置 mqtt 参数 */client.condata.willFlag = 0;client.condata.will.qos = 1;client.condata.will.retained = 0;client.condata.will.topicName.cstring = sup_pub_topic;client.buf_size = client.readbuf_size = 1024;client.buf = malloc(client.buf_size);client.readbuf = malloc(client.readbuf_size);if (!(client.buf && client.readbuf)){LOG_E("no memory for MQTT client buffer!");goto _exit;}/* 设置事件回调 */client.connect_callback = mqtt_connect_callback;client.online_callback = mqtt_online_callback;client.offline_callback = mqtt_offline_callback;/* 设置要订阅的 topic 和 topic 对应的回调函数 */client.messageHandlers[0].topicFilter = sup_pub_topic;client.messageHandlers[0].callback = mqtt_sub_callback;client.messageHandlers[0].qos = QOS1;/* 设置默认订阅回调函数 */client.defaultMessageHandler = mqtt_sub_default_callback;}/* 启动 MQTT 客户端 */LOG_D("Start mqtt client and subscribe topic:%s", sup_pub_topic);paho_mqtt_start(&client);is_started = 1;_exit:return;
}
paho_mqtt_start()
paho_mqtt 客户端启动函数,该函数创建了一个静态线程,线程启动后,会执行 paho_mqtt_thread() 函数,
int paho_mqtt_start(MQTTClient *client)
{rt_err_t result;rt_thread_t tid;int stack_size = RT_PKG_MQTT_THREAD_STACK_SIZE;int priority = RT_THREAD_PRIORITY_MAX / 3;char *stack;static int is_started = 0;if (is_started){LOG_D("paho mqtt has already started!");return 0;} tid = rt_malloc(RT_ALIGN(sizeof(struct rt_thread), 8) + stack_size);if (!tid){LOG_E("no memory for thread: MQTT");return -1;}stack = (char *)tid + RT_ALIGN(sizeof(struct rt_thread), 8);result = rt_thread_init(tid,"MQTT",paho_mqtt_thread, client, // fun, parameterstack, stack_size, // stack, sizepriority, 2 //priority, tick);if (result == RT_EOK){rt_thread_startup(tid);is_started = 1;}return 0;
}
paho_mqtt_thread()
paho_mqtt 线程入口函数,里面会执行 mqtt 连接回调函数 connect_callback()、连接成功回调函数 online_callback() 和 连接断开回调函数 offline_callback(),这些函数都已经在 main.c 中定义。
static void paho_mqtt_thread(void *param)
{MQTTClient *c = (MQTTClient *)param;int i, rc, len;int rc_t = 0;/* create publish pipe. */if (pipe(c->pub_pipe) != 0){LOG_E("creat pipe err");goto _mqtt_exit;}_mqtt_start:if (c->connect_callback){c->connect_callback(c);}rc = net_connect(c);if (rc != 0){LOG_E("Net connect error(%d)", rc);goto _mqtt_restart;}rc = MQTTConnect(c);if (rc != 0){LOG_E("MQTT connect error(%d): %s", rc, MQTTSerialize_connack_string(rc));goto _mqtt_restart;}LOG_I("MQTT server connect success");for (i = 0; i < MAX_MESSAGE_HANDLERS; i++){const char *topic = c->messageHandlers[i].topicFilter;enum QoS qos = c->messageHandlers[i].qos;if (topic == RT_NULL)continue;rc = MQTTSubscribe(c, topic, qos);LOG_I("Subscribe #%d %s %s!", i, topic, (rc < 0) || (rc == 0x80) ? ("fail") : ("OK"));if (rc != 0){if (rc == 0x80){LOG_E("QoS config err!");}goto _mqtt_disconnect;}}if (c->online_callback){c->online_callback(c);}c->tick_ping = rt_tick_get();while (1){int res;rt_tick_t tick_now;fd_set readset;struct timeval timeout;tick_now = rt_tick_get();if (((tick_now - c->tick_ping) / RT_TICK_PER_SECOND) > (c->keepAliveInterval - 5)){timeout.tv_sec = 1;//LOG_D("tick close to ping.");}else{timeout.tv_sec = c->keepAliveInterval - 10 - (tick_now - c->tick_ping) / RT_TICK_PER_SECOND;//LOG_D("timeount for ping: %d", timeout.tv_sec);}timeout.tv_usec = 0;FD_ZERO(&readset);FD_SET(c->sock, &readset);FD_SET(c->pub_pipe[0], &readset);/* int select(maxfdp1, readset, writeset, exceptset, timeout); */res = select(((c->pub_pipe[0] > c->sock) ? c->pub_pipe[0] : c->sock) + 1,&readset, RT_NULL, RT_NULL, &timeout);if (res == 0){len = MQTTSerialize_pingreq(c->buf, c->buf_size);rc = sendPacket(c, len);if (rc != 0){LOG_E("[%d] send ping rc: %d ", rt_tick_get(), rc);goto _mqtt_disconnect;}/* wait Ping Response. */timeout.tv_sec = 5;timeout.tv_usec = 0;FD_ZERO(&readset);FD_SET(c->sock, &readset);res = select(c->sock + 1, &readset, RT_NULL, RT_NULL, &timeout);if (res <= 0){LOG_E("[%d] wait Ping Response res: %d", rt_tick_get(), res);goto _mqtt_disconnect;}} /* res == 0: timeount for ping. */if (res < 0){LOG_E("select res: %d", res);goto _mqtt_disconnect;}if (FD_ISSET(c->sock, &readset)){//LOG_D("sock FD_ISSET");rc_t = MQTT_cycle(c);//LOG_D("sock FD_ISSET rc_t : %d", rc_t);if (rc_t < 0) goto _mqtt_disconnect;continue;}if (FD_ISSET(c->pub_pipe[0], &readset)){MQTTMessage *message;MQTTString topic = MQTTString_initializer;//LOG_D("pub_sock FD_ISSET");len = read(c->pub_pipe[0], c->readbuf, c->readbuf_size);if (len < sizeof(MQTTMessage)){c->readbuf[len] = '\0';LOG_D("pub_sock recv %d byte: %s", len, c->readbuf);if (strcmp((const char *)c->readbuf, "DISCONNECT") == 0){LOG_D("DISCONNECT");goto _mqtt_disconnect_exit;}continue;}message = (MQTTMessage *)c->readbuf;message->payload = c->readbuf + sizeof(MQTTMessage);topic.cstring = (char *)c->readbuf + sizeof(MQTTMessage) + message->payloadlen;//LOG_D("pub_sock topic:%s, payloadlen:%d", topic.cstring, message->payloadlen);len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,topic, (unsigned char *)message->payload, message->payloadlen);if (len <= 0){LOG_D("MQTTSerialize_publish len: %d", len);goto _mqtt_disconnect;}if ((rc = sendPacket(c, len)) != PAHO_SUCCESS) // send the subscribe packet{LOG_D("MQTTSerialize_publish sendPacket rc: %d", rc);goto _mqtt_disconnect;}} /* pbulish sock handler. */} /* while (1) */_mqtt_disconnect:MQTTDisconnect(c);
_mqtt_restart:if (c->offline_callback){c->offline_callback(c);}net_disconnect(c);rt_thread_delay(RT_TICK_PER_SECOND * 5);LOG_D("restart!");goto _mqtt_start;_mqtt_disconnect_exit:MQTTDisconnect(c);net_disconnect(c);_mqtt_exit:LOG_D("thread exit");return;
}
mqtt_sub_callback()
mqtt 订阅回调函数,订阅后进行信息打印。
static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
{*((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';LOG_D("mqtt sub default callback: %.*s %.*s",msg_data->topicName->lenstring.len,msg_data->topicName->lenstring.data,msg_data->message->payloadlen,(char *)msg_data->message->payload);return;
}
mqtt_sub_default_callback()
订阅默认回调函数,上面的订阅函数没有订阅成功时,会使用默认的订阅,同时调用该回调函数。
static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
{*((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';LOG_D("mqtt sub default callback: %.*s %.*s",msg_data->topicName->lenstring.len,msg_data->topicName->lenstring.data,msg_data->message->payloadlen,(char *)msg_data->message->payload);return;
}
mqtt_connect_callback()
这是 mqtt 线程会执行到的一个回调函数,当 mqtt 开始尝试连接时会自动调用。
static void mqtt_connect_callback(MQTTClient *c)
{LOG_I("Start to connect mqtt server");
}
mqtt_online_callback()
这是 mqtt 线程会执行到的一个回调函数,当 mqtt 成功连接时会自动调用。
static void mqtt_online_callback(MQTTClient *c)
{LOG_D("Connect mqtt server success");LOG_D("Publish message: Hello,RT-Thread! to topic: %s", sup_pub_topic);mq_publish("Hello,RT-Thread!");
}
mq_publish()
在 mqtt_online_callback() 中,调用了 mq_publish() 函数,里面完成了信息的包装和发送,发送到订阅的主题。
static void mq_publish(const char *send_str)
{MQTTMessage message;const char *msg_str = send_str;const char *topic = sup_pub_topic;message.qos = QOS1;message.retained = 0;message.payload = (void *)msg_str;message.payloadlen = strlen(message.payload);MQTTPublish(&client, topic, &message);return;
}
MQTTPublish()
更底层的 mqtt 信息发送函数,对 message 的成员进行封包,然后调用最底层的发送函数(write())发送数据。
/*** This function publish message to specified mqtt topic.* [MQTTMessage] + [payload] + [topic] + '\0'** @param c the pointer of MQTT context structure* @param topicFilter topic filter name* @param message the pointer of MQTTMessage structure** @return the error code, 0 on subscribe successfully.*/
int MQTTPublish(MQTTClient *c, const char *topicName, MQTTMessage *message)
{int rc = PAHO_FAILURE;int len, msg_len;char *data = 0;if (!c->isconnected)goto exit;msg_len = sizeof(MQTTMessage) + message->payloadlen + strlen(topicName) + 1;data = rt_malloc(msg_len);if (!data)goto exit;memcpy(data, message, sizeof(MQTTMessage));memcpy(data + sizeof(MQTTMessage), message->payload, message->payloadlen);strcpy(data + sizeof(MQTTMessage) + message->payloadlen, topicName);len = MQTT_local_send(c, data, msg_len);if (len == msg_len){rc = 0;}//LOG_D("MQTTPublish sendto %d", len);exit:if (data)rt_free(data);return rc;
}
mqtt_offline_callback()
这是 mqtt 线程会执行到的一个回调函数,当 mqtt 连接断开时会自动调用。
static void mqtt_offline_callback(MQTTClient *c)
{LOG_I("Disconnect from mqtt server");
}
LOG_D()
本实验中,我们可以将 LOG_D()
视为 rt_kprintf()
,
#define dbg_log_line(lvl, color_n, fmt, ...) \do \{ \_DBG_LOG_HDR(lvl, color_n); \rt_kprintf(fmt, ##__VA_ARGS__); \_DBG_LOG_X_END; \} \while (0)
LOG_D 是 RT-Thread 内核里的一个日志打印函数,详情可见:《RT-Thread 文档中心——ulog 日志》
RT-Thread 的日志 API 包括:
潘多拉 IOT 开发板学习(RT-Thread)—— 实验19 MQTT 协议通信实验(学习笔记)相关推荐
- 潘多拉 IOT 开发板学习(RT-Thread)—— 实验16 WiFi 模块实验(学习笔记)
本文代码参考 RT-Thread 官方 BSP 文章目录 实验功能 代码剖析 rt_hw_wlan_wait_init_done() LOG_D() rt_wlan_scan_sync() rt_wl ...
- 小熊派IoT开发板系列教程正式发布——免费学习
小熊派介绍 小熊派IoT开发板一款由南京厚德物联网有限公司联合华为技术有限公司基于STM32L431RCT6设计的高性能物联网开发板.开发板充分考虑物联网感知层设备的多样性,具有强大的可扩展性,用于提 ...
- 2021物联网开发学习——基于小熊派IoT开发板Bear-Pi-IOT、E53_IA1_智慧农业拓展板与小熊派-鸿蒙·季Bear-Pi-HM Nano并接入Hi-Link
2021物联网开发学习--基于小熊派IoT开发板Bear-Pi-IOT.E53_IA1_智慧农业拓展板与小熊派-鸿蒙·季Bear-Pi-HM Nano并接入Hi-Link 目录 作者介绍 目的 用到的 ...
- 华为认证物联网开发利器:小熊派IoT开发板
今年8月份我和小伙伴们协同研发的基于NB-IoT的智慧路灯监控系统有幸入选华为开发者大赛IoT赛道决赛,决赛期间留意到70%以上的个人/学生开发者团队都使用到华为认证(匹配HCIP:华为认证ICT高级 ...
- 鸿蒙IOT开发板 小熊派上手体验
鸿蒙IOT开发板 小熊派上手体验 一.简介 二. 上手搭建开发环境 1. 准备开发工具 2. 使用VMWare 创建虚拟机 4. 设置磁盘映射 5. 在ubuntu里获取源码 6. 编译代码 三.连接 ...
- 树莓派Pico W无线开发板MQTT协议通信MicroPython编程实践
本博文介绍采用Thonny+MicroPython和umqtt.simple库MQTTClient类的对象方法编制树莓派Pico W无线开发板MQTT协议通信程序,将Pico W无线开发板.电脑或An ...
- 【鸿蒙】HiSpark Wifi IOT开发板资料汇总
开发环境搭建 https://device.harmonyos.com/cn/docs/start/introduce/oem_start_guide-0000001054913231 HUAWEI ...
- 【华为云技术分享】小熊派IoT开发板华为物联网操作系统LiteOS内核实战教程01-IoT-Studio介绍及安装
1. 物联网一站式开发工具 -- IoT Studio IoT Studio 是支持 LiteOS 嵌入式系统软件开发的工具,提供了代码编辑.编译.烧录 及调试等一站式开发体验,支持 C.C++.汇编 ...
- stm32l0的停止模式怎么唤醒_探索者 STM32F407 开发板资料连载第二十二章 待机唤醒实验
1)实验平台:alientek 阿波罗 STM32F767 开发板 2)摘自<STM32F7 开发指南(HAL 库版)>关注官方微信号公众号,获取更多资料:正点原子 第二十二章 待机唤醒实 ...
最新文章
- 玩不转大数据就别勉强了,或许“小数据”才是真正的终南捷径
- 15个超实用的php正则表达式
- (SpringMVC)拦截器
- HTTP 知识点之一:头部解释(转)
- Android魔法(第二弹)——一步步实现淹没、展开效果
- 一旦有辞职念头就干不长了吗_每天都有辞职不想上班的冲动,你有吗?
- C语言实现任意两种进制之间互相转换
- 微信支付(PC扫码支付和H5公众号支付)
- 将OSM地图转化成OpenDRIVE
- Adobe Premiere(pr)2021 安装教程【64位】
- xp oracle10闪退,cad2010win10闪退怎么办_win10cad2010打开就闪退的解决方法
- 读论文,第十五天:FingerPing: Recognizing Fine-grained Hand Poses using Active Acoustic On-body Sensing
- 微信小程序列表首字母排序并根据字母定位
- c语言大学程序设计题库,大连理工大学c语言...程序设计题库.doc
- 软件测试和硬件测试的区别及概念
- 【低功耗蓝牙BLE】连接事件和相关参数
- Ubuntu18.04设置root密码(初始密码)
- golang及beego框架单元测试小结
- Cocos Creator生成方形
- map,hash_map和unordered_map效率比较
热门文章
- idea页面只能显示一个项目,idea使用教程——一个窗口中同时打开多个项目
- SpringBoot中使用Mybatis逆向工程(实体类含数据库注释)
- 阿里云2018开启双11百团大战 1折购买服务器瓜分红包
- 嵌入式项目烂尾的真实原因
- JS 利用vue过滤器将阿拉伯数字转化为汉字
- windows子系统(WSL)与本地互相访问;挂载U盘
- 招聘直播方案应该如何策划?企业应该如何管理直播招聘?
- 计算机毕业设计成品基于Uniapp+SSM实现的设备预约管理
- Cadence OrCAD Capture复用参考设计时保持参考设计编号不变的情况下自动编号的方法
- 写一段51单片机的避障小车代码