文章目录

  • 前言
  • 基于Netty构建HTTP/HTTPS应用程序
    • HTTP协议相关编解码器
    • HTTP聚合消息
    • HTTP压缩
    • 配置SSL,启用HTTPS
    • 实战
  • 空闲的连接和超时
  • 序列化
    • 基于Protocol Buffers 序列化
      • 构建Protocol Buffers 消息对象模型
      • 实战
  • 总结

前言

Netty为许多通用协议提供了编解码器和处理器,几乎可以开箱即用,这减少了你在那些相当繁琐的事务上本来会花费的时间与精力,本文会结合一些案例进行分析和应用内置的编解码器

基于Netty构建HTTP/HTTPS应用程序

Netty内置的一系列编解码器和ChannelHandler,其中就有可以支持用来处理HTTP和HTTPS协议的

HTTP协议相关编解码器

HTTP是基于请求/响应模式的:客户端向服务器发送一个 HTTP 请求,然后服务器将会返回一个 HTTP 响应。Netty提供了多种编码器和解码器以简化对这个协议的使用

HTTP请求的组成部分:

HTTP响应的组成部分:

从上面的图可以看出,一个HTTP 请求/响应可能由多个数据部分组成,并且总是以一个LastHttpContent部分作为结束。其中FullHttpRequest和FullHttpResponse 消息是特殊的子类型,分别代表了完整的请求和响应。所有类型的 HTTP 消息(FullHttpRequest、 LastHttpContent 等等)都实现了 HttpObject 接口

HTTP解码器和编码器:

名称 描述
HttpRequestEncoder 将 HttpRequest、HttpContent 和 LastHttpContent 消息编码为字节
HttpResponseEncoder 将 HttpResponse、HttpContent 和 LastHttpContent 消息编码为字节
HttpRequestDecoder 将字节解码为 HttpRequest、HttpContent 和 LastHttpContent 消息
HttpResponseDecoder 将字节解码为 HttpResponse、HttpContent 和 LastHttpContent 消息
HttpClientCodec 将客户端请求和响应做一个组合
HttpServerCodec 将服务端请求和响应做一个组合

HTTP聚合消息

由于HTTP的请求和响应可能由许多部分组成,因此你需要聚合它们以形成完整的消息。为了消除这项繁琐的任务,Netty 提供了一个聚合器HttpObjectAggregator,它可以将多个消息部分合并为 FullHttpRequest 或者 FullHttpResponse 消息。通过这样的方式,你将总是看到完整的消息内容

由于消息分段需要被缓存,直到可以转发一个完整的消息给下一个ChannelInboundHandler,所以这个操作有轻微的开销,其所带来的好处便是你不必关心消息碎片了

HTTP压缩

当使用 HTTP 时,都会建议开启压缩功能以尽可能多地减小传输数据的大小。虽然压缩会带来一些 CPU 时钟周期上的开销,但是通常来说它都是一个好主意,特别是对于文本数据来说

Netty 为压缩提供了一个HttpContentCompressor处理器,为解压缩提供了一个HttpContentDecompressor处理器,它们同时支持gzip和deflate编码

注意:如果你使用的JDK为1.6或者更早的版本,那么需要将JZlib (www.jcraft.com/jzlib/ ) 添加到CLASSPATH中以支持压缩功能。在Maven中,需要加入依赖:

<dependency><groupId>com.jcraft</groupId><artifactId>jzlib</artifactId><version>1.1.3</version>
</dependency>

配置SSL,启用HTTPS

Netty通过一个名为SslHandler的ChannelHandler来支持SSL/TLS,其内部使用SSLEngine来完成实际的工作。而启用HTTPS只需要将 SslHandler 添加到 ChannelPipeline 的 ChannelHandler 组合中即可

实战

自定义HTTP Server

/*** 通过Netty自定义Http Server*/
public class MyHTTPServer {private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();private static ServerBootstrap bootstrap = new ServerBootstrap();private static final int PORT = 8761;private static final boolean IS_SSL = false; //是否开启SSL认证public static void startServer() throws SSLException, CertificateException{SslContext sslContext = null;if(IS_SSL){//签名认证SelfSignedCertificate selfSignedCertificate= new SelfSignedCertificate();sslContext = SslContextBuilder.forServer(selfSignedCertificate.certificate(),selfSignedCertificate.privateKey()).build();}try {bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress(PORT).childHandler(new HttpInitHandler(sslContext));ChannelFuture future=bootstrap.bind().sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally{eventLoopGroup.shutdownGracefully();}}public static void main(String[] args) {//启动服务try {startServer();} catch (SSLException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (CertificateException e) {// TODO Auto-generated catch blocke.printStackTrace();}}
}

服务端添加相关的ChannelHandler

public class HttpInitHandler extends ChannelInitializer<SocketChannel>{private SslContext sslContext;public HttpInitHandler(SslContext sslContext) {super();this.sslContext = sslContext;}@Overrideprotected void initChannel(SocketChannel ch) throws Exception {if(null!=sslContext){SSLEngine engine=sslContext.newEngine(ch.alloc());//将SslHandler添加到ChannelPipeline中以使用HTTPSch.pipeline().addFirst(new SslHandler(engine));}//添加HTTP请求的解码器ch.pipeline().addLast(new HttpRequestDecoder());//添加HTTP响应的编码器ch.pipeline().addLast(new HttpResponseEncoder());//添加HTTP请求体消息的聚合器,指定最大的消息大小为512Kch.pipeline().addLast(new HttpObjectAggregator(512*1024));//压缩ch.pipeline().addLast(new HttpContentCompressor());//添加服务业务的handlerch.pipeline().addLast(new MyHttpHandler());}
}

自定义服务端业务handler,处理客户端发送的请求

public class MyHttpHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String resultMsg="";//将读取的数据转化为http请求对象FullHttpRequest request = (FullHttpRequest) msg;try {//获得请求的uriString uri=request.uri();//获得请求体内容String body = request.content().toString(CharsetUtil.UTF_8);//获得请求方法HttpMethod method=request.method();//指定请求的根路径,非法路径直接提示错误if(!"/my/http".equals(uri)){resultMsg = "请求路径找不到!";sendMsg(ctx,resultMsg,HttpResponseStatus.NOT_FOUND);return;}System.out.println("接收到:"+method+" 请求");//发送GET请求if(HttpMethod.GET.equals(method)){resultMsg="成功接收到GET请求";sendMsg(ctx,resultMsg,HttpResponseStatus.OK);return;}} catch (Exception e) {resultMsg = "服务器端出现不可预知的错误:"+e.getMessage();sendMsg(ctx,resultMsg,HttpResponseStatus.BAD_REQUEST);return;}finally{//释放请求request.release();}}private void sendMsg(ChannelHandlerContext ctx, String resultMsg,HttpResponseStatus status) {FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, //标识HTTP版本status, //标识响应状态Unpooled.copiedBuffer(resultMsg,CharsetUtil.UTF_8));//设置响应头信息response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=UTF-8");//响应消息ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}
}

自定义HTTP Client

public class MyHttpClient {public static final String HOST = "127.0.0.1";public static final int PORT = 8761;private static Bootstrap bootstrap = new Bootstrap();private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();public static void startServer(){try {bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(HOST, PORT)).handler(new HttpInitClientHandler());ChannelFuture future= bootstrap.connect().sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally{eventLoopGroup.shutdownGracefully();} }public static void main(String[] args) {startServer();}
}

客户端添加相关的ChannelHandler

public class HttpInitClientHandler extends ChannelInitializer<SocketChannel>{@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//添加HTTP请求的编解码器ch.pipeline().addLast(new HttpClientCodec());//聚合ch.pipeline().addLast(new HttpObjectAggregator(512*1024));//解压缩ch.pipeline().addLast(new HttpContentDecompressor());//客户端业务handlerch.pipeline().addLast(new MyHttpClientHandler());}
}

自定义客户端业务handler,向服务端发送HTTP/HTTPS请求,并接收服务端的返回的响应信息

public class MyHttpClientHandler extends ChannelInboundHandlerAdapter{//读取服务端传输的数据@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {FullHttpResponse response = (FullHttpResponse) msg;System.out.println("获取的数据: "+response.content().toString(CharsetUtil.UTF_8));//释放内存response.release();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {URI url = new URI("/my/http");FullHttpRequest request=new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,HttpMethod.GET, url.toASCIIString(), Unpooled.copiedBuffer("Hello server!",CharsetUtil.UTF_8));//构造HTTP请求头request.headers().set(HttpHeaderNames.HOST,MyHttpClient.HOST.toString());request.headers().set(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE);//发送http请求ctx.writeAndFlush(request);}
}

空闲的连接和超时

上述我们基于Netty构建了一个HTTP/HTTPS应用程序,但是缺少了一块比较重要的资源管理工作,而Netty也帮我们考虑到了这种情况,为我们提供了一系列的检查空闲的连接和超时的处理器。

检测空闲连接以及超时对于及时释放资源来说是至关重要的。由于这是一项常见的任务, Netty特地为它提供了几个ChannelHandler实现:

名称 描述
IdleStateHandler 当连接空闲时间太长时,将会触发一个 IdleStateEvent 事件。然后,你 可以通过在你的 ChannelInboundHandler 中重写 userEventTriggered()方法来处理该 IdleStateEvent 事件
ReadTimeoutHandler 如果在指定的时间间隔内没有收到任何的入站数据,则抛出一个 Read-TimeoutException 并关闭对应的 Channel。可以通过重写你的 ChannelHandler 中的 exceptionCaught()方法来检测该 Read-TimeoutException
WriteTimeoutHandler 如果在指定的时间间隔内没有任何出站数据写入,则抛出一个 Write-TimeoutException 并关闭对应的 Channel 。可以通过重写你的 ChannelHandler 的 exceptionCaught()方法检测该 WriteTimeout-Exception

实现一个发送心跳到远程节点的Handler:

/*** 发送心跳*/
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel>{@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(//IdleStateHandler将在被触发时发送一个IdleStateEvent事件new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));//将一个HeartbeatHandler添加到ChannelPipeline中pipeline.addLast(new HeartbeatHandler());}//实现userEventTriggered()方法以发送心跳消息public static final class HeartbeatHandlerextends ChannelInboundHandlerAdapter {//发送到远程节点的心跳消息private static final ByteBuf HEARTBEAT_SEQUENCE =Unpooled.copiedBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));@Overridepublic void userEventTriggered(ChannelHandlerContext ctx,Object evt) throws Exception {//发送心跳消息,并在发送失败时关闭该连接if (evt instanceof IdleStateEvent) {ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {//不是 IdleStateEvent 事件,所以将它传递给下一个 ChannelInboundHandlersuper.userEventTriggered(ctx, evt);}}}
}

实现流程:如果连接超过60s没有接收或者发送任何的数据,那么IdleStateHandler将会使用一个IdleStateEvent 事件来调用fireUserEventTriggered方法,而上述自定义的HeartbeatHandler重写了userEventTriggered方法,当这个方法检测到IdleStateEvent事件时,它就会发送心跳信息,并且添加一个将在发送操作失败时关闭该连接的ChannelFutureListener监听

序列化

通常跨进程进行服务调用时,都需要把被传输的Java对象编码为字节数组或者 ByteBuffer 对象。而当远程服务读取到 ByteBuffer 对象或者字节数组时,需要将其解码为发送时的 Java 对象。这被称为 Java 对象编解码技术。而Java 序列化仅仅是 Java 编解码技术的一种,由于它的种种缺陷,衍生出了多种序列化技术和框架

序列化框架的两个主要目标:

  1. 网络传输
  2. 对象持久化

Java序列化的缺陷:

  1. 无法跨语言
  2. 序列化后的字节数组太大
  3. 序列化性能太低

基于Protocol Buffers 序列化

Protocol Buffers以一种紧凑而高效的方式对结构化的数据进行编码以及解码,它具有许多和编程语言绑定,使得它很适合跨语言的项目

Netty为支持Protocol Buffers所提供的ChannelHandler:

名称 描述
ProtobufDecoder 使用protobuf对消息进行解码
ProtobufEncoder 使用protobuf对消息进行编码
ProtobufVarint32FrameDecoder 根据消息中的 Google Protocol Buffers的“Base 128 Varints” 整型长度字段值动态地分割所接收的ByteBuf
ProtobufVarint32LengthFieldPrepender 向ByteBuf前追加一个Google Protocol Buffers的“Base 128 Varints” 整型长度字段值

Protobuf安装包下载地址:https://github.com/protocolbuffers/protobuf/releases/tag/v2.5.0

构建Protocol Buffers 消息对象模型

编写proto文件

Protobuf定义数据格式的文件一般保存在“.proto” 文件中,每一个 message代表了一类结构化的数据

需要序列化的Bean:

public class Person {String name;int id;String email;
}

对应的proto文件:

package protocol_Buffer;option java_package = "cn.nettystudy";
option java_outer_classname = "PersonProtoc";message Person {required string name = 1;required int32 id = 2;optional string email = 3;
}

将".proto"文件编译成Java类

#  在终端输入下列命令进行编译
protoc -I = $ SRC_DIR --java_out = $ DST_DIR $ SRC_DIR / addressbook.proto

参数说明:

  1. SRC_DIR:源目录,指定需要编译的.proto文件目录 (如没有提供则使用当前目录)
  2. DST_DIR:目标目录,编译后代码生成的目录 (通常设置与SRC_DIR相同)
  3. 需要编译的.proto 文件的具体路径

最后就会对应生成一个:PersonProtoc.java 文件,然后将该文件引入到工程中

实战

客户端会基于ProtobufEncoder处理器对发往服务端的消息进行序列化

public class ProtobufClient {private static final int PORT = 8761;private static final String HOST = "127.0.0.1";private static Bootstrap bootstrap = new Bootstrap();private static EventLoopGroup eventLoopGroup=new NioEventLoopGroup();public static void startServer(){try {bootstrap.group(eventLoopGroup).option(ChannelOption.TCP_NODELAY, true).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//添加报文长度字段ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());//添加 ProtobufEncoder 进行序列化将实体类编码为字节ch.pipeline().addLast(new ProtobufEncoder());//添加业务handlerch.pipeline().addLast(new ProtobufClientHandler());}});ChannelFuture future=bootstrap.connect(new InetSocketAddress(HOST, PORT)).sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally{eventLoopGroup.shutdownGracefully();}}public static void main(String[] args) {startServer();}
}

客户端的业务Handler,根据生成的Protoc类拿到要传输的目标实例,然后设置要传输的数据,并写往对端

public class ProtobufClientHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("准备生成数据===========>");//生成实体类PersonPersonProtoc.Person.Builder builder = PersonProtoc.Person.newBuilder();for(int i=0;i<10;i++){builder.setName("test");builder.setId(1);builder.setEmail("test@189.cn");System.out.println("发送数据===========>"+builder.getName());//写往对端,由编码器进行编码ctx.writeAndFlush(builder.build());}}
}

服务端基于ProtobufDecoder处理器对传输的字节进行反序列化成目标实例

public class ProtobufServer {public static final int PORT = 8761;private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();private static ServerBootstrap bootstrap = new ServerBootstrap();public static void startServer(){try {bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress(PORT).childHandler(new ChannelInitializer<SocketChannel>() {//添加各种handler@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//基于消息头+消息体方式的解决粘包和半包//Netty集成的protobuf默认就提供了基于消息头+消息体方式解决粘包和半包的handler//添加ProtobufVarint32FrameDecoder分离数据帧ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());//添加ProtobufDecoder反序列化器,将字节解码为目标实例ch.pipeline().addLast(new ProtobufDecoder(PersonProtoc.Person.getDefaultInstance()));//添加业务handlerch.pipeline().addLast(new ProtobufServerHandler());}});ChannelFuture future= bootstrap.bind().sync();   future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {eventLoopGroup.shutdownGracefully();}}public static void main(String[] args) {startServer();}
}

服务端业务Handler,接收到客户端传输的数据

public class ProtobufServerHandler extends ChannelInboundHandlerAdapter{//读取到客户端发送的数据,这里msg已经被反序列化成目标实例了@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {PersonProtoc.Person person=(Person) msg;System.out.println("服务端经过protobuf反序列化后得到的数据:name="+person.getName()+",email="+person.getEmail());}}

总结

本文只介绍了Netty的一些的常见的内置编解码器和ChannelHandler,想更多以及更深入的了解,可以到Netty的官网提供的API文档进行查看,由于篇幅有限,还有一块关于集成第三方的序列化框架的实战,会放在下一篇文章进行分析

Netty框架进阶篇 - 分析和实战内置的编解码器及ChannelHandler相关推荐

  1. Vue.js实战——内置指令(二)

    参考<Vue.js实战>梁灏 Vue内置指令第一部分:Vue.js实战--内置指令(一) 1 方法与事件 1.1 基本用法 直接上代码,这样最实在: <!DOCTYPE html&g ...

  2. Netty系列进阶篇一:阻塞和多路复用到底是个啥?

    文章目录 一.进阶篇:Netty封装了什么? 二.刨根问底:到底什么是阻塞?什么是多路复用器? 1.操作系统基础 1-1 用户态与内核态 1-2 系统调用 1-3 File Descriptor 文件 ...

  3. MySQL 表的增删改查(进阶篇②)· 联合查询 内连接 外连接 · 自连接 · 子查询 exists · 合并查询 union

    接进阶篇①,我们继续学习. 一.联合查询 1.1 内连接 1.2 外连接 1.3 内连接和左右外连接的区别 二.自连接 三.子查询 3.1 单行子查询 3.2 多行子查询 使用 in 范围匹配多行 另 ...

  4. 【框架篇】Mybatis的内置方法,撸代码更快捷~~

    掌握内置方法,在撸代码的过程中就可以让我们的代码更加简洁优雅,提高我们的开发效率,以下是我在日常开发中常用的方法,希望能帮助到大家,话不多说,直接上Demo~~~ 一.insert --------- ...

  5. python内置函数面向对象_Pyhton——面向对象进阶二:类的内置函数补充、描述符...

    Pyhton--面向对象进阶二: 一.类的内置函数补充 1.isinstance(obj,cls)--检查obj是否是该类的对象 class Hoo: def __init__(self,name,t ...

  6. python内置的读取文件函数_Python函数篇(3)-内置函数、文件处理(已更新)

    1.内置函数 上一篇文章中,我重点写了reduce.map.filter3个内置函数,在本篇章节中,会补充其他的一些常规内置函数,并重点写max,min函数,其他没有说明的函数,会在后面写到类和面向对 ...

  7. Vue.js实战——内置指令(一)

    参考:<Vue.js实战>梁灏 1. 基本指令 1.1 v-cloak <!DOCTYPE html> <html><head><meta cha ...

  8. linux内置变量大全,Linux进阶之bash编程四(内置变量)

    一:基础回顾 1:文件清空 [craft@vp143 test]$ >log.txt 2:正常和错误重定向输出 //正常和错误都追加输出到同样地方 [craft@vp143 test]$ ifc ...

  9. “约见”面试官系列之常见面试题之第六十八篇之本地对象 内置对象 宿主对象(建议收藏)

    首先解释下宿主环境:一般宿主环境由外壳程序创建与维护,只要能提供js引擎执行的环境都可称之为外壳程序.如:web浏览器,一些桌面应用系统等.即由web浏览器或是这些桌面应用系统早就的环境即宿主环境. ...

最新文章

  1. SCRUM的三个工件
  2. vue 拍照人脸识别_安排上了!PC人脸识别登录,出乎意料的简单
  3. Hibernate中session的get方法和load方法的区别
  4. java 图像梯度检测_数字图像处理-边缘检测
  5. 开源天生就不是为了赚钱!
  6. 飞利浦 TASY 电子病历系统中存在严重漏洞,可暴露患者记录
  7. ubuntu 把软件源修改为国内源和更新(转载)
  8. html颜色代码表_html颜色代码表
  9. 计算机专业毕业设计资料免费下载
  10. noi2016试题C语言,noip2016普及组初赛试题和答案.pdf
  11. NPDP是什么考试?产品经理必知
  12. 头歌 初识Redis
  13. steam游戏存档迁移
  14. GIS与虚拟仿真下直观、完整、立体地园区实景展示
  15. win10下的VS2017安装MPI
  16. android listview 点击获取焦点,android – ListView项目焦点行为
  17. 解决导航栏按钮背景色切换,刷新页面,按钮背景色切换,页面和路径没有切换问题
  18. 我母亲在一家计算机公司工作,关于母亲的作文700字5篇
  19. 名词性从句(1)——同位语从句(1)
  20. SitePoint播客#34:对斜线表示抱歉

热门文章

  1. 初学者 linux版本,最适合于初学者的 Linux 发行版 | Linux 中国
  2. Linux之用户管理
  3. 如何利用命令打开截图工具?
  4. 【进度2】从阿里云迁至腾讯云,并添加网站备案号
  5. Linux_ntp服务
  6. 如何编写弹出窗口不被IE阻止的程序
  7. linux下malloc申请大内存,Linux malloc大内存的方法
  8. Ubuntu下查看IP地址
  9. 什么人适合做计算机工作吗,适合“性格内向”的人选择的几大专业,工作稳定,薪资可观!...
  10. drawrect java_java - g.drawRect在背景后面绘制矩形 - 堆栈内存溢出