依赖

<!--socket io --><dependency><groupId>io.socket</groupId><artifactId>socket.io-client</artifactId><version>1.0.1</version></dependency><dependency><groupId>com.corundumstudio.socketio</groupId><artifactId>netty-socketio</artifactId><version>${netty-socketio.version}</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>${kafka.version}</version></dependency>

代码

/*** socket io 资源管理器* @author ken* @date 2021/1/6 16:18*/
@Slf4j
@Component
public class SocketIOClientManager {@Autowiredprivate KafkaConnectionManager kafkaConnectionManager;@Autowiredprivate MqttConnectionManager mqttConnectionManager;@Resourceprivate WebSocketEventHandler webSocketEventHandler;// 用来存已连接的客户端唯一ID, <KAFKA+" : "+URL+" : "+topics, <sessionID, CLIENT>>private Map<String, Map<String, SocketIOClient>> clientMap = Collections.synchronizedMap(new HashMap<>());public void addClient(SocketIOClient client) {String sessionID = client.getSessionId().toString();String resourceID = getParamsByClient(client);if (resourceID == null) {log.error("客户端未配置参数");client.sendEvent("fail", 403, "Type" + splitStr + "address" + splitStr + "topic" + splitStr + "topic" + splitStr + "topic...");}if (clientMap.containsKey(resourceID)) {Map<String, SocketIOClient> subMap = clientMap.get(resourceID);if (!subMap.containsKey(sessionID)) {subMap.put(sessionID, client);clientMap.put(resourceID, subMap);}} else {final HashMap<String, SocketIOClient> subMap = new HashMap<>();subMap.put(sessionID, client);clientMap.put(resourceID, subMap);}log.info("在线客户端: " + clientMap.toString());}public void removeClient(SocketIOClient client) {String sessionID = client.getSessionId().toString();String resourceID = getParamsByClient(client);if (resourceID == null) {log.error("客户端未配置参数");client.sendEvent("fail", 403, "Type" + splitStr + "address" + splitStr + "topic" + splitStr + "topic" + splitStr + "topic...");return;}if (clientMap.containsKey(resourceID)) {final Map<String, SocketIOClient> subMap = clientMap.get(resourceID);final Iterator<Map.Entry<String, SocketIOClient>> iterator = subMap.entrySet().iterator();while (iterator.hasNext()) {final Map.Entry<String, SocketIOClient> clientEntry = iterator.next();if (clientEntry.getKey().equals(sessionID)) {iterator.remove();log.info("移除客户端: {}", sessionID);// 如果移除session后对应url没有对应session,那么移除urlif (subMap.size() == 0) {clientMap.remove(resourceID);log.info("移除ID: {}", resourceID);if (resourceID.startsWith(String.valueOf(ResourceType.KAFKA))) {kafkaConnectionManager.removeConnection(resourceID);}if (resourceID.startsWith(String.valueOf(ResourceType.MQTT))) {mqttConnectionManager.removeConnection(resourceID);}} else {clientMap.put(resourceID, subMap);}}}} else {log.info("没有 {} 对应的{} 客户端", resourceID, sessionID);}}public void pushClientMesg2Kafka(SocketIOClient client, String topic, String mesg) throws ExecutionException, InterruptedException {String resourceID = getParamsByClient(client);KafkaPubSubServer kafkaServer = (KafkaPubSubServer) kafkaConnectionManager.getServerByResourceID(resourceID);if (kafkaServer == null) {throw DataTException.asDataTException(CommonEnum.CONF_ERROR, "数据源未配置");}if (clientMap.containsKey(resourceID)) {kafkaServer.pushMesg(topic, mesg);}}public void pushKafkaMesg2Client(String resourceID, String mesg) {if (clientMap.containsKey(resourceID)) {Map<String, SocketIOClient> subMap = clientMap.get(resourceID);for (SocketIOClient ioClient : subMap.values()) {ioClient.sendEvent(webSocketEventHandler.getClientSubKafkaEvent(), mesg.toString());}}}public void pushClientMesg2MQTT(SocketIOClient client, String topic, String mesg) throws MqttException {String resourceID = getParamsByClient(client);MqttPubSubServer mqttServer = (MqttPubSubServer) mqttConnectionManager.getServerByResourceID(resourceID);if (mqttServer == null) {throw DataTException.asDataTException(CommonEnum.CONF_ERROR, "数据源未配置");}if (clientMap.containsKey(resourceID)) {mqttServer.pushMesg(topic, mesg);}}public void pushMQTTMesg2Client(String resourceID, String mesg) {if (clientMap.containsKey(resourceID)) {Map<String, SocketIOClient> subMap = clientMap.get(resourceID);for (SocketIOClient ioClient : subMap.values()) {ioClient.sendEvent(webSocketEventHandler.getClientSubEmqEvent(), mesg.toString());}}}/*** 此方法为获取client连接中的参数,可根据需求更改** @param client* @return*/private String getParamsByClient(SocketIOClient client) {// 从请求的连接中拿出参数(这里的loginUserNum必须是唯一标识)final String resourceID = client.getHandshakeData().getSingleUrlParam("resourceID");return resourceID;}}
@Configuration
public class SocketIOConfig {@Value("${socket-io.host}")private String host;@Value("${socket-io.port}")private int port;public String getUrl() {return "http://" + host + ":" + port;}public SocketIOConfig() {}@Beanpublic SocketIOServer socketIOServer() {//创建Socket,并设置监听端口com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();// 设置主机名,默认是0.0.0.0config.setHostname(host);// 设置监听端口config.setPort(port);// 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间config.setUpgradeTimeout(10000);// Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔config.setPingInterval(25000);// Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件config.setPingTimeout(60000);return new SocketIOServer(config);}@Beanpublic SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {return new SpringAnnotationScanner(socketServer);}
}
@Component
@Slf4j
public class WebSocketEventHandler {@Autowiredprivate SocketIOClientManager socketIOClientManager;public String getClientSubKafkaEvent() {return clientSubKafkaEvent;}public String getClientPubKafkaEvent() {return clientPubKafkaEvent;}public String getClientSubEmqEvent() {return clientSubEmqEvent;}public String getClientPubEmqEvent() {return clientPubEmqEvent;}private final String clientSubKafkaEvent = "subKafka";private final String clientPubKafkaEvent = "pubKafka";private final String clientSubEmqEvent = "subEmq";private final String clientPubEmqEvent = "pubEmq";@OnConnectpublic void onConnect(SocketIOClient client) {log.info("客户端发起连接. sessionId->{}", client.getSessionId());socketIOClientManager.addClient(client);}@OnDisconnectpublic void onDisconnect(SocketIOClient client) {final String sessionID = client.getSessionId().toString();log.info("客户端断开连接, sessionId->{}" + sessionID);socketIOClientManager.removeClient(client);client.disconnect();}// kafka消息接收入口@OnEvent(value = clientPubKafkaEvent)public void pushKafka(SocketIOClient client, AckRequest ackRequest, String topic, String mesg) {if (StrUtil.isEmpty(topic)) {ackRequest.sendAckData(400, "topic不能为空");}if (StrUtil.isEmpty(mesg)) {ackRequest.sendAckData(400, "mesg不能为空");}try {socketIOClientManager.pushClientMesg2Kafka(client, topic, mesg);ackRequest.sendAckData(200, "id");} catch (Exception e) {e.printStackTrace();ackRequest.sendAckData(500, e.getMessage());}}// emq信息接收入口@OnEvent(value = clientPubEmqEvent)public void pushEmq(SocketIOClient client, AckRequest ackRequest, String topic, String mesg) {if (StrUtil.isEmpty(topic)) {ackRequest.sendAckData(400, "topic不能为空");}if (StrUtil.isEmpty(mesg)) {ackRequest.sendAckData(400, "mesg不能为空");}try {socketIOClientManager.pushClientMesg2MQTT(client, topic, mesg);ackRequest.sendAckData(200, "id");} catch (Exception e) {e.printStackTrace();ackRequest.sendAckData(500, e.getMessage());}}}
@Component
@Order(1)
public class ServerRunner implements CommandLineRunner {private final SocketIOServer server;private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);@Autowiredpublic ServerRunner(SocketIOServer server) {this.server = server;}@Overridepublic void run(String... args) {logger.info("SocketIO 启动...");server.start();}
}
@Slf4j
public class SocketClientEMQTest {public static void main(String[] args) {final SocketClientEMQTest socketClientTest = new SocketClientEMQTest();try {socketClientTest.run();} catch (Exception e) {e.printStackTrace();}}public void run(String... args) throws Exception {URI uri = URI.create("http://127.0.0.1:9201");IO.Options options = new IO.Options();options.transports = new String[]{"websocket"};options.reconnectionAttempts = 2;options.query = "resourceID=" + "mqtt$$tcp://localhost:1883$$test";Socket socket = IO.socket(uri, options);socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("connect: {}", args);}});socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("disconnect: {}", args);}});socket.on("subEmq", new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("push_mqtt {}", args);}});/*  socket.on("push_kafka", new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("push_kafka {}" , args);}});*/final ArrayList<String> arrayList = new ArrayList<>();
//        arrayList.add("")int i = 0;while (true) {i += 1;socket.emit("pubEmq", "test", "testmesg" + i, new Ack() {@Overridepublic void call(Object... objects) {log.info("userChat ack:{}|{}", objects[0], objects[1]);}});if (i >= 10) {break;}Thread.sleep(2000);}socket.connect();LockSupport.park();}
}
@Slf4j
public class SocketClientKAFKATest {public static void main(String[] args) {final SocketClientKAFKATest socketClientTest = new SocketClientKAFKATest();try {socketClientTest.run();} catch (Exception e) {e.printStackTrace();}}public void run(String... args) throws Exception {URI uri = URI.create("http://127.0.0.1:9201");IO.Options options = new IO.Options();options.transports = new String[]{"websocket"};options.reconnectionAttempts = 2;options.query = "resourceID=" + "kafka$$localhost:9092$$test12399";
//        options.query = "loginUserNum=" + "mqtt$$tcp://localhost:1883$$test";Socket socket = IO.socket(uri, options);socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("connect: {}", args);}});socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("disconnect: {}", args);}});socket.on("subKafka", new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("push_kafka {}", args);}});/*  socket.on("push_kafka", new Emitter.Listener() {@Overridepublic void call(Object... args) {log.info("push_kafka {}" , args);}});*/final ArrayList<String> arrayList = new ArrayList<>();
//        arrayList.add("")int i = 0;while (true) {i += 1;socket.emit("pubKafka", "TEST", "testmesg" + i, new Ack() {@Overridepublic void call(Object... objects) {log.info("userChat ack:{}|{}", objects[0], objects[1]);}});if (i >= 10) {break;}Thread.sleep(2000);}socket.connect();LockSupport.park();}
}

基于corundumstudio建立websocket长连接相关推荐

  1. Android通过WebSocket建立一个长连接(带心跳检测)从服务器端接收消息

    最近公司要做一款内部使用的工具类app,方便销售部门打电话(其实就是在后台有好多用户数据,之前销售部门同事拨打电话,需要自己从销售后台查看用户手机号等信息,然后自己拿自己手机拨号,然后打出去.现在想实 ...

  2. java nio socket长连接_netty学习实战—实现websocket长连接和socket之间进程通信

    netty学习-实现websocket长连接和socket之间通信 最近正在学习netty,跟着教程写了一个基于WebSocket的网页聊天室,对netty有了一定的了解,现在正好项目使用到长连接,选 ...

  3. WebSocket长连接因为网络波动而导致客户端的“假离线”---问题发现、分析到解决

    文章目录 简介 问题的现象.场景和解决方案 基本的部署架构 问题是什么呢? 假离线到底是怎么来的? 验证猜想 解决问题 如何发现问题的呢? 客户端离线预警 奇怪的现象来了 该怎么去发现呢 到底是谁改的 ...

  4. 网络编程懒人入门(八):手把手教你写基于TCP的Socket长连接

    转自即时通讯网:http://www.52im.net/ 本文原作者:"水晶虾饺",原文由"玉刚说"写作平台提供写作赞助,原文版权归"玉刚说" ...

  5. 实现单台测试机6万websocket长连接

    本文由作者郑银燕授权网易云社区发布. 本文是我在测试过程中的记录,实现了单台测试机发起最大的websocket长连接数.在一台测试机上,连接到一个远程服务时的本地端口是有限的.根据TCP/IP协议,由 ...

  6. java与微信小程序通讯_java与微信小程序实现websocket长连接

    本文实例为大家分享了java与微信小程序实现websocket长连接的具体代码,供大家参考,具体内容如下 背景: 需要在小程序实现地图固定坐标下实时查看消息 java环境 :tomcat7 jdk1. ...

  7. WebSocket长连接

    WebSocket长连接 1.概述 1.1 定义 1.2 原理 2.Django中配置WebSocket 2.1安装第三方法包 pip install channels 2.2 Django 中的配置 ...

  8. websocket 获取连接id_Swoole学习笔记七:搭建WebSocket长连接 之 使用 USER_ID 作为身份凭证...

    Swoole学习笔记七:搭建WebSocket长连接 之 使用 USER_ID 作为身份凭证 2年前 阅读 3678 评论 0 喜欢 0 ### 0.前言 前面基本的WebSocket操作,我们基本都 ...

  9. 微信是与服务器长连接,java与微信小程序实现websocket长连接.pdf

    java与与微微信信小小程程序序实实现现websocket长长连连接接 本文实例为大家分享了j ava与微信小程序实现websocket长连接的具体代码,供大家参考,具体内容 下 背背景景:: 需要在 ...

最新文章

  1. 大数据之公开数据的价值
  2. mysql 的select语句_MySQLSELECT语句_MySQL
  3. 我的理想计算机系100字,我的理想作文100字(通用5篇)
  4. tensorflow tf.train.ExponentialMovingAverage().variables_to_restore()函数 (用于加载模型时将影子变量直接映射到变量本身)
  5. 【ARM】ARM处理器寻址方式
  6. 猎豹移动(金山网络)2015校园招聘(c++project师)
  7. Builder Pattern 在 Objective-C 中的使用
  8. python 中文apichm_python api 中文 chm
  9. php取结果集,php获取数据库结果集方法(推荐)
  10. NE40E面板ALM报警亮灯
  11. 自己试着在阿里云布了个服务器
  12. JSP指令:page指令,errorPage和isErrorPage
  13. 微信小程序登录授权开发
  14. 数据挖掘之决策树与决策规则
  15. Language Models are Unsupervised Multitask Learners翻译
  16. Java的getbytes()方法使用
  17. operator int()用法
  18. 第20课:技术转型的实践路线(图文篇)
  19. 一招学会绘制UI图标超椭圆
  20. 2015武汉大学计算机学院录取,2015年武汉大学计算机专业研究生录取名单

热门文章

  1. PID控制器开发笔记之五:变积分PID控制器的实现
  2. PKU 学生的反馈 2009 –2
  3. vscode如何连接新设备_台州要用“超级平台”连接300万台工业设备,成为全省新示范...
  4. JAVA入门级教学之(IDEA工具的快捷键和简单设置)
  5. git for windows_手把手教会舍友玩 Git (包教包会,再也不用担心他的学习)
  6. js 点击闭包_学习Javascript闭包(Closure)
  7. 最长不重复子串python_python经典算法题:无重复字符的最长子串
  8. mysql57数据库命令_MySQL 5.7 mysql command line client 使用命令详解
  9. 拓扑排序排课系统_视频结构化人脸布控系统
  10. java substring 越界_我在java中用substrng()提取某一字符串的子串是老是出现越界的问题,求指教~~~...