1.前言

之前基于Netty做了一套TCP与MQTT的服务端,随着系统接入的终端类型越来越多,出现了UDP通讯的设备,虽然这样的设备并非主流,而且通讯机制存在问题,为了考虑系统的兼容性,只能将整套服务做全。

2.UDP通讯优缺点

UDP 是一种面向非连接的协议,面向非连接指的是在正式通信前不必与对方先建立连接,不管对方状态就直接发送数据。至于对方是否可以接收到这些数据,UDP 协议无法控制,所以说 UDP 是一种不可靠的协议。

UDP 协议适用于一次只传送少量数据、对可靠性要求不高的应用环境。

与TCP 协议一样,UDP 协议直接位于 IP 协议之上。实际上,IP 协议属于 OSI 参考模型的网络层协议,而 UDP 协议和 TCP 协议都属于传输层协议。

因为 UDP 是面向非连接的协议,没有建立连接的过程,因此它的通信效率很高,但也正因为如此,它的可靠性不如 TCP 协议。

UDP 协议的主要作用是完成网络数据流和数据报之间的转换在信息的发送端,UDP 协议将网络数据流封装成数据报,然后将数据报发送出去;在信息的接收端,UDP 协议将数据报转换成实际数据内容。

可以认为 UDP 协议的 socket 类似于码头,数据报则类似于集装箱。码头的作用就是负责友送、接收集装箱,而 socket 的作用则是发送、接收数据报。因此,对于基于 UDP 协议的通信双方而言,没有所谓的客户端和服务器端的概念。

3.源码示例

UDP的源码包含了我这边的整个系统架构层面的内容,并非完整的源码,可以根据自己的系统结构进行调整

3.1.启动UDP服务

/*** 网关数据监听* @author lenny* @date 20220314*/
@Component
public class ApplicationEventListener implements CommandLineRunner {//因为我的网关服务是按终端协议节点进行区分的,当终端上下行数据都需要指定这个节点名称然后给到MQ@Value("${spring.application.name}")private String nodeName;//UDP服务的监听端口@Value("${gnss.udpserver.udpPort}")private int udpPort;@Overridepublic void run(String... args) throws Exception {//启动UDP服务startUdpServer();//清除Redis所有此节点的在线终端(业务需要,Demo可以不用)RedisService redisService = SpringBeanService.getBean(RedisService.class);redisService.deleteAllOnlineTerminals(nodeName);//将所有此节点的终端设置为离线(业务需要,Demo可以不用)RabbitMessageSender messageSender = SpringBeanService.getBean(RabbitMessageSender.class);messageSender.noticeAllOffline(nodeName);}/*** 启动udp服务** @throws Exception*/private void startUdpServer() throws Exception {//计数器,必须等到所有服务启动成功才能进行后续的操作final CountDownLatch countDownLatch = new CountDownLatch(1);//启动UDP服务UdpServer udpServer = new UdpServer(udpPort, ProtocolEnum.UDP, countDownLatch);udpServer.start();//等待启动完成countDownLatch.await();}
}

3.2.UdpServer类

/*** UDP服务类* @author lenny* @date 20220316*/
@Slf4j
public class UdpServer extends Thread{private int port;private ProtocolEnum protocolType;private EventLoopGroup workerGroup;private Bootstrap bootstrap = new Bootstrap();private CountDownLatch countDownLatch;public UdpServer(int port, ProtocolEnum protocolType, CountDownLatch countDownLatch) {this.port = port;this.protocolType = protocolType;this.countDownLatch = countDownLatch;final EventExecutorGroup executorGroup = SpringBeanService.getBean("executorGroup", EventExecutorGroup.class);workerGroup = SpringBeanService.getBean("workerGroup", EventLoopGroup.class);bootstrap.group( workerGroup).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true).handler(new ChannelInitializer<NioDatagramChannel>(){@Overrideprotected void initChannel(NioDatagramChannel  ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(UdpConstant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));//数据初步解析,并进行组包拆包处理ch.pipeline().addLast(new ProtocolDecoder(protocolType));//登录鉴权业务ch.pipeline().addLast(UdpLoginHandler.INSTANCE);//数据完全解码,并根据数据进行业务处理ch.pipeline().addLast(executorGroup, UdpBusinessHandler.INSTANCE);}});}@Overridepublic void run() {bind();}private void bind(){bootstrap.bind(port).addListener(future -> {if (future.isSuccess()) {log.info("UDP服务器启动,端口:{}" , port);countDownLatch.countDown();} else {log.error("UDP服务器启动失败,端口:{}", port, future.cause());System.exit(-1);}});}}

3.3.ProtocolDecoder数据预处理

/*** <p>Description: 常规消息体初步解析解释器</p>** @author lenny* @version 1.0.1* @date 2022-03-16*/
@Slf4j
public class ProtocolDecoder extends MessageToMessageDecoder<DatagramPacket> {/*** 协议类型*/private ProtocolEnum protocolType;public ProtocolDecoder(ProtocolEnum protocolType) {this.protocolType=protocolType;}@Overrideprotected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {ByteBuf in=datagramPacket.content();log.info("收到:{}", ByteBufUtil.hexDump(in));Object decoded = null;//decodePacket数据粘包与半包处理Object obj= PacketUtil.decodePacket(in);if(obj!=null) {//按协议进行数据初步解析,得到对应的设备ID与消息体,通过设备ID可以进行鉴权处理decoded = decodeMessage((ByteBuf) obj);} else {return;}if (decoded != null) {out.add(decoded);}}/*** 预处理包(使用的是博实结的一款UDP通讯的产品)* @param frame* @return*/private Object decodeMessage(ByteBuf frame) {//将完整包输出为16进制字符串String rowData=ByteBufUtil.hexDump(frame);frame.readShort();//信令int msgId=frame.readUnsignedByte();//消息长度(包长字节位置后的第一字节开始直到包尾的长度)int bodyLen=frame.readUnsignedShort();//伪IPbyte[] terminalNumArr = new byte[4];frame.readBytes(terminalNumArr);//终端IDString terminalNum= CommonUtil.parseTerminalNum(terminalNumArr);//剩余消息体byte[] msgBodyArr = new byte[bodyLen-6];frame.readBytes(msgBodyArr);//校验—+包尾frame.readShort();//处理成统一标准给到处理器去处理后续的解析UdpMessage message=new UdpMessage();message.setTerminalNumArr(terminalNumArr);message.setTerminalNum(terminalNum);message.setMsgId(msgId);message.setProtocolType(protocolType);message.setMsgBodyArr(msgBodyArr);message.setMsgBody(Unpooled.wrappedBuffer(msgBodyArr));return message;}
}

3.4.登录鉴权

我这边是通过当前设备是否在系统注册进行鉴权处理的

/*** 登录业务处理* @author Lenny* @date 20200314*/
@Slf4j
@ChannelHandler.Sharable
public class UdpLoginHandler extends SimpleChannelInboundHandler<UdpMessage> {public static final UdpLoginHandler INSTANCE = new UdpLoginHandler();private RedisService redisService;private RabbitMessageSender messageSender;private Environment environment;private UdpLoginHandler() {redisService = SpringBeanService.getBean(RedisService.class);messageSender = SpringBeanService.getBean(RabbitMessageSender.class);environment = SpringBeanService.getBean(Environment.class);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, UdpMessage msg) throws Exception {//协议类型ProtocolEnum protocolType = msg.getProtocolType();//查询终端号码有无在平台注册String terminalNum = msg.getTerminalNum();TerminalProto terminalInfo = redisService.getTerminalInfoByTerminalNum(terminalNum);if (terminalInfo == null) {log.error("终端登录失败,未找到终端信息,协议:{},终端号:{},消息:{}", protocolType, terminalNum, msg);if (msg.getMsgBody() != null) {ReferenceCountUtil.release(msg.getMsgBody());}ctx.close();return;}//设置协议类型terminalInfo.setProtocolType(protocolType);//设置节点名terminalInfo.setNodeName(environment.getProperty("spring.application.name"));//保存终端信息和消息流水号到上下文属性中Session session = new Session(terminalInfo);ChannelHandlerContext oldCtx = SessionUtil.bindSession(session, ctx);if (oldCtx == null) {log.info("终端登录成功,协议:{},终端ID:{},终端号:{}", protocolType, terminalInfo.getTerminalStrId(), terminalNum);} else {log.info("终端重复登录关闭上一个连接,协议:{},终端ID:{},终端号:{}", protocolType, terminalInfo.getTerminalStrId(), terminalNum);oldCtx.close();}//通知上线messageSender.noticeOnline(terminalInfo);//登录验证通过后移除此handlerctx.pipeline().remove(this);ctx.fireChannelRead(msg);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

3.5.业务处理

/*** 业务处理* @author lenny* @date 20220314*/
@Slf4j
@Sharable
public class UdpBusinessHandler extends SimpleChannelInboundHandler<UdpMessage> {public static final UdpBusinessHandler INSTANCE = new UdpBusinessHandler();private RabbitMessageSender messageSender;private MessageServiceProvider messageServiceProvider;private UdpBusinessHandler() {messageSender = SpringBeanService.getBean(RabbitMessageSender.class);messageServiceProvider = SpringBeanService.getBean(MessageServiceProvider.class);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, UdpMessage msg) throws Exception {//根据消息类型获取对应的消息处理器int msgId = msg.getMsgId();//找到对应的业务处理器BaseMessageService messageService = messageServiceProvider.getMessageService(msgId);ByteBuf msgBody = msg.getMsgBody();try {//通过业务处理器进行处理messageService.process(ctx, msg, msgBody);} catch (Exception e) {printExceptionLog(msg, messageService, e);} finally {ReferenceCountUtil.release(msgBody);}}/*** 打印异常日志** @param msg* @param messageService* @param e*/private void printExceptionLog(UdpMessage msg, BaseMessageService messageService, Exception e) {byte[] msgBodyArr = msg.getMsgBodyArr();log.error("收到{}({}),消息异常,协议:{},终端号码:{},消息体:{}", messageService.getDesc(), NumberUtil.formatMessageId(msg.getMsgId()), msg.getProtocolType(), msg.getTerminalNum(), ByteBufUtil.hexDump(msgBodyArr), e);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Session session = SessionUtil.getSession(ctx.channel());TerminalProto terminalInfo = session.getTerminalInfo();boolean unbindResult = SessionUtil.unbindSession(ctx);if (unbindResult) {//通知离线messageSender.noticeOffline(terminalInfo);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {TerminalProto terminalInfo = SessionUtil.getTerminalInfo(ctx);log.error("终端连接异常,终端ID:{},终端号码:{}", terminalInfo.getTerminalStrId(), terminalInfo.getTerminalNum(), cause);ctx.close();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {TerminalProto terminalInfo = SessionUtil.getTerminalInfo(ctx);if (terminalInfo == null) {log.info("{}秒未读到数据,关闭连接", UdpConstant.READER_IDLE_TIME);} else {log.info("{}秒未读到数据,关闭连接,终端ID:{},终端号码:{}", UdpConstant.READER_IDLE_TIME, terminalInfo.getTerminalStrId(), terminalInfo.getTerminalNum());}ctx.close();}}
}

3.6.业务处理器举例

比如我们收到一个消息ID为0x81的数据

/*** 点名查看* @author Lenny* @date 20220314*/
@Slf4j
@MessageService(messageId = 0x81, desc = "点名查看")
public class Message81Service extends BaseMessageService<UdpMessage> {@Autowiredprivate RabbitMessageSender messageSender;@Overridepublic Object process(ChannelHandlerContext ctx, UdpMessage msg, ByteBuf msgBodyBuf) throws Exception {//这里需要对业务进行实际解码,目前并未处理,目的是先验证整个流程的通畅性log.info(JSON.toJSONString(msg));return null;}
}

4.说明

目前我这边已经利用了Netty完成了TCP、MQTT、UDP的通讯服务层,集成了近百款终端协议产品,包含了部标全套的服务,以及第三方企业标准标准几十家,有兴趣的朋友可以联系我。

基于Netty的UDP服务端开发相关推荐

  1. linux netty udp服务端,Netty实现UDP服务端

    ### 前言 在之前的文章我已经讲过了利用`Netty`实现`UDP`客户端,大家有兴趣的话,可以参看下面文章: [Netty实现UDP客户端](https://www.jianshu.com/p/5 ...

  2. netty获取玩家chanel_基于netty的TCP服务端如何给客户端发送消息,但是如何拿到客户端连接时的SocketChannel呢,菜鸟求助?...

    1.思路1 每个客户端连接时的SocketChannel保存在会话类sessionManager中的sessionIdMap中 问题: 1.客户端连接时确实将SocketChannel保存在会话类se ...

  3. 基于C++的http服务端开发

    1.同时支持get.post接口请求 2.支持文件流下载接口 完整源代码下载地址:https://download.csdn.net/download/GUMU12345/81103130 // // ...

  4. rds基于什么开发_为什么不学基于TypeScript的Node.js服务端开发?

    为什么不学?学不动了吗?!别躺下啊,我扶你起来! 我们早就知道,如今的JavaScript已经不再是当初那个在浏览器网页中写写简单的表单验证.没事弹个alert框吓吓人的龙套角色了.借助基于v8引擎的 ...

  5. 送给即将春秋招的同学--一名服务端开发工程师的校招面经总结

    前言:作为一名21年大学毕业的Java服务端开发工程师,从19年10月份(大三上)开始进行日常实习面试,期间获得小米.快手.领英.Tencent等offer,因疫情爆发无法准时入职,20年3月份春招成 ...

  6. 关于SpringBoot整合Netty客户端和服务端实现JT808协议

    关于SpringBoot整合Netty客户端和服务端实现JT808协议 最近做了一个使用netty实现交通部JT808协议的项目,对比了mina和netty两种框架的使用,先整理一下netty的实现过 ...

  7. java服务端开发 php_PHP使用thrift做服务端开发

    php中文网最新课程 每日17点准时技术干货分享 php使用thrift做服务端开发 thrift采用接口描述语言定义和创建服务,用二进制格式传输数据,体积更小.效率更高,对于高并发.数据量大和多语言 ...

  8. Aooms_基于SpringCloud的微服务基础开发平台实战_002_工程构建

    为什么80%的码农都做不了架构师?>>>    一.关于框架更名的一点说明 最近在做年终总结.明年规划.还有几个项目需要了结.出解决方案,事情还比较多,死了不少脑细胞,距离上一篇文章 ...

  9. 第13章 Kotlin 集成 SpringBoot 服务端开发(1)

    第13章 Kotlin 集成 SpringBoot 服务端开发 本章介绍Kotlin服务端开发的相关内容.首先,我们简单介绍一下Spring Boot服务端开发框架,快速给出一个 Restful He ...

最新文章

  1. .html天气预报上蔡,上蔡天气预报15天
  2. linux下获取线程号
  3. 返回值类型与函数类型不匹配_golang基础语法,定义函数类型 为已存在的数据类型起别名...
  4. 机器学习知识总结系列-机器学习中的优化算法总结(1-4)
  5. Google服务你都用了哪些?
  6. PHP获取IP所在地地址
  7. 为什么电商越来越难做了?
  8. nginx 配置图片服务器 文件大小,nginx 配置图片服务器 文件大小
  9. Programmer Competency Matrix
  10. window7DOS常用命令
  11. 联盟链之hyperledger-fabric
  12. SpringBoot格式化日期
  13. python3 录屏
  14. DTD与shema学习
  15. 交换机与路由器的基本工作原理
  16. 污水处理问题多,泵站自动化控制系统是这样解决的
  17. 模拟cmos集成电路(6)
  18. 收集了一些distinct性能相关的文章 希望有用
  19. Python爬取高清无版权美图
  20. 基于Python的拼音汉字转换程序

热门文章

  1. Flutter - 控件之 Picker
  2. vscode 终端运行yarn 报错 “因为在此系统上禁止运行脚本”
  3. 苹果 IAP 支付服务端处理完整流程及注意事项(包含订阅商品处理)
  4. 新手在IDEA如何创建一个Web项目
  5. 关于Excel无法打开,因为文件格式或文件扩展名无效的解决方法
  6. cannot find -lxxx
  7. msvcr100.dll丢失的解决方法
  8. 插入U盘后 计算机未响应,电脑插入U盘后没有反应怎么办?
  9. REDO文件损坏修复
  10. 利用WireShark抓包进行数据分析