netty编解码器与序列化框架分析
netty编解码器分析
编码(Encode)也称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。
反之,解码(Decode)也称为反序列化(deserialization),用于把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。
进行远程跨进程服务调用时(例如 RPC 调用),需要使用特定的编解码技术,对需要进行网络传输的对象做编码或者解码,以便完成远程调用。
Java序列化
Java默认提供的序列化机制,需要序列化的Java对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInputStream和java.io.ObjectOutputStrem序列化和反序列化。
但是由于它自身存在很多缺点,因此大多数的RPC框架并没有选择它。Java序列化的主要缺点如下:
- 无法跨语言:是Java序列化最致命的问题。对于跨进程的服务调用,服务提供者可能会使用C++或者其它语言开发,当我们需要和异构语言进程交互时,Java序列化就难以胜任。由于Java序列化技术是Java语言内部的私有协议,其它语言并不支持,对于用户来说它完全是黑盒。Java序列化后的字节数组,别的语言无法进行反序列化,这就严重阻碍了它的应用范围。
- 序列化后的码流太大,例如使用二进制编解码技术对同一个复杂的POJO对象进行编码,它的码流仅仅为Java序列化之后的20%左右;目前主流的编解码框架,序列化之后的码流都远远小于原生的Java序列化。
- 序列化性能太低。
在netty中使用java的序列化,只需要添加编码器ObjectEncoder和解码器ObjectDecoder即可,netty都封装好了。
服务器端代码:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ObjectEncoder()); // 编码ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); // 解码ch.pipeline().addLast(new ServerHandler());}});
服务器端业务代码:
public class ServerHandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) {System.out.println("receive from client: " + msg);UserResponse response = new UserResponse();response.setCode(200);response.setMessage("success");ctx.writeAndFlush(response);}}
思考:不需要解决粘包半包问题?ObjectEncoder和ObjectDecoder中已经对粘包半包问题进行了处理,使用的是自定义消息长度的方式,下面看源代码。
io.netty.handler.codec.serialization.ObjectEncoder的源代码:
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {private static final byte[] LENGTH_PLACEHOLDER = new byte[4];@Overrideprotected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {int startIdx = out.writerIndex();ByteBufOutputStream bout = new ByteBufOutputStream(out);ObjectOutputStream oout = null;try {bout.write(LENGTH_PLACEHOLDER); // 先占位4个字节oout = new CompactObjectOutputStream(bout);oout.writeObject(msg);oout.flush();} finally {if (oout != null) {oout.close();} else {bout.close();}}int endIdx = out.writerIndex();out.setInt(startIdx, endIdx - startIdx - 4); // 计算报文的长度,放在开始的4个字节里面}
}
io.netty.handler.codec.serialization.ObjectDecoder解码器的源代码:
public class ObjectDecoder extends LengthFieldBasedFrameDecoder {... ...public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {super(maxObjectSize, 0, 4, 0, 4);this.classResolver = classResolver;}... ...
}
发现ObjectDecoder继承LengthFieldBasedFrameDecoder,构造方法中调用了父类的构造方法super(maxObjectSize, 0, 4, 0, 4);
,指定了解码时前4个字节为报文的长度,解码后丢弃前面的4个字节,传递给后面的Handler。
Google的Protobuf
Protobuf全称Google Protocol Buffers,它由谷歌开源而来,在谷歌内部久经考验。它将数据结构以.proto 文件进行描述,通过代码生成工具可以生成对应数据结构的POJO对象和Protobuf相关的方法和属性。
它的特点如下:
- 结构化数据存储格式(XML,JSON等)。
- 高效的编解码性能。
- 语言无关、平台无关、扩展性好。
使用方法如下:
先下载对应操作系统的工具,下载地址:https://github.com/protocolbuffers/protobuf/releases,这里使用的是protoc-3.13.0-win64.zip
,解压缩后bin目录下有个protoc.exe。
编写UserResponse.proto文件:
syntax = "proto3";
option java_package = "com.morris.netty.serialize.protobuf";
option java_outer_classname = "UserResponseProto";message UserResponse {int32 code = 1;string message = 2;
}
然后使用protoc.exe生成对应的实体类:
> protoc.exe --java_out=. UserResponse.proto
把生成的实体类,拷贝到工程对应的目录。
项目中引入protobuf的maven依赖:
dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.6.1</version>
</dependency>
服务器端代码:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); // 处理粘包半包的解码器ch.pipeline().addLast(new ProtobufDecoder(UserRequestProto.UserRequest.getDefaultInstance())); // protobuf解码器ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); // 处理粘包半包的编码器ch.pipeline().addLast(new ProtobufEncoder()); // protobuf编码器ch.pipeline().addLast(new ServerHandler()); // 具体业务处理}});
服务器端业务代码:
public class ServerHandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("receive from client: " + msg);UserResponseProto.UserResponse response = UserResponseProto.UserResponse.newBuilder().setCode(200).setMessage("success").buildPartial();ctx.writeAndFlush(response);}}
JBoss Marshalling
JBoss Marshalling是一个Java对象的序列化API包,修正了JDK自带的序列化包的很多问题,但又保持跟java.io.Serializable接口的兼容;同时增加了一些可调的参数和附加的特性,并且这些参数和特性可通过工厂类进行配置。
相比于传统的Java序列化机制,它的优点如下:
- 可插拔的类解析器,提供更加便捷的类加载定制策略,通过一个接口即可实现定制;
- 可插拔的对象替换技术,不需要通过继承的方式;
- 可插拔的预定义类缓存表,可以减小序列化的字节数组长度,提升常用类型的对象序列化性能;
- 无须实现java.io.Serializable接口,即可实现Java序列化;
- 通过缓存技术提升对象的序列化性能。
使用方法如下:
引入marshalling的maven依赖:
<dependency><groupId>org.jboss.marshalling</groupId><artifactId>jboss-marshalling-serial</artifactId><version>2.0.6.Final</version>
</dependency>
创建一个生产marshalling编码器和marshalling解码器的工厂类:
package com.morris.netty.serialize.marshalling;import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;public final class MarshallingCodeCFactory {public static MarshallingDecoder buildMarshallingDecoder() {final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);return decoder;}public static MarshallingEncoder buildMarshallingEncoder() {final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);MarshallingEncoder encoder = new MarshallingEncoder(provider);return encoder;}
}
服务器端代码:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); // 解码器ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); // 编码器ch.pipeline().addLast(new ServerHandler());}});
这里不需要处理粘包和半包问题的原因与java序列化不需要处理的原因一致,具体可以参考MarshallingEncoder和MarshallingDecoder的源代码。
服务器端业务代码与java序列化的服务器端业务代码一致。
相比于前面介绍的两种编解码框架,JBoss Marshalling更多是在JBoss内部使用,应用范围有限,netty是JBoss公司的,所以netty要支持一下自己家的产品。
自定义编解码器MessagePack
编码器相关基类:
- 将消息编码为字节:MessageToByteEncoder。
- 将消息编码为消息:MessageToMessageEncoder,T代表源数据的类型。
解码器相关基类:
- 将字节解码为消息:ByteToMessageDecoder。
- 将一种消息类型解码为另一种消息类型:MessageToMessageDecoder。
编解码器类:
- ByteToMessageCodec。
- MessageToMessageCodec。
下面通过自定义编码器和解码器,将MessagePack作为序列化框架嵌入到项目中来:
引入MessagePack的maven依赖:
<dependency><groupId>org.msgpack</groupId><artifactId>msgpack</artifactId><version>0.6.12</version>
</dependency>
编码器:
package com.morris.netty.serialize.messagepack;import com.morris.netty.serialize.pojo.UserRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;public class MessagePackEncoder extends MessageToByteEncoder<UserRequest> {@Overrideprotected void encode(ChannelHandlerContext ctx, UserRequest msg, ByteBuf out)throws Exception {MessagePack messagePack = new MessagePack();byte[] raw = messagePack.write(msg);out.writeBytes(raw);}
}
解码器:
package com.morris.netty.serialize.messagepack;import com.morris.netty.serialize.pojo.UserRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;import java.io.Serializable;
import java.util.List;public class MessagePackDecoder extends MessageToMessageDecoder<ByteBuf> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)throws Exception {byte[] bytes = new byte[in.readableBytes()];in.readBytes(bytes);MessagePack messagePack = new MessagePack();Serializable serializable = messagePack.read(bytes, UserRequest.class);out.add(serializable);}
}
实体类,注意加上@Message注解:
package com.morris.netty.serialize.pojo;import lombok.Data;
import org.msgpack.annotation.Message;import java.io.Serializable;@Message
@Data
public class UserRequest implements Serializable {private int age;private String name;
}
服务器端启动类关键代码:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); // 解决粘包半包问题ch.pipeline().addLast(new MessagePackDecoder()); // 解码器ch.pipeline().addLast(new ServerHandler());}});
服务器端业务处理类:
package com.morris.netty.serialize.messagepack;import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class ServerHandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) {System.out.println("receive from client: " + msg);ctx.writeAndFlush(Unpooled.copiedBuffer("success\n".getBytes()));ctx.fireChannelRead(msg);}}
客户端启动类关键代码:
Bootstrap b = new Bootstrap();
b.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LengthFieldPrepender(2)); // 解决粘包半包问题// 服务器端发送过来的是以换行符分割的文本,所以这里使用LineBasedFrameDecoder处理粘包半包问题ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new MessagePackEncoder()); // 编码器ch.pipeline().addLast(new ClientHandler());}});
客户端业务处理类:
package com.morris.netty.serialize.messagepack;import com.morris.netty.serialize.pojo.UserRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class ClientHandler extends SimpleChannelInboundHandler {@Overridepublic void channelActive(ChannelHandlerContext ctx) {for (int i = 0; i < 100; i++) {UserRequest request = new UserRequest();request.setAge(i);request.setName("morris");ctx.writeAndFlush(request);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) {ByteBuf receiveByteBuf = (ByteBuf) msg;byte[] bytes = new byte[receiveByteBuf.readableBytes()];receiveByteBuf.readBytes(bytes);System.out.println("receive from server: " + new String(bytes));}
}
当然,除了上述介绍的编解码框架和技术之外,比较常用的还有kryo、hession和Json等。
netty编解码器与序列化框架分析相关推荐
- Netty系列之Netty编解码框架分析
1. 背景 1.1. 编解码技术 通常我们也习惯将编码(Encode)称为序列化(serialization),它将对象序列化为字节数组,用于网络传输.数据持久化或者其它用途. 反之,解码(Decod ...
- [强烈推荐] 新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析
新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析 1.引言 Netty 是一个广受欢迎的异步事件驱动的Java开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端. 本文基 ...
- java基础巩固-宇宙第一AiYWM:为了维持生计,手写RPC~Version07(RPC原理、序列化框架们、网络协议框架们 、RPC 能帮助我们做什么呢、RPC异常排查:ctrl+F搜超时)整起
上次Version06说到了咱们手写迷你版RPC的大体流程, 对咱们的迷你版RPC的大体流程再做几点补充: 为什么要封装网络协议,别人说封装好咱们就要封装?Java有这个特性那咱就要用?好像是这样.看 ...
- hive序列生成_常见的序列化框架及Protobuf原理
享学课堂作者:逐梦々少年 转载请声明出处! 上次我们详细的学习了Java中的序列化机制,但是我们日常开发过程中,因为java的序列化机制的压缩效率问题,以及序列化大小带来的传输的效率问题,一般很少会使 ...
- Netty使用kryo序列化传输对象
Netty使用kryo序列化传输对象 横渡 Netty使用kryo序列化传输对象 - 简书参考文章:https://blog.csdn.net/eguid_1/article/details/7931 ...
- Netty 编解码器详解
什么是编解码器? 在网络中都是以字节码的数据形式来传输数据的,如何将其和目标应用程序的自定义消息对象数据格式进行相互转换.这种转换逻辑就需要编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将 ...
- Java 集合框架分析:JAVA集合中的一些边边角角的知识
相关文章: Java 集合框架分析:Set http://blog.csdn.net/youyou1543724847/article/details/52733723 Java 集合框架分析:Lin ...
- Netty ChannelInactive 断链场景分析
本文档主要列举离会.关闭进程.断网.重连等会导致sdk与服务端断开连接的场景的设计与实现,并试图解释其原理 1.Netty断链场景分析 1. Netty对断链的处理 简单来说Netty在检测到断开连接 ...
- RPC系列之Netty实现自定义RPC框架
进行这个章节之前,需要去看一下RMI的实现哈,如果了解过的童鞋可以直接跳过,如果没有或者不知道RMI的童鞋,移驾到下面的链接看完之后再回来继续看这篇 RPC系列之入门_阿小冰的博客-CSDN博客RPC ...
最新文章
- java 实现雷达图,如何使用y轴为0到100的chart.js创建雷达图?
- 堆和堆傻傻分不清?一文告诉你 Java 集合中「堆」的最佳打开方式
- linux otl oracle,linux otl 连接数据库
- protobuf oc
- 微信小程序怎么取mysql_微信小程序如何加载数据库真实数据?
- sequelize 外键关联_mysql – Sequelize.js外键
- uniGUI试用笔记(四)
- RK3399Pro Android Rock-X 人工智能开发系列(1)
- .NET中Redis安装部署及使用方法
- 如何使用 Python 构建一个“谷歌搜索”系统? | 内附代码
- 相对URL拼接为绝对URL的过程
- 斯坦福大学深度学习公开课cs231n学习笔记(1)softmax函数理解与应用
- 安川机器人梯形图指令(一)
- 最全Pycharm教程(6)——将Pycharm作为Vim编辑器使用
- 2021年全球以太网供电(POE)控制器收入大约269.2百万美元,预计2028年达到363.9百万美元
- PCIE标准共享,以及其机械尺寸图
- Maven Setting.xml配置文件下载 阿里云镜像 下载可用
- 【windows】找不到MSVCR100.dll、VCRUNTIME140.dll
- 2014年5月份第1周51Aspx源码发布详情
- “新能源拐点”热议潮未退,小鹏却已经从华尔街赶到了港交所