前言

MQTT 是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。MQTT 基于发布/订阅范式,工作在 TCP/IP协议族上,MQTT 协议轻量、简单、开放并易于实现,这些特点使它适用范围非常广泛。

MQTT 基于客户端-服务器通信模式,MQTT 服务端称为 MQTT Broker,目前行业内可选的 MQTT Broker 较多,其优劣与功能差别比较本文不再赘述。本文以开源社区中最流行的 MQTT 消息服务器 EMQ X 为例,使用 EMQ 提供的公共 Broker broker.emqx.io ,通过一个简单客户端连接 Broker 并发布、处理消息的例子,整理总结不同编程语言、平台下 MQTT 客户端库的使用方式与样例。

入选客户端库如下:

  • Eclipse Paho C 与 Eclipse Paho Embedded C
  • Eclipse Paho Java Client
  • Eclipse Paho MQTT Go client
  • emqtt : EMQ 提供的 Erlang MQTT 客户端库
  • MQTT.js Web 端 & Node.js 平台 MQTT 客户端
  • Eclipse Paho Python

MQTT 社区收录了许多 MQTT 客户端库,读者可以在此处查看。

样例应用介绍

MQTT 客户端整个生命周期的行为可以概括为:建立连接、订阅主题、接收消息并处理、向指定主题发布消息、取消订阅、断开连接。

标准的客户端库在每个环节都暴露出相应的方法,不同库在相同环节所需方法参数含义大致相同,具体选用哪些参数、启用哪些功能特性需要用户深入了解 MQTT 协议特性并结合实际应用场景而定。

本文以一个客户端连接并发布、处理消息为例,给出每个环节大致需要使用的参数:

  • 建立连接

    • 指定 MQTT Broker 基本信息接入地址与端口
    • 指定传输类型是 TCP 还是 MQTT over WebSocket
    • 如果启用 TLS 需要选择协议版本并携带相应的的证书
    • Broker 启用了认证鉴权则客户端需要携带相应的 MQTT Username Password 信息
    • 配置客户端参数如 keepalive 时长、clean session 回话保留标志位、MQTT 协议版本、遗嘱消息(LWT)等
  • 订阅主题:连接建立成功后可以订阅主题,需要指定主题信息
    • 指定主题过滤器 Topic,订阅的时候支持主题通配符 +# 的使用
    • 指定 QoS,根据客户端库和 Broker 的实现可选 Qos 0 1 2,注意部分 Broker 与云服务提供商不支持部分 QoS 级别,如 AWS IoT 、阿里云 IoT 套件、Azure IoT Hub 均不支持 QoS 2 级别消息
    • 订阅主题可能因为网络问题、Broker 端 ACL 规则限制而失败
  • 接收消息并处理
    • 一般是在连接时指定处理函数,依据客户端库与平台的网络编程模型不同此部分处理方式略有不同
  • 发布消息:向指定主题发布消息
    • 指定目标主题,注意该主题不能包含通配符 +#,若主题中包含通配符可能会导致消息发布失败、客户端断开等情况(视 Broker 与客户端库实现方式)
    • 指定消息 QoS 级别,同样存在不同 Broker 与平台支持的 QoS 级别不同,如 Azure IoT Hub 发布 QoS 2 的消息将断开客户端连接
    • 指定消息体内容,消息体内容大小不能超出 Broker 设置最大消息大小
    • 指定消息 Retain 保留消息标志位
  • 取消订阅
    • 指定目标主题即可
  • 断开连接
    • 主动断开连接,将发布遗嘱消息(LWT)

Eclipse Paho C 与 Eclipse Paho Embedded C

Eclipse Paho C 与 Eclipse Paho Embedded C 均为 Eclipse Paho 项目下的客户端库,均为使用 ANSI C 编写的功能齐全的 MQTT 客户端,Eclipse Paho Embedded C 可以在桌面操作系统上使用,但主要针对 mbed,Arduino和 FreeRTOS 等嵌入式环境。

该客户端有同步/异步两种 API ,分别以 MQTTClient 和 MQTTAsync 开头:

  • 同步 API 旨在更简单,更有用,某些调用将阻塞直到操作完成为止,使用编程上更加容易;
  • 异步 API 中只有一个调用块 API-waitForCompletion ,通过回调进行结果通知,更适用于非主线程的环境。

两个库的下载、使用详细说明请移步至项目主页查看,本文使用 Eclipse Paho C,直接提供样例代码如下:

#include "stdio.h"
#include "stdlib.h"
#include "string.h"#include "MQTTClient.h"#define ADDRESS     "tcp://broker.emqx.io:1883"
#define CLIENTID    "emqx_test"
#define TOPIC       "testtopic/1"
#define PAYLOAD     "Hello World!"
#define QOS         1
#define TIMEOUT     10000Lint main(int argc, char* argv[])
{MQTTClient client;MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;MQTTClient_message pubmsg = MQTTClient_message_initializer;MQTTClient_deliveryToken token;int rc;MQTTClient_create(&client, ADDRESS, CLIENTID,MQTTCLIENT_PERSISTENCE_NONE, NULL);// Connection parametersconn_opts.keepAliveInterval = 20;conn_opts.cleansession = 1;if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS){printf("Failed to connect, return code %d\n", rc);exit(-1);}// Publish messagepubmsg.payload = PAYLOAD;pubmsg.payloadlen = strlen(PAYLOAD);pubmsg.qos = QOS;pubmsg.retained = 0;MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);printf("Waiting for up to %d seconds for publication of %s\n""on topic %s for client with ClientID: %s\n",(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);printf("Message with delivery token %d delivered\n", token);// DisconnectMQTTClient_disconnect(client, 10000);MQTTClient_destroy(&client);return rc;
}

Eclipse Paho Java Client

Eclipse Paho Java Client 是用 Java 编写的 MQTT 客户端库,可用于 JVM 或其他 Java 兼容平台(例如Android)。

Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 异步和同步 API。

通过 Maven 安装:

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version>
</dependency>

连接样例代码如下:

App.java

package io.emqx;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class App {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "Hello World";int qos = 2;String broker = "tcp://broker.emqx.io:1883";String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// Connection optionsMqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_test");connOpts.setPassword("emqx_test_password".toCharArray());// Retain connectionconnOpts.setCleanSession(true);// Set callbackclient.setCallback(new PushCallback());// Setup connectionSystem.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// Publishclient.subscribe(subTopic);// Required parameters for publishing messageMqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(pubTopic, message);System.out.println("Message published");client.disconnect();System.out.println("Disconnected");client.close();System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}
}

回调消息处理类 OnMessageCallback.java

package io.emqx;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class OnMessageCallback implements MqttCallback {public void connectionLost(Throwable cause) {// Reconnect after lost connection.System.out.println("Connection lost, and re-connect here.");}public void messageArrived(String topic, MqttMessage message) throws Exception {// Message handler after receiving messageSystem.out.println("Topic:" + topic);System.out.println("QoS:" + message.getQos());System.out.println("Payload:" + new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());}
}

Eclipse Paho MQTT Go client

Eclipse Paho MQTT Go Client 为 Eclipse Paho 项目下的 Go 语言版客户端库,该库能够连接到 MQTT Broker 以发布消息,订阅主题并接收已发布的消息,支持完全异步的操作模式。

客户端依赖于 Google 的 proxy 和 websockets 软件包,通过以下命令完成安装:

go get github.com/eclipse/paho.mqtt.golang

连接样例代码如下:

package mainimport ("fmt""log""os""time""github.com/eclipse/paho.mqtt.golang"
)var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("TOPIC: %s\n", msg.Topic())fmt.Printf("MSG: %s\n", msg.Payload())
}func main() {mqtt.DEBUG = log.New(os.Stdout, "", 0)mqtt.ERROR = log.New(os.Stdout, "", 0)opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx_test_client")opts.SetKeepAlive(60 * time.Second)// Message callback handleropts.SetDefaultPublishHandler(f)opts.SetPingTimeout(1 * time.Second)c := mqtt.NewClient(opts)if token := c.Connect(); token.Wait() && token.Error() != nil {panic(token.Error())}// Subscriptionif token := c.Subscribe("testtopic/#", 0, nil); token.Wait() && token.Error() != nil {fmt.Println(token.Error())os.Exit(1)}// Publish messagetoken := c.Publish("testtopic/1", 0, false, "Hello World")token.Wait()time.Sleep(6 * time.Second)// Cancel subscriptionif token := c.Unsubscribe("testtopic/#"); token.Wait() && token.Error() != nil {fmt.Println(token.Error())os.Exit(1)}// Disconnectc.Disconnect(250)time.Sleep(1 * time.Second)
}

emqtt : EMQ 提供的 Erlang MQTT 客户端库

emqtt 是开源 MQTT Broker EMQ X 官方 EMQ 提供的客户端库,适用于 Erlang 语言。

Erlang 生态有多个 MQTT Broker 实现,如通过插件支持 MQTT 的 RabbitMQ ,VerenMQ、EMQ X 等。但是 MQTT 客户端库几乎没有选择的余地,MQTT 社区收录的 Erlang 客户端库中 emqtt 是最佳选择。

emqtt 完全由 Erlang 实现,完成支持 MQTT v3.1.1 和 MQTT v5.0 协议版本,支持 SSL 单双向认证与 WebSocket 连接。另一款 MQTT 基准测试工具 emqtt_bench 就基于该客户端库构建。

emqtt 使用方式如下:

ClientId = <<"test">>.
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]).
{ok, _Props} = emqtt:connect(ConnPid).
Topic = <<"guide/#">>.
QoS = 1.
{ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {Topic, QoS}).
{ok, _PktId} = emqtt:publish(ConnPid, <<"guide/1">>, <<"Hello World!">>, QoS).
%% If the qos of publish packet is 0, `publish` function would not return packetid.
ok = emqtt:publish(ConnPid, <<"guide/2">>, <<"Hello World!">>, 0).%% Recursively get messages from mail box.
Y = fun (Proc) -> ((fun (F) -> F(F) end)((fun(ProcGen) -> Proc(fun() -> (ProcGen(ProcGen))() end) end))) end.
Rec = fun(Receive) -> fun()-> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Receive(); _Other -> Receive() after 5 -> ok end end end.
(Y(Rec))().%% If you don't like y combinator, you can also try named function to recursively get messages in erlang shell.
Receive = fun Rec() -> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Rec(); _Other -> Rec() after 5 -> ok end end.
Receive().{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, <<"guide/#">>).ok = emqtt:disconnect(ConnPid).

MQTT.js Web 端 & Node.js 平台 MQTT 客户端

MQTT.js 是 JavaScript 编写的,实现了 MQTT 协议客户端功能的模块,可以在 Node.js 或浏览器环境中使用。在 Node.js 中使用时,即可以 -g 全局安装以命令行的形式使用,又可以将其集成到项目中调用。

由于 JavaScript 单线程特性,MQTT.js 是全异步 MQTT 客户端,MQTT.js 支持 MQTT 与 MQTT over WebSocket,在不同运行环境支持程度如下:

  • 浏览器环境:MQTT over WebSocket(包括微信小程序、支付宝小程序等定制浏览器环境)
  • Node.js 环境:MQTT、MQTT over WebSocket

不同环境里除了少部分连接参数不同,其他 API 均是相同的。

使用 npm 安装:

npm i mqtt

使用 CDN 安装(浏览器):

<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>// Initialize a global mqtt variableconsole.log(mqtt)</script>

样例代码:

// const mqtt = require('mqtt')
import mqtt from 'mqtt'// Connection option
const options = {clean: true, // Retain connectionconnectTimeout: 4000, // Timeout// AuthticationclientId: 'emqx_test',username: 'emqx_test',password: 'emqx_test',
}// Connection string
// ws: unsecured WebSocket
// wss: secured WebSocket connection
// mqtt: unsecured TCP connection
// mqtts: secured TCP connection
const connectUrl = 'wss://broker.emqx.io:8084/mqtt'
const client = mqtt.connect(connectUrl, options)client.on('reconnect', (error) => {console.log('reconnect:', error)
})client.on('reconnect', (error) => {console.log('reconnect:', error)
})client.on('message', (topic, message) => {console.log('message:', topic, message.toString())
})

Eclipse Paho Python

Eclipse Paho Python 为 Eclipse Paho 项目下的 Python 语言版客户端库,该库能够连接到 MQTT Broker 以发布消息,订阅主题并接收已发布的消息。

使用 PyPi 包管理工具安装:

pip install paho-mqtt

代码样例:

import paho.mqtt.client as mqtt# Successful Connection Callback
def on_connect(client, userdata, flags, rc):print('Connected with result code '+str(rc))client.subscribe('testtopic/#')# Message delivery callback
def on_message(client, userdata, msg):print(msg.topic+" "+str(msg.payload))client = mqtt.Client()# Set callback handler
client.on_connect = on_connect
client.on_message = on_message# Set up connection
client.connect('broker.emqx.io', 1883, 60)
# Publish message
client.publish('emqtt',payload='Hello World',qos=0)client.loop_forever()

总结

关于 MQTT 协议、MQTT 客户端库使用流程、常用 MQTT 客户端的简介就到这里,欢迎读者通过 EMQ X 进行MQTT 学习、项目开发使用。

常用 MQTT 客户端库简介相关推荐

  1. MQTT客户端库-Paho GO

    为了加深理解,本文是翻译文章.原文地址 Paho GO Client   语言 GO 协议 EPL AND EDL 官网地址 http://www.eclipse.org/paho/ API类型 As ...

  2. 整理的常用JAVA开源库简介

    Jakarta common: Commons Logging Jakarta Commons Logging (JCL)提供的是一个日志(Log)接口(interface),同时兼顾轻量级和不依赖于 ...

  3. 几种常用图像处理开源库简介及使用总结

    1.OpenCV,主要以算法形式,展示其实现:也就是说,它实际提供的是各种图像处理算法.若需具体应用,需要组合其算法以实现某个功能. OpenCV 的全称 Open Source Computer V ...

  4. mqtt客户端工具_如何在 Rust 中使用 MQTT

    Rust 是由 Mozilla 主导开发的通用.编译型编程语言.该语言的设计准则为:安全.并发.实用,支持 函数式.并发式.过程式以及面向对象的编程风格.Rust 速度惊人且内存利用率极高.由于没有运 ...

  5. paho | 支持10种语言编写mqtt客户端,总有一款适合你!

    1. 轻量级物联网协议 - MQTT MQTT全称 Message Queuing Telemetry Transport,即消息队列遥测传输协议,是一种基于发布/订阅(publish/subscri ...

  6. Android Studio实现MQTT客户端

    初学Android Studio,在实现MQTT客户端的过程中遇到了很多坑,而在查阅博文的时候发现各个博文能提供的解决方法很零碎,我也是结合了诸多博文才最终解决了问题,于是打算做一个小总结 我用的版本 ...

  7. 吐血整理——python常用的第三方库——库名称简介

    python常用的第三方库--库名称简介(一) python常用的第三方库--库名称简介(一) python常用的第三方库--库名称简介(一) 库名称简介 文件处理 库名称简介 Chardet字符编码 ...

  8. Python语言学习:Python常用自带库(imageio、pickle)简介、使用方法之详细攻略

    Python语言学习:Python常用自带库(imageio.pickle)简介.使用方法之详细攻略 目录 imageio简介及其常见使用方法 pickle简介及其常见使用方法 简介 使用方法 简介及 ...

  9. linux 使用paho C库实现mqtt客户端

    一.下载 github 下载paho mqtt c库源码,编译安装库文件. 地址:https://github.com/eclipse/paho.mqtt.c 关键API:Paho Asynchron ...

  10. MQTT客户端(基于mosquitto库)上报温度到腾讯云

    Linux C MQTT上报温度到腾讯云(基于mosquitto库) 一.创建产品 1.进入腾讯云官网,登陆或注册账号 2.进入控制台 3.鼠标滑到云产品,进入物联设备服务中的物联网通信 4.点击创建 ...

最新文章

  1. 谷歌兄弟公司Wing将于10月开始试点无人机配送
  2. Fedora 30将获得Bash 5.0,淘汰Yum推迟到Fedora 31
  3. R语言应用实战-OLS模型算法原理及应用示例
  4. (一)操作系统概论复习要点笔记
  5. 电脑能上网,手机连上wifi不能上网
  6. 【PAT甲级 BigInteger】1019 General Palindromic Number (20 分) Java版 7/7通过
  7. Redis 与 string 相关的常用命令
  8. Speaking of the impact of the epidemic
  9. [论文笔记]RoBERTa: A Robustly Optimized BERT Pretraining Approach
  10. 我的世界手机版javaui材质包_我的世界手机版大乱斗卡比模组
  11. html中选择收货地址时候,选择收货地址.html
  12. hibernatexml方式和注解方式实现单实体映射和继承关系映射,eclipse实现
  13. OpenCV源码剖析之ImageDecoder
  14. Redisson(2-1)分布式锁实现对比 VS Java的ReentrantLock之tryLock
  15. Mybatis的特性详解——动态SQL
  16. [Windows] 获取设备唯一标识
  17. 软件测试 - 项目实战篇
  18. 年薪百万的好苗头!不俗套的情人节,爱之丘比特走心了
  19. {}怎样进行邮件推广
  20. Java匿名类的用法及注意点

热门文章

  1. [Linux系统编程/网络编程] 笔记目录
  2. 最适合写python程序的软件
  3. 用微PE安装KALI LINUX到U盘,【U盘安装kali】U盘+kali+pe三合一教程!装机,存储(自己用来做U盘使用的空间)...
  4. decimal.JS 快速入门
  5. [Unity官方教程]Tanks!单机双人坦克大战源码和素材
  6. 算法笔记(一)(教你系统学习基础算法)
  7. windows/system32/winload.exe系统无法登录报错428的快速解决方法
  8. 爬取西刺代理的免费IP
  9. Centos7安装Caffe教程
  10. plsqldev12 工具栏图标设置