MQTT的学习研究(五) MQTT moquette 的 Blocking API 发布消息服务端使用
参看官方文档:
http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/index.jsp?topic=/com.ibm.mq.amqtat.doc/tt00000_.htm
* Java 为 MQ Telemetry Transport 创建异步发布程序
*在此任务中,您将遵循教程来修改第一个发布程序。通过修改,
*使应用程序能够发送发布而不等待传递确认信息。传递确认
*信息由您创建的回调类来接收。
*
*
*
*4.使客户机断开连接
* a.除去其中包含 token.waitForCompletion 表达式的语句。 主线程将继续执行,而不等待传递发布。
* b.测试客户机是否已断开连接。 将错误返回到 MqttCallback 中的 lostConnection 方法之后,MQTT 客户机将断开连接,客户机应用程序也可能断开连接。测试是否有打开的连接。
* c.使用常量 Example.quiesceTimeout 来设置使客户机停顿的最长时间。
* if (client.isConnected())
* client.disconnect(Example.quiesceTimeout);
*当满足下面三种情况的组合形式时,客户机就完成了:
* a.已经对在此会话中(如果重新启动了会话,则是在先前会话中)已发布的所有消息调用了回调。
* b.消息未完成,然而停顿时间间隔已到期。缺省情况下,停顿时间间隔为 30 秒。通过将要等待的毫秒数作为 client.disconnect 的一个参数来传递,即可更改停顿超时。
* c.在发布了某些消息并由客户机进行排队之后,但是在发送这些消息之前调用了 client.disconnect。已排队的消息尚未处于“未完成”状态。如果会话可重新启动,那么重新启动会话时就会重新发送消息。
* 缺省情况下,停顿时间间隔为 30 秒。
MQTT的消息发布代码:
- package com.etrip.wsmqtt.server;
- import com.ibm.micro.client.mqttv3.MqttClient;
- import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
- import com.ibm.micro.client.mqttv3.MqttMessage;
- import com.ibm.micro.client.mqttv3.MqttTopic;
- /**
- * 使用 Java 为 MQ Telemetry Transport 创建异步发布程序
- *
- *
- *
- *
- * 消息发布的类的具体的实现
- *
- * @author longgangbai
- *
- */
- public class WSMQTTServerPubAsync {
- public static void main(String[] args) {
- try {
- //创建MqttClient对象
- MqttClient client = new MqttClient(WSMQTTServerCommon.TCPAddress, WSMQTTServerCommon.clientId);
- //创建MQTT相关的主题
- MqttTopic topic = client.getTopic(WSMQTTServerCommon.topicString);
- //创建MQTT的消息体
- MqttMessage message = new MqttMessage();
- //设置消息传输的类型
- message.setQos(2);
- //设置是否在服务器中保存消息体
- message.setRetained(false);
- //设置消息的内容
- message.setPayload(WSMQTTServerCommon.publication.getBytes());
- //创建一个MQTT的回调类
- WSMQTTServerCallBack callback = new WSMQTTServerCallBack(WSMQTTServerCommon.clientId);
- //MqttClient绑定
- client.setCallback(callback);
- //MqttClient连接
- client.connect();
- System.out.println("Publishing \"" + message.toString()
- + "\" on topic \"" + topic.getName() + "\" with QoS = "
- + message.getQos());
- System.out.println("For client instance \"" + client.getClientId()
- + "\" on address " + client.getServerURI() + "\"");
- //发送消息并获取回执
- MqttDeliveryToken token = topic.publish(message);
- System.out.println("With delivery token \"" + token.hashCode()
- + " delivered: " + token.isComplete());
- Thread.sleep(100000000000000l);
- //关闭连接
- if (client.isConnected())
- client.disconnect(WSMQTTServerCommon.quiesceTimeout);
- System.out.println("Disconnected: delivery token \"" + token.hashCode()
- + "\" received: " + token.isComplete());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
MQTT消息发布回调代码:
- package com.etrip.wsmqtt.server;
- import com.ibm.micro.client.mqttv3.*;
- /**
- * 发布消息的回调类
- *
- * 必须实现MqttCallback的接口并实现对应的相关接口方法
- * ◦CallBack 类将实现 MqttCallBack。每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。在回调中,将它用来标识已经启动了该回调的哪个实例。
- * ◦必须在回调类中实现三个方法:
- *
- * public void messageArrived(MqttTopic topic, MqttMessage message)
- * 接收已经预订的发布。
- *
- * public void connectionLost(Throwable cause)
- * 在断开连接时调用。
- *
- * public void deliveryComplete(MqttDeliveryToken token))
- * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
- *
- *
- * ◦由 MqttClient.connect 激活此回调。
- *
- * @author longgangbai
- */
- public class WSMQTTServerCallBack implements MqttCallback {
- private String instanceData = "";
- public WSMQTTServerCallBack(String instance) {
- instanceData = instance;
- }
- /**
- * 接收到消息的回调的方法
- */
- public void messageArrived(MqttTopic topic, MqttMessage message) {
- try {
- System.out.println("Message arrived: \"" + message.toString()
- + "\" on topic \"" + topic.toString() + "\" for instance \""
- + instanceData + "\"");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * 消息连接丢失
- */
- public void connectionLost(Throwable cause) {
- System.out.println("Connection lost on instance \"" + instanceData
- + "\" with cause \"" + cause.getMessage() + "\" Reason code "
- + ((MqttException)cause).getReasonCode() + "\" Cause \""
- + ((MqttException)cause).getCause() + "\"");
- cause.printStackTrace();
- }
- /**
- *
- */
- public void deliveryComplete(MqttDeliveryToken token) {
- try {
- System.out.println("Delivery token \"" + token.hashCode()
- + "\" received by instance \"" + instanceData + "\"");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
常量类:
- package com.etrip.wsmqtt.server;
- import java.util.UUID;
- /**
- *
- * 消息发布消息的常量字段
- *
- * @author longgangbai
- */
- public final class WSMQTTServerCommon {
- //发布broker的ip和端口
- public static final String TCPAddress =System.getProperty("TCPAddress", "tcp://192.168.208.46:1883");
- //客户端的Id
- public static String clientId =String.format("%-23.23s", System.getProperty("clientId", (UUID.randomUUID().toString())).trim()).replace('-', '_');
- //发布消息的主题
- public static final String topicString = System.getProperty("topicString", "china/beijing");
- //发布的消息
- public static final String publication =System.getProperty("publication", "Hello World " + String.format("%tc", System.currentTimeMillis()));
- //超时时间
- public static final int quiesceTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));
- public static final int sleepTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));
- public static final boolean cleanSession =Boolean.parseBoolean(System.getProperty("cleanSession", "false"));
- public static final int QoS =Integer.parseInt(System.getProperty("QoS", "1"));
- public static final boolean retained =Boolean.parseBoolean(System.getProperty("retained", "false"));
- }
MQTT的学习研究(五) MQTT moquette 的 Blocking API 发布消息服务端使用相关推荐
- 华为帐号服务学习笔记(四):Authorization Code模式服务端开发
笔者在<华为帐号服务学习笔记(二):OAuth2.0协议详解>中已经给大家介绍了Authorization Code模式是需要有后台服务器才能使用的,并且在<华为帐号服务学习笔记(三 ...
- Qt学习心得之网络编程简单的局域网聊天服务端建立
学而不思则罔,思而不学则殆.学习和思考是相辅相成的,通过这几天对网络编程的学习,收获颇丰.接下来我将利用Qt做的一个以TcpIp协议为传输方式的简单的局域网聊天服务端与大家分享下: 首先谈谈我个人对T ...
- MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例
使用IBM MQTTv3实现相关的发布订阅功能 MQTTv3的发布消息的实现: Java代码 package com.etrip.mqttv3; import com.ibm.micro.clie ...
- JPA学习 —— 第五课、JPA常用API详解
实体状态和转换 JPA提供一个持久化上下文作为一级缓存,提供自动脏检查.对应某个id的实例在持久化上下文中只有一个对象. 查询时总是尝试在当前上下文中先搜索对象,不存在再触发数据库查询. 托管状态的b ...
- 学习笔记(08):Python网络编程并发编程-实现服务端可以对多个客户端提供服务
立即学习:https://edu.csdn.net/course/play/24458/296237?utm_source=blogtoedu 链接循环,一个服务器服务多个客户端, 思路1:服务器一个 ...
- 理论篇五: 如何设计游戏棋牌平台 - 服务端 - 棋牌设计 - 一切皆步骤
整体架构图 之前几篇简单介绍了游戏中心这边的整体架构,关于后台中心的暂时不过多的介绍了. 今天开始进入棋牌设计阶段. 一切皆步骤 无论是竞技棋牌类的或是回合制类的游戏,甚至是很多其他类型的游戏,都可以 ...
- 学习太极创客 — MQTT(四)服务端连接操作
视频链接:https://www.bilibili.com/video/BV1T54y1k7MQ/?spm_id_from=trigger_reload&vd_source=b91967c49 ...
- 服务端Skynet(五)——如何搭建一个实例
服务端Skynet(五)--如何搭建一个实例 文章目录 服务端Skynet(五)--如何搭建一个实例 1.配置文件 2.服务消息分发与回应(call/send) 3.通信(server/client) ...
- [C/C++后端开发学习] 9 服务端百万并发测试
服务端百万并发测试 服务端并发的概念 常见单机服务模型 并发测试方法 socket数量的限制(描述符数量的限制) 客户端测试代码 测试结果 改进代码提升测试并发量 单线程多端口同时监听 修改代码: 测 ...
最新文章
- css如何让图片不平铺,css怎么设置图片平铺方式?
- java同步方法完成案例_Java同步代码块和同步方法原理与应用案例详解
- Redis-08Redis数据结构--基数HyperLogLog
- 获取最大轮廓 opencv
- Python常见问题(2):编程问题 Programming FAQ
- 移动端常见的一些兼容性问题
- 提高redis cluster集群的安全性,增加密码验证
- shell自动生成的文件有一个问号的后缀
- ios framework 调用第三方 framework_Python基础:标准库和常用的第三方库
- Python批量整理文件名小案例(附公众号第一批赠书活动中奖名单)
- ASP.NET前端解决方案之一:Ext.Net入门随笔1
- HTML inline 与block元素
- 好用的在线PS编辑器
- vue中的attribute 和 property 是什么意思
- 深度学习论文-Cyclical Learning Rates for Training Neural Networks
- nRF5340开发指南目录汇总
- x265 (HEVC编码器,基于x264) 介绍
- 生成对抗网络——原理解释和数学推导
- ubuntu20.04使用USB转串口进行串口调试
- 阿里云虚拟主机项目根目录指向public目录下
热门文章
- Windows 2008 R2中的NAP新功能详解
- Java学习笔记二十五:Java面向对象的三大特性之多态
- 36.intellij idea 如何一键清除所有断点
- Android学习笔记之Android Studio添加新的Activity
- 微软批量授权版WINDOWS 10资料(截至到2015年11月,此处无下载地址)
- 《Programming WPF》翻译 第7章 3.笔刷和钢笔
- 算法设计与分析之循环与递归
- 利用AutoSPSourceBuilder和Autospinstaller自动安装SharePoint Server 2013图解教程——Part 1...
- 关于Jfinal的分享代码托管GitHub
- [转]android selector 背景选择器