netty(7)–UDP实战

UDP 协议

​ UDP 是面向无连接的通讯协议,UDP 报头由 4 个域组成,其中每个域各占用 2 个字节, 其中包括目的端口号和源端口号信息,数据报的长度域是指包括报头和数据部分在内的总字 节数,校验值域来保证数据的安全。由于通讯不需要连接,所以可以实现广播发送。

​ UDP 通讯时不需要接收方确认,属于不可靠的传输,可能会出现丢包现象,实际应用中 要求程序员编程验证。

​ UDP 与 TCP 位于同一层,但它不管数据包的顺序、错误或重发。因此,UDP 不被应用 于那些使用虚电路的面向连接的服务,UDP 主要用于那些面向查询—应答的服务,例如 NFS。 相对于 FTP 或 Telnet,这些服务需要交换的信息量较小。使用 UDP 的服务包括 NTP(网络时 间协议)和 DNS(DNS 也使用 TCP),包总量较少的通信(DNS、SNMP 等);2.视频、音频等多媒体通信(即时通信);3.限定于 LAN 等特定网络中的应用通信;4.广播通信(广播、 多播)。

UDP单播和广播

单播–发送消息给一个由唯一的地址所标识的单一的网络目的地。面·向连接的协议和无连接协议都支持这种模式。

广播–传输到网络(或者子网)上的所有主机。

Netty的 UDP 相关类

​ interface AddressedEnvelope<M, A extends SocketAddress>extends ReferenceCounted定义一个消息,其包装了另一个消息并带有发送者和接收者地址。其中 M 是消息类型; A 是地址类型。

​ class DefaultAddressedEnvelope<M, A extends SocketAddress>implements AddressedEnvelope<M,A>提供了 interface AddressedEnvelope 的默认实现

class DatagramPacket extends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> implements ByteBufHolder扩展了 DefaultAddressedEnvelope 以使用 ByteBuf 作为消息数据容器。DatagramPacket 是 final 类不能被继承,只能被使用。

  • 通过 content()来获取消息内容

  • 通过 sender()来获取发送者的消息

  • 通过 recipient()来获取接收者的消息。

    interface DatagramChannel extends Channel扩展了 Netty 的 Channel 抽象以支持 UDP 的多播组管理

    class NioDatagramChannnel extends AbstractNioMessageChannel implements DatagramChannel定义了一个能够发送和接收 Addressed-Envelope 消息的 Channel 类型

    Netty 的 DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程 节点通信。

UDP 单播

Question端

public class UdpQuestionSide {public final static String QUESTION = "告诉我一句古诗";public void run(int port) throws Exception{EventLoopGroup group  = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioDatagramChannel.class)/*UDP通信*/.handler(new QuestoinHandler());Channel channel = b.bind(0).sync().channel();channel.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(QUESTION,CharsetUtil.UTF_8),new InetSocketAddress("127.0.0.1",port))).sync();if(!channel.closeFuture().await(15000)){System.out.println("等待超时");}} catch (Exception e) {group.shutdownGracefully();}}public static void main(String [] args) throws Exception{new UdpQuestionSide().run(UdpAnswerSide.ANSWER_PORT);}
}
public class QuestoinHandler extendsSimpleChannelInboundHandler<DatagramPacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg)throws Exception {//获得应答,DatagramPacket提供了content()方法取得报文的实际内容String response = msg.content().toString(CharsetUtil.UTF_8);if (response.startsWith(UdpAnswerSide.ANSWER)) {System.out.println(response);ctx.close();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {cause.printStackTrace();ctx.close();}
}

Answer端

public class UdpAnswerSide {public static final int ANSWER_PORT = 8080;public final static String ANSWER = "古诗来了:";public void run(int port) throws Exception{EventLoopGroup group = new NioEventLoopGroup();try {/*udp无连接,没有接受连接说法,即使是接受端,也应该用Bootstrap*/Bootstrap b = new Bootstrap();/*由于我们用的是UDP协议,所以要用NioDatagramChannel来创建*/b.group(group).channel(NioDatagramChannel.class).handler(new AnswerHandler());//没有接受客户端连接的过程,监听本地端口即可ChannelFuture f = b.bind(port).sync();System.out.println("应答服务已启动.....");f.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}public static void main(String [] args) throws Exception{new UdpAnswerSide().run(ANSWER_PORT);}
}
public class AnswerHandler extendsSimpleChannelInboundHandler<DatagramPacket> {/*应答的具体内容从常量字符串数组中取得,由nextQuote方法随机获取*/private static final String[] DICTIONARY = {"只要功夫深,铁棒磨成针。","旧时王谢堂前燕,飞入寻常百姓家。","洛阳亲友如相问,一片冰心在玉壶。","一寸光阴一寸金,寸金难买寸光阴。","老骥伏枥,志在千里,烈士暮年,壮心不已" };private static Random r = new Random();private String nextQuote(){return DICTIONARY[r.nextInt(DICTIONARY.length-1)];}@Overrideprotected void channelRead0(ChannelHandlerContext ctx,DatagramPacket packet)throws Exception {/*获得请求*/String req = packet.content().toString(CharsetUtil.UTF_8);System.out.println("接收到请求:"+req);if(UdpQuestionSide.QUESTION.equals(req)){String answer = UdpAnswerSide.ANSWER+nextQuote();System.out.println("接收到请求:"+req);/*** 重新 new 一个DatagramPacket对象,* 我们通过packet.sender()来获取发送者的消息。重新发送出去!*/ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(answer,CharsetUtil.UTF_8),packet.sender()));}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.close();cause.printStackTrace();}
}

UDP广播

广播端

public class LogEventBroadcaster {private final EventLoopGroup group;private final Bootstrap bootstrap;public LogEventBroadcaster(InetSocketAddress remoteAddress) {group = new NioEventLoopGroup();bootstrap = new Bootstrap();//引导该 NioDatagramChannel(无连接的)bootstrap.group(group).channel(NioDatagramChannel.class)/*设置通信选型为广播模式*/.option(ChannelOption.SO_BROADCAST,true).handler(new LogEventEncoder(remoteAddress));}public void run() throws Exception {//绑定 ChannelChannel ch = bootstrap.bind(0).sync().channel();long count = 0;//启动主处理循环,模拟日志发送for (;;) {ch.writeAndFlush(new LogMsg(null, ++count,LogConst.getLogInfo()));try {//休眠 2 秒,如果被中断,则退出循环;Thread.sleep(2000);} catch (InterruptedException e) {Thread.interrupted();break;}}}public void stop() {group.shutdownGracefully();}public static void main(String[] args) throws Exception {//创建并启动一个新的 UdpQuestionSide 的实例LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255",LogConst.MONITOR_SIDE_PORT));try {broadcaster.run();}finally {broadcaster.stop();}}
}
public class LogEventEncoder extends MessageToMessageEncoder<LogMsg> {private final InetSocketAddress remoteAddress;//LogEventEncoder 创建了即将被发送到指定的 InetSocketAddress// 的 DatagramPacket 消息public LogEventEncoder(InetSocketAddress remoteAddress) {this.remoteAddress = remoteAddress;}@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext,LogMsg logMsg, List<Object> out) throws Exception {byte[] bytes = logMsg.getMsg().getBytes(CharsetUtil.UTF_8);ByteBuf buf = channelHandlerContext.alloc().buffer(8*2+bytes.length+1);buf.writeLong(logMsg.getTime());buf.writeLong(logMsg.getMsgId());buf.writeByte(LogMsg.SEPARATOR);buf.writeBytes(bytes);out.add(new DatagramPacket(buf,remoteAddress));}
}

接受端

public class LogEventMonitor {private final EventLoopGroup group;private final Bootstrap bootstrap;public LogEventMonitor(InetSocketAddress address) {group = new NioEventLoopGroup();bootstrap = new Bootstrap();//引导该 NioDatagramChannelbootstrap.group(group).channel(NioDatagramChannel.class)//设置套接字选项 SO_BROADCAST.option(ChannelOption.SO_BROADCAST, true).option(ChannelOption.SO_REUSEADDR, true).handler( new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel)throws Exception {ChannelPipeline pipeline = channel.pipeline();pipeline.addLast(new LogEventDecoder());pipeline.addLast(new LogEventHandler());}} ).localAddress(address);}public Channel bind() {//绑定 Channel。注意,DatagramChannel 是无连接的return bootstrap.bind().syncUninterruptibly().channel();}public void stop() {group.shutdownGracefully();}public static void main(String[] args) throws Exception {//构造一个新的 UdpAnswerSide并指明监听端口LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(LogConst.MONITOR_SIDE_PORT));try {//绑定本地监听端口Channel channel = monitor.bind();System.out.println("Udp AnswerSide running");channel.closeFuture().sync();} finally {monitor.stop();}}
}
public class LogEventHandlerextends SimpleChannelInboundHandler<LogMsg> {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {//当异常发生时,打印栈跟踪信息,并关闭对应的 Channelcause.printStackTrace();ctx.close();}@Overridepublic void channelRead0(ChannelHandlerContext ctx,LogMsg event) throws Exception {//创建 StringBuilder,并且构建输出的字符串StringBuilder builder = new StringBuilder();builder.append(event.getTime());builder.append(" [");builder.append(event.getSource().toString());builder.append("] :[");builder.append(event.getMsgId());builder.append("] :");builder.append(event.getMsg());//打印 LogMsg 的数据System.out.println(builder.toString());}
}
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {@Overrideprotected void decode(ChannelHandlerContext ctx,DatagramPacket datagramPacket, List<Object> out)throws Exception {ByteBuf data = datagramPacket.content();//获得发送时间long sendTime = data.readLong();System.out.println("接受到"+sendTime+"发送的消息");//获得消息的idlong msgId = data.readLong();//获得分隔符SEPARATORbyte sepa = data.readByte();//获取读索引的当前位置,就是分隔符的索引+1int idx = data.readerIndex();//提取日志消息,从读索引开始,到最后为日志的信息String sendMsg = data.slice(idx ,data.readableBytes()).toString(CharsetUtil.UTF_8);//构建一个新的 LogMsg 对象,并且将它添加到(已经解码的消息的)列表中LogMsg event = new LogMsg(datagramPacket.sender(),msgId, sendMsg);//作为本handler的处理结果,交给后面的handler进行处理out.add(event);}
}

实体类与常量

public final class LogMsg {public static final byte SEPARATOR = (byte) ':';/*源的 InetSocketAddress*/private final InetSocketAddress source;/*消息内容*/private final String msg;/*消息id*/private final long msgId;/*消息发送的时间*/private final long time;//用于传入消息的构造函数public LogMsg(String msg) {this(null, msg,-1,System.currentTimeMillis());}//用于传出消息的构造函数public LogMsg(InetSocketAddress source, long msgId,String msg) {this(source,msg,msgId,System.currentTimeMillis());}public LogMsg(InetSocketAddress source, String msg, long msgId, long time) {this.source = source;this.msg = msg;this.msgId = msgId;this.time = time;}//返回发送 LogMsg 的源的 InetSocketAddresspublic InetSocketAddress getSource() {return source;}//返回消息内容public String getMsg() {return msg;}//返回消息idpublic long getMsgId() {return msgId;}//返回消息中的时间public long getTime() {return time;}
}
public class LogConst {public final static int MONITOR_SIDE_PORT = 9998;private static final String[] LOG_INFOS = {"20180912:mark-machine:Send sms to 10001","20180912:lison-machine:Send email to james@enjoyedu","20180912:james-machine:Happen Exception","20180912:peter-machine:人生不能象做菜,把所有的料都准备好了才下锅","20180912:deer-machine:牵着你的手,就象左手牵右手没感觉,但砍下去也会痛!","20180912:king-machine:我听别人说这世界上有一种鸟是没有脚的," +"它只能一直飞呀飞呀,飞累了就在风里面睡觉,这种鸟一辈子只能下地一次," +"那一次就是它死亡的时候.","20180912:mark-machine:在我出道的时候,我认识了一个人," +"因为他喜欢在东边出没,所以很多年以后,他有个绰号叫东邪.","20180912:lison-machine:做人如果没有梦想,那和咸鱼有什么区别","20180912:james-machine:恐惧让你沦为囚犯,希望让你重获自由," +"坚强的人只能救赎自己,伟大的人才能拯救别人." +"记着,希望是件好东西,而且从没有一样好东西会消逝." +"忙活,或者等死.","20180912:peter-machine:世界上最远的距离不是生和死," +"而是我站在你的面前却不能说:我爱你","20180912:deer-machine:成功的含义不在于得到什么," +"而是在于你从那个奋斗的起点走了多远.","20180912:king-machine:杀一为罪,屠万成雄。","20180912:mark-machine:世界在我掌握中,我却掌握不住对你的感情","20180912:lison-machine:我害怕前面的路,但是一想到你,就有能力向前走了。","20180912:james-machine:早就劝你别吸烟,可是烟雾中的你是那么的美," +"叫我怎么劝得下口。","20180912:peter-machine:如果你只做自己能力范围之内的事情,就永远无法进步。" +"昨天已成为历史,明天是未知的,而今天是上天赐予我们的礼物," +"这就是为什么我们把它叫做现在!","20180912:deer-machine:年轻的时候有贼心没贼胆,等到了老了吧," +"贼心贼胆都有了,可贼又没了。","20180912:king-machine:别看现在闹得欢,小心将来拉清单。"};private final static Random r = new Random();public static String getLogInfo(){return LOG_INFOS[r.nextInt(LOG_INFOS.length-1)];}
}

UDT

​ 基于 UDP 的数据传输协议(UDP-based Data Transfer Protocol,简称 UDT)是一种互联 网数据传输协议。UDT 的主要目的是支持高速广域网上的海量数据传输,最典型的例子就是 建立在光纤广域网上的网格计算,一些研究所在这样的网络上运行他们的分布式的数据密集 程式,例如,远程访问仪器、分布式数据挖掘和高分辨率的多媒体流。

​ 而互联网上的标准数据传输协议 TCP 在高带宽长距离网络上性能很差。顾名思义,UDT 建于 UDP 之上,并引入新的拥塞控制和数据可靠性控制机制。UDT 是面向连接的双向的应 用层协议。

netty(7)--UDP实战相关推荐

  1. linux netty udp服务端,Netty实现UDP服务端

    ### 前言 在之前的文章我已经讲过了利用`Netty`实现`UDP`客户端,大家有兴趣的话,可以参看下面文章: [Netty实现UDP客户端](https://www.jianshu.com/p/5 ...

  2. 《netty入门与实战》笔记-02:服务端启动流程

    为什么80%的码农都做不了架构师?>>>    1.服务端启动流程 这一小节,我们来学习一下如何使用 Netty 来启动一个服务端应用程序,以下是服务端启动的一个非常精简的 Demo ...

  3. Netty网络编程实战2,使用Netty开发聊天室功能

    目录 一.服务端 1.主程序类 2.自定义初始化器 3.自定义处理器 二.客户端 1.主程序类 2.自定义初始化器 3.自定义处理器 三.启动服务端.客户端 1.服务端:你好,我是服务端,哪吒编程 2 ...

  4. Netty介绍与实战(三)之粘包拆包

    一.传统NIO架构 step1. 我们传统的nio架构已经解决了多路复用,零拷贝等问题,已经十分优秀了,那为什么我们现在Netty如此火热呢? 1) 首先他使用简单,基本上都是模板化,我们可以更专注业 ...

  5. Netty入门与实战:仿写微信IM即时通讯系统

    转载自:Netty入门与实战:仿写微信IM即时通讯系统 Netty是互联网中间件领域使用最广泛最核心的网络通信框架,几乎所有互联网中间件或者大数据领域均离不开Netty,掌握Netty是作为初中级工程 ...

  6. Springboot+Netty搭建UDP客户端

    使用Netty+SpringBoot方式可以快速地开发一套基于UDP协议的服务端程序,同样的也可以开发客户端,一般使用UDP都是使用原生的方式,发送消息后就不管不问,也就是不需要确定消息是否收到,这里 ...

  7. Netty介绍及实战(二)之IO与NIO和多路复用与零拷贝

    Netty介绍及实战(一) 一.Netty到底是什么?什么是多路复用?什么叫做零拷贝? Netty是一个NIO客户端服务器框架,可以快速.轻松地开发协议服务器和客户端等网络应用程序.它极大地简化和简化 ...

  8. Netty之UDP协议开发

    文章的开头奉献上代码,方便大家对照学习. UDP协议简介 UDP是用户数据报协议(User Datagrame Protocol,UDP)的简称,主要作用是将网络数据流压缩成数据报的形式,提供面向事务 ...

  9. 基于Netty的UDP服务端开发

    1.前言 之前基于Netty做了一套TCP与MQTT的服务端,随着系统接入的终端类型越来越多,出现了UDP通讯的设备,虽然这样的设备并非主流,而且通讯机制存在问题,为了考虑系统的兼容性,只能将整套服务 ...

  10. Netty游戏服务器实战开发(12):线程任务组件开发

    导读: 线程在java里面很多,本文不会介绍线程使用,只会介绍大型项目线程组件的开发模式. 一个大型项目中少不了对多线程的使用,在使用多线程的时候,可以使用Java 的API提供的Thread或者Ru ...

最新文章

  1. 云计算时代的数据库运行
  2. 微信公众号开发本地环境开发_如何在5分钟内使HTTPS在本地开发环境上工作
  3. linux su -c命令
  4. WordPress工作原理之程序文件执行顺序
  5. 火灾检测、人流量统计… 这个开源项目太香了!
  6. 没有c语言基础可以学python吗-必须要有C语言基础才能学python吗
  7. python如何输入多组数据_Python 中如何实现多组的输入输出
  8. 转:链表相交问题 详解
  9. leetcode 897. 递增顺序搜索树(中序遍历)
  10. LeetCode 38. 报数
  11. 拆轮子系列--RxJava理解(一)--Map解析
  12. NSGA-Ⅱ算法C++实现(测试函数为ZDT1)
  13. 手动保存刷新微星主板BIOS图解教程
  14. MindMap学习使用
  15. linux pv修改大小,Linux下扩容系统容量和删除unknown PV
  16. 中国跨5个时区,东南西北的极点坐标信息
  17. 什么是软考?软考有什么作用?
  18. 计算机课导入语,信息技术课的提问和导入技巧
  19. PTA 10-43 计算xsda表中最矮同学的身高
  20. 京东助手抢购-购买口罩教程

热门文章

  1. 机器学习十大算法---3. SVM
  2. 儿童“益”站线上课堂 战“疫”不停学
  3. 本地局域网(内网)远程连接报错0x112f的一种解决方案-由于一个协议错误(代码: 0x112f),远程会话将被中断。请重新跟远程计算机连接
  4. DAO是什么?为什么我们需要DAO?
  5. 服务器本地文件无法复制粘贴,本地电脑与服务器突然无法直接使用复制粘贴
  6. 无法复制文件到远程桌面的解决办法
  7. 谁将打开腾讯业绩增速天花板?
  8. Java基础--封装--继承 某公司的雇员分为以下若干类:Employee:这是所有员工总的父类。 SalariedEmployee:Employee的子类
  9. 从官网下win10太慢了怎么办?
  10. amd支持服务器内存,amd专用内存和普通的内存有什么区别?