emqttd 集群在上一节我们已经配置完毕。emqttd默认占用1883端口用于MQTT连接,8083端口用于HTTP接口,通过HTTP接口从应用程序向MQTT客户端发布消息。下面我们通过java实现消息的发布和订阅。

1.pom

使用mqtt-client java客户端,添加依赖jar

org.fusesource.mqtt-client

mqtt-client

1.12

2.代码实现

订阅者订阅node2节点端口

package mqtt;

import java.net.URISyntaxException;

import org.fusesource.mqtt.client.Future;

import org.fusesource.mqtt.client.FutureConnection;

import org.fusesource.mqtt.client.MQTT;

import org.fusesource.mqtt.client.Message;

import org.fusesource.mqtt.client.QoS;

import org.fusesource.mqtt.client.Topic;

/**

* 客户端订阅消息

*/

public class Client {

private final static String CONNECTION_STRING = "tcp://192.168.1.91:1883";

private final static boolean CLEAN_START = true;

private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s

private final static String CLIENT_ID = "client";

public static Topic[] topics = {

new Topic("mqtt/aaa", QoS.EXACTLY_ONCE), // 2 只有一次

new Topic("mqtt/bbb", QoS.AT_LEAST_ONCE), // 1 至少一次

new Topic("mqtt/ccc", QoS.AT_MOST_ONCE) }; // 0 至多一次

public final static long RECONNECTION_ATTEMPT_MAX = 6;

public final static long RECONNECTION_DELAY = 2000;

public final static int SEND_BUFFER_SIZE = 64 ;// 发送最大缓冲为2M

public static void main(String[] args) {

// 创建MQTT对象

MQTT mqtt = new MQTT();

try {

// 设置mqtt broker的ip和端口

mqtt.setHost(CONNECTION_STRING);

// 连接前清空会话信息

mqtt.setCleanSession(CLEAN_START);

// 设置重新连接的次数

mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);

// 设置重连的间隔时间

mqtt.setReconnectDelay(RECONNECTION_DELAY);

// 设置心跳时间

mqtt.setKeepAlive(KEEP_ALIVE);

// 设置缓冲的大小

mqtt.setSendBufferSize(SEND_BUFFER_SIZE);

//设置客户端id

mqtt.setClientId(CLIENT_ID);

// 获取mqtt的连接对象BlockingConnection ,采用Future模式 订阅主题

final FutureConnection connection = mqtt.futureConnection();

connection.connect();

connection.subscribe(topics);

while (true) {

Future futrueMessage = connection.receive();

Message message = futrueMessage.await();

System.out.println("MQTTFutureClient.Receive Message " + "Topic Title :" + message.getTopic() + " context :"

+ String.valueOf(message.getPayloadBuffer()));

}

} catch (URISyntaxException e) {

e.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

} finally {

}

}

}发布者通过node1发布消息

package mqtt;

import java.net.URISyntaxException;

import org.fusesource.mqtt.client.FutureConnection;

import org.fusesource.mqtt.client.MQTT;

import org.fusesource.mqtt.client.QoS;

import org.fusesource.mqtt.client.Topic;

/**

* 服务端发布消息

*/

public class Server {

private final static String CONNECTION_STRING = "tcp://192.168.1.90:1883";

private final static boolean CLEAN_START = true;

private final static String CLIENT_ID = "server";

private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s

public static Topic[] topics = {

new Topic("mqtt/aaa", QoS.EXACTLY_ONCE), // 2 只有一次

new Topic("mqtt/bbb", QoS.AT_LEAST_ONCE), // 1 至少一次

new Topic("mqtt/ccc", QoS.AT_MOST_ONCE) }; // 0 至多一次

public final static long RECONNECTION_ATTEMPT_MAX = 6;

public final static long RECONNECTION_DELAY = 2000;

public final static int SEND_BUFFER_SIZE = 64;// 发送最大缓冲

public static void main(String[] args) {

MQTT mqtt = new MQTT();

try {

//==MQTT设置说明

//设置服务端的ip

mqtt.setHost(CONNECTION_STRING);

//连接前清空会话信息 ,若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true

mqtt.setCleanSession(CLEAN_START);

//设置心跳时间 ,定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待

mqtt.setKeepAlive(KEEP_ALIVE);

//设置客户端id,用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。

//此ID应少于23个字符,默认根据本机地址、端口和时间自动生成

mqtt.setClientId(CLIENT_ID);

//==失败重连接设置说明

//设置重新连接的次数 ,客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1

mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);

//设置重连的间隔时间 ,首次重连接间隔毫秒数,默认为10ms

mqtt.setReconnectDelay(RECONNECTION_DELAY);

//设置socket发送缓冲区大小,默认为65536(64k)

mqtt.setSendBufferSize(SEND_BUFFER_SIZE);

设置发送数据包头的流量类型或服务类型字段,默认为8,意为吞吐量最大化传输

mqtt.setTrafficClass(8);

//==带宽限制设置说明

mqtt.setMaxReadRate(0);//设置连接的最大接收速率,单位为bytes/s。默认为0,即无限制

mqtt.setMaxWriteRate(0);//设置连接的最大发送速率,单位为bytes/s。默认为0,即无限制

//使用Future创建连接

final FutureConnection connection= mqtt.futureConnection();

connection.connect();

int count=1;

while(true){

count++;

// 用于发布消息,目前手机段不需要向服务端发送消息

//主题的内容

String message="Hello "+count+" MQTT...";

String topic = "mqtt/bbb";

connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE,

false);

System.out.println("MQTTFutureServer.publish Message "+"Topic Title :"+topic+" context :"+message);

}

} catch (URISyntaxException e) {

e.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

}finally{

}

}

}运行代码测试正常接收、发布消息。

也可以通过http请求的方式发布消息,如下:

curl -v --basic -u user:passwd -d "qos=2&retain=0&topic=mqtt/aaa&message=hello" -k http://192.168.1.90:8083/mqtt/publish

Name

Description

client

ClientId

qos

QoS(0, 1, 2)

retain

Retain(0, 1)

topic

Topic

message

Message

没有添加认证,用户密码可以随便填写。

RETAIN(保持)

仅针对PUBLISH消息。不同值,不同含义:

1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。

备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。

0:仅仅为当前订阅者推送此消息。

假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服务器可以删除掉对应的已被持久化的PUBLISH消息。

参考:https://github.com/emqtt/emqttd/wiki/HTTP%20Publish

java使用emqtt实现即时聊天_emqttd java代码测试相关推荐

  1. 在线即时通讯工具的网页即时聊天的html代码

    <a target=blank href=tencent://message/?uin=你的QQ号码&Site=您的网站/&Menu=yes><img border= ...

  2. java xmpp 框架_即时聊天IM之三 XMPP协议客户端库的和Android端框架概述

    合肥程序员群:49313181.    合肥实名程序员群:128131462 (不愿透露姓名和信息者勿加入) Q  Q:408365330     E-Mail:egojit@qq.com smack ...

  3. java实现仿QQ即时聊天

    这是我的java大作业,这里就直接贴上我的实验报告了. 2.0版已更新地址:Java仿QQ2.0版 项目已开源:github地址:imitate-qq 欢迎fork与star 仿微信App:canar ...

  4. java仿qq_「java qq」仿QQ聊天软件java实现(一) - seo实验室

    java qq 之前学java通信的时候写过简单的通信程序,但比较简陋,于是重新写了一个仿照QQ的聊天软件,主要在界面上做了优化,增加了一些功能.实现的功能有注册.登录.好友列表.分组.黑名单.添加好 ...

  5. Java+Swing+mysql仿QQ聊天工具

    Java+Swing+mysql仿QQ聊天工具 一.系统介绍 二.功能展示 1.用户登陆 2.好友列表 3.好友聊天 4.服务器日志 三.系统实现 四.其它 1.其他系统实现 2.获取源码 一.系统介 ...

  6. Java实现即时聊天:聊天服务器+聊天客户端+Web管理控制台

    来源:cnblogs.com/blogtimes/p/14767484.html 一.前言 说实话,写这个玩意儿是我上周刚刚产生的想法,本想写完后把代码挂上来赚点积分也不错.写完后发现这东西值得写一篇 ...

  7. im聊天软件Java即时通讯源码原生四端

    基本功能说明及介绍: 客户端:安卓,苹果,(可赠送web,pc) 开发语言: Java OC C# 运行软件:eclipse Java xcode 数据库:mongodb 环境:Linux Cento ...

  8. 【java毕业设计】基于java+原生Sevlet+socket的聊天室系统设计与实现(毕业论文+程序源码)——聊天室系统

    基于java+原生Sevlet+socket的聊天室系统设计与实现(毕业论文+程序源码) 大家好,今天给大家介绍基于java+原生Sevlet+socket的聊天室系统设计与实现,文章末尾附有本毕业设 ...

  9. 毕设 JAVA JSP 简单的OICQ聊天程序论文

    备注原文来源于:六月雪计算机毕业设计 JAVA即时通讯工具JICQ的设计与开发 摘  要 即时通讯(Instant Messaging)是目前Internet上最为流行的通讯方式,各种各样的即时通讯软 ...

最新文章

  1. VMware 收购 Kubernetes 初创公司 Heptio
  2. 服务器软RAID和LVM的实现
  3. 简化MVVM属性设置和修改 - .NET CORE(C#) WPF开发
  4. jsp过滤器示例_Java 8过滤器,地图,收集和流示例
  5. 设计模式----java的单例模式
  6. python获取当前时间和前一天时间
  7. python自定义函数画图_python matplotlib自定义colorbar颜色条-以及matplotlib中的内置色条...
  8. POJ2356 Find a multiple 鸽巢原理
  9. CAD复制,如何自由复制CAD图形?
  10. MIUI11线刷包精简
  11. 2018年迎春杯复赛入围名单(五年级)
  12. 企业微信机器人定时发送信息
  13. JS基础—选项卡套选项卡(函数传参)
  14. 华为荣耀X1相机或图库图标被删除后的恢复方法(不需要恢复出厂设置)!
  15. 基于jQuery实现表单提交验证
  16. Mac电脑如何给IDEA配置IDEA 公司发行的适合程序员编程字体
  17. 八拜之交是指哪八拜?
  18. 数据库系统概论之数据模型
  19. Windows自带远程桌面和远程协助用法
  20. 三农数据(1990-2020)六:生产性固定资产原值、耕地面积、可再生资源利用、水利等

热门文章

  1. Dell及其他电脑开启停电后来电重新开机
  2. 利用java集合框架实现扑克牌比大小游戏
  3. Android UI开发第四篇——实现像handcent sms或者chomp sms那样的气泡短信样式
  4. Android 12 SplashScreen(闪屏页)适配
  5. 【RocketMQ|源码分析】namesrv启动停止过程都做了什么
  6. Matlab中power函数的使用
  7. R报错tar: Failed to set default locale
  8. CSDN 会员版块问题解决日志
  9. 关于外网访问本地服务器 (家庭版)
  10. 爬虫工程师的进阶一览图(爬虫工程师水平对照表)根据崔庆才崔大神的文章总结的