Reactor Netty参考指南目录


原文地址

Reactor Netty提供了易于使用、易于配置的UdpClient。它隐藏了创建UDP客户端所需的大部分Netty的功能,并增加了Reactive Streams背压。

8.1.连接和断开

要将UDP客户端连接到给定的端点,您必须创建并且配置一个UdpClient实例。默认情况下,host为localhost,post为12012。下面是创建和连接一个UDP客户端的例子:

https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/create/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;public class Application {public static void main(String[] args) {Connection connection =UdpClient.create()                            //<1>.connectNow(Duration.ofSeconds(30)); //<2>connection.onDispose().block();}
}

<1> 创建一个UdpClient实例用来进行的配置操作。

<2> 用阻塞的方式进行连接操作,并且等待它初始化完成。

返回的Connection对象提供了简单的连接相关的API,包括disposeNow(),调用这个方法会以阻塞的方式关闭客户端。

8.1.1.Host和Port

想要连接特定的hostport,您可以使用以下方式来配置UDP客户端。示例如下:

https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/address/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;public class Application {public static void main(String[] args) {Connection connection =UdpClient.create().host("example.com") //<1>.port(80)            //<2>.connectNow(Duration.ofSeconds(30));connection.onDispose().block();}
}

<1> 指定客户端需要连接的host

<2> 指定客户端需要连接的port

8.2.预先初始化

默认情况下,UdpClient初始化资源的操作在需要使用的时候才进行。这意味着初始化加载的时候connect operation会占用额外的时间:

  • 事件循环组

  • 主机名解析器

  • native传输库(当使用了native传输的时候)

当您需要预加载这些资源的时候,您可以按照以下方式来配置UdpClient

https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/warmup/Application.java

import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;import java.time.Duration;public class Application {public static void main(String[] args) {UdpClient udpClient = UdpClient.create().host("example.com").port(80).handle((udpInbound, udpOutbound) -> udpOutbound.sendString(Mono.just("hello")));udpClient.warmup() //<1>.block();Connection connection = udpClient.connectNow(Duration.ofSeconds(30)); //<2>connection.onDispose().block();}
}

<1> 初始化和加载事件循环组,主机名解析器和native传输库

<2> 在连接远程节点的时候会进行主机名解析

8.3.写出数据

如果要发送数据到一个已有的端点,您必须添加一个I/O处理器。这个I/O处理器可以通过UdpOutbound来写出数据。

下面是一个发送hello字符串的例子:

https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/send/Application.java

import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;import java.time.Duration;public class Application {public static void main(String[] args) {Connection connection =UdpClient.create().host("example.com").port(80).handle((udpInbound, udpOutbound) -> udpOutbound.sendString(Mono.just("hello"))) //<1>.connectNow(Duration.ofSeconds(30));connection.onDispose().block();}
}

<1> 发送一个hello字符串到远端

8.4.消费数据

如果要接收从已有端点发过来的数据,您必须添加一个I/O处理器。这个I/O处理器可以通过UdpInbound来读取数据。示例如下:

https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/read/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;public class Application {public static void main(String[] args) {Connection connection =UdpClient.create().host("example.com").port(80).handle((udpInbound, udpOutbound) -> udpInbound.receive().then()) //<1>.connectNow(Duration.ofSeconds(30));connection.onDispose().block();}
}

<1> 接收从已有端点发送过来的数据

8.5.生命周期回调

下面的生命周期回调用参数是提供给您用来扩展UdpClient的:

Callback Description
doAfterResolve 在成功解析远程地址之后调用。
doOnChannelInit 在初始化channel的时候调用。
doOnConnect 当channel将要连接的时候调用。
doOnConnected 当channel已经连接上的时候调用。
doOnDisconnected 当channel断开的时候被调用。
doOnResolve 当远程地址将要被解析的时候被调用。
doOnResolveError 在远程地址解析失败的情况下被调用。

下面是使用doOnConnecteddoOnChannelInit回调的例子:

https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/lifecycle/Application.java

import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;public class Application {public static void main(String[] args) {Connection connection =UdpClient.create().host("example.com").port(80).doOnConnected(conn -> conn.addHandler(new LineBasedFrameDecoder(8192))) //<1>.doOnChannelInit((observer, channel, remoteAddress) ->channel.pipeline().addFirst(new LoggingHandler("reactor.netty.examples")))      //<2>.connectNow(Duration.ofSeconds(30));connection.onDispose().block();}
}

<1> 当一个channel连接上之后添加了一个LineBasedFrameDecoderNetty pipeline。

<2> 当初始化channel的时候添加了一个LoggingHandlerNetty pipeline。

8.6.连接配置

这一章节将给您介绍三种UDP层的配置方式:

  • Channel Options

  • Wire Logger

  • Event Loop Group

8.6.1.Channel Options

默认情况下,UDP客户端配置了以下options:

./…/…/reactor-netty-core/src/main/java/reactor/netty/udp/UdpClientConnect.java

UdpClientConnect() {this.config = new UdpClientConfig(ConnectionProvider.newConnection(),Collections.singletonMap(ChannelOption.AUTO_READ, false),() -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT));
}

如果需要添加新的option或者修改已有的option,您可以使用如下的方式:

https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/channeloptions/Application.java

import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;public class Application {public static void main(String[] args) {Connection connection =UdpClient.create().host("example.com").port(80).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000).connectNow(Duration.ofSeconds(30));connection.onDispose().block();}
}

您可以通过以下的链接找到更多关于Netty channel options的信息:

  • ChannelOption
  • Socket Options

8.6.2.Wire Logger

Reactor Netty提供了线路记录(wire logging)用来检查点对点的流量。默认情况下,线路记录是关闭的。如果想要开启它,您必须将日志reactor.netty.udp.UdpClient的设置为DEBUG等级并且按如下方式进行配置:

https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/wiretap/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.time.Duration;public class Application {public static void main(String[] args) {Connection connection =UdpClient.create().host("example.com").port(80).wiretap(true) //<1>.connectNow(Duration.ofSeconds(30));connection.onDispose().block();}
}

<1> 开启线路记录

默认情况下,线路记录在输出内容的时候会使用AdvancedByteBufFormat#HEX_DUMP。您也可以通过配置UdpClient改为AdvancedByteBufFormat#SIMPLE或者AdvancedByteBufFormat#TEXTUAL:

https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/wiretap/custom/Application.java

import io.netty.handler.logging.LogLevel;
import reactor.netty.Connection;
import reactor.netty.transport.logging.AdvancedByteBufFormat;
import reactor.netty.udp.UdpClient;import java.time.Duration;public class Application {public static void main(String[] args) {Connection connection =UdpClient.create().host("example.com").port(80).wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) //<1>.connectNow(Duration.ofSeconds(30));}}

<1> 开启线路记录并使用AdvancedByteBufFormat#TEXTUAL来输出内容。

8.6.3.Event Loop Group

默认情况下,UDP客户端使用"Event Loop Group",工作线程数等于初始化的时候可以用的处理器数量(但最小是4)。您也可以使用LoopResource#create其中的一个方法来修改配置。

默认的Event Loop Group配置如下:

./…/…/reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java

/*** Default worker thread count, fallback to available processor* (but with a minimum value of 4)*/
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/*** Default selector thread count, fallback to -1 (no selector thread)*/
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/*** Default worker thread count for UDP, fallback to available processor* (but with a minimum value of 4)*/
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/*** Default quiet period that guarantees that the disposal of the underlying LoopResources* will not happen, fallback to 2 seconds.*/
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/*** Default maximum amount of time to wait until the disposal of the underlying LoopResources* regardless if a task was submitted during the quiet period, fallback to 15 seconds.*/
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";/*** Default value whether the native transport (epoll, kqueue) will be preferred,* fallback it will be preferred when available*/
public static final String NATIVE = "reactor.netty.native";

如果需要修改这些设置,您也可以通过如下方式进行配置:

https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/eventloop/Application.java

import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.udp.UdpClient;
import java.time.Duration;public class Application {public static void main(String[] args) {LoopResources loop = LoopResources.create("event-loop", 1, 4, true);Connection connection =UdpClient.create().host("example.com").port(80).runOn(loop).connectNow(Duration.ofSeconds(30));connection.onDispose().block();}
}

8.7.度量

UDP客户端支持与Micrometer的内置集成。它暴露了所有前缀为reactor.netty.udp.client的度量。

下面的表格提供了UDP客户端度量的相关信息:

度量名称 类型 描述
reactor.netty.udp.client.data.received DistributionSummary 收到的数据量,以字节为单位
reactor.netty.udp.client.data.sent DistributionSummary 发送的数据量,以字节为单位
reactor.netty.udp.client.errors Counter 发生的错误数量
reactor.netty.udp.client.connect.time Timer 连接远程地址所花费的时间
reactor.netty.udp.client.address.resolver Timer 解析远程地址花费的时间

下面额外的度量也是可用的:

ByteBufAllocator度量

度量名称 类型 描述
reactor.netty.bytebuf.allocator.used.heap.memory Gauge 堆内存的字节数
reactor.netty.bytebuf.allocator.used.direct.memory Gauge 堆外内存的字节数
reactor.netty.bytebuf.allocator.used.heap.arenas Gauge 堆内存的个数(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.direct.arenas Gauge 堆外内存的个数(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.threadlocal.caches Gauge threadlocal的缓存数量(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.tiny.cache.size Gauge 微小缓存的大小(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.small.cache.size Gauge 小缓存的大小(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.normal.cache.size Gauge 一般缓存的大小(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.chunk.size Gauge 一个区域的块大小(当使用PooledByteBufAllocator的时候)

下面是开启集成的度量的例子:

https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/metrics/Application.java

import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;import java.time.Duration;public class Application {public static void main(String[] args) {Connection connection =UdpClient.create().host("example.com").port(80).metrics(true) //<1>.connectNow(Duration.ofSeconds(30));connection.onDispose().block();}
}

<1> 开启内建集成的Micrometer

如果您想让UDP客户端度量与除了Micrometer之外的系统集成或者想提供自己与Micrometer的集成来添加自己的度量记录器,您可以按如下方式实现:

https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/client/metrics/custom/Application.java

import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.udp.UdpClient;import java.net.SocketAddress;
import java.time.Duration;public class Application {public static void main(String[] args) {Connection connection =UdpClient.create().host("example.com").port(80).metrics(true, CustomChannelMetricsRecorder::new) //<1>.connectNow(Duration.ofSeconds(30));connection.onDispose().block();}}

<1> 开启UDP客户端度量并且提供ChannelMetricsRecorder的实现。

Suggest Edit to “UDP Client”


Reactor Netty参考指南目录


版权声明:如需转载,请带上本文链接、注明来源和本声明。否则将追究法律责任。https://www.immuthex.com/posts/reactor-netty-reference-guide/udp-client

[翻译]Reactor Netty参考指南 - 8.UDP客户端相关推荐

  1. 526页Netty参考指南,电子版已就绪,面试大厂不再被网络通信难倒

    有人调侃我们说: 程序员不如送外卖.送外卖是搬运食物,自己是搬运代码,都不产出新的东西-- 透支体力,又消耗健康,可替代性极强,30岁之后就要面临被优化的危险-- 想跳槽,但是更高的平台难进,同级别的 ...

  2. 《Netty权威指南 第2版》学习笔记(1)---服务端与客户端开发入门

    前言 Netty权威指南中以时间服务器为入门案例,演示了如何通过Netty完成了服务端与客户端之间的交互过程. 在开始使用Netty开发之前,先回顾一下使用NIO进行服务端开发的步骤. 创建Serve ...

  3. POCO库中文编程参考指南(11)如何使用Reactor框架?

    1 Reactor 框架概述 POCO 中的 Reactor 框架是基于 Reactor 设计模式进行设计的.其中由 Handler 将某 Socket 产生的事件,发送到指定的对象的方法上,作为回调 ...

  4. Springboot+Netty搭建UDP客户端

    使用Netty+SpringBoot方式可以快速地开发一套基于UDP协议的服务端程序,同样的也可以开发客户端,一般使用UDP都是使用原生的方式,发送消息后就不管不问,也就是不需要确定消息是否收到,这里 ...

  5. Spring Boot 中文参考指南

    Spring Boot 版本 2.7.8 原文:https://docs.spring.io/spring-boot/docs/2.7.8/reference/htmlsingle/ - 笔者注: S ...

  6. 《Netty权威指南》

    <Netty权威指南> 基本信息 作者: 李林锋 出版社:电子工业出版社 ISBN:9787121233432 上架时间:2014-5-29 出版日期:2014 年6月 开本:16开 页码 ...

  7. Spring5参考指南:IOC容器

    文章目录 为什么使用Spring5 什么是IOC容器 配置元数据 实例化容器 XML嵌套 groovy bean定义DSL 使用容器 最近在翻译Spring Framework Documentati ...

  8. java 面试指南_Java面试参考指南–第1部分

    java 面试指南 JAVA面向对象的概念 Java in基于面向对象的概念,它允许更高级别的抽象以实际方式解决任何问题. 面向对象的方法将实际对象中的问题解决方案概念化,更易于在整个应用程序中重用. ...

  9. Java面试参考指南–第1部分

    JAVA面向对象的概念 Java基于面向对象的概念,它允许更高级别的抽象以实际方式解决任何问题. 面向对象的方法将实际对象中的问题解决方案概念化,从而更易于在整个应用程序中重用. 例如椅子,风扇,狗, ...

最新文章

  1. 前紫光展锐CTO创业造芯:主打边缘AI视觉,融资10亿,9个月流片
  2. HDFS应用场景、部署、原理与基本架构
  3. android Q版本外部存储问题以及获取空间大小问题
  4. XGBoost-原理推导(上)
  5. 发布web应用程序是出现unsafe code
  6. 牛客网 java刷题_牛客网刷题(纯java题型 1~30题)
  7. 【分治】动态点分治 ([ZJOI2007]捉迷藏)
  8. 2013年值得我们学习的网页设计作品【系列二】
  9. ASP.NET AJAX应用
  10. 列出本机JCE支持的Provider、消息摘要算法、公钥私钥算法
  11. 华为NP课程笔记13-PIM
  12. python反编译class文件_简单的反编译class文件并重新编译的方法
  13. 51单片机用C语言实现交通灯(红绿灯)源程序实训QY-KP1
  14. 最新2020版IDEA下载安装教程
  15. ruoyi框架默认的导出Excel功能代码简析
  16. 阿拉丁年会 - 小程序开发者年度盛会 报告内容摘要
  17. Cannot load C:\Users\12778\AppData\Local\JetBrains\IntelliJIdea2021.1\tomcat\c0cf5d96-4221-48ee-b343
  18. C# throw new Exception的使用
  19. 微信小程序-H5-uniapp css制作上下跳动的柱状图——频谱
  20. QQ邮箱获取邮件客户端授权码

热门文章

  1. 常见英文语法错误案例分享
  2. VS Code实现python代码语法检查、格式规范化、自动换行字数限制
  3. ctfshow萌新赛web
  4. python+pyecharts画地图
  5. Arqit公司将于2023年用卫星发送量子密钥;QC Ware发布量子线性代数API | 全球量子科技与工业快讯第二十六期
  6. mac上安装vbox虚拟机运行win7系统,mac上运行exe
  7. PDF太大如何压缩变小?
  8. FPGA采集AD7606串行输源码加讲解
  9. AD7606 SPI模式 网上问题汇总
  10. C语言报错警告合集(转)