Rust 是由 Mozilla 主导开发的通用、编译型编程语言。该语言的设计准则为:安全、并发、实用,支持 函数式、并发式、过程式以及面向对象的编程风格。Rust 速度惊人且内存利用率极高。由于没有运行时和垃圾回收,它能够胜任对性能要求特别高的服务,可以在嵌入式设备上运行,还能轻松和其他语言集成。Rust 丰富的类型系统和所有权模型保证了内存安全和线程安全,让您在编译期就能够消除各种各样的错误。

MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。

本文主要介绍如何在 Rust 项目中使用 paho-mqtt 客户端库 ,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能。

项目初始化

本项目使用 Rust 1.44.0 进行开发测试,并使用 Cargo 1.44.0 包管理工具进行项目管理,读者可用如下命令查看当前的 Rust 版本。

~ rustc --version
rustc 1.44.0 (49cae5576 2020-06-01)

选择 MQTT 客户端库

paho-mqtt 是目前 Rust 中,功能完善且使用较多的 MQTT 客户端,最新的 0.7.1 版本支持 MQTT v5、3.1.1、3.1,支持通过标准 TCP、SSL / TLS、WebSockets 传输数据,QoS 支持 0、1、2 等。

初始化项目

执行以下命令创建名为 mqtt-example 的 Rust 新项目。

~ cargo new mqtt-exampleCreated binary (application) `mqtt-example` package

编辑项目中的 Cargo.toml 文件,在 dependencies 中添加 paho-mqtt 库的地址,以及指定订阅、发布代码文件对应的二进制文件。

[dependencies]
paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" }[[bin]]
name = "sub"
path = "src/sub/main.rs"[[bin]]
name = "pub"
path = "src/pub/main.rs"

Rust MQTT 的使用

创建客户端连接

本文将使用 EMQ X 提供的 免费公共 MQTT 服务器 作为测试连接的 MQTT 服务器,该服务基于 EMQ X 的 MQTT 物联网云平台 创建。服务器接入信息如下:

  • Broker: http://broker.emqx.io
  • TCP Port: 1883
  • Websocket Port: 8083

配置 MQTT Broker 连接参数

配置 MQTT Broker 连接地址(包括端口)、topic (这里我们配置了两个 topic ),以及客户端 id。

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];

编写 MQTT 连接代码

编写 MQTT 连接代码,为了提升使用体验,可在执行二进制文件时通过命令行参数的形式传入连接地址。通常我们需要先创建一个客户端,然后将该客户端连接到 broker.emqx.io

let host = env::args().nth(1).unwrap_or_else(||DFLT_BROKER.to_string()
);// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new().server_uri(host).client_id(DFLT_CLIENT.to_string()).finalize();// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {println!("Error creating the client: {:?}", err);process::exit(1);
});// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new().keep_alive_interval(Duration::from_secs(20)).clean_session(true).finalize();// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {println!("Unable to connect:nt{:?}", e);process::exit(1);
}

发布消息

这里我们总共发布五条消息,根据循环的奇偶性,分别向 rust/mqttrust/test 这两个主题发布。

for num in 0..5 {let content =  "Hello world! ".to_string() + &num.to_string();let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);if num % 2 == 0 {println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);} else {println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);}let tok = cli.publish(msg);if let Err(e) = tok {println!("Error sending message: {:?}", e);break;}
}

订阅消息

在客户端连接之前,需要先初始化消费者。这里我们会循环处理消费者中的消息队列,并打印出订阅的 topic 名称及接收到的消息内容。

fn subscribe_topics(cli: &mqtt::Client) {if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {println!("Error subscribes topics: {:?}", e);process::exit(1);}
}fn main() {...// Initialize the consumer before connecting.let rx = cli.start_consuming();...// Subscribe topics.subscribe_topics(&cli);println!("Processing requests...");for msg in rx.iter() {if let Some(msg) = msg {println!("{}", msg);}else if !cli.is_connected() {if try_reconnect(&cli) {println!("Resubscribe topics...");subscribe_topics(&cli);} else {break;}}}...
}

完整代码

消息发布代码

use std::{env,process,time::Duration
};extern crate paho_mqtt as mqtt;const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// Define the qos.
const QOS:i32 = 1;fn main() {let host = env::args().nth(1).unwrap_or_else(||DFLT_BROKER.to_string());// Define the set of options for the create.// Use an ID for a persistent session.let create_opts = mqtt::CreateOptionsBuilder::new().server_uri(host).client_id(DFLT_CLIENT.to_string()).finalize();// Create a client.let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {println!("Error creating the client: {:?}", err);process::exit(1);});// Define the set of options for the connection.let conn_opts = mqtt::ConnectOptionsBuilder::new().keep_alive_interval(Duration::from_secs(20)).clean_session(true).finalize();// Connect and wait for it to complete or fail.if let Err(e) = cli.connect(conn_opts) {println!("Unable to connect:nt{:?}", e);process::exit(1);}// Create a message and publish it.// Publish message to 'test' and 'hello' topics.for num in 0..5 {let content =  "Hello world! ".to_string() + &num.to_string();let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);if num % 2 == 0 {println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);} else {println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);}let tok = cli.publish(msg);if let Err(e) = tok {println!("Error sending message: {:?}", e);break;}}// Disconnect from the broker.let tok = cli.disconnect(None);println!("Disconnect from the broker");tok.unwrap();
}

消息订阅代码

为了提升使用体验,消息订阅做了断开重连的处理,并在重新建立连接后对主题进行重新订阅。

use std::{env,process,thread,time::Duration
};extern crate paho_mqtt as mqtt;const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_subscribe";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS:&[i32] = &[0, 1];// Reconnect to the broker when connection is lost.
fn try_reconnect(cli: &mqtt::Client) -> bool
{println!("Connection lost. Waiting to retry connection");for _ in 0..12 {thread::sleep(Duration::from_millis(5000));if cli.reconnect().is_ok() {println!("Successfully reconnected");return true;}}println!("Unable to reconnect after several attempts.");false
}// Subscribes to multiple topics.
fn subscribe_topics(cli: &mqtt::Client) {if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {println!("Error subscribes topics: {:?}", e);process::exit(1);}
}fn main() {let host = env::args().nth(1).unwrap_or_else(||DFLT_BROKER.to_string());// Define the set of options for the create.// Use an ID for a persistent session.let create_opts = mqtt::CreateOptionsBuilder::new().server_uri(host).client_id(DFLT_CLIENT.to_string()).finalize();// Create a client.let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {println!("Error creating the client: {:?}", err);process::exit(1);});// Initialize the consumer before connecting.let rx = cli.start_consuming();// Define the set of options for the connection.let lwt = mqtt::MessageBuilder::new().topic("test").payload("Consumer lost connection").finalize();let conn_opts = mqtt::ConnectOptionsBuilder::new().keep_alive_interval(Duration::from_secs(20)).clean_session(false).will_message(lwt).finalize();// Connect and wait for it to complete or fail.if let Err(e) = cli.connect(conn_opts) {println!("Unable to connect:nt{:?}", e);process::exit(1);}// Subscribe topics.subscribe_topics(&cli);println!("Processing requests...");for msg in rx.iter() {if let Some(msg) = msg {println!("{}", msg);}else if !cli.is_connected() {if try_reconnect(&cli) {println!("Resubscribe topics...");subscribe_topics(&cli);} else {break;}}}// If still connected, then disconnect now.if cli.is_connected() {println!("Disconnecting");cli.unsubscribe_many(DFLT_TOPICS).unwrap();cli.disconnect(None).unwrap();}println!("Exiting");
}

运行与测试

编译二进制文件

执行以下命令,会在 mqtt-example/target/debug 目录下生成消息订阅、发布对应的 subpub 二进制文件。

cargo build

消息订阅

执行 sub 二进制文件,等待消费发布。

消息发布

执行 pub 二进制文件,可以看到分别往 rust/testrust/mqtt 这两个主题发布了消息。

同时在消息订阅中可看到发布的消息

至此,我们完成了使用 paho-mqtt 客户端连接到 公共 MQTT 服务器,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅。

版权声明: 本文为 EMQ 原创,转载请注明出处。
原文链接:https://www.emqx.io/cn/blog/how-to-use-mqtt-in-rust

mqtt客户端工具_如何在 Rust 中使用 MQTT相关推荐

  1. mqtt判断设备是否在线_如何在 Python 中使用 MQTT

    Python 是一种广泛使用的解释型.高级编程.通用型编程语言.Python 的设计哲学强调代码的可读性和简洁的语法(尤其是使用空格缩进划分代码块,而非使用大括号或者关键词).Python 让开发者能 ...

  2. python mqtt库_如何在 Python 中使用 MQTT

    Python 是一种广泛使用的解释型.高级编程.通用型编程语言.Python 的设计哲学强调代码的可读性和简洁的语法(尤其是使用空格缩进划分代码块,而非使用大括号或者关键词).Python 让开发者能 ...

  3. photoshop标尺工具_如何在Photoshop中使用和掌握非常困难的钢笔工具

    photoshop标尺工具 Photoshop and Illustrator both have a dark point many users choose to avoid-the notori ...

  4. hashmap储存有向图_如何在Rust中构建向量的HashMap?

    将[]放到HashMap上是(现已弃用)get(..)函数的糖,该声明是: fn get(&'a self, k: &K) -> &'a V 并返回一个常量(&) ...

  5. aes离线解密工具_如何在Python中解密OpenSSL AES加密文件?

    OpenSSL为AES加密提供了一种流行的(但不安全 - 见下文!)命令行界面: openssl aes-256-cbc -salt -in filename -out filename.enc Py ...

  6. figma设计_如何在Figma中构建设计入门套件(第1部分)

    figma设计 Figma教程 (Figma Tutorial) Do you like staring at a blank canvas every time you start a new pr ...

  7. 在excel日期比对大小_如何在Excel中防止分组日期

    在excel日期比对大小 As a teenager, group dates can be fun. If you have strict parents, that might be the on ...

  8. 表格在整个html居中显示,html 表格字符居中显示_如何在HTML中居中显示表格?

    html 表格字符居中显示_如何在HTML中居中显示表格? html 表格字符居中显示_如何在HTML中居中显示表格? html 表格字符居中显示 HTML table provides the ab ...

  9. MQTT客户端工具介绍

    简述 由于在工作当中需要监测设备的数据,所以用MQTT客户端工具来订阅和发布消息.我了解到的客户端,本文希望能对你有所帮助 MQTT客户端工具 1.MQTT-spy(基于java) 1.下载链接: M ...

最新文章

  1. 微信小程序顶部tab切换以及滑动
  2. 《OpenCV3编程入门》学习笔记5 Core组件进阶(六)输入输出XML和YAML文件
  3. 二叉排序树经典算法速成
  4. 期货与期权(part3)--期货合约和期权合约
  5. UVA 12501 Bulky process of bulk reduction ——(线段树成段更新)
  6. 【VMware虚拟机】使用SSH连接VMware上的Linux虚拟机(主机互通也可访问外网)
  7. SPA单页应用的优缺点
  8. C语言实现常用排序算法——基数排序
  9. java中list、set和map 的区别(转)
  10. DbgView远程调试
  11. Xshell6与Xftp6下载
  12. java面试题——常见项目真实面试题(实际面试被问到)
  13. 花了一万多买的web前端全套教程,现在分享给大家
  14. 第十三届蓝桥杯模拟赛(第三期)试题与题解 C++
  15. Excel制作+导出
  16. 小米电视4A Android8,小米电视4A 删除内置应用及其去广告攻略
  17. Arduino教程-11. PIR传感器
  18. 简单说说rebuttal
  19. c语言中字符表,C语言指令表与符号表
  20. JAVA代码实现扫码购带圆图二维码生成

热门文章

  1. Oracle的SQL语法提示30例,INDEX_JOIN,ORDERED,USE_NL,LEADING
  2. 子程序入口参数是什么_三菱FX PLC | 什么是中断服务?没事多看几遍
  3. bigdecimal 科学计数转普通计数_LoaRunner性能测试教程:Windows计数器(2)
  4. 电脑显示未安装任何音频输出设备_音频频谱分析仪插件Voxengo SPAN Plus介绍及安装教程...
  5. Android添加gdb symbols
  6. ckati与ninja构建demo
  7. Typora MarkDown语法笔记(一)
  8. ios实现图片动画效果
  9. mysql数据库逻辑备份与恢复_Mysqldump逻辑备份与恢复
  10. 设计代码说明什么是多态性?如何实现多态?(代码中要写注释解释)_狗屎一样的代码!快,重构我!...