最近真是的好一个劲的折腾,算是完全搞明白了如何在STM32上实现MQTT协议了。

目录 [显示]

一、本教程中说明的内容

先说说本文化的适用范围吧:

一、使用的芯片是STM32F103C8T6,但是并没有任何与平台相关的代码,应该在所有STM32芯片中都是可以用的。

二、本文使用的是SIM800C模块,驱动是用C++实现的,基于ARM mbed平台写的。但是从原理上来讲,C和C++差别不大,本文的代码经过修改也可以直接用于其他平台的使用。

三、本文数据传输使用的是“透传模式”,对于所有的透传模块,本文都有很大的参考意义。

二、MQTT的使用

首先,推荐一个MQTT的库:Paho,这个库支持非常多的平台,当然也包括了嵌入式平台:GitHub – paho.mqtt.embedded-c。将该库中的MQTTPacket文件夹下载下来,MQTTPacket文件夹下面主要有三个文件夹,我们使用的文件主要集中在src文件夹和samples文件夹中。

src文件夹中存放着MQTT核心功能的代码,而samples中存放着三个例子:pub00sub1、pub0sub1_nc、qos0pub和网络驱动(transport.c和transport.h)。

由于三个驱动都有一个main函数,所以无法同时存在,本文中只使用了pub0sub,所以将此文件夹内容精减到只有pub0sub1.c、transport.c、transport.h三个文件。

当然,在实现使用时可能会改变目录结构,使目录结构更加清楚,可以根据自己的喜好来进行更改,并不影响使用。

将transport.h的内容精减到以下内容:

1

2

3

4

int transport_sendPacketBuffer(unsigned char* buf, int buflen);

int transport_getdata(unsigned char* buf, int count);

int transport_open(char* host, int port);

int transport_close();

主要的工作有:

1、为了方便表示,删除了版权信息,有实际使用时请保留。

2、没有使用pub0sub1_nc这个例子,所以将transport_getdatanb方法去除。

3、透传模块中使用不到socket,所以将与socket相关的参数去掉。

这些方法实现的主要功能是:

1、transport_open的作用是初始化模块连网的信息、transport_close作用是关闭链接。

2、transport_sendPacketBuffer用于发送数据、transport_getdata用于接收数据。

然后用transport.c来实现transport.h中声明的4个函数。

三、在ARM mbed中使用MQTT

首先说句题外话,自我感觉mbed是一个非常不错的平台,很大程度上提高了代码的可重用性。但也有一个问题,就是其支持是以开发板为单位的,所以并不是对每一种芯片的支持都很好。

首先介绍一个例子,HelloMQTT – a mercurial repository | mbed。但这个例子其中有很多不完善的地方,而且该例子使用的网络驱动也不是GPRS模块。

如果要用不同的连网方式,那么就写一个驱动,驱动中至少要包含以下两个方法:

1

2

int read(unsigned char* buffer, int len, int timeout);

int write(unsigned char* buffer, int len, int timeout);

这两个方法会在MQTTClient中自动调用,timeout表示毫秒。返回值为读或写的字节数。

对此,我写了驱动程序:

MQTTGRPSEthernet.h

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

#pragma once

#if !defined(MQTTGPRSETHERNET_H)

#define MQTTGPRSETHERNET_H

#define DEFAULT_GPRS_TIMEOUT 6000000  //6s

#define SERIAL_BUFFER_SIZE 256

#include "mbed.h"

class MQTTGPRSEthernet

{

public:

MQTTGPRSEthernet(PinName tx, PinName rx, int baudrate = 115200);

~MQTTGPRSEthernet();

bool initNet(const char* apn, const char* userName = "", const char* passWord = "", int timeout = DEFAULT_GPRS_TIMEOUT, bool isReconnect = false);

bool connect(char* hostname, int port, int timeout = DEFAULT_GPRS_TIMEOUT);

int read_line(char* buffer, int timeout = DEFAULT_GPRS_TIMEOUT);

int read(unsigned char* buffer, int len, int timeout = DEFAULT_GPRS_TIMEOUT);

int write(unsigned char* buffer, int len, int timeout = DEFAULT_GPRS_TIMEOUT);

bool disconnect();

private:

bool initNet();

Serial eth;

bool command(const char* cmd, const char* ack = "");

bool connected = false;

bool initialized = false;

char* localIP;

const char *_apn;

const char *_passWord;

const char *_userName;

};

#endif

MQTTGRPSEthernet.cpp

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

#include "MQTTGPRSEthernet.h"

#include "StringHelper.h"

#ifdef DEBUG

Serial pc(PB_10, PB_11, 115200);

#define LOG(args...)    pc.printf(args)

#define LOGC(args...)   pc.putc(args)

#else

#define LOG(args...)    (args)

#define LOGC(args...)   (args)

#endif // DEBUG

//还需要增加一个表示已经初始化成功的变量

//后续可能要增加表示6条连接的东东,如果host,port,连接状态等

MQTTGPRSEthernet::MQTTGPRSEthernet(PinName tx, PinName rx, int baudrate)

: eth(tx, rx, baudrate){}

bool MQTTGPRSEthernet::initNet(const char* apn, const char* userName, const char* passWord, int timeout, bool isReconnect)

{

_apn = apn;

_userName = userName;

_passWord = passWord;

command("ATE0");

wait_ms(800);

command("AT+CIPMUX=0");

wait_ms(800);

//检查 GPRS 附着状态

while (!command("AT+CGATT?\r\n", "+CGATT: 1"))

{

LOG("GPRS NOT ATTACHED!\r\n");

wait_ms(1200);

eth.printf("+++");

wait_ms(1200);

command("AT");

command("AT+CIPSHUT\r\n");

}

//Select multiple connection

//单链路模式

//command("AT+CIPMUX=0");

//wait_ms(800);

//透传模式

command("AT+CIPMODE=1");

wait_ms(800);

command("AT+CIPCCFG=5,2,1024,1,0,1460,50");

wait_ms(800);

// Set APN

command(StringHelper::Format("AT+CSTT=\"%s\",\"%s\",\"%s\"", apn, userName, passWord));

wait_ms(800);

LOG(StringHelper::Format("AT+CSTT=\"%s\",\"%s\",\"%s\"\r\n", apn, userName, passWord));

uint32_t start = us_ticker_read();

do

{

// Brings up wireless connection

//建立无线链路(GPRS 或者 CSD)

bool flag = command("AT+CIICR", "OK");

// Get local IP address

eth.printf("AT+CIFSR\r\n");

char ip_addr_buf[32];

if (read_line(ip_addr_buf) <= 0) {

//LOG("failed to join network\r\n");

initialized = false;

}

else if (StringHelper::CheckIP(ip_addr_buf))

{

LOG("IP ADDRESS:");

LOG(ip_addr_buf);

LOG("\r\n");

localIP = ip_addr_buf;

initialized = true;

break;

}

else

{

initialized = false;

}

if (flag == false && initialized == false && isReconnect == false)

{

//表明此时建立无线链路返回error,得不到IP地址

//此时应使用AT+CIPSHUT关闭PDP上下文后再重新进行连接

//防止在数据状态

wait_ms(1200);

eth.printf("+++");

wait_ms(1200);

command("AT");

command("AT+CIPSHUT\r\n");

initialized = false;

return initNet(_apn, _userName, _passWord, timeout, true);

}

} while (us_ticker_read() - start < timeout);

//command("AT+CRPRXGET=0");

return initialized;

}

bool MQTTGPRSEthernet::initNet()

{

return initNet(_apn, _userName, _passWord);

}

bool MQTTGPRSEthernet::connect(char* hostname, int port, int timeout)

{

uint32_t start = us_ticker_read();

do

{

if (initialized == false)

initNet();

else

break;

} while (us_ticker_read() - start < timeout);

start = us_ticker_read();

do

{

//AT+CIPSTART=0,”TCP”,”116.228.221.51”,”8500”

char strPort[10];

itoa(port, strPort, 10);

char response[64] = { 0, };

int connectStart = us_ticker_read();

LOG(StringHelper::Format("AT+CIPSTART=TCP,%s,%s\r\n", hostname, strPort));

eth.printf(StringHelper::Format("AT+CIPSTART=TCP,%s,%s\r\n", hostname, strPort));

do

{

read_line(response);

if (strstr(response, "CONNECT") != NULL)

{

connected = true;

return connected;

}

} while (us_ticker_read() - connectStart < timeout);

} while (us_ticker_read() - start < timeout);

connected = false;

return connected;

}

//向SIM800C发送命令

//cmd:发送的命令字符串(不需要添加回车了),当cmd<0XFF的时候,发送数字(比如发送0X1A),大于的时候发送字符串.

//ack:期待的应答结果,如果为空,则表示不需要等待应答,对于那些并不关心返回结果的直接返回true

//此函数只能对应立即进行读取的情况。

//返回值:0,发送成功(得到了期待的应答结果)

//       1,发送失败

bool MQTTGPRSEthernet::command(const char* cmd, const char* ack)

{

char response[64] = { 0, };

if (StringHelper::EndWith(cmd, "\r\n"))

{

LOG(cmd);

eth.printf(cmd);

}

else if (StringHelper::EndWith(cmd, "\r"))

{

LOG(StringHelper::Add(cmd, "\n"));

eth.printf(StringHelper::Add(cmd, "\n"));

}

else

{

LOG(StringHelper::Add(cmd, "\r\n"));

eth.printf(StringHelper::Add(cmd, "\r\n"));

}

read_line(response);

if (strstr(response, ack) != NULL) {

return true;

}

return false;

}

int MQTTGPRSEthernet::read_line(char* buffer, int timeout)

{

int bytes = 0;

uint32_t start = us_ticker_read();

while (true) {

if (eth.readable()) {

char ch = eth.getc();

if ((ch == '\n' || ch == '\r') && bytes == 0)

{

//此时说明以\r\n开头,在无回显的情况下经常会出现

//此时忽略空行\r\n

continue;

}

if (ch == '\n') {

if (bytes > 0 && buffer[bytes - 1] == '\r') {

//表明读取到\r\n,此行结束

bytes--;

}

if (bytes > 0) {

buffer[bytes] = '\0';

return bytes;

}

}

else {

buffer[bytes] = ch;

bytes++;

}

}

else {

if ((uint32_t)(us_ticker_read() - start) > timeout) {

return bytes;

}

}

}

//此时表示读到最后一个字节还没有读到\n

//有两种情况,一种是len不够长,另一种是恰好读完,此时数组的末尾没有\0

return bytes;

}

int MQTTGPRSEthernet::read(unsigned char* buffer, int len, int timeout)

{

int bytes = 0;

uint32_t start = us_ticker_read();

while (bytes < len) {

if (eth.readable()) {

char ch = eth.getc();

buffer[bytes] = ch;

bytes++;

}

else {

if ((uint32_t)(us_ticker_read() - start) > timeout) {

return bytes;

}

}

}

//此时表示读到最后一个字节还没有读到\n

//有两种情况,一种是len不够长,另一种是恰好读完,此时数组的末尾没有\0

return bytes;

}

int MQTTGPRSEthernet::write(unsigned char* buffer, int len, int timeout)

{

uint32_t start = us_ticker_read();

while (!connected)

{

if ((us_ticker_read() - start) > timeout)

{

return -1;

}

initNet();

}

for (size_t i = 0; i < len; i++)

{

LOGC(buffer[i]);

eth.putc(buffer[i]);

}

return len;

}

bool MQTTGPRSEthernet::disconnect()

{

wait_ms(1200);

eth.printf("+++");

wait_ms(1200);

eth.printf("AT\r\n");

eth.printf("AT+CIPSHUT\r\n");

connected = false;

return true;

}

MQTTGPRSEthernet::~MQTTGPRSEthernet()

{

if (connected)

{

disconnect();

}

}

主程序

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

#include "mbed.h"

#define APN         "uninet"

#define USERNAME    NULL

#define PASSWORD    NULL

#define INIT_TIMES  5            //初始化次数

#ifdef DEBUG

Serial pc2(PB_10, PB_11, 115200);

#define LOG(args...)    pc2.printf(args)

#else

#define LOG(args...)    (args)

#endif // DEBUG

#include "MQTTClient.h"

#include "MQTTmbed.h"

#include "MQTTGPRSEthernet.h"

#include "StringHelper.h"

int arrivedcount = 0;

void messageArrived(MQTT::MessageData& md)

{

MQTT::Message &message = md.message;

LOG("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message.qos, message.retained, message.dup, message.id);

LOG("Payload %.*s\n", message.payloadlen, (char*)message.payload);

++arrivedcount;

}

int main(int argc, char* argv[])

{

MQTTGPRSEthernet ethernet(PB_6, PB_7);

for (int i = 0; i < INIT_TIMES; i++)

{

if (ethernet.initNet(APN, USERNAME, PASSWORD))

break;

else

LOG("SIM CARD INIT FAILED!\r\n");

}

MQTT::Client client = MQTT::Client(ethernet);

char* hostname = "iot.eclipse.org";

int port = 1883;

for (int i = 0; i < INIT_TIMES; i++)

{

if (ethernet.connect(hostname, port))

break;

else

LOG("CAN NOT CONNECT TO SERVER!\r\n");

}

char* topic = "tson-topic-18669888635";

MQTTPacket_connectData data = MQTTPacket_connectData_initializer;

data.MQTTVersion = 3;

data.clientID.cstring = "tson-client-18669888635";

int rc;

for (int i = 0; i < INIT_TIMES; i++)

{

if ((rc = client.connect(data)) != 0)

//无法连接到MQTT服务器

LOG("CAN NOT CONNECT TO MQTT SERVER!\r\n");

else

break;

}

if ((rc = client.subscribe(topic, MQTT::QOS0, messageArrived)) != 0)

//无法订阅

LOG("CAN NOT SUBSCRIBE THE TOPIC!\r\n");

MQTT::Message message;

// QoS 0

char buf[100];

sprintf(buf, "Hello World!  QoS 0 message from tson\r\n");

message.qos = MQTT::QOS0;

message.retained = false;

message.dup = false;

message.payload = (void*)buf;

message.payloadlen = strlen(buf) + 1;

rc = client.publish(topic, message);

while (arrivedcount == 0)

client.yield(100000);

// QoS 1

sprintf(buf, "Hello World!  QoS 1 message from tson\n");

message.qos = MQTT::QOS1;

message.payloadlen = strlen(buf) + 1;

rc = client.publish(topic, message);

while (arrivedcount == 1)

client.yield(100000);

// QoS 2

sprintf(buf, "Hello World!  QoS 2 message from tson\n");

message.qos = MQTT::QOS2;

message.payloadlen = strlen(buf) + 1;

rc = client.publish(topic, message);

while (arrivedcount == 2)

client.yield(100000);

if ((rc = client.unsubscribe(topic)) != 0)

printf("rc from unsubscribe was %d\n", rc);

if ((rc = client.disconnect()) != 0)

printf("rc from disconnect was %d\n", rc);

ethernet.disconnect();

return 0;

}

client的yield函数中用调用messageArrived函数,之前的示例给出的是100,可能是由于间太短的缘故,总是调用不了回调函数,所以我将其改的非常大,便于调试。实际使用时可以使用1000。

四、总结

其实paho embeded-c用起来还是挺方便的,但是代码的重要性不高,所以移植起来往往会让人无从下手。但深放研究就会发现其实使用起来是非常简单的。

—————————–可爱的分割线——————————————————

有路过的吃瓜群众说想要StringHelper这个类,当时这个类用处不大,里面多数功能都可以用std::string中的功能来实现。不过自己刚从C转过来自己并不太了解,所以自己又写了一个,功能并不是很完善。也分享给大家:StringHelper

相关主题2017-05-28 阿里云IoT套件中MQTT协议的使用(0) 今天,阿里云给我发来邮件,说我申请的IoT套件已经可以申请了,虽然我不记得自己什么时候申请过了,但请我试用就试用一下呗。我看这个阿里云的这个东东有两个节点,一 […]

2017-05-12 通过Paho客户端接入OneNet(1) 首先说本文想要说明一个什么问题:OneNet平台支持MQTT协议,但给的资料非常有限。而Paho是一个开源的,MQTT的各种服务器、客户端的集成。本文要做的就 […]

2016-02-07 使用代理(有验证)连接TcpClient(2) 先分享代码给大家:

static TcpClient connectViaHTTPProxy(

string targetHost,

[…]

2016-03-01 Git使用心得之在线管理(0) 本文是在Git使用心得之Git与GitHub的关系和Git使用心得之本地管理两篇文章的基础上进行的,有不明白的可以参照上面两篇文章。

首先给 […]

2016-03-01 Git使用心得之本地管理(2) 如果对Git没有完整的概念,大家可以参考:Git使用心得之Git与GitHub的关系

首先把所有工具都安装上去:

1、Git:h […]

2017-05-15 VisualGDB项目的移植问题(0) 很多人可能还不知道VisualGDB是什么东东,我给大家普及一下(自己的理解,不对勿喷):

1、VisualGDB是一个Visual […]

gprs模块http mqtt_在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议 | TsonTec:测量解决方案提供者...相关推荐

  1. gprs模块http mqtt_基于GPRS模块的MQTT至MODBUS协议转换器

    [报名阶段需要填写的内容] 1.参赛者姓名(必填项): 方海钰 2.单位或学校名称(选填项): 3.当前职务或职称(选填项): 4.参赛作品的名字(必填项): 基于GPRS模块的MQTT至MODBUS ...

  2. SIM900A模块开发:通过GPRS连接OneNet平台发送GPS信息

    SIM900A模块开发:通过GPRS连接OneNet平台发送GPS信息 1. SIM900A模块介绍 1.1 SIM900A模块具有以下特点: 1.2 模块TTL接口图 1.3 硬件连接方法 2. O ...

  3. 开源一个安信可A9g小项目微信小程序定位器项目②GPS模块如何定位经纬度并且上报到MQTT服务器,实现远程查看模块的经纬度;

    本系列博客学习由非官方人员 半颗心脏 潜心所力所写,仅仅做个人技术交流分享,不做任何商业用途.如有不对之处,请留言,本人及时更改. 1. 如何在windows10上环境搭建,编译烧录代码固件,查看运行 ...

  4. STM32+移远MC20模块采用MQTT协议登录OneNet上传GPS数据

    一.环境介绍 MCU:  STM32F103C8T6 GSM模块: 移远MC20 (MT2503D)(GSM+GPS共存)功能很强大 开发软件: Keil5 MQTT协议采用OneNet的旧版协议,登 ...

  5. javascript模块_JavaScript模块第2部分:模块捆绑

    javascript模块 by Preethi Kasireddy 通过Preethi Kasireddy JavaScript模块第2部分:模块捆绑 (JavaScript Modules Part ...

  6. pythonos模块使用方法_Python OS模块常用方法总结

    Python OS模块常用方法总结 Python OS模块方法: 操作 说明 os.getcwd() 得到当前工作目录,即当前Python脚本工作的目录路径 os.listdir() 返回指定目录下的 ...

  7. MVC4做网站后台:模块管理1、修改模块信息

    网站可能会包含一些模块:像文章.产品.图片.留言等. 栏目模块主要实现功能,启用或禁用模块,模块权限设置,模块上传设置等. 权限设置和上传设置以后专门考虑,先来显示或禁用模块. 1.在顶部导航栏添加管 ...

  8. 处理程序“WebServiceHandlerFactory-Integrated”在其模块列表中有一个错误模块“ManagedPipelineHandler”

    HTTP 错误 500.21 - Internal Server Error 处理程序"WebServiceHandlerFactory-Integrated"在其模块列表中有一个 ...

  9. python logging模块的作用_Python 日志模块logging分析及使用-2

    本文作为Python日志模块的补充,主要介绍日志回滚RotatingFileHandler和TimedRotatingFileHandler的使用,以及其所带来的问题.Logger对象的日志等级是如何 ...

最新文章

  1. c4android资源,OpenC4Android开发环境搭.doc
  2. 一种javascript链式多重继承的方式(__proto__原型链)
  3. 一个栗子上手CSS3动画
  4. android 单个模块编译的方法
  5. AI开发者大会之计算机视觉技术实践与应用:2020年7月3日《RPA+AI助力政企实现智能时代的人机协同》、《5G风口到来,边缘计算引领数据中心变革》、《数字化时代金融市场与AI算法如何结合?》
  6. python 利用 for ... else 跳出双层嵌套循环
  7. Spring Boot Jpa多数据源配置
  8. windows下手动安装composer
  9. linux显示内存状态,Linux显示内存状态
  10. P2574 XOR的艺术
  11. android studio导入jar包和so库,Android实战技巧之十二:Android Studio导入第三方类库、jar包和so库(示例代码)...
  12. 如何新建Outlook电子邮件规则实现邮件自动分类
  13. python pandas库的应用(类比mysql语言)
  14. poi-3.17版本 和若依框架结合--excel导出,excel图片导出
  15. 心有景旗,志存远方——湖南安全技术职业学院美和易思愿景图活动
  16. 如何合理选择AI加速器?
  17. python网络游戏脚本_用Python写一个游戏脚本,你会吗?
  18. 暗黑破坏神2中的符文系统,一共有多少个符文,可以介绍其中1个符文组合吗?...
  19. 智能聊天机器人微信小程序
  20. 产品定位,什么是产品?什么是产品定位?

热门文章

  1. extjs jquery使用场合
  2. .NET 的 WCF 和 WebService 有什么区别?(转载)
  3. 附加一个:为什么要用简单工厂模式
  4. python正则表达式中group
  5. syn重发_什么是“SYN”请求?ISN又是什么?
  6. 网络爬虫模拟登陆获取数据并解析实战(二)
  7. Opencv--Mat属性step,size,step1,elemSize,elemSize1
  8. 牛客xiao白月赛32-- 拼三角(暴力却有坑)
  9. python剪刀石头布_如何用python写剪刀石头布
  10. python批量下载网页文件夹_Python实现批量从不同的Linux服务器下载文件