前言

字节的流动形成了流,Netty作为优秀的通信框架他的字节是如何流动的,本文就理一下这个事。梳理完Netty的字节流动与JDK提供的ByteBuffer一对比看下Netty方便在哪里。本分从官方文档概念原理入手梳理,然后看下源码解读下这些原理如何实现的,体验一把Netty写入数据自动扩容,探究下这个过程如何实现的。

一、基本概念

1.ByteBuf创建

使用Unpooled类来创建ByteBuf,不建议使用ByteBuf的构造函数自己去创建。

2.读写索引

ByteBuf提供了两个指针readerIndex和writerIndex,分别记录读、写的开始位置。两个指针将ByteBuf分成了三个区域。

3.discardable bytes

这个区间的范围为0~readerIndex,已经被读过的、可废弃的区域。通过调用discardReadBytes(),可以释放discardable bytes区域。这个区域释放后,可写区域(writable bytes)部分增多。

4.readable bytes

可读区域的范围为(writerIndex-readerIndex)

5.writable bytes

可写区域的范围为(capacity-writerIndex)

6.清理索引

调用Buffer.clear()后,读写索引全部归零,缓存buffer被释放。

二、ByteBuf的构建

接下来通过示例窜下上面的知识点,看下源码是如何实现的,示例中将字符串写入ByteBuf中,然后再读出来打印。

@Test
public void testWriteUtf81() {String str1 = "瓜农";ByteBuf buf = Unpooled.buffer(1);buf.writeBytes(str1.getBytes(CharsetUtil.UTF_8));ByteBuf readByteBuf = ByteBufUtil.readBytes(UnpooledByteBufAllocator.DEFAULT,buf,str1.getBytes(CharsetUtil.UTF_8).length);System.out.print(readByteBuf.toString(CharsetUtil.UTF_8));
}

源码解读

public static ByteBuf buffer(int initialCapacity) {return ALLOC.heapBuffer(initialCapacity); // 注解@1
}public ByteBuf heapBuffer(int initialCapacity) {return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY); // 注解@2
}InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {super(alloc, initialCapacity, maxCapacity);
}public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {super(maxCapacity);if (initialCapacity > maxCapacity) {throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));}this.alloc = checkNotNull(alloc, "alloc");setArray(allocateArray(initialCapacity)); // 注解@3setIndex(0, 0); // 注解@4
}

注解@1 使用ByteBufAllocator来分配ByteBuf,默认为UnpooledByteBufAllocator。

注解@2 initialCapacity为初始容量例子中给的为16,maxCapacity为默认的DEFAULT_MAX_CAPACITY=Integer.MAX_VALUE。

注解@3 allocateArray()的方法如下,此时使用JDK的byte[]初始化缓存区。通过setArray(),UnpooledHeapByteBuf持有byte[]缓存区。

 protected byte[] allocateArray(int initialCapacity) {return new byte[initialCapacity];}private void setArray(byte[] initialArray) {array = initialArray;tmpNioBuf = null;
}

注解@4 初始化readerIndex和writerIndex,均为0。

小结 ByteBuf的构建通过Unpooled来分配,示例中通过UnpooledByteBufAllocator持有byte[]、 readerIndex、writerIndex、maxCapacity完成ByteBuf的初始化。示例中array数组大小为16;readerIndex=writerIndex=0;maxCapacity=Integer.MAX_VALUE。

三、写入数据

public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {ensureWritable(length); // 注解@5setBytes(writerIndex, src, srcIndex, length); // 注解@6writerIndex += length; // 注解@7return this;
}

注解@5 确保剩余的空间能够容纳需写入的数据。

具体逻辑如下:如果写入的数据长度小于已经分配的容量空间capacity则允许直接返回;

如果写入的数据长度超过允许的最大容量maxCapacity直接抛出IndexOutOfBoundsException拒绝;

如果写入数据长度大于已经分配的空间capacity但是小于最大最大允许空间maxCapacity,则需要扩容。

final void ensureWritable0(int minWritableBytes) {final int writerIndex = writerIndex(); // 注解@5.1final int targetCapacity = writerIndex + minWritableBytes; // 注解@5.2if (targetCapacity <= capacity()) { // 注解@5.3ensureAccessible();return;}if (checkBounds && targetCapacity > maxCapacity) { // 注解@5.4 ensureAccessible();throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",writerIndex, minWritableBytes, maxCapacity, this));}// Normalize the target capacity to the power of 2.final int fastWritable = maxFastWritableBytes(); // 注解@5.5 int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable: alloc().calculateNewCapacity(targetCapacity, maxCapacity); // 注解@5.6// Adjust to the new capacity.capacity(newCapacity); // 注解@5.7
}

注解@5.1  获取当前写索引

注解@5.2 计算需要的容量

注解@5.3 与当前已分配的容量capacity进行比较

注解@5.4 不能超过最大允许的容量maxCapacity

注解@5.5 fastWritable = capacity() - writerIndex

注解@5.6 newCapacity的判断通常走到这里应该为,剩余的空间不够了。所以通常会进入alloc().calculateNewCapacity(targetCapacity, maxCapacity)。

public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {// ...final int threshold = CALCULATE_THRESHOLD; // 4 MiB pageif (minNewCapacity == threshold) { // 注解@5.6.1return threshold;}// If over threshold, do not double but just increase by threshold.if (minNewCapacity > threshold) { // 注解@5.6.2int newCapacity = minNewCapacity / threshold * threshold;if (newCapacity > maxCapacity - threshold) {newCapacity = maxCapacity;} else {newCapacity += threshold;}return newCapacity;}// Not over threshold. Double up to 4 MiB, starting from 64.int newCapacity = 64;while (newCapacity < minNewCapacity) { // 注解@5.6.3newCapacity <<= 1;}return Math.min(newCapacity, maxCapacity);}

注解@5.6.1 如果写入的数据长度刚好为4M则返回threshold=4M

注解@5.6.2 如果写入的数据长度大于4M,newCapacity不再翻倍增长,通过minNewCapacity / threshold * threshold计算刚容下需要的数据即可。

注解@5.6.3 如果写入的数据长度小于4M,则newCapacity从64翻倍增长(128、256、512...),直到newCapacity能够容纳需要写入的数据。

注解@5.7 确定了要扩容的容量newCapacity后,我们看下如何扩容的。

public ByteBuf capacity(int newCapacity) {checkNewCapacity(newCapacity);byte[] oldArray = array;int oldCapacity = oldArray.length;if (newCapacity == oldCapacity) {return this;}int bytesToCopy;if (newCapacity > oldCapacity) {bytesToCopy = oldCapacity;} else {trimIndicesToCapacity(newCapacity);bytesToCopy = newCapacity;}byte[] newArray = allocateArray(newCapacity); // 注解@5.7.1System.arraycopy(oldArray, 0, newArray, 0, bytesToCopy); // 注解@5.7.2setArray(newArray); // 注解@5.7.3freeArray(oldArray); // 注解@5.7.4return this;
}

注解@5.7.1 使用新的容量初始化newArray=new byte[initialCapacity]

注解@5.7.2 将旧的oldArray数据拷贝到新的newArray=new中

注解@5.7.3 将UnpooledHeapByteBuf的byte[]引用替换为newArray

注解@5.7.4 oldArray清理操作

注解@6 写入数据,通过System.arraycopy将数据写入array中。

public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {checkSrcIndex(index, length, srcIndex, src.length);System.arraycopy(src, srcIndex, array, index, length);return this;
}

注解@7 移动writerIndex指针。

小结: 将上面例子的initialCapacity设置成1,促使写入数据时扩充容量。下面运行时截图:array被扩容到64,writerIndex从0位置移动到6.

在写入数据时,判断剩余容量是否足够;不够则需要扩容,如果写入的数据小于4M,则双倍增长,直到容纳写写入的数据。如果写入的数据大于4M,通过(minNewCapacity / threshold * threshold)计算需要扩容的大小。

四、读出数据

从buf中把刚才写入的数据(”瓜农“)读出来,通过工具类ByteBufUtil.readBytes来实现。

ByteBuf readByteBuf = ByteBufUtil.readBytes(UnpooledByteBufAllocator.DEFAULT,buf,str1.getBytes(CharsetUtil.UTF_8).length);
System.out.print(readByteBuf.toString(CharsetUtil.UTF_8));
public static ByteBuf readBytes(ByteBufAllocator alloc, ByteBuf buffer, int length) {boolean release = true;ByteBuf dst = alloc.buffer(length); // 注解@8try {buffer.readBytes(dst); // 注解@9release = false;return dst;} finally {if (release) {dst.release();}}
}

注解@8 重新构造了一个ByteBuf(dst)用于存储读取的数据

注解@9 读取数据,并移动读索引。

@Override
public ByteBuf readBytes(ByteBuf dst, int length) {if (checkBounds) {if (length > dst.writableBytes()) {throw new IndexOutOfBoundsException(String.format("length(%d) exceeds dst.writableBytes(%d) where dst is: %s", length, dst.writableBytes(), dst));}}readBytes(dst, dst.writerIndex(), length); // 注解@9.1dst.writerIndex(dst.writerIndex() + length); // 注解@9.2return this;
}

注解@9.1 读取字节到新的ByteBuf。

public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {checkReadableBytes(length);getBytes(readerIndex, dst, dstIndex, length); // 注解@9.1.1readerIndex += length; // 注解@9.1.2return this;
}

注解@9.1.1 通过native api UNSAFE.copyMemory() 实现byte数组之间的拷贝

注解@9.1.2 源byteBuf读索引readerIndex向前移动

注解@9.2 数据读入新构建的缓存区dst,dst的写索引向前移动

小结: 示例中通过构造一个新的ByteBuf(dst),将源ByteBuf(buf)的数据读入到dst。数据读取结束后,源ByteBuf(buf)readerIndex向前移动;ByteBuf(dst)的writerIndex向前移动。

Netty8# Netty之ByteBuf初探相关推荐

  1. Netty 的 ByteBuf 是如何支持 堆内存非池化 实现的

    Netty的ByteBuf是如何支持堆内存非池化实现的 ByteBuffer 从实现方式上分成 HeapByteBuffer 和 DirectByteBuffer 两种内存实现方式, HeapByte ...

  2. Netty谈谈ByteBuf

    前言 在网络传输过程中,字节是最基本也是最小的单元.JAVA NIO有提供一个ByteBuffer容器去装载这些数据,但是用起来会有点复杂,经常要在读写间进行切换以及不支持动态扩展等等.而netty为 ...

  3. Netty 教程 – ByteBuf详解

    ByteBuffer存在的问题 ByteBuffer是JDK1.4中提供的java.nio.Buffer, 在内存中预留指定大小的存储空间来存放临时数据,其他Buffer的子类有:CharBuffer ...

  4. Netty(九)——ByteBuf源码之析

    ByteBuf在Netty占据着中重要的位置,上篇<Netty--ByteBuf功能之说>讲了ByteBuf的工作原理和重要功能介绍.这篇从源码的角度来看ByteBuf.首先,来看一下主要 ...

  5. Netty中ByteBuf 的零拷贝

    转载:https://www.jianshu.com/p/1d1fa2fe1ed9 此文章已同步发布在我的 segmentfault 专栏. 根据 Wiki 对 Zero-copy 的定义: &quo ...

  6. Netty Associated -- ByteBuf

    ByteBuf ByteBuf是Netty的Server与Client之间通信的数据传输载体.他提供了一个byte数组(byte[])的抽象视图 buffer创建 我们推荐通过一个Unpooled的帮 ...

  7. Netty之ByteBuf详解

    1. ByteBuf的创建 在Netty中,有一个比较常见的对象ByteBuf,它其实等同于Java Nio中的ByteBuffer,但是ByteBuf对Nio中的ByteBuffer的功能做了很作增 ...

  8. Netty中ByteBuf的copy、duplicate、slice方法对比

    Jdk注释翻译 /** *返回ByteBuf的可读字节的拷贝.修改返回的ByteBuf内容与当前ByteBuf完全不会相互影响. *此方法不会修改当前ByteBuf的readerIndex或write ...

  9. 【Netty】Netty为什么要手动释放ByteBuf资源?

    ByteBuf是Netty网络通信框架中一个重要的组件.先进和友好的设计理念让开发者受益匪浅. 两个指针操作ByteBuf -> 读和写 对象池技术 -> 非垃圾回收机制 对象池技术 对象 ...

  10. Netty ByteBuf(图解之 2)| 秒懂

    目录 Netty ByteBuf(图解二):API 图解 源码工程 写在前面 ByteBuf 的四个逻辑部分 ByteBuf 的三个指针 ByteBuf 的三组方法 ByteBuf 的引用计数 Byt ...

最新文章

  1. Linux下安装数据库
  2. c 路径 空格 参数_好听的炫舞名字空格最新_好听的炫舞名字空格2020
  3. Android SearchView 搜索框
  4. [Ubuntu] ThinkPad T410i linux下如何调节亮度
  5. MPU6050开发 -- 卡尔曼滤波(转)
  6. Code Runner for VS Code 突破 1000 万下载量!支持运行超过 40 种语言
  7. 车联网 python_利用百度车联网提供的天气查询接口用python查询天气信息
  8. java 树状 子节点_java构建树形列表(带children属性)
  9. 基于JAVA+SpringMVC+MYSQL的在线考试系统
  10. linux ssh环境,在Linux下ssh 环境的登录 和 文件拷贝
  11. Redis07-对象结构体redisObject
  12. 【SpringBoot】解决拦截器注入 Service 为空问题
  13. 数学归纳法证明时间复杂度
  14. HttpClient、HttpURLConnection、OKHttp和Volley
  15. PHP网站常见一些安全漏洞及防御方法
  16. LSD_SLAM 编译、安装到运行demo
  17. uniapp的分享到朋友圈和朋友(APP)
  18. MATLAB代码:含冰蓄冷空调的冷热电联供型微网多时间尺度优化调度
  19. 今日头条校园招聘历年经典面试题汇总:C++研发岗
  20. tag untag pvid 理解

热门文章

  1. Ngnix 搭建视频直播服务器
  2. CSS设置背景和渐变色
  3. winen中文_enWin使用部分中文字库
  4. Blender烘焙光照贴图
  5. Jetson TX2使用记录
  6. 新元宇宙奇科幻小说原创作品每周连载地球人奇游天球记第六回冬奥登月
  7. springboot starter封装永中预览
  8. 计算机进入安全模式的原因,电脑只能进入安全模式的原因及处理方法
  9. Snort的TILE64移植
  10. MPLS-虚拟专用网络