Java UDP Server的轻量级实现
Java UDP Server的轻量级实现
实现方法
接收线程:只处理收包,收完后之后放入工作线程
发送线程:负责发送udp包到其它的server
工作线程:解析包体,实现业务逻辑
工作线程消息处理:在工作线程中解析出协议包体后,根据messageId实现消息处理
主要的java类
ServerManager.java
SenderThread.java
ReceiverThread.java
WorkThread.java
ServerManager.java 相关代码
public static void startListener() throws UDPException{ new Thread(new WorkThread()).start(); } catch (SocketException e) { |
SenderThread.java 相关代码:
逻辑:读取队列中的消息。并实现发送
@Overridepublic void run() {log.info("UDP Sender thread started");while(ServerManager.tdUdpServerIsStart) try {UDPMessage reqBody = senderQueue.take();//队列中没有时会一直等待byte[] bodyBit = reqBody.getMsgBits();if(reqBody.getHeader() == null){log.warn("Header is null");continue;}if(reqBody.getHeader().getMessageId() == null){log.warn("MessageId is null!");continue;}// log.debug("Send head:"+reqBody.getHeader().toString()); // log.info(String.format("Preprocess messageId:%s,len:%s,sessionId:%s to %s", reqBody.getHeader().getMessageId(),bodyBit.length, reqBody.getHeader().getSessionId(), reqBody.getHeader().getSocketAddress())); // byte[] body = ProtMsgData.makeMsgStream(reqBody.getSessionId(), reqBody.getMessageId().getMessageId(), bodyBit);byte[] body = reqBody.getMsgBits();DatagramPacket dataPacket = new DatagramPacket(body, body.length,reqBody.getHeader().getSocketAddress());//测试CODE // DatagramPacket dataPacket = new DatagramPacket(body, body.length, // InetAddress.getByName("10.10.10.100"), 6060);// DatagramSocket dataSocket = new DatagramSocket(reqBody.getHeader().getInetSocketAddress().getPort()); // DatagramSocket dataSocket = new DatagramSocket(45677);DatagramSocket dataSocket = getSocket(reqBody.getHeader().getInetSocketAddress().getPort());if(dataSocket == null){log.warn(String.format("Send error!Not found socket by port:%s",reqBody.getHeader().getInetSocketAddress().getPort()));continue;}log.info(String.format("Send message to %s,id:%s,len:%s,data:%s",reqBody.getHeader().getSocketAddress().toString(),reqBody.getHeader().getMessageId(),body.length,ProtUtils.getHexByString(body)));dataSocket.send(dataPacket);//如果需要接收消息,先将记录保存起来// if (reqBody.isNeedReceive()) { // UDPSessionManager.add(reqBody); // }} catch (Exception e) {log.error("Send message error", e);}} |
ReceiverThread.java 相关代码
封装数据,给工作线程使用。也可以在这里解析协议的头,方便后续使用
@Overridepublic void run() {Init();}public void Init() {while(UCSIServerManager.tdUdpServerIsStart){try {byte[] receiveBody = new byte[BUFFER_SIZE];DatagramPacket dataPacketBody = new DatagramPacket(receiveBody, receiveBody.length);if(dataPacketBody == null) continue;ServerManager.receive(dataPacketBody);if(dataPacketBody == null) continue;//添加到工作线程队列ProtRespInfo respInfo = new ProtRespInfo(null,dataPacketBody); UDPWorkThread.push(respInfo);} catch (Exception e) {}} } |
WorkThread.java 相关代码
@Overridepublic void run() {while(UCSIServerManager.tdUdpServerIsStart){try {ProtRespInfo packet = workQueue.take();UDPReceiverHandler handler = new UDPReceiverHandler(packet);handler.receive();} catch (InterruptedException e) {e.printStackTrace();continue;}} }public static void push(ProtRespInfo respInfo){try {workQueue.put(respInfo);} catch (Exception e) {e.printStackTrace();} } |
UDPReceiverHandler.java
实现在解析协议头和协议体。并根据messageId来调用对应的消息实现,我这里使用的注解的方式,程序启时扫描相关注解,如果新加协议只需要实现对应的接口,并加上注解。
public void receive(){ log.debug("UDPReceiver receive..."); UDPHeader header = getUDPHeader(respInfo.getDeviceType());if(header == null){log.warn("Not found head by port:"+respInfo.getDataPacket().getPort());return;}header.setSocketAddress(respInfo.getDataPacket().getSocketAddress());header.data = respInfo.getDataPacket().getData();header.len = header.getHeaderSize();header.decode();header.setDeviceType(respInfo.getDeviceType());respInfo.setHeader(header);if(respInfo.getHeader() == null || respInfo.getHeader().getMessageId() == null){log.warn("消息头错误,来自IP:"+respInfo.getHeader().getRemoteAddr()+",端口:"+respInfo.getHeader().getRemotePort());return;}try {IReceiverHandler handler = ReceiverHandlerManager.get(respInfo.getHeader().getMessageId().getMessageId());if(handler == null){log.error("没有找到对应的消息Handler,ID:"+respInfo.getHeader().getMessageId());return;}handler.handler(header,respInfo);} catch (Exception e) {log.error("",e);}} |
小结
丢包严重:可以尝试增加DatagramSocket.setReceiveBufferSize来增加缓冲区大小
数据解析:定义好协议后,将协议头和协议体抽象出来。我这里抽象了一个UDPCoder(一些数据的公用read方法和write方法),UDPHeader,UDPBody。
PS:有空之后一定把源码整理出来
Java UDP Server的轻量级实现相关推荐
- 为什么 jmeter 分布式测试,一定要设置 java.rmi.server.hostname
之前总结了 jmeter 分布式测试的过程,在部署过程中提到,要在 system.properties 中配置自己的 IP. 至于为什么要这么做,源于这一次 debug 的过程. 运行环境 mint, ...
- python udp 大文件_Python UDP服务器发送文本文件的行(Python UDP Server send lines of a text file)...
Python UDP服务器发送文本文件的行(Python UDP Server send lines of a text file) 我需要模拟一个UDP服务器,它在无限循环中逐行发送文本文件的内容. ...
- JAVA UDP套接字编程
JAVA UDP套接字编程 UDP套接字 无连接 非可靠传输 面向数据报 package com.lius.udp;import java.io.IOException; import java.ne ...
- issue no route to host 为什么 jmeter 分布式测试,一定要设置 java.rmi.server.hostname--(有效)
之前总结了 jmeter 分布式测试的过程,在部署过程中提到,要在 system.properties 中配置自己的 IP. 至于为什么要这么做,源于这一次 debug 的过程. 运行环境 技术分享图 ...
- java UDP实现一个聊天工具
题目: 假设Tom和Jerry利用Java UDP进行聊天,请为他们编写程序.具体如下: (1).Tom和Jerry聊天的双方都应该具有发送端和接收端: (2).利用DatagramSocket与Da ...
- Java UDP 编程简介.
一.UDP 协议简介 UPD协议 是常见的 网络传输协议之一, 当然另1个是TCP协议. UPD协议 是一种不靠的协议. 是因为发送方不会关心接受方的状态, 直接向接收方发送数据包, 也就是说这个数据 ...
- Tomcat 之 启动tomcat时 错误: 代理抛出异常 : java.rmi.server.ExportException: Port already in use: 1099;...
错误: 代理抛出异常 : java.rmi.server.ExportException: Port already in use: 1099; nested exception is: java. ...
- Jodd - Java界的瑞士军刀轻量级工具包
转载自 Jodd - Java界的瑞士军刀轻量级工具包! Jodd介绍 Jodd是对于Java开发更便捷的开源迷你框架,包含工具类.实用功能的集合,总包体积不到1.7M. Jodd构建于通用场景使开发 ...
- java.rmi.server.port_java.rmi.server.ExportException: internal error: ObjID already in use报错处理...
由于在server.xml文件中使用配置了 在catalina.sh中也指定了对应 CATALINA_OPTS="$CATALINA_OPTS -Dcom.sun.management.jm ...
最新文章
- 无法提升彼此,夫妻关系就要终结?
- cdi name 日志_CDI 2.0更新
- 【Socket网络编程】16.UDP 循环读取recvfrom() 与 循环发送 sendto()
- python pip如何安装wheel文件?.whl(pip install [wheel])
- kafka消费并导出_如何使用Docker内的Kafka服务?消息服务测试实践篇
- python在sql添加数据库_使用Python创建MySQL数据库实现字段动态增加以及动态的插入数据...
- snapchat注册不到_从Snapchat获得开发人员职位中学到的经验教训
- Linux(14)-正则表达式
- nvidia-smi‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件
- 用xargs处理带空格文件名
- 解决忽略vscode中pylint报错的问题
- 充一次电使用一年的手机_超级手机电池:充一次电用一年多?
- IDO已经OUT ?3分钟了解NFT的新玩法INO
- “科目四”竟是民间杜撰出来的?
- 蓝桥杯 2018 C++ A组 初赛部分题解
- Spring JMS CLIENT_ACKNOWLEDGE
- 利用AutoSSH实现远程管理内网服务器
- AD ADSI入门
- iframe例子 (
- 2022年值得关注的5个区块链项目 数字藏品平台开发搭建
热门文章
- FineReport构建银行金融租赁考核系统
- python27和python36 共存时安装pip方法,解决python27文件夹下没有script文件方法
- 计算机组成原理期末复习第三章-3(唐朔飞)
- 使用Minitab热图可视化的五种热门方法
- [UE4]不错的音效插件WWISE
- mysql数据库系统原理_数据库系统原理及MySQL应用教程
- iPhone4S安装Linux系统,Ubuntu系统下iPhone4S降级6.1.3教程
- local class incompatible: stream classdesc serialVersionUID = 1, local class serialVersionUID = 2427
- 《游戏设计艺术(第2版)》——学习笔记(7)第7章 游戏始于一个创意
- 图书推荐|计算机组成与设计(原书第5版) 硬件软件接口 RISC-V