netty编解码器注意事项及粘包和拆包解决方案
netty编解码器
- 当 Netty 发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如 java 对象);如果是出站消息,它会被编码成字节。
- Netty 提供一系列实用的编解码器,他们都实现了 ChannelInboundHadnler 或者 ChannelOutboundHandler 接口。在这些类中,channelRead 方法已经被重写了。以入站为例,对于每个从入站 Channel 读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的 decode() 方法进行解码,
并将已经解码的字节转发给 ChannelPipeline 中的下一个 ChannelInboundHandler
。
解码器- ByteToMessageDecoder
关系继承图
小细节
- 不论解码器 handler 还是编码器 handler 即接收的消息类型必须与待处理的消息类型一致,否则该 handler 不会被执行。
- 当我们在handler中调用ctx.writeAndFlush()方法后,就会将数据交给ChannelOutboundHandler进行出站处理(如果下一个handler是编码器并且数据类型符合编码器处理的数据类型则会传递给编码器),只是我们没有去定义出站类而已,若有需求可以自己去实现ChannelOutboundHandler出站类。
- 解码器一般不需要设置泛型(待处理数据的类型)。
TCP粘包和拆包基本介绍
1.TCP 是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的 socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的
2.由于 TCP 无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题
TCP粘包和拆包解决方案
具体事例:
1.要求客户端发送 5 个 Message 对象,客户端每次发送一个 Message 对象
2.服务器端每次接收一个 Message,分 5 次进行解码,每读取到一个 Message,会回复一个 Message 对象给客户端。
代码:
MessageProtocol.class
package com.haust.tcp;public class MessageProtocol {//协议包private int len; //关键private byte[] content;//内容public int getLen() {return len;}public void setLen(int len) {this.len = len;}public byte[] getContent() {return content;}public void setContent(byte[] content) {this.content = content;}
}
MyServer.class
package com.haust.tcp;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class MyServer {public static void main(String[] args) throws Exception{EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workerGroup).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定义一个初始化类ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();channelFuture.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}
MyServerInitializer.class
package com.haust.tcp;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;public class MyServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new MyMessageDecoder());//解码器pipeline.addLast(new MyMessageEncoder());//编码器pipeline.addLast(new MyServerHandler());}
}
MyServerHandler.class
package com.haust.tcp;import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.UUID;//处理业务的handler
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {private int count;@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//cause.printStackTrace();ctx.close();}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {// 接收到数据,并处理int len = msg.getLen();byte[] content = msg.getContent();System.out.println("服务器接收到信息如下");System.out.println("长度=" + len);System.out.println("内容=" + new String(content, Charset.forName("utf-8")));System.out.println("服务器接收到消息包数量=" + (++this.count));//回复消息System.out.println("服务端开始回复消息------");String responseContent = UUID.randomUUID().toString();int responseLen = responseContent.getBytes("utf-8").length;byte[] responseContent2 = responseContent.getBytes("utf-8");//构建一个协议包MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(responseLen);messageProtocol.setContent(responseContent2);//客户端需要输入信息,创建一个扫描器
// MessageProtocol messageProtocol = new MessageProtocol();
// int inLen=0;
// byte[] incontent=null ;
// Scanner scanner = new Scanner(System.in);
// while (scanner.hasNextLine()) {// String msg1 = scanner.nextLine();
// //将字符串加工成协议包类型
// inLen = msg1.getBytes().length;
// incontent = msg1.getBytes();
// messageProtocol.setLen(inLen);
// messageProtocol.setContent(incontent);
// //通过ctx 发送到服务器端ctx.writeAndFlush(messageProtocol);
// }}
}
MyClient.class
package com.haust.tcp;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;import java.util.Scanner;public class MyClient {public static void main(String[] args) throws Exception{EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new MyClientInitializer()); //自定义一个初始化类ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();channelFuture.channel().closeFuture().sync();}finally {group.shutdownGracefully();}}
}
MyClientInitializer.class
package com.haust.tcp;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;public class MyClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new MyMessageEncoder()); //加入编码器pipeline.addLast(new MyMessageDecoder()); //加入解码器pipeline.addLast(new MyClientHandler());}
}
MyClientHandler.class
package com.haust.tcp;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.nio.charset.Charset;public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {private int count;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//使用客户端发送10条数据 "今天天气冷,吃火锅" 编号for(int i = 0; i< 5; i++) {String mes = "今天天气冷,吃火锅";byte[] content = mes.getBytes(Charset.forName("utf-8"));int length = mes.getBytes(Charset.forName("utf-8")).length;//创建协议包对象MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(length);messageProtocol.setContent(content);ctx.writeAndFlush(messageProtocol);}}// @Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {int len = msg.getLen();byte[] content = msg.getContent();System.out.println("客户端接收到消息如下");System.out.println("长度=" + len);System.out.println("内容=" + new String(content, Charset.forName("utf-8")));System.out.println("客户端接收消息数量=" + (++this.count));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("异常消息=" + cause.getMessage());ctx.close();}
}
MyMessageDecoder.class
package com.haust.tcp;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;import java.util.List;public class MyMessageDecoder extends ReplayingDecoder<Void> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println();System.out.println();System.out.println("MyMessageDecoder decode 被调用");//需要将得到二进制字节码-> MessageProtocol 数据包(对象)int length = in.readInt();byte[] content = new byte[length];in.readBytes(content);//封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(length);messageProtocol.setContent(content);//放入out传给下一个hanlder进行处理out.add(messageProtocol);}
}
MyMessageEncoder.class
package com.haust.tcp;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {@Overrideprotected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {System.out.println("MyMessageEncoder encode 方法被调用");out.writeInt(msg.getLen());out.writeBytes(msg.getContent());}
}
效果:
server端输出
MyMessageDecoder decode 被调用
服务器接收到信息如下
长度=27
内容=今天天气冷,吃火锅
服务器接收到消息包数量=1
服务端开始回复消息------
MyMessageEncoder encode 方法被调用MyMessageDecoder decode 被调用
服务器接收到信息如下
长度=27
内容=今天天气冷,吃火锅
服务器接收到消息包数量=2
服务端开始回复消息------
MyMessageEncoder encode 方法被调用MyMessageDecoder decode 被调用
服务器接收到信息如下
长度=27
内容=今天天气冷,吃火锅
服务器接收到消息包数量=3
服务端开始回复消息------
MyMessageEncoder encode 方法被调用MyMessageDecoder decode 被调用
服务器接收到信息如下
长度=27
内容=今天天气冷,吃火锅
服务器接收到消息包数量=4
服务端开始回复消息------
MyMessageEncoder encode 方法被调用MyMessageDecoder decode 被调用
服务器接收到信息如下
长度=27
内容=今天天气冷,吃火锅
服务器接收到消息包数量=5
服务端开始回复消息------
MyMessageEncoder encode 方法被调用
client端输出
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用MyMessageDecoder decode 被调用
客户端接收到消息如下
长度=36
内容=f71b9a7d-96c0-4f26-a2ed-5684c30f0a5a
客户端接收消息数量=1MyMessageDecoder decode 被调用
客户端接收到消息如下
长度=36
内容=53cee25c-4a81-4d06-8ec8-bf6a664729e8
客户端接收消息数量=2MyMessageDecoder decode 被调用
客户端接收到消息如下
长度=36
内容=556d84ce-907b-4228-a722-9965b54e589e
客户端接收消息数量=3MyMessageDecoder decode 被调用
客户端接收到消息如下
长度=36
内容=774c3992-a39d-4aa2-9a20-e7fdbd3d4abd
客户端接收消息数量=4MyMessageDecoder decode 被调用
客户端接收到消息如下
长度=36
内容=69f8777f-dc61-4f2f-8f28-fdb0680c96d9
客户端接收消息数量=5
netty编解码器注意事项及粘包和拆包解决方案相关推荐
- 面试官问:你来讲下Netty通信中的粘包、拆包?
点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 作者:Java技术剑 来源:https://urlify.cn/I ...
- Netty 中的粘包和拆包详解
Netty 底层是基于 TCP 协议来处理网络数据传输.我们知道 TCP 协议是面向字节流的协议,数据像流水一样在网络中传输那何来 "包" 的概念呢? TCP是四层协议不负责数据逻 ...
- 什么是粘包和拆包,Netty如何解决粘包拆包?
Netty粘包拆包 TCP 粘包拆包是指发送方发送的若干包数据到接收方接收时粘成一包或某个数据包被拆开接收. 如下图所示,client 发送了两个数据包 D1 和 D2,但是 server 端可能会收 ...
- TCP 粘包和拆包及解决方案
TCP 粘包和拆包基本介绍 1.TCP 是面向连接的,面向流的,提供高可靠性服务.收发两端(客户端和服务器端)都要有一一成对的 socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使 ...
- 20-Netty TCP 粘包和拆包及解决方案
TCP粘包和拆包的基本介绍 TCP是面向连接的, 面向流的, 提供可靠性服务, 收发两端(客户端和服务器端) 都有一一成对的Socket,因此发送端为了将多个发给接收端的包, 更有效的发给对方, 使用 ...
- TCP的粘包和拆包及Netty中的解决方案
1.基本介绍 TCP 是面向连接的,面向流的,提供高可靠性服务.收发两端(客户端和服务器端)都要有一一成对的 socket, 因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(N ...
- Netty(二)——TCP粘包/拆包
转载请注明出处:http://www.cnblogs.com/Joanna-Yan/p/7814644.html 前面讲到:Netty(一)--Netty入门程序 主要内容: TCP粘包/拆包的基础知 ...
- netty的编解码、粘包拆包问题、心跳检测机制原理
文章目录 1. 编码解码器 2. 编解码序列化机制的性能优化 3. Netty粘包拆包 4. Netty心跳检测机制 5. Netty断线自动重连实现 1. 编码解码器 当你通过netty发送或者接受 ...
- 【Netty】Netty解决粘包和拆包问题的四种方案
在RPC框架中,粘包和拆包问题是必须解决一个问题,因为RPC框架中,各个微服务相互之间都是维系了一个TCP长连接,比如dubbo就是一个全双工的长连接.由于微服务往对方发送信息的时候,所有的请求都是使 ...
最新文章
- Tomcat+Apache 负载均衡
- ASP.NET用户登录模块代码
- linux里的dd权限不够怎么办,Linux dd 遇到 容量不足 的 resize 解法
- NRF52810能不能替代NRF52832
- VC++ 开发pop3收邮件程序的相关问题
- 日志-周报-月报(2019年2月)
- android menu item 显示,Android 如何通过menu id来得到menu item 控件 .
- ES6新特性_ES6箭头函数的实践以及应用场景---JavaScript_ECMAScript_ES6-ES11新特性工作笔记010
- (一) 双目立体视觉介绍
- leetcode blind 75
- 中职计算机专业英语说课稿,中职英语说课稿模板.doc
- Canvas API - 江苏黑马 - 博客园
- 贵阳市交通大数据中心
- tomcat 日志拆分
- ps如何放大缩小图层
- python实现将不同的附件发邮件到不同的地区
- 消息队列:SpringBoot集成RocketMQ的那些坑(真实有效、附源码)
- 联通4G业务或沿用沃品牌 不推无限量套餐
- 基于近场动力学的二维疲劳裂纹扩展模型_近场动力学数值模拟的程序实现(1)——引言...
- Microsoft Data Access Components(MDAC) version 2.6 or later