在RPC框架中,粘包和拆包问题是必须解决一个问题,因为RPC框架中,各个微服务相互之间都是维系了一个TCP长连接,比如dubbo就是一个全双工的长连接。由于微服务往对方发送信息的时候,所有的请求都是使用的同一个连接,这样就会产生粘包和拆包的问题。本文首先会对粘包和拆包问题进行描述,然后介绍其常用的解决方案,最后会对Netty提供的几种解决方案进行讲解。

1. 粘包和拆包

​ 产生粘包和拆包问题的主要原因是,操作系统在发送TCP数据的时候,底层会有一个缓冲区,例如1024个字节大小,如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题;如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。如下图展示了粘包和拆包的一个示意图:

​ 上图中演示了粘包和拆包的三种情况:

  • A和B两个包都刚好满足TCP缓冲区的大小,或者说其等待时间已经达到TCP等待时长,从而还是使用两个独立的包进行发送;
  • A和B两次请求间隔时间内较短,并且数据包较小,因而合并为同一个包发送给服务端;
  • B包比较大,因而将其拆分为两个包B_1和B_2进行发送,而这里由于拆分后的B_2比较小,其又与A包合并在一起发送。

2. 常见解决方案

​ 对于粘包和拆包问题,常见的解决方案有四种:

  • 客户端在发送数据包的时候,每个包都固定长度,比如1024个字节大小,如果客户端发送的数据长度不足1024个字节,则通过补充空格的方式补全到指定长度;
  • 客户端在每个包的末尾使用固定的分隔符,例如\r\n,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的\r\n,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包;
  • 将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息;
  • 通过自定义协议进行粘包和拆包的处理。

3. Netty提供的粘包拆包解决方案

3.1 FixedLengthFrameDecoder

​ 对于使用固定长度的粘包和拆包场景,可以使用FixedLengthFrameDecoder,该解码器会每次读取固定长度的消息,如果当前读取到的消息不足指定长度,那么就会等待下一个消息到达后进行补足。其使用也比较简单,只需要在构造函数中指定每个消息的长度即可。这里需要注意的是,FixedLengthFrameDecoder只是一个解码器,Netty也只提供了一个解码器,这是因为对于解码是需要等待下一个包的进行补全的,代码相对复杂,而对于编码器,用户可以自行编写,因为编码时只需要将不足指定长度的部分进行补全即可。下面的示例中展示了如何使用FixedLengthFrameDecoder来进行粘包和拆包处理:

public class EchoServer {public void bind(int port) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 这里将FixedLengthFrameDecoder添加到pipeline中,指定长度为20ch.pipeline().addLast(new FixedLengthFrameDecoder(20));// 将前一步解码得到的数据转码为字符串ch.pipeline().addLast(new StringDecoder());// 这里FixedLengthFrameEncoder是我们自定义的,用于将长度不足20的消息进行补全空格ch.pipeline().addLast(new FixedLengthFrameEncoder(20));// 最终的数据处理ch.pipeline().addLast(new EchoServerHandler());}});ChannelFuture future = bootstrap.bind(port).sync();future.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws InterruptedException {new EchoServer().bind(8080);}
}

​ 上面的pipeline中,对于入栈数据,这里主要添加了FixedLengthFrameDecoderStringDecoder,前面一个用于处理固定长度的消息的粘包和拆包问题,第二个则是将处理之后的消息转换为字符串。最后由EchoServerHandler处理最终得到的数据,处理完成后,将处理得到的数据交由FixedLengthFrameEncoder处理,该编码器是我们自定义的实现,主要作用是将长度不足20的消息进行空格补全。下面是FixedLengthFrameEncoder的实现代码:

public class FixedLengthFrameEncoder extends MessageToByteEncoder<String> {private int length;public FixedLengthFrameEncoder(int length) {this.length = length;}@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)throws Exception {// 对于超过指定长度的消息,这里直接抛出异常if (msg.length() > length) {throw new UnsupportedOperationException("message length is too large, it's limited " + length);}// 如果长度不足,则进行补全if (msg.length() < length) {msg = addSpace(msg);}ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.getBytes()));}// 进行空格补全private String addSpace(String msg) {StringBuilder builder = new StringBuilder(msg);for (int i = 0; i < length - msg.length(); i++) {builder.append(" ");}return builder.toString();}
}

​ 这里FixedLengthFrameEncoder实现了decode()方法,在该方法中,主要是将消息长度不足20的消息进行空格补全。EchoServerHandler的作用主要是打印接收到的消息,然后发送响应给客户端:

public class EchoServerHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("server receives message: " + msg.trim());ctx.writeAndFlush("hello client!");}
}

​ 对于客户端,其实现方式基本与服务端的使用方式类似,只是在最后进行消息发送的时候与服务端的处理方式不同。如下是客户端EchoClient的代码:

public class EchoClient {public void connect(String host, int port) throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 对服务端发送的消息进行粘包和拆包处理,由于服务端发送的消息已经进行了空格补全,// 并且长度为20,因而这里指定的长度也为20ch.pipeline().addLast(new FixedLengthFrameDecoder(20));// 将粘包和拆包处理得到的消息转换为字符串ch.pipeline().addLast(new StringDecoder());// 对客户端发送的消息进行空格补全,保证其长度为20ch.pipeline().addLast(new FixedLengthFrameEncoder(20));// 客户端发送消息给服务端,并且处理服务端响应的消息ch.pipeline().addLast(new EchoClientHandler());}});ChannelFuture future = bootstrap.connect(host, port).sync();future.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}public static void main(String[] args) throws InterruptedException {new EchoClient().connect("127.0.0.1", 8080);}
}

​ 对于客户端而言,其消息的处理流程其实与服务端是相似的,对于入站消息,需要对其进行粘包和拆包处理,然后将其转码为字符串,对于出站消息,则需要将长度不足20的消息进行空格补全。客户端与服务端处理的主要区别在于最后的消息处理handler不一样,也即这里的EchoClientHandler,如下是该handler的源码:

public class EchoClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("client receives message: " + msg.trim());}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush("hello server!");}
}

​ 这里客户端的处理主要是重写了channelActive()channelRead0()两个方法,这两个方法的主要作用在于,channelActive()会在客户端连接上服务器时执行,也就是说,其连上服务器之后就会往服务器发送消息。而channelRead0()主要是在服务器发送响应给客户端时执行,这里主要是打印服务器的响应消息。对于服务端而言,前面我们我们可以看到,EchoServerHandler只重写了channelRead0()方法,这是因为服务器只需要等待客户端发送消息过来,然后在该方法中进行处理,处理完成后直接将响应发送给客户端。如下是分别启动服务端和客户端之后控制台打印的数据:

// server
server receives message: hello server!
// client
client receives message: hello client!

3.2 LineBasedFrameDecoder与DelimiterBasedFrameDecoder

​ 对于通过分隔符进行粘包和拆包问题的处理,Netty提供了两个编解码的类,LineBasedFrameDecoderDelimiterBasedFrameDecoder。这里LineBasedFrameDecoder的作用主要是通过换行符,即\n或者\r\n对数据进行处理;而DelimiterBasedFrameDecoder的作用则是通过用户指定的分隔符对数据进行粘包和拆包处理。同样的,这两个类都是解码器类,而对于数据的编码,也即在每个数据包最后添加换行符或者指定分割符的部分需要用户自行进行处理。这里以DelimiterBasedFrameDecoder为例进行讲解,如下是EchoServer中使用该类的代码片段,其余部分与前面的例子中的完全一致:

@Override
protected void initChannel(SocketChannel ch) throws Exception {String delimiter = "_$";// 将delimiter设置到DelimiterBasedFrameDecoder中,经过该解码一器进行处理之后,源数据将会// 被按照_$进行分隔,这里1024指的是分隔的最大长度,即当读取到1024个字节的数据之后,若还是未// 读取到分隔符,则舍弃当前数据段,因为其很有可能是由于码流紊乱造成的ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.wrappedBuffer(delimiter.getBytes())));// 将分隔之后的字节数据转换为字符串数据ch.pipeline().addLast(new StringDecoder());// 这是我们自定义的一个编码器,主要作用是在返回的响应数据最后添加分隔符ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));// 最终处理数据并且返回响应的handlerch.pipeline().addLast(new EchoServerHandler());
}

​ 上面pipeline的设置中,添加的解码器主要有DelimiterBasedFrameDecoderStringDecoder,经过这两个处理器处理之后,接收到的字节流就会被分隔,并且转换为字符串数据,最终交由EchoServerHandler处理。这里DelimiterBasedFrameEncoder是我们自定义的编码器,其主要作用是在返回的响应数据之后添加分隔符。如下是该编码器的源码:

public class DelimiterBasedFrameEncoder extends MessageToByteEncoder<String> {private String delimiter;public DelimiterBasedFrameEncoder(String delimiter) {this.delimiter = delimiter;}@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {// 在响应的数据后面添加分隔符ctx.writeAndFlush(Unpooled.wrappedBuffer((msg + delimiter).getBytes()));}
}

​ 对于客户端而言,这里的处理方式与服务端类似,其pipeline的添加方式如下:

@Override
protected void initChannel(SocketChannel ch) throws Exception {String delimiter = "_$";// 对服务端返回的消息通过_$进行分隔,并且每次查找的最大大小为1024字节ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(delimiter.getBytes())));// 将分隔之后的字节数据转换为字符串ch.pipeline().addLast(new StringDecoder());// 对客户端发送的数据进行编码,这里主要是在客户端发送的数据最后添加分隔符ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));// 客户端发送数据给服务端,并且处理从服务端响应的数据ch.pipeline().addLast(new EchoClientHandler());
}

​ 这里客户端的处理方式与服务端基本一致,关于这里没展示的代码,其与示例一中的代码完全一致,这里则不予展示。

3.3 LengthFieldBasedFrameDecoder与LengthFieldPrepender

​ 这里LengthFieldBasedFrameDecoderLengthFieldPrepender需要配合起来使用,其实本质上来讲,这两者一个是解码,一个是编码的关系。它们处理粘拆包的主要思想是在生成的数据包中添加一个长度字段,用于记录当前数据包的长度。LengthFieldBasedFrameDecoder会按照参数指定的包长度偏移量数据对接收到的数据进行解码,从而得到目标消息体数据;而LengthFieldPrepender则会在响应的数据前面添加指定的字节数据,这个字节数据中保存了当前消息体的整体字节数据长度。LengthFieldBasedFrameDecoder的解码过程如下图所示:

LengthFieldPrepender的编码过程如下图所示:

​ 关于LengthFieldBasedFrameDecoder,这里需要对其构造函数参数进行介绍:

  • maxFrameLength:指定了每个包所能传递的最大数据包大小;
  • lengthFieldOffset:指定了长度字段在字节码中的偏移量;
  • lengthFieldLength:指定了长度字段所占用的字节长度;
  • lengthAdjustment:对一些不仅包含有消息头和消息体的数据进行消息头的长度的调整,这样就可以只得到消息体的数据,这里的lengthAdjustment指定的就是消息头的长度;
  • initialBytesToStrip:对于长度字段在消息头中间的情况,可以通过initialBytesToStrip忽略掉消息头以及长度字段占用的字节。

​ 这里我们以json序列化为例对LengthFieldBasedFrameDecoderLengthFieldPrepender的使用方式进行讲解。如下是EchoServer的源码:

public class EchoServer {public void bind(int port) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 这里将LengthFieldBasedFrameDecoder添加到pipeline的首位,因为其需要对接收到的数据// 进行长度字段解码,这里也会对数据进行粘包和拆包处理ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));// LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段ch.pipeline().addLast(new LengthFieldPrepender(2));// 对经过粘包和拆包处理之后的数据进行json反序列化,从而得到User对象ch.pipeline().addLast(new JsonDecoder());// 对响应数据进行编码,主要是将User对象序列化为jsonch.pipeline().addLast(new JsonEncoder());// 处理客户端的请求的数据,并且进行响应ch.pipeline().addLast(new EchoServerHandler());}});ChannelFuture future = bootstrap.bind(port).sync();future.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws InterruptedException {new EchoServer().bind(8080);}
}

​ 这里EchoServer主要是在pipeline中添加了两个编码器和两个解码器,编码器主要是负责将响应的User对象序列化为json对象,然后在其字节数组前面添加一个长度字段的字节数组;解码器主要是对接收到的数据进行长度字段的解码,然后将其反序列化为一个User对象。下面是JsonDecoder的源码:

public class JsonDecoder extends MessageToMessageDecoder<ByteBuf> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {byte[] bytes = new byte[buf.readableBytes()];buf.readBytes(bytes);User user = JSON.parseObject(new String(bytes, CharsetUtil.UTF_8), User.class);out.add(user);}
}

JsonDecoder首先从接收到的数据流中读取字节数组,然后将其反序列化为一个User对象。下面我们看看JsonEncoder的源码:

public class JsonEncoder extends MessageToByteEncoder<User> {@Overrideprotected void encode(ChannelHandlerContext ctx, User user, ByteBuf buf)throws Exception {String json = JSON.toJSONString(user);ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes()));}
}

JsonEncoder将响应得到的User对象转换为一个json对象,然后写入响应中。对于EchoServerHandler,其主要作用就是接收客户端数据,并且进行响应,如下是其源码:

public class EchoServerHandler extends SimpleChannelInboundHandler<User> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {System.out.println("receive from client: " + user);ctx.write(user);}
}

​ 对于客户端,其主要逻辑与服务端的基本类似,这里主要展示其pipeline的添加方式,以及最后发送请求,并且对服务器响应进行处理的过程:

@Override
protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));ch.pipeline().addLast(new LengthFieldPrepender(2));ch.pipeline().addLast(new JsonDecoder());ch.pipeline().addLast(new JsonEncoder());ch.pipeline().addLast(new EchoClientHandler());
}
public class EchoClientHandler extends SimpleChannelInboundHandler<User> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.write(getUser());}private User getUser() {User user = new User();user.setAge(27);user.setName("zhangxufeng");return user;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {System.out.println("receive message from server: " + user);}
}

​ 这里客户端首先会在连接上服务器时,往服务器发送一个User对象数据,然后在接收到服务器响应之后,会打印服务器响应的数据。

3.4 自定义粘包与拆包器

​ 对于粘包与拆包问题,其实前面三种基本上已经能够满足大多数情形了,但是对于一些更加复杂的协议,可能有一些定制化的需求。对于这些场景,其实本质上,我们也不需要手动从头开始写一份粘包与拆包处理器,而是通过继承LengthFieldBasedFrameDecoderLengthFieldPrepender来实现粘包和拆包的处理。

​ 如果用户确实需要不通过继承的方式实现自己的粘包和拆包处理器,这里可以通过实现MessageToByteEncoderByteToMessageDecoder来实现。这里MessageToByteEncoder的作用是将响应数据编码为一个ByteBuf对象,而ByteToMessageDecoder则是将接收到的ByteBuf数据转换为某个对象数据。通过实现这两个抽象类,用户就可以达到实现自定义粘包和拆包处理的目的。如下是这两个类及其抽象方法的声明:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
}
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
}

4. 小结

​ 本文首先对粘包和拆包的问题原理进行描述,帮助读者理解粘包和拆包问题所在。然后对处理粘包和拆包的几种常用解决方案进行讲解。接着通过辅以示例的方式对Netty提供的几种解决粘包和拆包问题的解决方案进行了详细讲解。

【Netty】Netty解决粘包和拆包问题的四种方案相关推荐

  1. Netty 解决粘包和拆包问题的四种方案

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | https://my.oschina.net/ ...

  2. Netty解决粘包和拆包问题的四种方案

    在RPC框架中,粘包和拆包问题是必须解决一个问题,因为RPC框架中,各个微服务相互之间都是维系了一个TCP长连接,比如dubbo就是一个全双工的长连接.由于微服务往对方发送信息的时候,所有的请求都是使 ...

  3. netty 高低位转码_Netty解决粘包和拆包问题的四种方案

    在RPC框架中,粘包和拆包问题是必须解决一个问题,因为RPC框架中,各个微服务相互之间都是维系了一个TCP长连接,比如dubbo就是一个全双工的长连接.由于微服务往对方发送信息的时候,所有的请求都是使 ...

  4. 什么是粘包和拆包,Netty如何解决粘包拆包?

    Netty粘包拆包 TCP 粘包拆包是指发送方发送的若干包数据到接收方接收时粘成一包或某个数据包被拆开接收. 如下图所示,client 发送了两个数据包 D1 和 D2,但是 server 端可能会收 ...

  5. Netty是如何解决粘包和拆包问题的

    本文来说下Netty是如何解决粘包和拆包问题的 文章目录 概述 粘包和拆包 常见解决方案 Netty提供的粘包拆包解决方案 FixedLengthFrameDecoder LineBasedFrame ...

  6. Netty 中的粘包和拆包详解

    Netty 底层是基于 TCP 协议来处理网络数据传输.我们知道 TCP 协议是面向字节流的协议,数据像流水一样在网络中传输那何来 "包" 的概念呢? TCP是四层协议不负责数据逻 ...

  7. Netty如何解决粘包拆包?(二)

    前言 TCP是个流协议,所谓流,就是没有界限的一串数据.大家可以想想河里的流水,是连成一片的,其间并没有分界线.TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所 ...

  8. 【Netty】TCP粘包和拆包

    一.前言 前面已经基本上讲解完了Netty的主要内容,现在来学习Netty中的一些可能存在的问题,如TCP粘包和拆包. 二.粘包和拆包 对于TCP协议而言,当底层发送消息和接受消息时,都需要考虑TCP ...

  9. Netty如何解决粘包半包问题

    何为粘包 / 半包? 比如,我们发送两条消息:ABC 和 DEF,那么对方收到的就一定是 ABC 和 DEF 吗? 不一定,对方可能一次就把两条消息接收完了,即 ABCDEF:也可能分成了好多次,比如 ...

最新文章

  1. 正则表达式(Regular Expressions)
  2. 计算机病毒的危害主要体现于对计算机系统的信息破坏和,2014年中央电大专科信息技术应用理论题.doc...
  3. java list类_java_List集合及其实现类
  4. go redis 序列化_求求你不要手写Redis缓存
  5. 【I】ZF2安装 和 创建一个新项目
  6. 6 redis 编译失败_Redis(NoSQL数据库)基础篇
  7. 实用工具网站(经纬度、短链接、图片格式转换、长微博、图片压缩、uuid、繁体字、md5破解)
  8. JAVA--异常(1)
  9. 176条DevOps人员常用的Linux命令速查表
  10. 修改服务器后账套不存在,金蝶KIS专业版环境配置常见问题
  11. 反应测试_SUPERCRC 微反应量热仪DARC差分加速量热仪 PT-DSC压力跟踪差示扫描量热仪...
  12. 华杉讲透《孙子兵法》阅读有感(一)
  13. 开源项目zheng学习
  14. 最近羊毛小更新 青龙面板 薅羊毛 22/6/6更新
  15. 计算机应用专业毕业感言,大学毕业感言一句话
  16. android 获取alertdialog的view,Android开发实现AlertDialog中View的控件设置监听功能分析...
  17. 充分利用微博加快社区发展
  18. html 网页不可以复制粘贴,网页上的文字不能复制,三种方法教你复制全网文字...
  19. linux服务器重启原因排查
  20. 使用Ollydbg破解注册机的两种套路

热门文章

  1. Nature综述:菌根共生的独特性和共性
  2. 人类或起源于古菌?真假?
  3. QIIME 2用户文档. 20命令行界面q2cli(2019.7)
  4. R语言psych包的corr.test函数计算相关性并给出所有相关性的显著性(Correlation matrix and tests of significance via corr.test())
  5. R语言使用caret包的confusionMatrix函数计算混淆矩阵、使用编写的自定义函数可视化混淆矩阵(confusion matrix)
  6. R语言临床预测模型的评价指标与验证指标实战:综合判别改善指数IDI(Integrated Discrimination Improvement, IDI)
  7. linux bash字符串截取
  8. desc mysql 连表查询_Mysql连表查询
  9. 抓图软件_Faststone capture8.3
  10. python librosa 或 ffmpeg 改变音频采样率