Netty框架进阶篇 - 分析和实战内置的编解码器及ChannelHandler
文章目录
- 前言
- 基于Netty构建HTTP/HTTPS应用程序
- HTTP协议相关编解码器
- HTTP聚合消息
- HTTP压缩
- 配置SSL,启用HTTPS
- 实战
- 空闲的连接和超时
- 序列化
- 基于Protocol Buffers 序列化
- 构建Protocol Buffers 消息对象模型
- 实战
- 总结
前言
Netty为许多通用协议提供了编解码器和处理器,几乎可以开箱即用,这减少了你在那些相当繁琐的事务上本来会花费的时间与精力,本文会结合一些案例进行分析和应用内置的编解码器
基于Netty构建HTTP/HTTPS应用程序
Netty内置的一系列编解码器和ChannelHandler,其中就有可以支持用来处理HTTP和HTTPS协议的
HTTP协议相关编解码器
HTTP是基于请求/响应模式的:客户端向服务器发送一个 HTTP 请求,然后服务器将会返回一个 HTTP 响应。Netty提供了多种编码器和解码器以简化对这个协议的使用
HTTP请求的组成部分:
HTTP响应的组成部分:
从上面的图可以看出,一个HTTP 请求/响应可能由多个数据部分组成,并且总是以一个LastHttpContent部分作为结束。其中FullHttpRequest和FullHttpResponse 消息是特殊的子类型,分别代表了完整的请求和响应。所有类型的 HTTP 消息(FullHttpRequest、 LastHttpContent 等等)都实现了 HttpObject 接口
HTTP解码器和编码器:
名称 | 描述 |
---|---|
HttpRequestEncoder | 将 HttpRequest、HttpContent 和 LastHttpContent 消息编码为字节 |
HttpResponseEncoder | 将 HttpResponse、HttpContent 和 LastHttpContent 消息编码为字节 |
HttpRequestDecoder | 将字节解码为 HttpRequest、HttpContent 和 LastHttpContent 消息 |
HttpResponseDecoder | 将字节解码为 HttpResponse、HttpContent 和 LastHttpContent 消息 |
HttpClientCodec | 将客户端请求和响应做一个组合 |
HttpServerCodec | 将服务端请求和响应做一个组合 |
HTTP聚合消息
由于HTTP的请求和响应可能由许多部分组成,因此你需要聚合它们以形成完整的消息。为了消除这项繁琐的任务,Netty 提供了一个聚合器HttpObjectAggregator,它可以将多个消息部分合并为 FullHttpRequest 或者 FullHttpResponse 消息。通过这样的方式,你将总是看到完整的消息内容
由于消息分段需要被缓存,直到可以转发一个完整的消息给下一个ChannelInboundHandler,所以这个操作有轻微的开销,其所带来的好处便是你不必关心消息碎片了
HTTP压缩
当使用 HTTP 时,都会建议开启压缩功能以尽可能多地减小传输数据的大小。虽然压缩会带来一些 CPU 时钟周期上的开销,但是通常来说它都是一个好主意,特别是对于文本数据来说
Netty 为压缩提供了一个HttpContentCompressor处理器,为解压缩提供了一个HttpContentDecompressor处理器,它们同时支持gzip和deflate编码
注意:如果你使用的JDK为1.6或者更早的版本,那么需要将JZlib (www.jcraft.com/jzlib/ ) 添加到CLASSPATH中以支持压缩功能。在Maven中,需要加入依赖:
<dependency><groupId>com.jcraft</groupId><artifactId>jzlib</artifactId><version>1.1.3</version>
</dependency>
配置SSL,启用HTTPS
Netty通过一个名为SslHandler的ChannelHandler来支持SSL/TLS,其内部使用SSLEngine来完成实际的工作。而启用HTTPS只需要将 SslHandler 添加到 ChannelPipeline 的 ChannelHandler 组合中即可
实战
自定义HTTP Server
/*** 通过Netty自定义Http Server*/
public class MyHTTPServer {private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();private static ServerBootstrap bootstrap = new ServerBootstrap();private static final int PORT = 8761;private static final boolean IS_SSL = false; //是否开启SSL认证public static void startServer() throws SSLException, CertificateException{SslContext sslContext = null;if(IS_SSL){//签名认证SelfSignedCertificate selfSignedCertificate= new SelfSignedCertificate();sslContext = SslContextBuilder.forServer(selfSignedCertificate.certificate(),selfSignedCertificate.privateKey()).build();}try {bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress(PORT).childHandler(new HttpInitHandler(sslContext));ChannelFuture future=bootstrap.bind().sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally{eventLoopGroup.shutdownGracefully();}}public static void main(String[] args) {//启动服务try {startServer();} catch (SSLException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (CertificateException e) {// TODO Auto-generated catch blocke.printStackTrace();}}
}
服务端添加相关的ChannelHandler
public class HttpInitHandler extends ChannelInitializer<SocketChannel>{private SslContext sslContext;public HttpInitHandler(SslContext sslContext) {super();this.sslContext = sslContext;}@Overrideprotected void initChannel(SocketChannel ch) throws Exception {if(null!=sslContext){SSLEngine engine=sslContext.newEngine(ch.alloc());//将SslHandler添加到ChannelPipeline中以使用HTTPSch.pipeline().addFirst(new SslHandler(engine));}//添加HTTP请求的解码器ch.pipeline().addLast(new HttpRequestDecoder());//添加HTTP响应的编码器ch.pipeline().addLast(new HttpResponseEncoder());//添加HTTP请求体消息的聚合器,指定最大的消息大小为512Kch.pipeline().addLast(new HttpObjectAggregator(512*1024));//压缩ch.pipeline().addLast(new HttpContentCompressor());//添加服务业务的handlerch.pipeline().addLast(new MyHttpHandler());}
}
自定义服务端业务handler,处理客户端发送的请求
public class MyHttpHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String resultMsg="";//将读取的数据转化为http请求对象FullHttpRequest request = (FullHttpRequest) msg;try {//获得请求的uriString uri=request.uri();//获得请求体内容String body = request.content().toString(CharsetUtil.UTF_8);//获得请求方法HttpMethod method=request.method();//指定请求的根路径,非法路径直接提示错误if(!"/my/http".equals(uri)){resultMsg = "请求路径找不到!";sendMsg(ctx,resultMsg,HttpResponseStatus.NOT_FOUND);return;}System.out.println("接收到:"+method+" 请求");//发送GET请求if(HttpMethod.GET.equals(method)){resultMsg="成功接收到GET请求";sendMsg(ctx,resultMsg,HttpResponseStatus.OK);return;}} catch (Exception e) {resultMsg = "服务器端出现不可预知的错误:"+e.getMessage();sendMsg(ctx,resultMsg,HttpResponseStatus.BAD_REQUEST);return;}finally{//释放请求request.release();}}private void sendMsg(ChannelHandlerContext ctx, String resultMsg,HttpResponseStatus status) {FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, //标识HTTP版本status, //标识响应状态Unpooled.copiedBuffer(resultMsg,CharsetUtil.UTF_8));//设置响应头信息response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=UTF-8");//响应消息ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}
}
自定义HTTP Client
public class MyHttpClient {public static final String HOST = "127.0.0.1";public static final int PORT = 8761;private static Bootstrap bootstrap = new Bootstrap();private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();public static void startServer(){try {bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(HOST, PORT)).handler(new HttpInitClientHandler());ChannelFuture future= bootstrap.connect().sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally{eventLoopGroup.shutdownGracefully();} }public static void main(String[] args) {startServer();}
}
客户端添加相关的ChannelHandler
public class HttpInitClientHandler extends ChannelInitializer<SocketChannel>{@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//添加HTTP请求的编解码器ch.pipeline().addLast(new HttpClientCodec());//聚合ch.pipeline().addLast(new HttpObjectAggregator(512*1024));//解压缩ch.pipeline().addLast(new HttpContentDecompressor());//客户端业务handlerch.pipeline().addLast(new MyHttpClientHandler());}
}
自定义客户端业务handler,向服务端发送HTTP/HTTPS请求,并接收服务端的返回的响应信息
public class MyHttpClientHandler extends ChannelInboundHandlerAdapter{//读取服务端传输的数据@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {FullHttpResponse response = (FullHttpResponse) msg;System.out.println("获取的数据: "+response.content().toString(CharsetUtil.UTF_8));//释放内存response.release();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {URI url = new URI("/my/http");FullHttpRequest request=new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,HttpMethod.GET, url.toASCIIString(), Unpooled.copiedBuffer("Hello server!",CharsetUtil.UTF_8));//构造HTTP请求头request.headers().set(HttpHeaderNames.HOST,MyHttpClient.HOST.toString());request.headers().set(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE);//发送http请求ctx.writeAndFlush(request);}
}
空闲的连接和超时
上述我们基于Netty构建了一个HTTP/HTTPS应用程序,但是缺少了一块比较重要的资源管理工作,而Netty也帮我们考虑到了这种情况,为我们提供了一系列的检查空闲的连接和超时的处理器。
检测空闲连接以及超时对于及时释放资源来说是至关重要的。由于这是一项常见的任务, Netty特地为它提供了几个ChannelHandler实现:
名称 | 描述 |
---|---|
IdleStateHandler | 当连接空闲时间太长时,将会触发一个 IdleStateEvent 事件。然后,你 可以通过在你的 ChannelInboundHandler 中重写 userEventTriggered()方法来处理该 IdleStateEvent 事件 |
ReadTimeoutHandler | 如果在指定的时间间隔内没有收到任何的入站数据,则抛出一个 Read-TimeoutException 并关闭对应的 Channel。可以通过重写你的 ChannelHandler 中的 exceptionCaught()方法来检测该 Read-TimeoutException |
WriteTimeoutHandler | 如果在指定的时间间隔内没有任何出站数据写入,则抛出一个 Write-TimeoutException 并关闭对应的 Channel 。可以通过重写你的 ChannelHandler 的 exceptionCaught()方法检测该 WriteTimeout-Exception |
实现一个发送心跳到远程节点的Handler:
/*** 发送心跳*/
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel>{@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(//IdleStateHandler将在被触发时发送一个IdleStateEvent事件new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));//将一个HeartbeatHandler添加到ChannelPipeline中pipeline.addLast(new HeartbeatHandler());}//实现userEventTriggered()方法以发送心跳消息public static final class HeartbeatHandlerextends ChannelInboundHandlerAdapter {//发送到远程节点的心跳消息private static final ByteBuf HEARTBEAT_SEQUENCE =Unpooled.copiedBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));@Overridepublic void userEventTriggered(ChannelHandlerContext ctx,Object evt) throws Exception {//发送心跳消息,并在发送失败时关闭该连接if (evt instanceof IdleStateEvent) {ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {//不是 IdleStateEvent 事件,所以将它传递给下一个 ChannelInboundHandlersuper.userEventTriggered(ctx, evt);}}}
}
实现流程:如果连接超过60s没有接收或者发送任何的数据,那么IdleStateHandler将会使用一个IdleStateEvent 事件来调用fireUserEventTriggered方法,而上述自定义的HeartbeatHandler重写了userEventTriggered方法,当这个方法检测到IdleStateEvent事件时,它就会发送心跳信息,并且添加一个将在发送操作失败时关闭该连接的ChannelFutureListener监听
序列化
通常跨进程进行服务调用时,都需要把被传输的Java对象编码为字节数组或者 ByteBuffer 对象。而当远程服务读取到 ByteBuffer 对象或者字节数组时,需要将其解码为发送时的 Java 对象。这被称为 Java 对象编解码技术。而Java 序列化仅仅是 Java 编解码技术的一种,由于它的种种缺陷,衍生出了多种序列化技术和框架
序列化框架的两个主要目标:
- 网络传输
- 对象持久化
Java序列化的缺陷:
- 无法跨语言
- 序列化后的字节数组太大
- 序列化性能太低
基于Protocol Buffers 序列化
Protocol Buffers以一种紧凑而高效的方式对结构化的数据进行编码以及解码,它具有许多和编程语言绑定,使得它很适合跨语言的项目
Netty为支持Protocol Buffers所提供的ChannelHandler:
名称 | 描述 |
---|---|
ProtobufDecoder | 使用protobuf对消息进行解码 |
ProtobufEncoder | 使用protobuf对消息进行编码 |
ProtobufVarint32FrameDecoder | 根据消息中的 Google Protocol Buffers的“Base 128 Varints” 整型长度字段值动态地分割所接收的ByteBuf |
ProtobufVarint32LengthFieldPrepender | 向ByteBuf前追加一个Google Protocol Buffers的“Base 128 Varints” 整型长度字段值 |
Protobuf安装包下载地址:https://github.com/protocolbuffers/protobuf/releases/tag/v2.5.0
构建Protocol Buffers 消息对象模型
编写proto文件
Protobuf定义数据格式的文件一般保存在“.proto” 文件中,每一个 message代表了一类结构化的数据
需要序列化的Bean:
public class Person {String name;int id;String email;
}
对应的proto文件:
package protocol_Buffer;option java_package = "cn.nettystudy";
option java_outer_classname = "PersonProtoc";message Person {required string name = 1;required int32 id = 2;optional string email = 3;
}
将".proto"文件编译成Java类
# 在终端输入下列命令进行编译
protoc -I = $ SRC_DIR --java_out = $ DST_DIR $ SRC_DIR / addressbook.proto
参数说明:
- SRC_DIR:源目录,指定需要编译的.proto文件目录 (如没有提供则使用当前目录)
- DST_DIR:目标目录,编译后代码生成的目录 (通常设置与SRC_DIR相同)
- 需要编译的.proto 文件的具体路径
最后就会对应生成一个:PersonProtoc.java 文件,然后将该文件引入到工程中
实战
客户端会基于ProtobufEncoder处理器对发往服务端的消息进行序列化
public class ProtobufClient {private static final int PORT = 8761;private static final String HOST = "127.0.0.1";private static Bootstrap bootstrap = new Bootstrap();private static EventLoopGroup eventLoopGroup=new NioEventLoopGroup();public static void startServer(){try {bootstrap.group(eventLoopGroup).option(ChannelOption.TCP_NODELAY, true).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//添加报文长度字段ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());//添加 ProtobufEncoder 进行序列化将实体类编码为字节ch.pipeline().addLast(new ProtobufEncoder());//添加业务handlerch.pipeline().addLast(new ProtobufClientHandler());}});ChannelFuture future=bootstrap.connect(new InetSocketAddress(HOST, PORT)).sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally{eventLoopGroup.shutdownGracefully();}}public static void main(String[] args) {startServer();}
}
客户端的业务Handler,根据生成的Protoc类拿到要传输的目标实例,然后设置要传输的数据,并写往对端
public class ProtobufClientHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("准备生成数据===========>");//生成实体类PersonPersonProtoc.Person.Builder builder = PersonProtoc.Person.newBuilder();for(int i=0;i<10;i++){builder.setName("test");builder.setId(1);builder.setEmail("test@189.cn");System.out.println("发送数据===========>"+builder.getName());//写往对端,由编码器进行编码ctx.writeAndFlush(builder.build());}}
}
服务端基于ProtobufDecoder处理器对传输的字节进行反序列化成目标实例
public class ProtobufServer {public static final int PORT = 8761;private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();private static ServerBootstrap bootstrap = new ServerBootstrap();public static void startServer(){try {bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress(PORT).childHandler(new ChannelInitializer<SocketChannel>() {//添加各种handler@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//基于消息头+消息体方式的解决粘包和半包//Netty集成的protobuf默认就提供了基于消息头+消息体方式解决粘包和半包的handler//添加ProtobufVarint32FrameDecoder分离数据帧ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());//添加ProtobufDecoder反序列化器,将字节解码为目标实例ch.pipeline().addLast(new ProtobufDecoder(PersonProtoc.Person.getDefaultInstance()));//添加业务handlerch.pipeline().addLast(new ProtobufServerHandler());}});ChannelFuture future= bootstrap.bind().sync(); future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {eventLoopGroup.shutdownGracefully();}}public static void main(String[] args) {startServer();}
}
服务端业务Handler,接收到客户端传输的数据
public class ProtobufServerHandler extends ChannelInboundHandlerAdapter{//读取到客户端发送的数据,这里msg已经被反序列化成目标实例了@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {PersonProtoc.Person person=(Person) msg;System.out.println("服务端经过protobuf反序列化后得到的数据:name="+person.getName()+",email="+person.getEmail());}}
总结
本文只介绍了Netty的一些的常见的内置编解码器和ChannelHandler,想更多以及更深入的了解,可以到Netty的官网提供的API文档进行查看,由于篇幅有限,还有一块关于集成第三方的序列化框架的实战,会放在下一篇文章进行分析
Netty框架进阶篇 - 分析和实战内置的编解码器及ChannelHandler相关推荐
- Vue.js实战——内置指令(二)
参考<Vue.js实战>梁灏 Vue内置指令第一部分:Vue.js实战--内置指令(一) 1 方法与事件 1.1 基本用法 直接上代码,这样最实在: <!DOCTYPE html&g ...
- Netty系列进阶篇一:阻塞和多路复用到底是个啥?
文章目录 一.进阶篇:Netty封装了什么? 二.刨根问底:到底什么是阻塞?什么是多路复用器? 1.操作系统基础 1-1 用户态与内核态 1-2 系统调用 1-3 File Descriptor 文件 ...
- MySQL 表的增删改查(进阶篇②)· 联合查询 内连接 外连接 · 自连接 · 子查询 exists · 合并查询 union
接进阶篇①,我们继续学习. 一.联合查询 1.1 内连接 1.2 外连接 1.3 内连接和左右外连接的区别 二.自连接 三.子查询 3.1 单行子查询 3.2 多行子查询 使用 in 范围匹配多行 另 ...
- 【框架篇】Mybatis的内置方法,撸代码更快捷~~
掌握内置方法,在撸代码的过程中就可以让我们的代码更加简洁优雅,提高我们的开发效率,以下是我在日常开发中常用的方法,希望能帮助到大家,话不多说,直接上Demo~~~ 一.insert --------- ...
- python内置函数面向对象_Pyhton——面向对象进阶二:类的内置函数补充、描述符...
Pyhton--面向对象进阶二: 一.类的内置函数补充 1.isinstance(obj,cls)--检查obj是否是该类的对象 class Hoo: def __init__(self,name,t ...
- python内置的读取文件函数_Python函数篇(3)-内置函数、文件处理(已更新)
1.内置函数 上一篇文章中,我重点写了reduce.map.filter3个内置函数,在本篇章节中,会补充其他的一些常规内置函数,并重点写max,min函数,其他没有说明的函数,会在后面写到类和面向对 ...
- Vue.js实战——内置指令(一)
参考:<Vue.js实战>梁灏 1. 基本指令 1.1 v-cloak <!DOCTYPE html> <html><head><meta cha ...
- linux内置变量大全,Linux进阶之bash编程四(内置变量)
一:基础回顾 1:文件清空 [craft@vp143 test]$ >log.txt 2:正常和错误重定向输出 //正常和错误都追加输出到同样地方 [craft@vp143 test]$ ifc ...
- “约见”面试官系列之常见面试题之第六十八篇之本地对象 内置对象 宿主对象(建议收藏)
首先解释下宿主环境:一般宿主环境由外壳程序创建与维护,只要能提供js引擎执行的环境都可称之为外壳程序.如:web浏览器,一些桌面应用系统等.即由web浏览器或是这些桌面应用系统早就的环境即宿主环境. ...
最新文章
- SCRUM的三个工件
- vue 拍照人脸识别_安排上了!PC人脸识别登录,出乎意料的简单
- Hibernate中session的get方法和load方法的区别
- java 图像梯度检测_数字图像处理-边缘检测
- 开源天生就不是为了赚钱!
- 飞利浦 TASY 电子病历系统中存在严重漏洞,可暴露患者记录
- ubuntu 把软件源修改为国内源和更新(转载)
- html颜色代码表_html颜色代码表
- 计算机专业毕业设计资料免费下载
- noi2016试题C语言,noip2016普及组初赛试题和答案.pdf
- NPDP是什么考试?产品经理必知
- 头歌 初识Redis
- steam游戏存档迁移
- GIS与虚拟仿真下直观、完整、立体地园区实景展示
- win10下的VS2017安装MPI
- android listview 点击获取焦点,android – ListView项目焦点行为
- 解决导航栏按钮背景色切换,刷新页面,按钮背景色切换,页面和路径没有切换问题
- 我母亲在一家计算机公司工作,关于母亲的作文700字5篇
- 名词性从句(1)——同位语从句(1)
- SitePoint播客#34:对斜线表示抱歉
热门文章
- 初学者 linux版本,最适合于初学者的 Linux 发行版 | Linux 中国
- Linux之用户管理
- 如何利用命令打开截图工具?
- 【进度2】从阿里云迁至腾讯云,并添加网站备案号
- Linux_ntp服务
- 如何编写弹出窗口不被IE阻止的程序
- linux下malloc申请大内存,Linux malloc大内存的方法
- Ubuntu下查看IP地址
- 什么人适合做计算机工作吗,适合“性格内向”的人选择的几大专业,工作稳定,薪资可观!...
- drawrect java_java - g.drawRect在背景后面绘制矩形 - 堆栈内存溢出