mqtt 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放,简单,轻量级,且易于实现,这些优点使得他受用于任何环境

该协议的特点有:

使用发布/订阅消息的模式,提供一对多的消息发布,解除应用程序耦合

对负载内容屏蔽的消息传输

使用TCP/IO 提供的网络连接

有三种消息发布服务质量:

"至多一次",消息发布完全依赖底层TCP/IP 网络,会发生消息丢失或者重复,这一级别可用于如下情况,环境,传感器数据,丢失一次度记录无所谓,因为不久之后会有第二次发送

"至少一次" 确保消息到达,但消息重复可能发生

“只有一次",确保消息到达一次,这一级别可用于如下情况,在计费系统中,消息重复或者丢失导致不正确的结果

小型传输,开销很小(固定床都的头部是2个字节),协议变换最小化,以降低网络流量

使用Last will和Testament 特性通知有关客户端异常中断的机制

1:配置环境

2:服务器端程序

package com.example;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;/*** Description*   服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题* Company Beijing * author  youxuan  E-mail:xuanyouwu@163.com* date createTime:16/7/13* version*/
public class Server {public static final String HOST = "tcp://101.200.133.189:1883";public static final String TOPIC = "toclient/124";public static final String TOPIC125 = "toclient/125";private static final String clientid = "server";private MqttClient client;private MqttTopic topic;private MqttTopic topic125;private String userName = "admin";private String passWord = "password";private MqttMessage message;public Server() throws MqttException {// MemoryPersistence设置clientid的保存形式,默认为以内存保存client = new MqttClient(HOST, clientid, new MemoryPersistence());connect();}private void connect() {MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(false);options.setUserName(userName);options.setPassword(passWord.toCharArray());// 设置超时时间options.setConnectionTimeout(10);// 设置会话心跳时间options.setKeepAliveInterval(20);try {client.setCallback(new PushCallback());client.connect(options);topic = client.getTopic(TOPIC);topic125 = client.getTopic(TOPIC125);} catch (Exception e) {e.printStackTrace();}}public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException,MqttException {MqttDeliveryToken token = topic.publish(message);token.waitForCompletion();System.out.println("message is published completely! "+ token.isComplete());}public static void main(String[] args) throws MqttException {Server server = new Server();server.message = new MqttMessage();server.message.setQos(2);server.message.setRetained(true);server.message.setPayload("给客户端124推送的信息:wuyouxuan".getBytes());server.publish(server.topic, server.message);server.message = new MqttMessage();server.message.setQos(2);server.message.setRetained(true);server.message.setPayload("给客户端125推送的信息:wuyouxuan".getBytes());server.publish(server.topic125, server.message);System.out.println(server.message.isRetained() + "------ratained状态");}
}

回调监听

package com.example;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;  /**  * 发布消息的回调类  *   * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。  * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。* 在回调中,将它用来标识已经启动了该回调的哪个实例。  * 必须在回调类中实现三个方法:  *   *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。  *   *  public void connectionLost(Throwable cause)在断开连接时调用。  *   *  public void deliveryComplete(MqttDeliveryToken token))  *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。  *  由 MqttClient.connect 激活此回调。  *   */
public class PushCallback implements MqttCallback {  public void connectionLost(Throwable cause) {  // 连接丢失后,一般在这里面进行重连  System.out.println("连接断开,可以做重连");  }  public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());  }public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面  System.out.println("接收消息主题 : " + topic);  System.out.println("接收消息Qos : " + message.getQos());  System.out.println("接收消息内容 : " + new String(message.getPayload()));  }
}

3:客户端程序

package com.example;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.concurrent.ScheduledExecutorService;/*** Description*   客户端程序* Company Beijing * author  youxuan  E-mail:xuanyouwu@163.com* date createTime:16/7/13* version*/
public class Client {public static final String HOST = "tcp://101.200.133.189:1883";public static final String TOPIC = "toclient/124";  private static final String clientid = "client124";  private MqttClient client;  private MqttConnectOptions options;  private String userName = "admin";private String passWord = "password";private ScheduledExecutorService scheduler;  private void start() {  try {  // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存  client = new MqttClient(HOST, clientid, new MemoryPersistence());  // MQTT的连接设置  options = new MqttConnectOptions();  // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接  options.setCleanSession(true);  // 设置连接的用户名
            options.setUserName(userName);  // 设置连接的密码
            options.setPassword(passWord.toCharArray());  // 设置超时时间 单位为秒  options.setConnectionTimeout(10);  // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制  options.setKeepAliveInterval(20);  // 设置回调  client.setCallback(new PushCallback());  MqttTopic topic = client.getTopic(TOPIC);  //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息    options.setWill(topic, "close".getBytes(), 2, true);  client.connect(options);  //订阅消息  int[] Qos  = {1};  String[] topic1 = {TOPIC};  client.subscribe(topic1, Qos);  } catch (Exception e) {  e.printStackTrace();  }  }  public static void main(String[] args) throws MqttException {     Client client = new Client();  client.start();  }
}

参考网站:

https://segmentfault.com/a/1190000002809450#articleHeader0

github 开源项目   https://github.com/fusesource/mqtt-client

运行效果:

转载于:https://www.cnblogs.com/jiangzhaowei/p/9152646.html

mqtt 异步消息 长连接 解析相关推荐

  1. MQTT与TCP 长连接

    前言 在接触到MQTT之后,总是会有疑问,为什么用MQTT不用TCP长连接透传?看起来[TCP长连接+私有协议透传]和[MQTT+业务主题]似乎都能达到同样的目的,甚至用MQTT会使得设备端逻辑实现. ...

  2. google的api key调用次数是多少_Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?...

    前言 在很多互联网应用系统中,请求处理异步化是提升系统性能一种常用的手段,而基于消息系统的异步处理由于具备高可靠性.高吞吐量的特点,因而在并发请求量比较高的互联网系统中被广泛应用.与此同时,这种方案也 ...

  3. 长连接与短连接、全双工与半双工,单工

    长连接: 连接建立后,需要通过心跳继续维持连接,这样发消息的时候不用每次都建立连接. 通信的过程:建立连接 --> 数据传输 -->  维持心跳  --> 数据传输  -->  ...

  4. 移动互联网消息推送原理:长连接+心跳机制(MQTT协议)

    互联网推送消息的方式很常见,特别是移动互联网上,手机每天都能收到好多推送消息,经过研究发现,这些推送服务的原理都是维护一个长连接(要不不可能达到实时效果),但普通的socket连接对服务器的消耗太大了 ...

  5. 转 互联网推送服务原理:长连接+心跳机制(MQTT协议)

    http://blog.csdn.net/zhangzeyuaaa/article/details/39028369 目录(?)[-] 无线移动网络的特点 android系统的推送和IOS的推送有什么 ...

  6. mqtt连接失败_Netty实战:如何让单机下Netty支持百万长连接?

    单机下能不能让我们的网络应用支持百万连接?可以,但是有很多的工作要做.而且要考虑到单机的系统资源消耗能否支撑百万并发 一.操作系统优化 首先就是要突破操作系统的限制. 在Linux平台上,无论编写客户 ...

  7. 如何实现android和服务器长连接呢?推送消息的原理

    前言:现在的大多数移动端应用都有实时得到消息的能力,简单来说,有发送消息的主动权和接受消息的被动权.例如:微信,QQ,天气预报等等,相信好处和用户体验相信大家都知道吧. 提出问题:这种功能必须涉及cl ...

  8. mqtt如何发送心跳 安卓_互联网推送服务原理:长连接+心跳机制(MQTT协议)

    互联网推送消息的方式很常见,特别是移动互联网上,手机每天都能收到好多推送消息,经过研究发现,这些推送服务的原理都是维护一个长连接(要不不可能达到实时效果),但普通的socket连接对服务器的消耗太大了 ...

  9. 互联网推送服务原理:长连接+心跳机制(MQTT协议)

    互联网推送消息的方式很常见,特别是移动互联网上,手机每天都能收到好多推送消息,经过研究发现,这些推送服务的原理都是维护一个长连接(要不不可能达到实时效果),但普通的socket连接对服务器的消耗太大了 ...

最新文章

  1. CSS3选择非第一个子元素
  2. ARM系统中断产生流程
  3. CCCC-GPLT L1-033. 出生年 天梯赛
  4. Introduction to Computer Networking学习笔记(十五):End to End Delay 端对端延迟
  5. (搬运)手机卫星通信详细科普图漫版
  6. Louvain 算法原理 及设计实现
  7. python判断字符串为空,Python判断字符串是否为空和null方法实例
  8. js方法禁止查看源文件、防止复制、禁止右键、防被框架的方法总结
  9. win7开机密码_win7忘记开机密码怎样才能打开电脑?别再用那些错误的方法了
  10. centos7 wget无法解析主机域名的解决办法
  11. C语言实现洗牌发牌程序,用C语言实现的扑克牌洗牌程序
  12. 【开源访谈】ECharts 作者 林峰 访谈实录
  13. python倒数切片_python切片
  14. 利用INFOPATH2007VS2005开发MOSS工作流详解 --收藏
  15. spyder替换_Spyder简单使用
  16. Tableau 中多张表的联接
  17. mac pdf去水印_PDF水印工具for Mac-PDF水印工具Mac版下载 V1.7-PC6苹果网
  18. mysql 设置 sql_mode
  19. 哪些食物会使皮肤变黑?
  20. ThreadPoolExecutor详解及线程池优化

热门文章

  1. SpringMVC处理自定义异常,通过读取配置文件把错误信息显示在前台页面
  2. 专访阿里云域名与网站业务总经理宋瑛桥:域名未来将更加个性化、生态化和规范化...
  3. Guice系列之用户指南(十)
  4. 使用***搭建javaweb环境
  5. Win32基础知识5 - Win32汇编语言006
  6. C#3.0新特性小结(2)
  7. 互联网与CTI技术结合之商业应用
  8. 优化算法-共轭梯度法
  9. (转载)机器学习知识点(十一)隐马尔可夫模型
  10. 日志分析平台ELK部署初学