目录

  • 什么是mqtt
    • 简介
    • 特性
    • 实现方式
  • apollo1.7服务器的下载和使用
    • 下载
    • 使用
  • Springboot整合mqtt客户端
    • Springboot和mqtt的依赖
    • 客户端代码
    • Controller代码
    • 启动类
  • 演示
    • 连接apollo服务器
    • 客户端之间的交互
  • 遇到的问题
  • 源码

什么是mqtt

简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级协议,该协议构建于TCP/IP协议之上,MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
   MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议 是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

特性

MQTT协议工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

(1)使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。

(2)对负载内容屏蔽的消息传输。

(3)使用TCP/IP提供网络连接。

主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。

(4)有三种消息发布服务质量:

“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。

“至少一次”,确保消息到达,但消息重复可能会发生。

“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。

(5)小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。

这就是为什么在介绍里说它非常适合“在物联网领域,传感器与服务器的通信,信息的收集”,要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。

实现方式

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

(2)payload(message),可以理解为消息的内容,是指订阅者具体要使用的内容。

apollo1.7服务器的下载和使用

下载

apache官网对于apollo已经不更新了,我传上来了,大家可以去我这里,0积分就能下。

使用

下载后解压,然后cmd进入bin目录下,


然后执行命令,apollo.cmd create mybroker

create mybroker1之后会在bin目录下生成mybroker1文件夹,里面包含有很多信息,其中etc\apollo.xml文件下是配置服务器信息的文件,etc\users.properties文件包含连接MQTT服务器时用到的用户名和密码,默认账号和密码分别为,admin和password。
接着进入到mybroker1的bin目录下,并执行命令apollo-broker.cmd run,启动服务器。

最后可以在浏览器中输入http://127.0.0.1:61680/查看是否安装成功。

输入用户名:admin,密码:password,登录。

Springboot整合mqtt客户端

Springboot和mqtt的依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

客户端代码

@Component
public class client1 {public client1() throws MqttException {}//主题名String topic = "test1";//QoS服务质量等级int qos = 1;//访问服务器地址private String broker="tcp://127.0.0.1:61613";//账号String userName="admin";//密码String password="password" ;String clientId = "Client1";// 内存存储MemoryPersistence persistence = new MemoryPersistence();// 创建客户端MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
/初始化设置订阅的回调public void init(){sampleClient.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {}//当有订阅的消息时会从这里接收public void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("clien1收到主题为:"+topic+"的消息。------\n"+"消息为::"+new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------"+ token.isComplete());}});}
///
public void connect() throws MqttException {// 创建链接参数MqttConnectOptions connOpts = new MqttConnectOptions();// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接connOpts.setCleanSession(true);// 设置连接的用户名connOpts.setUserName(userName);connOpts.setPassword(password.toCharArray());// 建立连接IMqttToken iMqttToken = sampleClient.connectWithResult(connOpts);boolean r=iMqttToken.isComplete();if(r){System.out.println("client1连接到服务器成功");}else {System.out.println("client1连接到服务器失败");}}

public void publish(String mes) throws MqttException {// 创建消息MqttMessage message = new MqttMessage(mes.getBytes());// 设置消息的服务质量message.setQos(qos);// 发布消息sampleClient.publish(topic, message);}
/
public void subscribe(String topic) throws MqttException {//订阅消息sampleClient.subscribe(topic, qos);}///public void disconnect() throws MqttException {// 断开连接sampleClient.disconnect();// 关闭客户端sampleClient.close();}
}

我一共建了3客户端类,分别是client1、client2、client3,代码基本上都一样,上面的是client1。这三个类加上@Component注解后,在创建Spring容器时就会在容器中创建这三个类的实例,要使用时用@Autowired注解注入一下就好了。
然后对于client1我这里让他发布消息时,主题默认是"test1",client2的默认为"test2",client3的默认为"test3"(偷个懒)。(client1、client2、client3代码绝大部分都相同,之后topic和clientId这两个变量不一样)

Controller代码

@RestController
@RequestMapping("mqttDemo")
public class MqttDemoController {@Autowiredpublic client1 c1;@Autowiredpublic client2 c2;@Autowiredpublic client3 c3;@RequestMapping("init")//先让用户client1、client2、client3连接上服务器public void init() throws MqttException {c1.init();c1.connect();c2.init();c2.connect();c3.init();c3.connect();}@RequestMapping("client1/sub/{topic}")//让用户client1订阅一个话题public void client1_sub( @PathVariable String topic) throws MqttException {c1.subscribe(topic);System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"---client1订阅了名为:"+topic+"的。");}@RequestMapping("client1/pub/{mes}")//让用户主题client1发布一个消息(话题默认为"test1")public void client1_pub( @PathVariable String mes) throws MqttException {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));c1.publish(mes);}/@RequestMapping("client2/sub/{topic}")//让用户client2订阅一个话题public void client2_sub( @PathVariable String topic) throws MqttException {c2.subscribe(topic);System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"---client2订阅了名为:"+topic+"的主题。");}@RequestMapping("client2/pub/{mes}")//让用户client2发布一个消息(话题默认为"test2")public void client2_pub( @PathVariable String mes) throws MqttException {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));c2.publish(mes);}@RequestMapping("client3/sub/{topic}")//让用户client3订阅一个话题public void client3_sub( @PathVariable String topic) throws MqttException {c3.subscribe(topic);System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"---client3订阅了名为:"+topic+"的主题。");}@RequestMapping("client3/pub/{mes}")//让用户client3发布一个消息(话题默认为"test3")public void client_pub( @PathVariable String mes) throws MqttException {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));c3.publish(mes);}}

通过游览器输入网址的方式执行这些业务方法。主要就是一个客户都有两个动作,即订阅和发布。然后都是通过@PathVariable获取路径中的参数,效果都在控制台打印出来。

启动类

@SpringBootApplication
public class SpringbootApplication {public static void main(String[] args) throws MqttException {SpringApplication.run(SpringbootApplication.class, args);}}

演示

连接apollo服务器

先连接上apollo服务器,通过访问http://localhost:8080/mqttDemo/init,执行MqttDemoController里的–init()–方法。

控制台会打印信息,

去apollo的控制台,检查是否真的连接上了服务器。


连接成功。

客户端之间的交互

连接到服务器后,客户端之间就可以交互了。
比如我现在要让client3订阅一个名为test1的话题(这个话题的信息默认由client1发布),访问http://localhost:8080/mqttDemo/client3/sub/test1,执行MqttDemoController里的–client3_sub()–方法。


订阅成功。

然后现在我让client1发一条信息(当然默认主题是test1),client3就会收到。访问http://localhost:8080/mqttDemo/client1/pub/我是client1,执行MqttDemoController里的–client1_pub()–方法。

接收成功。

最后这里可以实现一对多、多对一,就不一一演示了。

遇到的问题

刚接触mqtt这个协议不久,在做这个demo时遇到一个小问题,
就是在这个连接函数里的connOpts.setCleanSession(true);这句代码的意思是每次连接到服务器都以新的身份连接,然后我一开始是false,导致我上次的测试时的客户端之间的订阅关系都被记录了,影响了我下一次的测试。

比如,上次的测的时候client1订阅了test2,然后这次测时,无论client有没有订阅test2,都会收到这个话题的信息。

源码

demo源码

Springboot整合mqtt客户端,实现客户端之间的交互(mqtt服务器为apollo1.7)【windows下]】相关推荐

  1. SpringBoot整合WebSocket(获取客户端真实ip)

    遇到"后台推送"之类的需求,自然是躲不开websocket了.这一次遇到的需求有点特殊,客户端的ip是固定,需要根据客户端的ip来分辨具体是哪个客户端. 不过,为了方便以后使用,我 ...

  2. SpringBoot整合第三方技术学习笔记(自用)

    SpringBoot整合第三方技术学习笔记 搬运黑马视频配套笔记 KF-4.数据层解决方案 KF-4-1.SQL 回忆一下之前做SSMP整合的时候数据层解决方案涉及到了哪些技术?MySQL数据库与My ...

  3. SpringBoot整合emqtt

    SpringBoot整合emqtt 一.emqtt安装 二.相关配置 1. 在pom文件下添加以下maven依赖: 2. 在yml文件中进行mqtt的连接配置 三.springboot-emqtt的整 ...

  4. 2019.12.24笔记——SpringBoot整合Elasticsearch及其使用

    目录 Elasticsearch与springboot整合的方式 原生客户端 REST Jest Spring Data Spring Data的配置 Spring Data的使用 插入或修改数据 删 ...

  5. 【学习笔记】在windows下进行基于TCP的本地客户端和服务端socket通信

    文章目录 socket介绍 java中使用socket 基于tcp的socket通信 使用ServerSocket类创建一个web服务器:(java) windows下的基于tcp的socket编程( ...

  6. redis客户端Jedis和Luttuce的区别,并使用springboot整合

    Lettuce 和 Jedis 的定位都是Redis的client,所以他们当然可以直接连接redis server.也就是说这两种都可以是redis的客户端. Jedis Jedis在实现上是直接连 ...

  7. SpringBoot整合WebService(服务端+客户端)

    SpringBoot整合WebService(服务端+客户端) 文章目录 SpringBoot整合WebService(服务端+客户端) 一.服务端 1.项目结构 2.创建好SpringBoot项目后 ...

  8. 关于SpringBoot整合Netty客户端和服务端实现JT808协议

    关于SpringBoot整合Netty客户端和服务端实现JT808协议 最近做了一个使用netty实现交通部JT808协议的项目,对比了mina和netty两种框架的使用,先整理一下netty的实现过 ...

  9. 物联网:SpringBoot 集成Websocket 前后端客户端 及 mqtt 实现设备联动

    项目背景: 实现人离开房间超过一定时间,自动关闭空调联动控制功能. 实现原理: 通过客流密度摄像机监测客流人数变化,发送订阅消息到mqtt 消息服务器,WEB后台服务器订阅mqtt主题,接收客流密度摄 ...

  10. SpringBoot整合Redis客户端

    一.SpringBoot整合Redis的步骤 1. 导入SpringBoot整合Redis坐标,starter <dependency><groupId>org.springf ...

最新文章

  1. java h5获取ip,websocket中获取客户端通信的真实IP
  2. php 值不进行解码,无法解码PHP中的JSON值
  3. Quartz 2 Scheduler示例
  4. 如何通俗理解计算机视觉、计算机图形、图像处理之间的区别与联系
  5. AMD、CMD、CommonJs、ES6的对比
  6. Python数据分析学习笔记:Python数据可视化入门
  7. 【干货下载】2020新基建展望:新战略、新动力、新格局.pdf(附下载链接)
  8. 各种【icon】矢量图
  9. 开源游戏java引擎_基于Java的开源3D游戏引擎jMonkeyEngine
  10. 产品经理面试习题大汇总
  11. 官方高清标准地图素材下载地址
  12. OneZero第一次会议(非正式)
  13. STM32debug模式下可以执行,但是不能单步调试和跳转
  14. 毕业论文图片、公式自动编号和交叉索引教程——真的超好用
  15. 拼多多登陆 JS 密码字段加密解析
  16. HTML静态网页作业:使用html+css制作北京黎红学院学校网站 (4个页面)
  17. Meta-Classifier in Membership Inference
  18. Vue3+Ts(coderwhy)超详细学习笔记(二)邂逅Vue3开发
  19. linux网络配置ifconfig
  20. 小米蓝牙音箱固件、升级工具

热门文章

  1. 微信内置浏览器不支持下载文件的解决方案
  2. 存储过程和函数的操作
  3. SmartAdmin开源简单的门户网站管理系统
  4. Jenkins教程(八)实现 GitLab 触发 Jenkins 自动按模块发布前端
  5. Mongo 多语言模糊匹配
  6. iso是什么意思/iso9001质量管理体系认证有哪些标准
  7. GPU图形加速型云服务器是什么?
  8. 富士康计划将苹果生产线转移到越南,是什么原因呢?
  9. Thor 1.5.3中文版 (使用自定义按键启动软件)
  10. 海康摄像头如何查看IP,重置密码