本文代码参考 RT-Thread 官方 BSP

文章目录

  • 实验功能
  • 代码剖析
    • rt_wlan_register_event_handler()
    • mq_start()
    • mqtt_sub_callback()
    • mqtt_sub_default_callback()
    • mqtt_connect_callback()
    • mqtt_online_callback()
    • mqtt_offline_callback()
    • LOG_D()

实验功能

例程源码:(main函数)

该实验实现的功能:WiFi 初始化后创建 mqtt 客户端,然后开启 WiFi 自动连接(WiFi 底层代码本文不研究)。在网络连接上的情况下,mqtt 客户端会订阅一个主题,同时向主题发送和接收数据。

int main(void)
{/* 注册 wlan 回调函数 */rt_wlan_register_event_handler(RT_WLAN_EVT_READY, (void (*)(int, struct rt_wlan_buff *, void *))mq_start, RT_NULL);/* 初始化自动连接功能 */wlan_autoconnect_init();/* 使能 wlan 自动连接 */rt_wlan_config_autoreconnect(RT_TRUE);
}

代码剖析

rt_wlan_register_event_handler()

Wlan 事件回调注册函数,其实就是给 event_tabe[event] 的 handler() 和 parameter 成员赋值。

rt_err_t rt_wlan_register_event_handler(rt_wlan_event_t event, rt_wlan_event_handler handler, void *parameter)
{rt_base_t level;if (event >= RT_WLAN_EVT_MAX){return RT_EINVAL;}RT_WLAN_LOG_D("%s is run event:%d", __FUNCTION__, event);MGNT_LOCK();/* Registering Callbacks */level = rt_hw_interrupt_disable();event_tab[event].handler = handler;event_tab[event].parameter = parameter;rt_hw_interrupt_enable(level);MGNT_UNLOCK();return RT_EOK;
}

mq_start()

mqtt 初始化函数,主要配置了一些参数和回调函数,最后调用 paho_mqtt_start() 启动 mqtt 客户端。

/* 创建与配置 mqtt 客户端 */
static void mq_start(void)
{/* 初始 condata 参数 */MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;static char cid[20] = {0};static int is_started = 0;if (is_started){return;}/* 配置 MQTT 文本参数 */{client.isconnected = 0;client.uri = MQTT_URI;/* 生成随机客户端 ID */rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());rt_snprintf(sup_pub_topic, sizeof(sup_pub_topic), "%s%s", MQTT_PUBTOPIC, cid);/* 配置连接参数 */memcpy(&client.condata, &condata, sizeof(condata));client.condata.clientID.cstring = cid;client.condata.keepAliveInterval = 60;client.condata.cleansession = 1;client.condata.username.cstring = MQTT_USERNAME;client.condata.password.cstring = MQTT_PASSWORD;/* 配置 mqtt 参数 */client.condata.willFlag = 0;client.condata.will.qos = 1;client.condata.will.retained = 0;client.condata.will.topicName.cstring = sup_pub_topic;client.buf_size = client.readbuf_size = 1024;client.buf = malloc(client.buf_size);client.readbuf = malloc(client.readbuf_size);if (!(client.buf && client.readbuf)){LOG_E("no memory for MQTT client buffer!");goto _exit;}/* 设置事件回调 */client.connect_callback = mqtt_connect_callback;client.online_callback = mqtt_online_callback;client.offline_callback = mqtt_offline_callback;/* 设置要订阅的 topic 和 topic 对应的回调函数 */client.messageHandlers[0].topicFilter = sup_pub_topic;client.messageHandlers[0].callback = mqtt_sub_callback;client.messageHandlers[0].qos = QOS1;/* 设置默认订阅回调函数 */client.defaultMessageHandler = mqtt_sub_default_callback;}/* 启动 MQTT 客户端 */LOG_D("Start mqtt client and subscribe topic:%s", sup_pub_topic);paho_mqtt_start(&client);is_started = 1;_exit:return;
}

paho_mqtt_start()

paho_mqtt 客户端启动函数,该函数创建了一个静态线程,线程启动后,会执行 paho_mqtt_thread() 函数,

int paho_mqtt_start(MQTTClient *client)
{rt_err_t result;rt_thread_t tid;int stack_size = RT_PKG_MQTT_THREAD_STACK_SIZE;int priority = RT_THREAD_PRIORITY_MAX / 3;char *stack;static int is_started = 0;if (is_started){LOG_D("paho mqtt has already started!");return 0;}    tid = rt_malloc(RT_ALIGN(sizeof(struct rt_thread), 8) + stack_size);if (!tid){LOG_E("no memory for thread: MQTT");return -1;}stack = (char *)tid + RT_ALIGN(sizeof(struct rt_thread), 8);result = rt_thread_init(tid,"MQTT",paho_mqtt_thread, client, // fun, parameterstack, stack_size,        // stack, sizepriority, 2               //priority, tick);if (result == RT_EOK){rt_thread_startup(tid);is_started = 1;}return 0;
}

paho_mqtt_thread()

paho_mqtt 线程入口函数,里面会执行 mqtt 连接回调函数 connect_callback()、连接成功回调函数 online_callback() 和 连接断开回调函数 offline_callback(),这些函数都已经在 main.c 中定义。

static void paho_mqtt_thread(void *param)
{MQTTClient *c = (MQTTClient *)param;int i, rc, len;int rc_t = 0;/* create publish pipe. */if (pipe(c->pub_pipe) != 0){LOG_E("creat pipe err");goto _mqtt_exit;}_mqtt_start:if (c->connect_callback){c->connect_callback(c);}rc = net_connect(c);if (rc != 0){LOG_E("Net connect error(%d)", rc);goto _mqtt_restart;}rc = MQTTConnect(c);if (rc != 0){LOG_E("MQTT connect error(%d): %s", rc, MQTTSerialize_connack_string(rc));goto _mqtt_restart;}LOG_I("MQTT server connect success");for (i = 0; i < MAX_MESSAGE_HANDLERS; i++){const char *topic = c->messageHandlers[i].topicFilter;enum QoS qos = c->messageHandlers[i].qos;if (topic == RT_NULL)continue;rc = MQTTSubscribe(c, topic, qos);LOG_I("Subscribe #%d %s %s!", i, topic, (rc < 0) || (rc == 0x80) ? ("fail") : ("OK"));if (rc != 0){if (rc == 0x80){LOG_E("QoS config err!");}goto _mqtt_disconnect;}}if (c->online_callback){c->online_callback(c);}c->tick_ping = rt_tick_get();while (1){int res;rt_tick_t tick_now;fd_set readset;struct timeval timeout;tick_now = rt_tick_get();if (((tick_now - c->tick_ping) / RT_TICK_PER_SECOND) > (c->keepAliveInterval - 5)){timeout.tv_sec = 1;//LOG_D("tick close to ping.");}else{timeout.tv_sec = c->keepAliveInterval - 10 - (tick_now - c->tick_ping) / RT_TICK_PER_SECOND;//LOG_D("timeount for ping: %d", timeout.tv_sec);}timeout.tv_usec = 0;FD_ZERO(&readset);FD_SET(c->sock, &readset);FD_SET(c->pub_pipe[0], &readset);/* int select(maxfdp1, readset, writeset, exceptset, timeout); */res = select(((c->pub_pipe[0] > c->sock) ? c->pub_pipe[0] : c->sock) + 1,&readset, RT_NULL, RT_NULL, &timeout);if (res == 0){len = MQTTSerialize_pingreq(c->buf, c->buf_size);rc = sendPacket(c, len);if (rc != 0){LOG_E("[%d] send ping rc: %d ", rt_tick_get(), rc);goto _mqtt_disconnect;}/* wait Ping Response. */timeout.tv_sec = 5;timeout.tv_usec = 0;FD_ZERO(&readset);FD_SET(c->sock, &readset);res = select(c->sock + 1, &readset, RT_NULL, RT_NULL, &timeout);if (res <= 0){LOG_E("[%d] wait Ping Response res: %d", rt_tick_get(), res);goto _mqtt_disconnect;}} /* res == 0: timeount for ping. */if (res < 0){LOG_E("select res: %d", res);goto _mqtt_disconnect;}if (FD_ISSET(c->sock, &readset)){//LOG_D("sock FD_ISSET");rc_t = MQTT_cycle(c);//LOG_D("sock FD_ISSET rc_t : %d", rc_t);if (rc_t < 0)    goto _mqtt_disconnect;continue;}if (FD_ISSET(c->pub_pipe[0], &readset)){MQTTMessage *message;MQTTString topic = MQTTString_initializer;//LOG_D("pub_sock FD_ISSET");len = read(c->pub_pipe[0], c->readbuf, c->readbuf_size);if (len < sizeof(MQTTMessage)){c->readbuf[len] = '\0';LOG_D("pub_sock recv %d byte: %s", len, c->readbuf);if (strcmp((const char *)c->readbuf, "DISCONNECT") == 0){LOG_D("DISCONNECT");goto _mqtt_disconnect_exit;}continue;}message = (MQTTMessage *)c->readbuf;message->payload = c->readbuf + sizeof(MQTTMessage);topic.cstring = (char *)c->readbuf + sizeof(MQTTMessage) + message->payloadlen;//LOG_D("pub_sock topic:%s, payloadlen:%d", topic.cstring, message->payloadlen);len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,topic, (unsigned char *)message->payload, message->payloadlen);if (len <= 0){LOG_D("MQTTSerialize_publish len: %d", len);goto _mqtt_disconnect;}if ((rc = sendPacket(c, len)) != PAHO_SUCCESS) // send the subscribe packet{LOG_D("MQTTSerialize_publish sendPacket rc: %d", rc);goto _mqtt_disconnect;}} /* pbulish sock handler. */} /* while (1) */_mqtt_disconnect:MQTTDisconnect(c);
_mqtt_restart:if (c->offline_callback){c->offline_callback(c);}net_disconnect(c);rt_thread_delay(RT_TICK_PER_SECOND * 5);LOG_D("restart!");goto _mqtt_start;_mqtt_disconnect_exit:MQTTDisconnect(c);net_disconnect(c);_mqtt_exit:LOG_D("thread exit");return;
}

mqtt_sub_callback()

mqtt 订阅回调函数,订阅后进行信息打印。

static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
{*((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';LOG_D("mqtt sub default callback: %.*s %.*s",msg_data->topicName->lenstring.len,msg_data->topicName->lenstring.data,msg_data->message->payloadlen,(char *)msg_data->message->payload);return;
}

mqtt_sub_default_callback()

订阅默认回调函数,上面的订阅函数没有订阅成功时,会使用默认的订阅,同时调用该回调函数。

static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
{*((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';LOG_D("mqtt sub default callback: %.*s %.*s",msg_data->topicName->lenstring.len,msg_data->topicName->lenstring.data,msg_data->message->payloadlen,(char *)msg_data->message->payload);return;
}

mqtt_connect_callback()

这是 mqtt 线程会执行到的一个回调函数,当 mqtt 开始尝试连接时会自动调用。

static void mqtt_connect_callback(MQTTClient *c)
{LOG_I("Start to connect mqtt server");
}

mqtt_online_callback()

这是 mqtt 线程会执行到的一个回调函数,当 mqtt 成功连接时会自动调用。

static void mqtt_online_callback(MQTTClient *c)
{LOG_D("Connect mqtt server success");LOG_D("Publish message: Hello,RT-Thread! to topic: %s", sup_pub_topic);mq_publish("Hello,RT-Thread!");
}

mq_publish()

在 mqtt_online_callback() 中,调用了 mq_publish() 函数,里面完成了信息的包装和发送,发送到订阅的主题。

static void mq_publish(const char *send_str)
{MQTTMessage message;const char *msg_str = send_str;const char *topic = sup_pub_topic;message.qos = QOS1;message.retained = 0;message.payload = (void *)msg_str;message.payloadlen = strlen(message.payload);MQTTPublish(&client, topic, &message);return;
}

MQTTPublish()

更底层的 mqtt 信息发送函数,对 message 的成员进行封包,然后调用最底层的发送函数(write())发送数据。

/*** This function publish message to specified mqtt topic.* [MQTTMessage] + [payload] + [topic] + '\0'** @param c the pointer of MQTT context structure* @param topicFilter topic filter name* @param message the pointer of MQTTMessage structure** @return the error code, 0 on subscribe successfully.*/
int MQTTPublish(MQTTClient *c, const char *topicName, MQTTMessage *message)
{int rc = PAHO_FAILURE;int len, msg_len;char *data = 0;if (!c->isconnected)goto exit;msg_len = sizeof(MQTTMessage) + message->payloadlen + strlen(topicName) + 1;data = rt_malloc(msg_len);if (!data)goto exit;memcpy(data, message, sizeof(MQTTMessage));memcpy(data + sizeof(MQTTMessage), message->payload, message->payloadlen);strcpy(data + sizeof(MQTTMessage) + message->payloadlen, topicName);len = MQTT_local_send(c, data, msg_len);if (len == msg_len){rc = 0;}//LOG_D("MQTTPublish sendto %d", len);exit:if (data)rt_free(data);return rc;
}

mqtt_offline_callback()

这是 mqtt 线程会执行到的一个回调函数,当 mqtt 连接断开时会自动调用。

static void mqtt_offline_callback(MQTTClient *c)
{LOG_I("Disconnect from mqtt server");
}

LOG_D()

本实验中,我们可以将 LOG_D() 视为 rt_kprintf()

#define dbg_log_line(lvl, color_n, fmt, ...)                \do                                                      \{                                                       \_DBG_LOG_HDR(lvl, color_n);                         \rt_kprintf(fmt, ##__VA_ARGS__);                     \_DBG_LOG_X_END;                                     \}                                                       \while (0)

LOG_D 是 RT-Thread 内核里的一个日志打印函数,详情可见:《RT-Thread 文档中心——ulog 日志》

RT-Thread 的日志 API 包括:

潘多拉 IOT 开发板学习(RT-Thread)—— 实验19 MQTT 协议通信实验(学习笔记)相关推荐

  1. 潘多拉 IOT 开发板学习(RT-Thread)—— 实验16 WiFi 模块实验(学习笔记)

    本文代码参考 RT-Thread 官方 BSP 文章目录 实验功能 代码剖析 rt_hw_wlan_wait_init_done() LOG_D() rt_wlan_scan_sync() rt_wl ...

  2. 小熊派IoT开发板系列教程正式发布——免费学习

    小熊派介绍 小熊派IoT开发板一款由南京厚德物联网有限公司联合华为技术有限公司基于STM32L431RCT6设计的高性能物联网开发板.开发板充分考虑物联网感知层设备的多样性,具有强大的可扩展性,用于提 ...

  3. 2021物联网开发学习——基于小熊派IoT开发板Bear-Pi-IOT、E53_IA1_智慧农业拓展板与小熊派-鸿蒙·季Bear-Pi-HM Nano并接入Hi-Link

    2021物联网开发学习--基于小熊派IoT开发板Bear-Pi-IOT.E53_IA1_智慧农业拓展板与小熊派-鸿蒙·季Bear-Pi-HM Nano并接入Hi-Link 目录 作者介绍 目的 用到的 ...

  4. 华为认证物联网开发利器:小熊派IoT开发板

    今年8月份我和小伙伴们协同研发的基于NB-IoT的智慧路灯监控系统有幸入选华为开发者大赛IoT赛道决赛,决赛期间留意到70%以上的个人/学生开发者团队都使用到华为认证(匹配HCIP:华为认证ICT高级 ...

  5. 鸿蒙IOT开发板 小熊派上手体验

    鸿蒙IOT开发板 小熊派上手体验 一.简介 二. 上手搭建开发环境 1. 准备开发工具 2. 使用VMWare 创建虚拟机 4. 设置磁盘映射 5. 在ubuntu里获取源码 6. 编译代码 三.连接 ...

  6. 树莓派Pico W无线开发板MQTT协议通信MicroPython编程实践

    本博文介绍采用Thonny+MicroPython和umqtt.simple库MQTTClient类的对象方法编制树莓派Pico W无线开发板MQTT协议通信程序,将Pico W无线开发板.电脑或An ...

  7. 【鸿蒙】HiSpark Wifi IOT开发板资料汇总

    开发环境搭建 https://device.harmonyos.com/cn/docs/start/introduce/oem_start_guide-0000001054913231 HUAWEI ...

  8. 【华为云技术分享】小熊派IoT开发板华为物联网操作系统LiteOS内核实战教程01-IoT-Studio介绍及安装

    1. 物联网一站式开发工具 -- IoT Studio IoT Studio 是支持 LiteOS 嵌入式系统软件开发的工具,提供了代码编辑.编译.烧录 及调试等一站式开发体验,支持 C.C++.汇编 ...

  9. stm32l0的停止模式怎么唤醒_探索者 STM32F407 开发板资料连载第二十二章 待机唤醒实验

    1)实验平台:alientek 阿波罗 STM32F767 开发板 2)摘自<STM32F7 开发指南(HAL 库版)>关注官方微信号公众号,获取更多资料:正点原子 第二十二章 待机唤醒实 ...

最新文章

  1. 玩不转大数据就别勉强了,或许“小数据”才是真正的终南捷径
  2. 15个超实用的php正则表达式
  3. (SpringMVC)拦截器
  4. HTTP 知识点之一:头部解释(转)
  5. Android魔法(第二弹)——一步步实现淹没、展开效果
  6. 一旦有辞职念头就干不长了吗_每天都有辞职不想上班的冲动,你有吗?
  7. C语言实现任意两种进制之间互相转换
  8. 微信支付(PC扫码支付和H5公众号支付)
  9. 将OSM地图转化成OpenDRIVE
  10. Adobe Premiere(pr)2021 安装教程【64位】
  11. xp oracle10闪退,cad2010win10闪退怎么办_win10cad2010打开就闪退的解决方法
  12. 读论文,第十五天:FingerPing: Recognizing Fine-grained Hand Poses using Active Acoustic On-body Sensing
  13. 微信小程序列表首字母排序并根据字母定位
  14. c语言大学程序设计题库,大连理工大学c语言...程序设计题库.doc
  15. 软件测试和硬件测试的区别及概念
  16. 【低功耗蓝牙BLE】连接事件和相关参数
  17. Ubuntu18.04设置root密码(初始密码)
  18. golang及beego框架单元测试小结
  19. Cocos Creator生成方形
  20. map,hash_map和unordered_map效率比较

热门文章

  1. idea页面只能显示一个项目,idea使用教程——一个窗口中同时打开多个项目
  2. SpringBoot中使用Mybatis逆向工程(实体类含数据库注释)
  3. 阿里云2018开启双11百团大战 1折购买服务器瓜分红包
  4. 嵌入式项目烂尾的真实原因
  5. JS 利用vue过滤器将阿拉伯数字转化为汉字
  6. windows子系统(WSL)与本地互相访问;挂载U盘
  7. 招聘直播方案应该如何策划?企业应该如何管理直播招聘?
  8. 计算机毕业设计成品基于Uniapp+SSM实现的设备预约管理
  9. Cadence OrCAD Capture复用参考设计时保持参考设计编号不变的情况下自动编号的方法
  10. 写一段51单片机的避障小车代码