前言

由于项目需要,目前需要使用Emqttd搭建一个聊天室,自己写了个demo,特记录下来

代码

使用IDEA搭建一个Spring Boot工程

pom.xml文件,此处我只列出dependencies部分

com.alibaba

fastjson

1.2.49

org.fusesource.mqtt-client

mqtt-client

1.14

org.springframework.boot

spring-boot-starter-freemarker

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-devtools

runtime

org.projectlombok

lombok

true

org.springframework.boot

spring-boot-starter-test

test

application.properties文件

server.port=8080

emqt.server=填你的Emqttd服务器地址

emqt.port=1883

emqt.host=tcp://${emqt.server}:${emqt.port}

emqt.clientId=spring-boot-client-${server.port}

emqt.subcribe.topic=/live_test/#

EmqtConfig.java,用于初始化MQTT客户端,配置监听器等

import org.fusesource.hawtbuf.Buffer;

import org.fusesource.hawtbuf.UTF8Buffer;

import org.fusesource.hawtdispatch.DispatchQueue;

import org.fusesource.mqtt.client.*;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

/**

* @author yangxin

* @date 2019/4/23

*/

@Configuration

public class EmqtConfig {

@Value("${emqt.host}")

private String host;

@Value("${emqt.clientId}")

private String clientId;

@Value("${emqt.subcribe.topic}")

private String topicName;

private static Lock lock = new ReentrantLock();

private static Map onlineMap = new ConcurrentHashMap<>();

private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Bean

public MQTT mqtt() {

try {

logger.info("====连接到mqtt===");

MQTT mqtt = new MQTT();

mqtt.setHost(host);

mqtt.setClientId(clientId);

mqtt.setReconnectDelay(100);

mqtt.setKeepAlive((short) 20);

return mqtt;

} catch (Exception e) {

logger.error(e.getMessage(), e);

return null;

}

}

@Bean

public CallbackConnection callbackConnection(MQTT mqtt) {

try {

CallbackConnection connection = mqtt.callbackConnection();

connection.listener(new Listener() {

@Override

public void onConnected() {

logger.info("连接成功");

}

@Override

public void onDisconnected() {

logger.info("断开连接");

}

@Override

public void onPublish(UTF8Buffer topic, Buffer message, Runnable callback) {

try {

lock.lock();

logger.info("收到topic:" + topic.toString() + "消息为:" + message.utf8());

//表示监听成功

String topicName = topic.toString();

if (topicName.startsWith("/liveOnline")) {

Long liveId = findNum(topicName);

Integer integer = onlineMap.get(liveId);

if (integer == null) {

integer = 0;

}

onlineMap.put(liveId, ++integer);

}

}finally {

callback.run();

lock.unlock();

}

}

@Override

public void onFailure(Throwable throwable) {

logger.error(throwable.getMessage(), throwable);

}

});

connection.connect(new Callback() {

@Override

public void onSuccess(Void aVoid) {

//连接成功后会默认订阅主题($client/mengsu)

logger.info(clientId + "连接成功");

}

@Override

public void onFailure(Throwable throwable) {

logger.error(throwable.getMessage(), throwable);

}

});

//新建一个主题

Topic[] topic = new Topic[]{new Topic(topicName, QoS.EXACTLY_ONCE),new Topic("/liveOnline/#",QoS.EXACTLY_ONCE)};

connection.subscribe(topic, new Callback() {

@Override

public void onSuccess(byte[] bytes) {

logger.info(clientId + " topic订阅成功");

}

@Override

public void onFailure(Throwable throwable) {

logger.info(clientId + " topic订阅 失败");

logger.error(throwable.getMessage(), throwable);

}

});

/* connection.publish("/live/1", "这是服务器自己发出来的消息".getBytes(), QoS.AT_LEAST_ONCE, true,new Callback() {

@Override

public void onSuccess(Void aVoid) {

System.out.println("发送成功");

}

@Override

public void onFailure(Throwable throwable) {

throwable.printStackTrace();

}

});*/

DispatchQueue dispatchQueue = connection.getDispatchQueue();

dispatchQueue.execute(new Runnable() {

public void run() {

//在这里进行相应的订阅、发布、停止连接等等操作

System.out.println("在这里进行相应的订阅、发布、停止连接等等操作");

}

});

return connection;

} catch (Exception e) {

logger.error(e.getMessage(), e);

return null;

}

}

private static Long findNum(String str) {

String regEx="[^0-9]";

Pattern p = Pattern.compile(regEx);

Matcher m = p.matcher(str);

String result = m.replaceAll("").trim();

return Long.valueOf(result);

}

public int getOnlineCount(Long liveId){

try {

lock.lock();

Integer integer = onlineMap.get(liveId);

return integer == null ? 0 : integer;

}finally {

lock.unlock();

}

}

}

至此,MQTT客户端就配置好了,下面是controller

IndexController.java

import com.alibaba.fastjson.JSONObject;

import com.example.emqtdemo.emqt.EmqtConfig;

import org.fusesource.mqtt.client.*;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;

import javax.servlet.http.HttpServletRequest;

/**

* @author yangxin

* @date 2019/4/23

*/

@Controller

public class IndexController {

@Autowired

private HttpServletRequest request;

@Resource

private CallbackConnection callbackConnection;

@Resource

private EmqtConfig emqtConfig;

@RequestMapping("/")

public String index(Long liveId,String username){

request.setAttribute("liveId", liveId);

request.setAttribute("username", username);

request.setAttribute("clientId","liveroom" + liveId + username);

request.setAttribute("topic","/live_test/" + liveId);

return "index";

}

// 发送消息

@RequestMapping("/send")

@ResponseBody

public Object send(String topic, String clientId,String msg) {

JSONObject content = new JSONObject();

content.put("clientId", clientId);

content.put("msg", msg);

callbackConnection.publish(topic, content.toJSONString().getBytes(), QoS.EXACTLY_ONCE, false,new Callback() {

@Override

public void onSuccess(Void aVoid) {

}

@Override

public void onFailure(Throwable throwable) {

}

});

JSONObject jsonObject = new JSONObject();

jsonObject.put("success", true);

jsonObject.put("content", msg);

return jsonObject;

}

// 获取在线人数

@RequestMapping("/getOnlineCount")

@ResponseBody

public Object getOnlineCount(Long liveId) {

int onlineCount = emqtConfig.getOnlineCount(liveId);

JSONObject jsonObject = new JSONObject();

jsonObject.put("success", true);

jsonObject.put("content", onlineCount);

return jsonObject;

}

}

index.ftl文件,用于展示

title

${username!}成功进入了${liveId!}直播间

当前在线人数:0


输入消息:

发送


取消

var clientId = '${clientId!}';

var username = '${username!}';

var topic = '${topic!}';

var liveId = '${liveId!}';

index.js,用于控制页面的一些逻辑

$(document).ready(function () {

// 将在全局初始化一个 mqtt 变量

console.log(mqtt)

// 连接选项

const options = {

connectTimeout: 4000, // 超时时间

// 认证信息

clientId: clientId, // 客户端id 这个自己填 尽量唯一

username: username, // 取当前用户的名字

password: '123',

}

const client = mqtt.connect('ws://your address:8083/mqtt', options)

// let topic = "/live_dev/${liveId}"

client.on('connect', (e) => {

console.log('成功连接服务器')

$("#connectionTip").html("成功连接到消息服务器")

// 订阅一个主题

client.subscribe(topic, { qos: 2 }, (error) => {

if (!error) {

console.log('订阅成功')

}

},onSubscribeSuccess)

})

/*

// 取消订阅

client.unubscribe(

// topic, topic Array, topic Array-Onject

'hello',

onUnubscribeSuccess,

)

*/

client.on('reconnect', (error) => {

console.log('正在重连:' + error)

})

client.on('error', (error) => {

console.log('连接失败:' + error)

})

function onSubscribeSuccess() {

client.publish('/liveOnline/' + liveId, liveId, { qos: 2, rein: false }, (error) => {

console.log(error || '发布成功')

})

}

function onUnubscribeSuccess() {

console.log("onUnubscribeSuccess")

}

// 监听接收消息事件

client.on('message', (topic, message,callback) => {

console.log('收到来自', topic, '的消息', message.toString());

let html = ` 收到来自 ${topic} 的消息:${ message.toString()}
`;

$("#msgContent").append(html);

})

$("#send").click(function () {

let conent = $("input[name='content']").val();

$.post("/send",{clientId:clientId,msg:conent,topic:topic},function (data) {

if (data.success){

console.log("发送成功")

}

},'json')

})

setInterval(function () {

$.post("/getOnlineCount",{liveId:liveId},function (data) {

if (data.success){

$("#onlineCount").html(data.content)

}

},'json')

},3000)

})

参考的文章

代码地址

emqttd java 即时通讯_使用Emqttd搭建一个聊天室相关推荐

  1. emqttd java 即时通讯_[emqttd] (EMQ)

    [emqttd] (EMQ)是采用Erlang语言开发,全面支持MQTT V3.1.1协议,支持集群和大规模连接的开源MQTT消息服务器. [emqttd]致力于发布一个基于Erlang/OTP语言平 ...

  2. 2021-06-14 Socketio学习使用搭建一个聊天室

    Socketio搭建一个聊天室 前言 本次实验所用编程语言为HTML以及javascript和JQurey语言和Socketio框架,所用编辑文本工具为VS code. 注意事项 (1)前端编程注意H ...

  3. java netty聊天室_netty实现消息中心(二)基于netty搭建一个聊天室

    前言 上篇博文(netty实现消息中心(一)思路整理 )大概说了下netty websocket消息中心的设计思路,这篇文章主要说说简化版的netty聊天室代码实现,支持群聊和点对点聊天. 此demo ...

  4. 用webstorm做一个跑马灯_用Workman做一个聊天室

    php中文网最新课程 每日17点准时技术干货分享 为什么要写这篇文章? 我学习Workman好几次了,每次都失败(没做成想要的功能,原谅我比较笨).但是这次也花了好几个小时,把之前没做成的功能实现了. ...

  5. Java聊天室程序源码 Java即时通讯代码 Java局域网聊天系统 Java即时通讯 Java聊天系统

    Java聊天室程序源码 Java即时通讯代码 Java局域网聊天系统  Java即时通讯 Java聊天系统 public Swingtest002() {// 设置标题setTitle("请 ...

  6. java聊天室小程序论文_在Java项目中利用continue与break制作一个聊天室小程序

    在Java项目中利用continue与break制作一个聊天室小程序 发布时间:2020-12-08 16:03:27 来源:亿速云 阅读:98 作者:Leah 在Java项目中利用continue与 ...

  7. 抓住语音社交风口,1天快速搭建语音聊天室

    语音聊天室孵化 一起KTV.众人大合唱.语音开黑.狼人杀.剧本杀.多人配音.观影.语音电台.相亲联谊社交等,一般都是在语音聊天室中进行,那么语音聊天室产品如此火热的原因有哪些呢? 一对一社交适用于朋友 ...

  8. 音视频---速搭建语音聊天室技术分析

    语音聊天室孵化 一起KTV.众人大合唱.语音开黑.狼人杀.剧本杀.多人配音.观影.语音电台.相亲联谊社交等,一般都是在语音聊天室中进行,那么语音聊天室产品如此火热的原因有哪些呢? 一对一社交适用于朋友 ...

  9. Java进阶:基于TCP的网络实时聊天室(socket通信案例)

    目录 开门见山 一.数据结构Map 二.保证线程安全 三.群聊核心方法 四.聊天室具体设计 0.用户登录服务器 1.查看当前上线用户 2.群聊 3.私信 4.退出当前聊天状态 5.离线 6.查看帮助 ...

最新文章

  1. 走向.NET架构设计—第三章—分层设计,初涉架构
  2. swoole 异步MYSQL
  3. 如何手动卸载 SQL Server 2005 实例(官方)
  4. 如何修改linux的MAC地址
  5. 制作wordpress页面的学习记录
  6. ITU衡量信息社会报告:我国ICT发展指数进入亚太前十
  7. vs 下如何调试js
  8. Delphi6及SqlServer对于生僻字䶮的支持测试
  9. 亚马逊警用刷脸计划小小受阻,但原因并不是贝佐斯妥协
  10. 直播平台实现视频监控
  11. 2019.5.25 Noip模拟测试2 T1题解
  12. android studio 包重复
  13. 免费下载遥感数据的网址
  14. 基于c#的区块链编程_3.区块链 · C#区块链编程入门教程-巴比特图书
  15. 安卓手机远程控制app
  16. 防晒新时代,小红书美妆品牌营销趋势洞察
  17. CSS绘制平行四边形
  18. 大疆文档(8)-Android教程-模拟器App
  19. 尚硅谷谷粒商城项目P16前端项目renren-fast-vue的bug,耗时三天终于运行起来了
  20. 网络爬虫之Selenium(可视化)爬虫

热门文章

  1. android 视频电话
  2. 如何确认W5500网络芯片物理连接是否正常?
  3. 扑克牌洗牌发牌java代码_java实战(一)之Java模仿斗地主洗牌发牌小游戏
  4. 芋道源码的周天(2018.06.06)
  5. Linux入门之 vi 编辑器使用
  6. 游戏弹窗程序卸载完重启又出现,探究解决办法中发现与360浏览器有关
  7. 前端基本面试题 重点掌握** vue 20220110
  8. 软件建模概述 UML模型图
  9. redis protobuf java_Protobuf序列化对象放到redis
  10. UE4 Slate独立引用程序(摘抄大象无形)