想要的逻辑是这样的:
A向局域网内发送广播消息messageA;
B收到了messageA并直接使用既有的session或channel把需要回复的消息write回来就行了。

自己尝试了一下,记载一下使用中较为便利的写法。

客户端一般是这样写:

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true).handler(new NormalUDPClientHandler());bootstrap.bind(9999).sync().channel().closeFuture().await();} catch (InterruptedException e) {e.printStackTrace();} finally {eventLoopGroup.shutdownGracefully();}

就像创建一个普通的客户端一样,不过channel传入的通道类型为NioDatagramChannel,使用的是bind而非connect,以及option传入的网络选项为ChannelOption.SO_BROADCAST。

上面这几行代码在《Netty权威指南》第十二章有类似出现,但原文是bind(0),这里是bind(9999),毕竟真实场景中还是很少拿0作为接收端口。或者说,原文想表达的意思就是这里的bootstrap只作为发送方存在,不接收回复。

初次看这段代码时被这个bind(0)误导了,以为是发送UDP必须如此…后面发现并不是,bind(0)代表着发送方会随机选择一个端口去发送UDP,类似于”万能端口“的概念,如果绑定一个合理值,那么发送端口就会被固定下来。
另外,区分客户端与服务端的界限并不明显,尤其是在单对单的情况下,在单对多的情况下,可以由哪一方实现了childrenHandler接口为服务端。

但其实发送UDP的重点除了上述所说的通道类型与网络选项外,还在于发送的数据包的特殊,与bind(0)关系不大。这个数据包也就是DatagramPacket,这里在handler中体现:

public class NormalUDPClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {L.d(ctx.channel().remoteAddress() + "");ctx.executor().parent().execute(new Runnable() {@Overridepublic void run() {for (int i = 0; i < 10; i++) {ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("我在广播" + i, Charset.forName("utf-8")), new InetSocketAddress("255.255.255.255", 10000)));try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}}}});}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {L.d(ctx.channel().remoteAddress() + "");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {DatagramPacket packet = (DatagramPacket) msg;ByteBuf byteBuf = packet.copy().content();byte[] bytes = new byte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);String content = new String(bytes);L.d(packet.sender()+","+content);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();}
}

这里需要小小注意一下,如果在Handler中有延时发送UDP的需求,请务必使用子线程
看过netty源码就会知道,这里handler中回调的几个方法,都在同一个线程池的同一条线程中,会按照调用顺序依次调用。所以如果有延时,就使用本身自带的线程池别起一条线程去延时吧,或者不要使用sleep这种方法。

再来看这个DatagramPacket,这是netty自建的一个数据包,传入内容和目标地址即可,当然这里因为是广播,地址传入"255.255.255.255"就可以了,这里的10000端口就表示只有监听10000端口才能收到这条广播消息。

public final class DatagramPacket extends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> implements ByteBufHolder {public DatagramPacket(ByteBuf data, InetSocketAddress recipient) {super(data, recipient);}
}

另外这里发送方也在9999端口监听着广播是否有人回复,接下来看UDP接收端。

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true).handler(new NormalUDPServerHandler());bootstrap.bind(10000).sync().channel().closeFuture().await();} catch (InterruptedException e) {e.printStackTrace();} finally {eventLoopGroup.shutdownGracefully();}

接收端与发送端写法一模一样,只是监听的端口与handler不同。

public class NormalUDPServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {DatagramPacket packet = (DatagramPacket) msg;ByteBuf byteBuf = packet.copy().content();byte[] bytes = new byte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);String content = new String(bytes);L.d(packet.sender().toString() + "," + content);ByteBuf byteBuf1 = new UnpooledByteBufAllocator(false).buffer();byteBuf1.writeCharSequence(content, Charset.forName("utf-8"));ctx.writeAndFlush(new DatagramPacket(byteBuf1, packet.sender()));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();}
}

这里的逻辑是把接收到的广播消息,原路返回给发送方,发送的地址信息可以使用packet.sender()直接获得。

这样一来,接收端的输出为:

20:01:43.043 (NormalUDPServerHandler.java:26)#channelRead-->:/192.168.0.104:9999,我在广播0
20:01:45.045 (NormalUDPServerHandler.java:26)#channelRead-->:/192.168.0.104:9999,我在广播1
20:01:47.047 (NormalUDPServerHandler.java:26)#channelRead-->:/192.168.0.104:9999,我在广播2

发送端的输出为:

20:01:43.043 (NormalUDPClientHandler.java:19)#channelActive-->:null
20:01:43.043 (NormalUDPClientHandler.java:48)#channelRead-->:/192.168.0.104:10000,我在广播0
20:01:45.045 (NormalUDPClientHandler.java:48)#channelRead-->:/192.168.0.104:10000,我在广播1
20:01:47.047 (NormalUDPClientHandler.java:48)#channelRead-->:/192.168.0.104:10000,我在广播2

疑问

问题:

  1. 客户端启动后是怎么进入channelActive来发送广播的?channelActive什么时候才会触发吗?
  2. 客户端是怎么实现自己关闭的?调用ctx.close后会怎样?

其实都可以通过了解源码得知,前面的逻辑我们不再赘述,有兴趣的话可以通过另一篇从Netty源码看心跳超时机制大致了解一番。

启动

我们先来看这个UDP的客户端是怎么”启动“的。
首先注意到上文记载的UDP广播无论是发送方还是接收方其实都是用的bind方式,以传统理解其实都是作为服务端存在的,但在netty中这样的概念其实是不明显的,只考虑"端对端"的情况是可以不区分谁是客户端谁是服务端的;
当然这也是因为UDP和TCP不同,是无连接的,所以才可以都使用bind使自己固定地址和端口;

大致调用流程都是一致的,无论是客户端还是服务端都会先创建channel并注册到某线程,然后处理连接事项后补全channel的状态与信息:

AbstractBootstrap-->
AbstractChannel-->
AbstractChannelHandlerContext-->
DefaultChannelPipeline-->
AbstractChannel$AbstractUnsafe

和看心跳那篇博客的时序图基本一致,还是先关注AbstractUnsafe中的register0方法:

       private void register0(ChannelPromise promise) {try {// ...pipeline.fireChannelRegistered();//1必定调用System.out.println("register0->isActive="+isActive());System.out.println("register0->firstRegistration="+firstRegistration);if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();//2看条件} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {// ...}}

其中有fireChannelRegistered和fireChannelActive,前者是必定调用的,而后者需要看isActive的状态,那么TCP建立和UDP建立会有所不同么?
这里当然是相同的,因为尚在注册阶段,一般情况下isActive均为false,

register0->isActive=false
register0->firstRegistration=true

所以fireChannelActive在这两种情况均不会调用。
那么再看AbstractUnsafe的bind方法,建立服务使用bind最终都会调用到这里来:

        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {// ...boolean wasActive = isActive();System.out.println("bind->isActive="+isActive());System.out.println("bind->wasActive="+wasActive);try {doBind(localAddress);//与本地地址绑定} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}System.out.println("bind->after doBind");System.out.println("bind->isActive="+isActive());System.out.println("bind->wasActive="+wasActive);if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();//1取决于两个判断标记}});}safeSetSuccess(promise);}

可以看到,doBind方法才是真正的绑定服务,而其后会通过wasActive标记与isActive状态的值来决定是否调用fireChannelActive。
那么这一步TCP服务和UDP服务两者会有所不同么?

TCP服务bind
bind->isActive=false
bind->wasActive=false
bind->after doBind
bind->isActive=true
bind->wasActive=false
---------------------
UDP服务bind
bind->isActive=false
bind->wasActive=false
bind->after doBind
bind->isActive=true
bind->wasActive=false

还是一样的,没想到吧。
所以就算是调用bind方法,仍然会回调到handler的channelActive方法,这是匆庸置疑的,但我们一般情况下建立TCP服务端的时候并不会为其“自身"添加handler,而是监听连接使用的childrenHandler,所以往往会忽略:

         ServerBootstrap serverBootstrap = new ServerBootstrap().group(boos, worker).channel(NioServerSocketChannel.class)//.handler(new NormalServerHandler())//看不见我.childOption(ChannelOption.TCP_NODELAY,true).childHandler(new ChannelInitializer<Channel>() {protected void initChannel(Channel ch) {ch.pipeline().addLast(new NormalServerHandler());}});

如果将上面被注释的这一行放开,那么就算只是服务端自己启动,也仍然会调用其中的channelActive方法,表示自身已经处于激活状态了。
所以至此应该可以回答第一个问题:
UDP的客户端和服务端其实本质上都是传统意义上的服务端写法,均使用bind,而bind成功后是必定要调用handler()所添加的Handler的channelActive方法的,和是否有channel连接是无关的;
childrenHandler()添加的Handler才会在有channel连接时调用其对应的channelActive方法;
或者更进一步的理解:

无论是服务端还是客户端,无论是bind或是connect,在netty中都会先创建一个channel,再使用channel的相关方法去操作,而只要这个channel被创建成功,那么肯定会调用相应channelActive方法的。

//connectprivate ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {ChannelFuture regFuture = this.initAndRegister();final Channel channel = regFuture.channel();}
//bindprivate ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();}

如果有不能进入channelRead的情况应该还是哪里写法有误,UDP这里的写法其实并不分客户端与服务端,全凭

new DatagramPacket(Unpooled.copiedBuffer("我在广播" + i, Charset.forName("utf-8")), new InetSocketAddress("255.255.255.255", 10001))

这样的包中的地址与端口来判断是否能发送,而channelRead和是否手动调用ctx.fireChannelActive()是无关的,服务会在自身绑定建立后就一直在自身的NioEventLoop线程的循环监听消息,所以应该不会影响。

关闭

如果这个”关闭“是指上文中UDP服务的关闭的话,其实客户端和服务端都是以服务形式启动的,所以关闭这个channel(通道)和关闭服务是一个意思。当然,这里建议把“channel”扩展理解为”端”,而非单纯的通道。
如果是指其他场景下,比如不挂起的情况下自动关闭:

bootstrap.bind(10000).sync().channel();
bootstrap.bind(10000).sync().channel().closeFuture().await();

前者是自动关闭的,后者是手动挂起,但其实自动关闭和手动关闭走的流程是一样的,只是自动关闭是由系统调用了手动关闭。
我们先看手动关闭。
如果在服务激活后5秒左右手动关闭是怎样的流程呢?

 @Overridepublic void channelActive(final ChannelHandlerContext ctx) throws Exception {System.out.println("handler active->"+ctx.channel().remoteAddress()+"");new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(5);System.out.println("----");ctx.close();} catch (InterruptedException e) {e.printStackTrace();}}}).start();}

ChannelHandlerContext可以看作对channel的一个包装,任意一端的建立都会产生一个新的channel,关闭了当前ChannelHandlerContext,也就意味着将当前端关闭了,所以服务也关闭了。
具体关闭源码:

        private void close(final ChannelPromise promise, final Throwable cause,final ClosedChannelException closeCause, final boolean notify) {//...Executor closeExecutor = prepareToClose();if (closeExecutor != null) {closeExecutor.execute(new Runnable() {@Overridepublic void run() {try {// Execute the close.doClose0(promise);} finally {invokeLater(new Runnable() {@Overridepublic void run() {if (outboundBuffer != null) {// Fail all the queued messagesoutboundBuffer.failFlushed(cause, notify);outboundBuffer.close(closeCause);}fireChannelInactiveAndDeregister(wasActive);}});}}});} else {//...}}private void doClose0(ChannelPromise promise) {try {doClose();closeFuture.setClosed();safeSetSuccess(promise);} catch (Throwable t) {closeFuture.setClosed();safeSetFailure(promise, t);}}

还是会由AbstractChannel$AbstractUnsafe接管,调用其中的close方法,然后调用doClose0,最后调用这个doClose();
这个doClose()会由新建时传入的channel种类确定,去调用各自类中的相应方法,

.channel(NioDatagramChannel.class)

UDP建立时使用的NioDatagramChannel.class,于是会回调NioDatagramChannel中的对应方法:

    @Overrideprotected void doClose() throws Exception {javaChannel().close();}

最终使用JDK中的关闭方法去关闭这个channel。
另外,如果我们不挂起的情况下,其实和手动关闭走过的方法是一样的,不同之处在于在不挂起的情况下,框架内有一套检测机制会去调用关闭,类似在NioEventLoop中:

    @Overrideprotected void run() {for (;;) {//...try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}private void closeAll() {//...for (AbstractNioChannel ch: channels) {ch.unsafe().close(ch.unsafe().voidPromise());}}

当一个channel被建立后,注册到线程池,其实线程就会一直跑着,一直去判断当前是否需要关闭;
一旦检测到需要关闭,就会调用

ch.unsafe().close

方法去关闭channel,这和ctx.close最终调用到的方法是一样的,都是AbstractUnsafe接管;
而isShuttingDown()中的状态会由线程池启动后就开始一直更新自己的状态,只要在这个线程池内还有任务存在,那么就能“苟活”,否则就会马上被判定为ST_SHUTTING_DOWN ,就开始步入“死亡”了。
可以在SingleThreadEventExecutor中找到相关的源码:

    private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {//...try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = state;if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {FastThreadLocal.removeAll();STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.countDown();if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}

所以如果在handler里面强行挂个sleep,也是可以延长这个channel的寿命,因为这样属于“任务尚未处理完毕“。
至此,应该能大致回答第2个问题。
客户端(服务端)如果不挂起await或其他耗时任务,框架会去自动检测并关闭channel;
ctx.close其实就是关闭channel,channel不止于通道的意思,它就是端,服务端把自己的端关闭了,服务自然就关了。

以上。

Netty的UDP广播发送与接收相关推荐

  1. Android再进阶之广播发送、接收和注册过程

    广播的注册分为静态注册和动态注册,静态注册在应用安装的时候由PackageManagerService来完成注册.这里只介绍动态注册. 动态注册 调用registerReceiver方法,它在Cont ...

  2. 虚拟机里udp广播发送不出去问题

    在虚拟机里我写的程序UDP广播,没有广播报文 我的解决方法是在虚拟机网络设置里,使用桥接模式,勾选复制物理网络连接状态 还有本机如果是双网卡,设备连接的是有线的话,最好把无线网络禁用了

  3. 解决go udp广播包无法接收

    1.虚拟机网卡导致广播包发送不出去或接收不到问题 2.防火墙开启限制udp包接收发送

  4. springboot集成netty使用udp协议实现消息接收与转发

    一.转发服务 1.创建NettyServer,使用线程池实现异步处理 *** udp服务*/ public class NettyServer {private static final Logger ...

  5. vc udp 广播接收和发送_UDP编程与DatagramSocket类:UDP的套接字

    API ----网络编程 ----DatagramPacket类 ----InetAddress类 java.lang.Object 继承者 java.net.DatagramSocket publi ...

  6. UDP 实现多收多发,广播发送,组播发送 TCP 实现多收多发

    文章目录 TCP 实现多收多发(上线下线提醒) UDP多收多发 UDP广播发送接收 UDP 组播发送接收 TCP 实现多收多发(上线下线提醒) 创建发送端 public static void mai ...

  7. WIFI UDP 实时广播 发送数据

    先说下结论, WIFI 单播与组播的对比 802.11 只有 unicast 和 broadcast.UDP 广播以及组播在实际传输上,根据设置的不同,可以被操作系统自动转换成 unicast 和 b ...

  8. 网络通信之如何广播发送

    网络通信基础 1. 广播一般局限于局域网,多播即可用于局域网也可跨越广域网. 2. 广播地址应用于网络内的所有主机 1)有限广播 它不被路由但会被送到相同物理网络段上的所有主机 IP地址的网络字段和主 ...

  9. python UDP广播

    #!/usr/bin/python python # -*- coding:UTF-8 -*- # UDP 广播接收 from socket import *HOST = '0.0.0.0' PORT ...

最新文章

  1. WINCE6.0 error C2220: warning treated as error问题解决
  2. 学习笔记——深拷贝与浅拷贝
  3. 计算机系统和中断的概念
  4. Spring Async和Java的8 CompletableFuture
  5. NHibernate实例化类部分属性
  6. LeetCode 914. 卡牌分组(最大公约数)
  7. centos安装golang环境
  8. java对象的我可变属性,不可变对象的所有属性都必须是最终的吗?
  9. URL安全的Base64编码
  10. verilog语法实例学习(3)
  11. 国人项目上了Github全球热榜,之后都发生了什么?
  12. .net抽象类和抽象函数abstract
  13. 揭秘ASP.NET 2.0之Page.Eval
  14. arcgis拓扑与修复
  15. 学术会议论文查重吗_会议论文需要进行查重吗?
  16. w10连接远程计算机控制,Win10远程桌面连接如何开启,3种方式助你远程控制
  17. 【转载】Unity 项目管理与优化
  18. 2020安洵杯--MISC
  19. SQL语句——处理函数
  20. 经营收款限制个人收款码,商户的个税会受影响吗?

热门文章

  1. 大话设计模式-工厂模式
  2. PMP报考流程以及注意事项
  3. FJUT-cwl的女朋友3 凑钱最少张数问题
  4. web实现消息推送及桌面提醒
  5. python socket 心跳包_Socket之心跳包实现思路
  6. 写一个杀戮尖塔存档修改器
  7. 系统集成项目管理工程师证书介绍
  8. 6-10 找出大于num的最小素数 (10 分)
  9. hdf5 mysql_PythonHDF5目录
  10. 青龙新毛——闪电世界