Netty的UDP广播发送与接收
想要的逻辑是这样的:
A向局域网内发送广播消息messageA;
B收到了messageA并直接使用既有的session或channel把需要回复的消息write回来就行了。
自己尝试了一下,记载一下使用中较为便利的写法。
客户端一般是这样写:
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true).handler(new NormalUDPClientHandler());bootstrap.bind(9999).sync().channel().closeFuture().await();} catch (InterruptedException e) {e.printStackTrace();} finally {eventLoopGroup.shutdownGracefully();}
就像创建一个普通的客户端一样,不过channel传入的通道类型为NioDatagramChannel,使用的是bind而非connect,以及option传入的网络选项为ChannelOption.SO_BROADCAST。
上面这几行代码在《Netty权威指南》第十二章有类似出现,但原文是bind(0),这里是bind(9999),毕竟真实场景中还是很少拿0作为接收端口。或者说,原文想表达的意思就是这里的bootstrap只作为发送方存在,不接收回复。
初次看这段代码时被这个bind(0)误导了,以为是发送UDP必须如此…后面发现并不是,bind(0)代表着发送方会随机选择一个端口去发送UDP,类似于”万能端口“的概念,如果绑定一个合理值,那么发送端口就会被固定下来。
另外,区分客户端与服务端的界限并不明显,尤其是在单对单的情况下,在单对多的情况下,可以由哪一方实现了childrenHandler接口为服务端。
但其实发送UDP的重点除了上述所说的通道类型与网络选项外,还在于发送的数据包的特殊,与bind(0)关系不大。这个数据包也就是DatagramPacket,这里在handler中体现:
public class NormalUDPClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {L.d(ctx.channel().remoteAddress() + "");ctx.executor().parent().execute(new Runnable() {@Overridepublic void run() {for (int i = 0; i < 10; i++) {ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("我在广播" + i, Charset.forName("utf-8")), new InetSocketAddress("255.255.255.255", 10000)));try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}}}});}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {L.d(ctx.channel().remoteAddress() + "");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {DatagramPacket packet = (DatagramPacket) msg;ByteBuf byteBuf = packet.copy().content();byte[] bytes = new byte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);String content = new String(bytes);L.d(packet.sender()+","+content);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();}
}
这里需要小小注意一下,如果在Handler中有延时发送UDP的需求,请务必使用子线程。
看过netty源码就会知道,这里handler中回调的几个方法,都在同一个线程池的同一条线程中,会按照调用顺序依次调用。所以如果有延时,就使用本身自带的线程池别起一条线程去延时吧,或者不要使用sleep这种方法。
再来看这个DatagramPacket,这是netty自建的一个数据包,传入内容和目标地址即可,当然这里因为是广播,地址传入"255.255.255.255"就可以了,这里的10000端口就表示只有监听10000端口才能收到这条广播消息。
public final class DatagramPacket extends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> implements ByteBufHolder {public DatagramPacket(ByteBuf data, InetSocketAddress recipient) {super(data, recipient);}
}
另外这里发送方也在9999端口监听着广播是否有人回复,接下来看UDP接收端。
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true).handler(new NormalUDPServerHandler());bootstrap.bind(10000).sync().channel().closeFuture().await();} catch (InterruptedException e) {e.printStackTrace();} finally {eventLoopGroup.shutdownGracefully();}
接收端与发送端写法一模一样,只是监听的端口与handler不同。
public class NormalUDPServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {DatagramPacket packet = (DatagramPacket) msg;ByteBuf byteBuf = packet.copy().content();byte[] bytes = new byte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);String content = new String(bytes);L.d(packet.sender().toString() + "," + content);ByteBuf byteBuf1 = new UnpooledByteBufAllocator(false).buffer();byteBuf1.writeCharSequence(content, Charset.forName("utf-8"));ctx.writeAndFlush(new DatagramPacket(byteBuf1, packet.sender()));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();}
}
这里的逻辑是把接收到的广播消息,原路返回给发送方,发送的地址信息可以使用packet.sender()直接获得。
这样一来,接收端的输出为:
20:01:43.043 (NormalUDPServerHandler.java:26)#channelRead-->:/192.168.0.104:9999,我在广播0
20:01:45.045 (NormalUDPServerHandler.java:26)#channelRead-->:/192.168.0.104:9999,我在广播1
20:01:47.047 (NormalUDPServerHandler.java:26)#channelRead-->:/192.168.0.104:9999,我在广播2
发送端的输出为:
20:01:43.043 (NormalUDPClientHandler.java:19)#channelActive-->:null
20:01:43.043 (NormalUDPClientHandler.java:48)#channelRead-->:/192.168.0.104:10000,我在广播0
20:01:45.045 (NormalUDPClientHandler.java:48)#channelRead-->:/192.168.0.104:10000,我在广播1
20:01:47.047 (NormalUDPClientHandler.java:48)#channelRead-->:/192.168.0.104:10000,我在广播2
疑问
问题:
- 客户端启动后是怎么进入channelActive来发送广播的?channelActive什么时候才会触发吗?
- 客户端是怎么实现自己关闭的?调用ctx.close后会怎样?
其实都可以通过了解源码得知,前面的逻辑我们不再赘述,有兴趣的话可以通过另一篇从Netty源码看心跳超时机制大致了解一番。
启动
我们先来看这个UDP的客户端是怎么”启动“的。
首先注意到上文记载的UDP广播无论是发送方还是接收方其实都是用的bind方式,以传统理解其实都是作为服务端存在的,但在netty中这样的概念其实是不明显的,只考虑"端对端"的情况是可以不区分谁是客户端谁是服务端的;
当然这也是因为UDP和TCP不同,是无连接的,所以才可以都使用bind使自己固定地址和端口;
大致调用流程都是一致的,无论是客户端还是服务端都会先创建channel并注册到某线程,然后处理连接事项后补全channel的状态与信息:
AbstractBootstrap-->
AbstractChannel-->
AbstractChannelHandlerContext-->
DefaultChannelPipeline-->
AbstractChannel$AbstractUnsafe
和看心跳那篇博客的时序图基本一致,还是先关注AbstractUnsafe中的register0方法:
private void register0(ChannelPromise promise) {try {// ...pipeline.fireChannelRegistered();//1必定调用System.out.println("register0->isActive="+isActive());System.out.println("register0->firstRegistration="+firstRegistration);if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();//2看条件} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {// ...}}
其中有fireChannelRegistered和fireChannelActive,前者是必定调用的,而后者需要看isActive的状态,那么TCP建立和UDP建立会有所不同么?
这里当然是相同的,因为尚在注册阶段,一般情况下isActive均为false,
register0->isActive=false
register0->firstRegistration=true
所以fireChannelActive在这两种情况均不会调用。
那么再看AbstractUnsafe的bind方法,建立服务使用bind最终都会调用到这里来:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {// ...boolean wasActive = isActive();System.out.println("bind->isActive="+isActive());System.out.println("bind->wasActive="+wasActive);try {doBind(localAddress);//与本地地址绑定} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}System.out.println("bind->after doBind");System.out.println("bind->isActive="+isActive());System.out.println("bind->wasActive="+wasActive);if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();//1取决于两个判断标记}});}safeSetSuccess(promise);}
可以看到,doBind方法才是真正的绑定服务,而其后会通过wasActive标记与isActive状态的值来决定是否调用fireChannelActive。
那么这一步TCP服务和UDP服务两者会有所不同么?
TCP服务bind
bind->isActive=false
bind->wasActive=false
bind->after doBind
bind->isActive=true
bind->wasActive=false
---------------------
UDP服务bind
bind->isActive=false
bind->wasActive=false
bind->after doBind
bind->isActive=true
bind->wasActive=false
还是一样的,没想到吧。
所以就算是调用bind方法,仍然会回调到handler的channelActive方法,这是匆庸置疑的,但我们一般情况下建立TCP服务端的时候并不会为其“自身"添加handler,而是监听连接使用的childrenHandler,所以往往会忽略:
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boos, worker).channel(NioServerSocketChannel.class)//.handler(new NormalServerHandler())//看不见我.childOption(ChannelOption.TCP_NODELAY,true).childHandler(new ChannelInitializer<Channel>() {protected void initChannel(Channel ch) {ch.pipeline().addLast(new NormalServerHandler());}});
如果将上面被注释的这一行放开,那么就算只是服务端自己启动,也仍然会调用其中的channelActive方法,表示自身已经处于激活状态了。
所以至此应该可以回答第一个问题:
UDP的客户端和服务端其实本质上都是传统意义上的服务端写法,均使用bind,而bind成功后是必定要调用handler()所添加的Handler的channelActive方法的,和是否有channel连接是无关的;
childrenHandler()添加的Handler才会在有channel连接时调用其对应的channelActive方法;
或者更进一步的理解:
无论是服务端还是客户端,无论是bind或是connect,在netty中都会先创建一个channel,再使用channel的相关方法去操作,而只要这个channel被创建成功,那么肯定会调用相应channelActive方法的。
//connectprivate ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {ChannelFuture regFuture = this.initAndRegister();final Channel channel = regFuture.channel();}
//bindprivate ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();}
如果有不能进入channelRead的情况应该还是哪里写法有误,UDP这里的写法其实并不分客户端与服务端,全凭
new DatagramPacket(Unpooled.copiedBuffer("我在广播" + i, Charset.forName("utf-8")), new InetSocketAddress("255.255.255.255", 10001))
这样的包中的地址与端口来判断是否能发送,而channelRead和是否手动调用ctx.fireChannelActive()是无关的,服务会在自身绑定建立后就一直在自身的NioEventLoop线程的循环监听消息,所以应该不会影响。
关闭
如果这个”关闭“是指上文中UDP服务的关闭的话,其实客户端和服务端都是以服务形式启动的,所以关闭这个channel(通道)和关闭服务是一个意思。当然,这里建议把“channel”扩展理解为”端”,而非单纯的通道。
如果是指其他场景下,比如不挂起的情况下自动关闭:
bootstrap.bind(10000).sync().channel();
bootstrap.bind(10000).sync().channel().closeFuture().await();
前者是自动关闭的,后者是手动挂起,但其实自动关闭和手动关闭走的流程是一样的,只是自动关闭是由系统调用了手动关闭。
我们先看手动关闭。
如果在服务激活后5秒左右手动关闭是怎样的流程呢?
@Overridepublic void channelActive(final ChannelHandlerContext ctx) throws Exception {System.out.println("handler active->"+ctx.channel().remoteAddress()+"");new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(5);System.out.println("----");ctx.close();} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
ChannelHandlerContext可以看作对channel的一个包装,任意一端的建立都会产生一个新的channel,关闭了当前ChannelHandlerContext,也就意味着将当前端关闭了,所以服务也关闭了。
具体关闭源码:
private void close(final ChannelPromise promise, final Throwable cause,final ClosedChannelException closeCause, final boolean notify) {//...Executor closeExecutor = prepareToClose();if (closeExecutor != null) {closeExecutor.execute(new Runnable() {@Overridepublic void run() {try {// Execute the close.doClose0(promise);} finally {invokeLater(new Runnable() {@Overridepublic void run() {if (outboundBuffer != null) {// Fail all the queued messagesoutboundBuffer.failFlushed(cause, notify);outboundBuffer.close(closeCause);}fireChannelInactiveAndDeregister(wasActive);}});}}});} else {//...}}private void doClose0(ChannelPromise promise) {try {doClose();closeFuture.setClosed();safeSetSuccess(promise);} catch (Throwable t) {closeFuture.setClosed();safeSetFailure(promise, t);}}
还是会由AbstractChannel$AbstractUnsafe接管,调用其中的close方法,然后调用doClose0,最后调用这个doClose();
这个doClose()会由新建时传入的channel种类确定,去调用各自类中的相应方法,
.channel(NioDatagramChannel.class)
UDP建立时使用的NioDatagramChannel.class,于是会回调NioDatagramChannel中的对应方法:
@Overrideprotected void doClose() throws Exception {javaChannel().close();}
最终使用JDK中的关闭方法去关闭这个channel。
另外,如果我们不挂起的情况下,其实和手动关闭走过的方法是一样的,不同之处在于在不挂起的情况下,框架内有一套检测机制会去调用关闭,类似在NioEventLoop中:
@Overrideprotected void run() {for (;;) {//...try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}private void closeAll() {//...for (AbstractNioChannel ch: channels) {ch.unsafe().close(ch.unsafe().voidPromise());}}
当一个channel被建立后,注册到线程池,其实线程就会一直跑着,一直去判断当前是否需要关闭;
一旦检测到需要关闭,就会调用
ch.unsafe().close
方法去关闭channel,这和ctx.close最终调用到的方法是一样的,都是AbstractUnsafe接管;
而isShuttingDown()中的状态会由线程池启动后就开始一直更新自己的状态,只要在这个线程池内还有任务存在,那么就能“苟活”,否则就会马上被判定为ST_SHUTTING_DOWN ,就开始步入“死亡”了。
可以在SingleThreadEventExecutor中找到相关的源码:
private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {//...try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = state;if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {FastThreadLocal.removeAll();STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.countDown();if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}
所以如果在handler里面强行挂个sleep,也是可以延长这个channel的寿命,因为这样属于“任务尚未处理完毕“。
至此,应该能大致回答第2个问题。
客户端(服务端)如果不挂起await或其他耗时任务,框架会去自动检测并关闭channel;
ctx.close其实就是关闭channel,channel不止于通道的意思,它就是端,服务端把自己的端关闭了,服务自然就关了。
以上。
Netty的UDP广播发送与接收相关推荐
- Android再进阶之广播发送、接收和注册过程
广播的注册分为静态注册和动态注册,静态注册在应用安装的时候由PackageManagerService来完成注册.这里只介绍动态注册. 动态注册 调用registerReceiver方法,它在Cont ...
- 虚拟机里udp广播发送不出去问题
在虚拟机里我写的程序UDP广播,没有广播报文 我的解决方法是在虚拟机网络设置里,使用桥接模式,勾选复制物理网络连接状态 还有本机如果是双网卡,设备连接的是有线的话,最好把无线网络禁用了
- 解决go udp广播包无法接收
1.虚拟机网卡导致广播包发送不出去或接收不到问题 2.防火墙开启限制udp包接收发送
- springboot集成netty使用udp协议实现消息接收与转发
一.转发服务 1.创建NettyServer,使用线程池实现异步处理 *** udp服务*/ public class NettyServer {private static final Logger ...
- vc udp 广播接收和发送_UDP编程与DatagramSocket类:UDP的套接字
API ----网络编程 ----DatagramPacket类 ----InetAddress类 java.lang.Object 继承者 java.net.DatagramSocket publi ...
- UDP 实现多收多发,广播发送,组播发送 TCP 实现多收多发
文章目录 TCP 实现多收多发(上线下线提醒) UDP多收多发 UDP广播发送接收 UDP 组播发送接收 TCP 实现多收多发(上线下线提醒) 创建发送端 public static void mai ...
- WIFI UDP 实时广播 发送数据
先说下结论, WIFI 单播与组播的对比 802.11 只有 unicast 和 broadcast.UDP 广播以及组播在实际传输上,根据设置的不同,可以被操作系统自动转换成 unicast 和 b ...
- 网络通信之如何广播发送
网络通信基础 1. 广播一般局限于局域网,多播即可用于局域网也可跨越广域网. 2. 广播地址应用于网络内的所有主机 1)有限广播 它不被路由但会被送到相同物理网络段上的所有主机 IP地址的网络字段和主 ...
- python UDP广播
#!/usr/bin/python python # -*- coding:UTF-8 -*- # UDP 广播接收 from socket import *HOST = '0.0.0.0' PORT ...
最新文章
- WINCE6.0 error C2220: warning treated as error问题解决
- 学习笔记——深拷贝与浅拷贝
- 计算机系统和中断的概念
- Spring Async和Java的8 CompletableFuture
- NHibernate实例化类部分属性
- LeetCode 914. 卡牌分组(最大公约数)
- centos安装golang环境
- java对象的我可变属性,不可变对象的所有属性都必须是最终的吗?
- URL安全的Base64编码
- verilog语法实例学习(3)
- 国人项目上了Github全球热榜,之后都发生了什么?
- .net抽象类和抽象函数abstract
- 揭秘ASP.NET 2.0之Page.Eval
- arcgis拓扑与修复
- 学术会议论文查重吗_会议论文需要进行查重吗?
- w10连接远程计算机控制,Win10远程桌面连接如何开启,3种方式助你远程控制
- 【转载】Unity 项目管理与优化
- 2020安洵杯--MISC
- SQL语句——处理函数
- 经营收款限制个人收款码,商户的个税会受影响吗?