Netty教程(九)——解码器
https://juejin.cn/post/6889632659979862029
ByteToMessageDecoder
看看ByteToMessageDecoder
这个解码器的channelRead
方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof ByteBuf) {//存放解析出来的结果CodecOutputList out = CodecOutputList.newInstance();try {first = cumulation == null;//累加字节流cumulation = cumulator.cumulate(ctx.alloc(),first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);//解码callDecode(ctx, cumulation, out);} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {try {if (cumulation != null && !cumulation.isReadable()) {numReads = 0;cumulation.release();cumulation = null;} else if (++numReads >= discardAfterReads) {numReads = 0;discardSomeReadBytes();}int size = out.size();firedChannelRead |= out.insertSinceRecycled();//向后传播ByteBuffireChannelRead(ctx, out, size);} finally {out.recycle();}}} else {ctx.fireChannelRead(msg);}
}
主要过程分为3步
- 累加字节流
- 调用子类的decode方法进行解析
- 将解析到的ByteBuf向后传播
累加字节流
这部分代码片段如下
//累加字节流
first = cumulation == null;
cumulation = cumulator.cumulate(ctx.alloc(),first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
通过一个累加器cumulator
累加字节流
首先会判断是否是第一次累加,来决定是否要分配空的bytebuf
cumulator
是Cumulator
类的实例,来看看它的cumulate
方法
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {if (!cumulation.isReadable() && in.isContiguous()) {?/省略}try {final int required = in.readableBytes();if (required > cumulation.maxWritableBytes() ||(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||cumulation.isReadOnly()) {//读入的数据大于剩余容量,进行扩容return expandCumulation(alloc, cumulation, in);}//读入数据cumulation.writeBytes(in, in.readerIndex(), required);//调整bytebuf的写索引in.readerIndex(in.writerIndex());return cumulation;} finally {//省略}
}
可以看到,就是通过一个bytebuf累加字节流,在适当的时候进行扩容
调用子类的decode方法进行解析
接下来看看callDecode
方法
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {while (in.isReadable()) {//记录解码前out的大小int outSize = out.size();if (outSize > 0) {//如果已经有解码结果,将其向后传播fireChannelRead(ctx, out, outSize);out.clear();if (ctx.isRemoved()) {//如果节点被移除了,退出循环break;}outSize = 0;}//记录一下decode前的可读字节数int oldInputLength = in.readableBytes();//调用子类的decode方法decodeRemovalReentryProtection(ctx, in, out);if (ctx.isRemoved()) {//如果节点被移除了,退出循环break;}if (outSize == out.size()) {if (oldInputLength == in.readableBytes()) {//目前的字节数不足以解析出结果break;} else {continue;}}if (oldInputLength == in.readableBytes()) {//没有使用读入的字节,但是却解析出结果,抛出异常throw new DecoderException(StringUtil.simpleClassName(getClass()) +".decode() did not read anything but decoded a message.");}if (isSingleDecode()) {//如果每次只解析一条message,退出循环break;}}} catch (DecoderException e) {throw e;} catch (Exception cause) {throw new DecoderException(cause);}
}
继续看看decodeRemovalReentryProtection
方法
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)throws Exception {decodeState = STATE_CALLING_CHILD_DECODE;try {//调用子类的decode方法decode(ctx, in, out);} finally {boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;decodeState = STATE_INIT;if (removePending) {//该节点正在等待被移除,向后传播结果然后移除该节点fireChannelRead(ctx, out, out.size());out.clear();handlerRemoved(ctx);}}
}
向后传播ByteBuf
来看看finally块里的fireChannelRead
方法
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {for (int i = 0; i < numElements; i ++) {ctx.fireChannelRead(msgs.getUnsafe(i));}
}
可以看到,这里就是将解析出来的bytebuf一个个向后传播
FixedLengthFrameDecoder
FixedLengthFrameDecoder
是基于固定长度(字节的长度)的解码器,它集成自ByteToMessageDecoder
,在构造函数里传入frameLength
表示长度
来看看它的decode
方法
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {Object decoded = decode(ctx, in);if (decoded != null) {out.add(decoded);}
}protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {if (in.readableBytes() < frameLength) {return null;} else {return in.readRetainedSlice(frameLength);}
}
可以看到,解码的逻辑很简单
- 如果当前累加器里可读的字节数小于frameLength,返回null,out里不会添加解析结果
- 如果当前累加器里可读的字节数大于等于frameLength,那么读取frameLength个字节构建新的bytebuf,添加到out结果里
LineBasedFrameDecoder
LineBasedFrameDecoder
是基于行的解码器
先来看看他的几个比较重要的成员变量
//能够解码的最大长度
private final int maxLength;
//超出最大长度后是否要立即抛出异常,默认为false
private final boolean failFast;
//解析出来的结果是否丢弃换行符,默认为true
private final boolean stripDelimiter;
接下来看看它的decode方法
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {Object decoded = decode(ctx, in);if (decoded != null) {out.add(decoded);}
}
同样,它的核心方法在另一个decode方法里
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {//寻找行换行符,如果使用'\r\n'换行,那么位置指向\rfinal int eol = findEndOfLine(buffer);if (!discarding) {//非丢弃模式if (eol >= 0) {//如果找到了行末final ByteBuf frame;//计算该行长度(不包括换行符长度)final int length = eol - buffer.readerIndex();//根据换行符长度final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;if (length > maxLength) {//行长度超过最大长度,读入数据并且抛出异常buffer.readerIndex(eol + delimLength);fail(ctx, length);return null;}if (stripDelimiter) {//如果去掉换行符,frame = buffer.readRetainedSlice(length);buffer.skipBytes(delimLength);} else {frame = buffer.readRetainedSlice(length + delimLength);}//返回解析到的结果return frame;} else {//没找到换行符final int length = buffer.readableBytes();if (length > maxLength) {//超出最大长度,将读入的数据都丢弃,进入丢弃模式discardedBytes = length;buffer.readerIndex(buffer.writerIndex());discarding = true;offset = 0;if (failFast) {fail(ctx, "over " + discardedBytes);}}return null;}} else {if (eol >= 0) {//丢弃模式下,读取到了换行符,退出丢弃模式final int length = discardedBytes + eol - buffer.readerIndex();final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;buffer.readerIndex(eol + delimLength);discardedBytes = 0;discarding = false;if (!failFast) {fail(ctx, length);}} else {//丢弃模式下,没有读取到换行符,继续丢弃数据discardedBytes += buffer.readableBytes();buffer.readerIndex(buffer.writerIndex());// We skip everything in the buffer, we need to set the offset to 0 again.offset = 0;}return null;}
}
逻辑也比较简单
- 非丢弃模式下
- 若读取到了换行符,那么读取出该行然后返回结果
- 若没有读取到换行符,并且累加器内的字节数超过最大长度,那么丢弃这部分数据,进入丢弃模式
- 丢弃模式下
- 若读取到了换行符,丢弃直到换行符的数据,退出丢弃模式
- 若没有读取到换行符,继续丢弃数据
LengthFieldBasedFrameDecoder
LengthFieldBasedFrameDecoder
是基于长度域解码器
这个类有几个比较重要的成员变量
//长度域的起始偏移
private final int lengthFieldOffset;
//长度域的长度
private final int lengthFieldLength;
//长度矫正(有时候长度域存储的值是整个message的长度,需要设置该值得到真正数据段的长度)
private final int lengthAdjustment;
//跳过的起始字节数(有时候长度域前面有header部分,需要设置该值来跳过这部分找到长度域)
private final int initialBytesToStrip;
//长度域前的header长度加上长度域的长度
private final int lengthFieldEndOffset;
更详细的解释可以去查看官方文档
接下来看看他的decode方法,与之前一样,真正的解码逻辑在另一个decode方法里
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {if (discardingTooLongFrame) {//丢弃过长的数据discardingTooLongFrame(in);}if (in.readableBytes() < lengthFieldEndOffset) {//累加器中的可读字节数太少return null;}//计算长度域在累加器里的实际偏移数int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;//获取无矫正前的真正数据段的长度(即没有将lengthAdjustment考虑进去)long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);if (frameLength < 0) {//长度小于0,抛出异常failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);}//将lengthAdjustment考虑进去,计算整个frame的长度frameLength += lengthAdjustment + lengthFieldEndOffset;if (frameLength < lengthFieldEndOffset) {//整个frame的长度<长度域前的header长度+长度域长度,抛出异常failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);}if (frameLength > maxFrameLength) {//超出最大长度exceededFrameLength(in, frameLength);return null;}// never overflows because it's less than maxFrameLengthint frameLengthInt = (int) frameLength;if (in.readableBytes() < frameLengthInt) {//累加器内的可读字节数小于frame长度return null;}if (initialBytesToStrip > frameLengthInt) {//跳过的字节数超过frame长度,抛出异常failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);}//跳过指定字节数in.skipBytes(initialBytesToStrip);// extract frameint readerIndex = in.readerIndex();int actualFrameLength = frameLengthInt - initialBytesToStrip;//解析出frame数据包的数据ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);//设置累加器的读索引in.readerIndex(readerIndex + actualFrameLength);return frame;
}
逻辑比较简单,概括下来就是这几步
- 计算数据包长度的长度
- 处理跳过的字节
- 解析出数据包
Netty教程(九)——解码器相关推荐
- PVE系列教程(九)、openWRT设置主路由与旁路由模式
PVE系列教程(九).openWRT设置主路由与旁路由模式 为了更好的浏览体验,欢迎光顾勤奋的凯尔森同学个人博客http://nas.hepcloud.top:7000 Hello,小伙伴们,这是凯尔 ...
- Java Netty 教程
Netty是用于Java的高性能IO工具包. Netty是开源的,因此可以自由使用它,甚至可以为它做出贡献.该Netty教程将解释Netty的工作方式以及如何开始使用Netty.但本教程不会涵盖Net ...
- java netty 教程_Java NIO框架Netty教程(十六)
该图是OneCoder通过阅读Netty源码,逐渐记录下来的.基本可以说明Netty服务的启动流程.这里在具体讲解一下. 首先说明,我们这次顺利的流程是基于NioSocketServer的.也就是基于 ...
- Netty 教程 – 解码器详解
TCP以流的方式进行数据传输,上层的应用为了对消息进行区分,往往采用如下方式 固定消息长度,累计读取到长度和定长LEN的报文后,就认为读取到了个完整的消息,然后将计数器位置重置在读取下一个报文内容 将 ...
- 【Netty】自定义解码器、编码器、编解码器(十五)
文章目录 前言 一.自定义基于换行的解码器 1.1 LineBasedFrameDecoder 类 1.2 定义解码器 1.3 定义 ChannelHandler 1.4 定义 ChannelInit ...
- Netty实战九之单元测试
ChannelHandler是Netty应用程序的关键元素,所以彻底地测试他们应该是你的开发过程的一个标准部分.最佳实践要求你的测试不仅要能够证明你的实现是正确的,而且还要能够很容易地隔离那些因修改代 ...
- uCOS-II 基础入门教程(九)
建立任务,OSTaskCreate() 想让µC/OS-Ⅱ管理用户的任务,用户必须要先建立任务.用户可以通过传递任务地址和其它参数到以下两个函数之一来建立任务:OSTaskCreate() 或 OST ...
- netty实战-自定义解码器处理半包消息
概述 在李林锋的Netty系列之Netty编解码框架分析中介绍了各种解码器,也推荐组合 LengthFieldBasedFrameDecoder ByteToMessageDecoder 这两个解码器 ...
- Netty固定长度解码器
FixedLengthFrameDecoder是固定长度解码器,它根据指定的长度自动对消息进行解码,开发者不需要考虑TCP拆包/粘包问题. 下面通过一个例子进行说明. 服务端: 在服务端的Channe ...
最新文章
- android程序贴吧,【Android 教程总结贴】归纳所有android贴
- [HNOI2008]Cards
- SegmentFault 技术周刊 Vol.21 - 程序人生(二):2016 这一年
- rosserial_java_编写ros串口节点,使用官方serial包(示例代码)
- 洛谷P4630 [APIO2018] Duathlon 铁人两项 【圆方树】
- npm命令,开发依赖,版本号【正解】
- 事务管理基础:数据库的并发控制相关知识笔记
- 为什么不应该使用(长期存在的)功能分支
- 一步步编写操作系统 29 cpu缓存简介
- package和import
- [Leetcode]141. Linked List Cycle
- 2016计算机二级公共知识,2016计算机二级《公共基础知识》章节训练与答案
- mongodb batchInsert
- 7wifi模块多少钱_APP开发要多少钱?创业者估算成本的四个方法
- 18 Strings for Mac(Xcode文件翻译工具)
- js 生成二维码 vue项目
- 使用RedRocket方便的查看证券数据
- android手机获取qq闪照的方法,QQ闪照怎么保存 闪照保存到手机的方法教程
- 简单linux命令之备份文件
- Java爬虫Jsoup的使用
热门文章
- 使用Scrutiny在您的网站中查找不同网址中的重复内容的方法
- 数字模拟电子计算机,数字与模拟电路计算机45汇编.ppt
- 由一道CTF对10种反调试的探究
- KMeans 算法复习
- html怎么设置出场动画,css3炫酷coming soon分步显示动画特效
- css position:absolute 父元素高度塌陷
- 解决Office PowerPoint 2007 输入汉字卡死
- 为微信开发填坑:微信网页支付的开发流程及填坑技巧 1
- 传华为将抢先苹果首发卫星通信 ;1200亿亿次每秒,阿里上线世界第一智算中心;wxPython团队辟谣已死传闻|极客头条
- (四)Scala中apply的应用