mqtt客户端工具_如何在 Rust 中使用 MQTT
Rust 是由 Mozilla 主导开发的通用、编译型编程语言。该语言的设计准则为:安全、并发、实用,支持 函数式、并发式、过程式以及面向对象的编程风格。Rust 速度惊人且内存利用率极高。由于没有运行时和垃圾回收,它能够胜任对性能要求特别高的服务,可以在嵌入式设备上运行,还能轻松和其他语言集成。Rust 丰富的类型系统和所有权模型保证了内存安全和线程安全,让您在编译期就能够消除各种各样的错误。
MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。
本文主要介绍如何在 Rust 项目中使用 paho-mqtt 客户端库 ,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能。
项目初始化
本项目使用 Rust 1.44.0 进行开发测试,并使用 Cargo 1.44.0 包管理工具进行项目管理,读者可用如下命令查看当前的 Rust 版本。
~ rustc --version
rustc 1.44.0 (49cae5576 2020-06-01)
选择 MQTT 客户端库
paho-mqtt 是目前 Rust 中,功能完善且使用较多的 MQTT 客户端,最新的 0.7.1
版本支持 MQTT v5、3.1.1、3.1,支持通过标准 TCP、SSL / TLS、WebSockets 传输数据,QoS 支持 0、1、2 等。
初始化项目
执行以下命令创建名为 mqtt-example
的 Rust 新项目。
~ cargo new mqtt-exampleCreated binary (application) `mqtt-example` package
编辑项目中的 Cargo.toml
文件,在 dependencies
中添加 paho-mqtt
库的地址,以及指定订阅、发布代码文件对应的二进制文件。
[dependencies]
paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" }[[bin]]
name = "sub"
path = "src/sub/main.rs"[[bin]]
name = "pub"
path = "src/pub/main.rs"
Rust MQTT 的使用
创建客户端连接
本文将使用 EMQ X 提供的 免费公共 MQTT 服务器 作为测试连接的 MQTT 服务器,该服务基于 EMQ X 的 MQTT 物联网云平台 创建。服务器接入信息如下:
- Broker: http://broker.emqx.io
- TCP Port: 1883
- Websocket Port: 8083
配置 MQTT Broker 连接参数
配置 MQTT Broker 连接地址(包括端口)、topic (这里我们配置了两个 topic ),以及客户端 id。
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
编写 MQTT 连接代码
编写 MQTT 连接代码,为了提升使用体验,可在执行二进制文件时通过命令行参数的形式传入连接地址。通常我们需要先创建一个客户端,然后将该客户端连接到 broker.emqx.io
。
let host = env::args().nth(1).unwrap_or_else(||DFLT_BROKER.to_string()
);// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new().server_uri(host).client_id(DFLT_CLIENT.to_string()).finalize();// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {println!("Error creating the client: {:?}", err);process::exit(1);
});// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new().keep_alive_interval(Duration::from_secs(20)).clean_session(true).finalize();// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {println!("Unable to connect:nt{:?}", e);process::exit(1);
}
发布消息
这里我们总共发布五条消息,根据循环的奇偶性,分别向 rust/mqtt
、 rust/test
这两个主题发布。
for num in 0..5 {let content = "Hello world! ".to_string() + &num.to_string();let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);if num % 2 == 0 {println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);} else {println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);}let tok = cli.publish(msg);if let Err(e) = tok {println!("Error sending message: {:?}", e);break;}
}
订阅消息
在客户端连接之前,需要先初始化消费者。这里我们会循环处理消费者中的消息队列,并打印出订阅的 topic 名称及接收到的消息内容。
fn subscribe_topics(cli: &mqtt::Client) {if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {println!("Error subscribes topics: {:?}", e);process::exit(1);}
}fn main() {...// Initialize the consumer before connecting.let rx = cli.start_consuming();...// Subscribe topics.subscribe_topics(&cli);println!("Processing requests...");for msg in rx.iter() {if let Some(msg) = msg {println!("{}", msg);}else if !cli.is_connected() {if try_reconnect(&cli) {println!("Resubscribe topics...");subscribe_topics(&cli);} else {break;}}}...
}
完整代码
消息发布代码
use std::{env,process,time::Duration
};extern crate paho_mqtt as mqtt;const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// Define the qos.
const QOS:i32 = 1;fn main() {let host = env::args().nth(1).unwrap_or_else(||DFLT_BROKER.to_string());// Define the set of options for the create.// Use an ID for a persistent session.let create_opts = mqtt::CreateOptionsBuilder::new().server_uri(host).client_id(DFLT_CLIENT.to_string()).finalize();// Create a client.let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {println!("Error creating the client: {:?}", err);process::exit(1);});// Define the set of options for the connection.let conn_opts = mqtt::ConnectOptionsBuilder::new().keep_alive_interval(Duration::from_secs(20)).clean_session(true).finalize();// Connect and wait for it to complete or fail.if let Err(e) = cli.connect(conn_opts) {println!("Unable to connect:nt{:?}", e);process::exit(1);}// Create a message and publish it.// Publish message to 'test' and 'hello' topics.for num in 0..5 {let content = "Hello world! ".to_string() + &num.to_string();let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);if num % 2 == 0 {println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);} else {println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);}let tok = cli.publish(msg);if let Err(e) = tok {println!("Error sending message: {:?}", e);break;}}// Disconnect from the broker.let tok = cli.disconnect(None);println!("Disconnect from the broker");tok.unwrap();
}
消息订阅代码
为了提升使用体验,消息订阅做了断开重连的处理,并在重新建立连接后对主题进行重新订阅。
use std::{env,process,thread,time::Duration
};extern crate paho_mqtt as mqtt;const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_subscribe";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS:&[i32] = &[0, 1];// Reconnect to the broker when connection is lost.
fn try_reconnect(cli: &mqtt::Client) -> bool
{println!("Connection lost. Waiting to retry connection");for _ in 0..12 {thread::sleep(Duration::from_millis(5000));if cli.reconnect().is_ok() {println!("Successfully reconnected");return true;}}println!("Unable to reconnect after several attempts.");false
}// Subscribes to multiple topics.
fn subscribe_topics(cli: &mqtt::Client) {if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {println!("Error subscribes topics: {:?}", e);process::exit(1);}
}fn main() {let host = env::args().nth(1).unwrap_or_else(||DFLT_BROKER.to_string());// Define the set of options for the create.// Use an ID for a persistent session.let create_opts = mqtt::CreateOptionsBuilder::new().server_uri(host).client_id(DFLT_CLIENT.to_string()).finalize();// Create a client.let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {println!("Error creating the client: {:?}", err);process::exit(1);});// Initialize the consumer before connecting.let rx = cli.start_consuming();// Define the set of options for the connection.let lwt = mqtt::MessageBuilder::new().topic("test").payload("Consumer lost connection").finalize();let conn_opts = mqtt::ConnectOptionsBuilder::new().keep_alive_interval(Duration::from_secs(20)).clean_session(false).will_message(lwt).finalize();// Connect and wait for it to complete or fail.if let Err(e) = cli.connect(conn_opts) {println!("Unable to connect:nt{:?}", e);process::exit(1);}// Subscribe topics.subscribe_topics(&cli);println!("Processing requests...");for msg in rx.iter() {if let Some(msg) = msg {println!("{}", msg);}else if !cli.is_connected() {if try_reconnect(&cli) {println!("Resubscribe topics...");subscribe_topics(&cli);} else {break;}}}// If still connected, then disconnect now.if cli.is_connected() {println!("Disconnecting");cli.unsubscribe_many(DFLT_TOPICS).unwrap();cli.disconnect(None).unwrap();}println!("Exiting");
}
运行与测试
编译二进制文件
执行以下命令,会在 mqtt-example/target/debug
目录下生成消息订阅、发布对应的 sub
、pub
二进制文件。
cargo build
消息订阅
执行 sub
二进制文件,等待消费发布。
消息发布
执行 pub
二进制文件,可以看到分别往 rust/test
、rust/mqtt
这两个主题发布了消息。
同时在消息订阅中可看到发布的消息
至此,我们完成了使用 paho-mqtt 客户端连接到 公共 MQTT 服务器,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅。
版权声明: 本文为 EMQ 原创,转载请注明出处。
原文链接:https://www.emqx.io/cn/blog/how-to-use-mqtt-in-rust
mqtt客户端工具_如何在 Rust 中使用 MQTT相关推荐
- mqtt判断设备是否在线_如何在 Python 中使用 MQTT
Python 是一种广泛使用的解释型.高级编程.通用型编程语言.Python 的设计哲学强调代码的可读性和简洁的语法(尤其是使用空格缩进划分代码块,而非使用大括号或者关键词).Python 让开发者能 ...
- python mqtt库_如何在 Python 中使用 MQTT
Python 是一种广泛使用的解释型.高级编程.通用型编程语言.Python 的设计哲学强调代码的可读性和简洁的语法(尤其是使用空格缩进划分代码块,而非使用大括号或者关键词).Python 让开发者能 ...
- photoshop标尺工具_如何在Photoshop中使用和掌握非常困难的钢笔工具
photoshop标尺工具 Photoshop and Illustrator both have a dark point many users choose to avoid-the notori ...
- hashmap储存有向图_如何在Rust中构建向量的HashMap?
将[]放到HashMap上是(现已弃用)get(..)函数的糖,该声明是: fn get(&'a self, k: &K) -> &'a V 并返回一个常量(&) ...
- aes离线解密工具_如何在Python中解密OpenSSL AES加密文件?
OpenSSL为AES加密提供了一种流行的(但不安全 - 见下文!)命令行界面: openssl aes-256-cbc -salt -in filename -out filename.enc Py ...
- figma设计_如何在Figma中构建设计入门套件(第1部分)
figma设计 Figma教程 (Figma Tutorial) Do you like staring at a blank canvas every time you start a new pr ...
- 在excel日期比对大小_如何在Excel中防止分组日期
在excel日期比对大小 As a teenager, group dates can be fun. If you have strict parents, that might be the on ...
- 表格在整个html居中显示,html 表格字符居中显示_如何在HTML中居中显示表格?
html 表格字符居中显示_如何在HTML中居中显示表格? html 表格字符居中显示_如何在HTML中居中显示表格? html 表格字符居中显示 HTML table provides the ab ...
- MQTT客户端工具介绍
简述 由于在工作当中需要监测设备的数据,所以用MQTT客户端工具来订阅和发布消息.我了解到的客户端,本文希望能对你有所帮助 MQTT客户端工具 1.MQTT-spy(基于java) 1.下载链接: M ...
最新文章
- 微信小程序顶部tab切换以及滑动
- 《OpenCV3编程入门》学习笔记5 Core组件进阶(六)输入输出XML和YAML文件
- 二叉排序树经典算法速成
- 期货与期权(part3)--期货合约和期权合约
- UVA 12501 Bulky process of bulk reduction ——(线段树成段更新)
- 【VMware虚拟机】使用SSH连接VMware上的Linux虚拟机(主机互通也可访问外网)
- SPA单页应用的优缺点
- C语言实现常用排序算法——基数排序
- java中list、set和map 的区别(转)
- DbgView远程调试
- Xshell6与Xftp6下载
- java面试题——常见项目真实面试题(实际面试被问到)
- 花了一万多买的web前端全套教程,现在分享给大家
- 第十三届蓝桥杯模拟赛(第三期)试题与题解 C++
- Excel制作+导出
- 小米电视4A Android8,小米电视4A 删除内置应用及其去广告攻略
- Arduino教程-11. PIR传感器
- 简单说说rebuttal
- c语言中字符表,C语言指令表与符号表
- JAVA代码实现扫码购带圆图二维码生成
热门文章
- Oracle的SQL语法提示30例,INDEX_JOIN,ORDERED,USE_NL,LEADING
- 子程序入口参数是什么_三菱FX PLC | 什么是中断服务?没事多看几遍
- bigdecimal 科学计数转普通计数_LoaRunner性能测试教程:Windows计数器(2)
- 电脑显示未安装任何音频输出设备_音频频谱分析仪插件Voxengo SPAN Plus介绍及安装教程...
- Android添加gdb symbols
- ckati与ninja构建demo
- Typora MarkDown语法笔记(一)
- ios实现图片动画效果
- mysql数据库逻辑备份与恢复_Mysqldump逻辑备份与恢复
- 设计代码说明什么是多态性?如何实现多态?(代码中要写注释解释)_狗屎一样的代码!快,重构我!...