文章目录

  • 四 ByteBuf源码分析
    • 4.1 ByteBuf源码分析
      • 4.1.1 ByteBuffer 的缺点
      • 4.1.2 工作原理
      • 4.1.3 API 介绍
      • 4.1.4 AbstractByteBuf源码分析
        • 4.1.4.1 继承关系
        • 4.1.4.2 读取方法
        • 4.1.4.3 写方法
        • 4.1.4.5 重用缓冲区
        • 4.1.4.6 skipBytes
    • 4.2 Reference 引用计数
      • 4.2.1 ReferenceCounted 类
        • 4.2.1.1 基本概述
        • 4.2.1.2 基本方法
      • 4.2.2 ReferenceCountUpdater
        • 4.2.2.1 实现原理
        • 4.2.2.2 基本方法
        • 4.2.2.3 retain 系列方法
        • 4.2.2.4 release 系列方法
    • 4.3 UnpooledHeapByteBuf 源码分析
      • 4.3.1 常用变量与构造器
      • 4.3.2 扩容机制
      • 4.3.3 字节数组复制
      • 4.3.4 转换成JDKByteBuffer
      • 4.3.5 其他方法
    • 4.4 PooledByteBuf内存池原理分析
      • 4.4.1 PoolArena
      • 4.4.2 PoolChunk
      • 4.4.3 PoolSubpage
    • 4.5 PooledDirectByteBuf
      • 4.5.1 创建字节缓存区
      • 4.5.2 复制新的字节缓冲区

四 ByteBuf源码分析

4.1 ByteBuf源码分析

4.1.1 ByteBuffer 的缺点

  • ByteBuffer 长度固定,一旦分配完成,它的容量不能动态扩展和收缩,当需要编码的POJO对象大于ByteBuffer的容量时,会发生索引越界异常。
  • ByteBuffer 只有一个标识位置的指针 position,读写的时候需要手工调用flip()和rewind()等,使用者必须小心谨慎地处理这些API,否则很容易导致程序处理失败。
  • ByteBuffer 的API功能有限,一些高级和实用的特性它不支持,需要使用者自己编程实现。
  • 为了弥补这些不足,Netty提供了自己的 ByteBuffer实现——ByteBuf,下面我们一起学习ByteBuf的原理和主要功能。

4.1.2 工作原理

  • 不同ByteBuf实现类的工作原理不尽相同,本小节我们从 ByteBuf的设计原理出发,一起探寻Netty ByteBuf的设计理念。
  • 首先,ByteBuf依然是个 Byte数组的缓冲区,它的基本功能应该与JDK 的 ByteBuffer一致,提供以下几类基本功能。
  • 7种Java基础类型、byte数组、ByteBuffer ( ByteBuf)等的读写;缓冲区自身的copy和 slice等;。设置网络字节序;构造缓冲区实例;操作位置指针等方法。

策略

  • 由于JDK的 ByteBuffer已经提供了这些基础能力的实现,因此,Netty ByteBuf的实现可以有两种策略。
  • 参考JDK ByteBuffer 的实现,增加额外的功能,解决原ByteBuffer的缺点。
  • 聚合JDK ByteBuffer,通过Facade模式对其进行包装,可以减少自身的代码量,降低实现成本。

缺点一:读写模式来回操作
JDK ByteBuffer由于只有一个位置指针用于处理读写操作,因此每次读写的时候都需要额外调用flip()和 clear()等方法,否则功能将出错,它的典型用法如下。

package com.shu.ByteBuffer;import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;/*** @Author shu* @Date: 2021/11/12/ 16:35* @Description ByteBuffer基本用法**/
public class   TestByteBuffer {public static void main(String[] args) {// 获得FileChanneltry (FileChannel channel = new FileInputStream("shu.txt").getChannel()) {// 获得缓冲区ByteBuffer buffer = ByteBuffer.allocate(10);int hasNext = 0;StringBuilder builder = new StringBuilder();while((hasNext = channel.read(buffer)) > 0) {// 切换模式 limit=position, position=0buffer.flip();// 当buffer中还有数据时,获取其中的数据while(buffer.hasRemaining()) {builder.append((char)buffer.get());}// 切换模式 position=0, limit=capacitybuffer.clear();}System.out.println(builder.toString());} catch (IOException e) {}}
}
  • 当我们需要写模式时需要调用filp方法来切换它的 limit被设置为 position,position设置为0,capacity不变,这样来回切换,是否麻烦。

解决方法:两个指针

  • ByteBuf通过两个位置指针来协助缓冲区的读写操作,读操作使用readerIndex,写操作使用writerIndex。
  • readerIndex和 writerIndex的取值一开始都是0,随着数据的写入 writerIndex 会增加,读取数据会使readerIndex增加,但是它不会超过 writerIndex。在读取之后,0~readerIndex的就被视为discard 的,调用discardReadBytes方法,可以释放这部分空间,它的作用类似ByteBuffer 的 compact方法。ReaderIndex和 writerIndex 之间的数据是可读取的,等价于ByteBuffer position和 limit之间的数据。WriterIndex和 capacity之间的空间是可写的,等价于ByteBuffer limit和 capacity之间的可用空间。
  • 由于写操作不修改readerIndex指针,读操作不修改 writerIndex 指针,因此读写之间不再需要调整位置指针,这极大地简化了缓冲区的读写操作,避免了由于遗漏或者不熟悉flip()操作导致的功能异常。





缺点二:ByteBuffer的动态扩容

  • 通常情况下﹐当我们对ByteBuffer进行 put操作的时候,如果缓冲区剩余可写空间不够,就会发生 BufferOverflowException
  • 为了避免发生这个问题,通常在进行put操作的时候会对剩余可用空间进行校验。
 public ByteBuffer put(ByteBuffer src) {if (src == this)throw new IllegalArgumentException();if (isReadOnly())throw new ReadOnlyBufferException();int n = src.remaining();if (n > remaining())throw new BufferOverflowException();for (int i = 0; i < n; i++)put(src.get());return this;}

解决方法

  • ByteBuf对 write操作进行了封装,由 ByteBuf 的 write操作负责进行剩余可用空间的校验,如果可用缓冲区不足,ByteBuf 会自动进行动态扩展,对于使用者而言,不需要关心底层的校验和扩展细节,只要不超过设置的最大缓冲区容量即可。当可用空间不足时,ByteBuf会帮助我们实现自动扩展,这极大地降低了 ByteBuf的学习和使用成本,提升了开发效率。
  public ByteBuf writeByte(int value) {// 确定是否可写ensureWritable0(1);_setByte(writerIndex++, value);return this;}final void ensureWritable0(int minWritableBytes) {final int writerIndex = writerIndex();final int targetCapacity = writerIndex + minWritableBytes;// using non-short-circuit & to reduce branching - this is a hot path and targetCapacity should rarely overflowif (targetCapacity >= 0 & targetCapacity <= capacity()) {ensureAccessible();return;}if (checkBounds && (targetCapacity < 0 || targetCapacity > maxCapacity)) {ensureAccessible();throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",writerIndex, minWritableBytes, maxCapacity, this));}// Normalize the target capacity to the power of 2.final int fastWritable = maxFastWritableBytes();// 计算新的容量,自动扩容int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable: alloc().calculateNewCapacity(targetCapacity, maxCapacity);// Adjust to the new capacity.// 调整容量capacity(newCapacity);}

4.1.3 API 介绍

读方法



写方法


readerlndex和writerlndex

  • Netty提供了两个指针变量用于支持顺序读取和写入操作:readerIndex 用于标识读取索引,writerIndex用于标识写入索引。两个位置指针将ByteBuf缓冲区分割成三个区域。

  • 调用ByteBuf的 read操作时,从readerIndex 处开始读取。
  • readerIndex到 writerIndex之间的空间为可读的字节缓冲区。
  • 从 writerIndex 到 capacity之间为可写的字节缓冲区。
  • 0到readerIndex 之间是已经读取过的缓冲区。
  • 可以调用discardReadBytes操作来重用这部分空间,以节约内存,防止 ByteBuf的动态扩张。
  • 这在私有协议栈消息解码的时候非常有用,因为TCP底层可能粘包,几百个整包消息被TCP粘包后作为一个整包发送,这样,通过discardReadBytes操作可以重用之前已经解码过的缓冲区,这样就可以防止接收缓冲区因为容量不足导致的扩张。但是,discardReadBytes操作是把双刃剑,不能滥用。

Discardable bytes

  • 相比于其他的 Java对象,缓冲区的分配和释放是个耗时的操作,因此,我们需要尽量重用它们。由于缓冲区的动态扩张需要进行字节数组的复制,它是个耗时的操作,因此,为了最大程度地提升性能,往往需要尽最大努力提升缓冲区的重用率。
  • 假如缓冲区包含了N个整包消息,每个消息的长度为L,消息的可写字节数为R。当读取M个整包消息后,如果不对ByteBuf做压缩或者discardReadBytes操作,则可写的缓冲区长度依然为R。如果调用discardReadBytes操作,则可写字节数会变为R= (R+M×L),之前已经读取的M个整包的空间会被重用。假如此时ByteBuf需要写入R+1个字节,则不需要动态扩张 ByteBuf。
  • 需要指出的是,调用discardReadBytes会发生字节数组的内存复制,所以,频繁调用将会导致性能下降,因此在调用它之前要确认你确实需要这样做,例如牺牲性能来换取更多的可用内存。

 @Overridepublic ByteBuf discardReadBytes() {if (readerIndex == 0) {ensureAccessible();return this;}if (readerIndex != writerIndex) {setBytes(0, this, readerIndex, writerIndex - readerIndex);writerIndex -= readerIndex;adjustMarkers(readerIndex);readerIndex = 0;} else {ensureAccessible();adjustMarkers(readerIndex);writerIndex = readerIndex = 0;}return this;}

Readable bytes和Writable bytes

  • 可读空间段是数据实际存储的区域,以read或者 skip开头的任何操作将会从readerIndex开始读取或者跳过指定的数据,操作完成之后 readerIndex 增加了读取或者跳过的字节数长度。如果读取的字节数长度大于实际可读的字节数,则抛出IndexOutOfBoundsException。当新分配、包装或者复制一个新的 ByteBuf对象时,它的readerIndex为0。
  • 可写空间段是尚未被使用可以填充的空闲空间,任何以 write开头的操作都会从writerIndex开始向空闲空间写入字节,操作完成之后 writerIndex 增加了写入的字节数长度。如果写入的字节数大于可写的字节数,则会抛出 IndexOutOfBoundsException异常。新分配一个 ByteBuf对象时,它的readerIndex为0。通过包装或者复制的方式创建一个新的 ByteBuf对象时,它的writerIndex是 ByteBuf的容量。

Clear操作
正如JDK ByteBuffer的 clear操作,它并不会清空缓冲区内容本身,例如填充为NUL( 0x00)。它主要用来操作位置指针,例如 position、limit和 mark。对于 ByteBuf,它也是用来操作readerIndex和 writerIndex,将它们还原为初始分配值。

Mark与Rest

  • 当对缓冲区进行读操作时,由于某种原因,可能需要对之前的操作进行回滚。读操作并不会改变缓冲区的内容,回滚操作主要就是重新设置索引信息。
  • 对于JDK的 ByteBuffer,调用mark 操作会将当前的位置指针备份到mark变量中,当调用rest操作之后,重新将指针的当前位置恢复为备份在mark 中的值。


查找方法

  • indexOf(int fromIndex, int toIndex, byte value):从当前ByteBuf中定位出首次出现value 的位置,起始索引为fromIndex,终点是tolndex,如果没有查找到则返回-1,否则返回第一条满足搜索条件的位置索引。
  • bytesBefore(byte value):从当前ByteBuf中定位出首次出现 value的位置,起始索引为readerIndex,终点是writerIndex,如果没有查找到则返回-1,否则返回第一条满足搜索条件的位置索引。该方法不会修改readerIndex和 writerIndex。
  • bytesBefore(int length, byte value):从当前ByteBuf中定位出首次出现value 的位置,起始索引为readerIndex,终点是readerIndex+length,如果没有查找到则返回-1,否则返回第一条满足搜索条件的位置索引。如果 length 大于当前字节缓冲区的可读字节数,则抛出IndexOutOfBoundsException异常。
  • bytesBefore(int index,int length, byte value):从当前ByteBuf中定位出首次出现value 的位置,起始索引为index,终点是index+length,如果没有查找到则返回-1,否则返回第一条满足搜索条件的位置索引。如果index+length 大于当前字节缓冲区的容量,则抛出 IndexOutOfBoundsException异常。
  • forEachByte(int index, int length,ByteBufProcessor processor):以index为起始位置,index + length为终止位置进行遍历,与ByteBufProcessor 设置的查找条件进行对比,如果满足条件,则返回位置索引,否则返回-1。
  • forEachByteDesc(ByteBufProcessor processor):遍历当前ByteBuf的可读字节数组,与 ByteBufProcessor设置的查找条件进行对比,如果满足条件,则返回位置索引,否则返回-1。注意对字节数组进行迭代的时候采用逆序的方式,也就是从 writerIndex-1开始迭代,直到rcadcrlndcx 。
  • forEachByteDesc(int index,int length,ByteBufProcessor processor):以 index为起始位置,index +length为终止位置进行遍历,与 ByteBufProcessor 设置的查找条件进行对比,如果满足条件,则返回位置索引,否则返回-1。采用逆序查找的方式,从index + length-1开始,直到index。

Derived buffers

  • duplicate:返回当前ByteBuf的复制对象,复制后返回的 ByteBuf与操作的ByteBuf共享缓冲区内容,但是维护自己独立的读写索引。当修改复制后的ByteBuf内容后,之前原 ByteBuf 的内容也随之改变,双方持有的是同一个内容指针引用。
  • copy:复制一个新的 ByteBuf对象,它的内容和索引都是独立的,复制操作本身并不修改原 ByteBuf的读写索引。
  • copy(int index,int length):从指定的索引开始复制,复制的字节长度为length,复制后的 ByteBuf内容和读写索引都与之前的独立。
  • slice:返回当前 ByteBuf的可读子缓冲区,起始位置从readerIndex到 writerIndex,返回后的 ByteBuf与原 BytcBuf 共享内容,但是读写索引独立维护。该操作并不修改原ByteBuf的 readerIndex和 writerIndex。
  • slice(int index, int length):返回当前 ByteBuf的可读子缓冲区,起始位置从index到index + length,返回后的 ByteBuf与原ByteBuf共享内容,但是读写索引独立维护。该操作并不修改原 ByteBuf 的readerIndex和 writerIndex。

转换成标准的 ByteBuffer

  • ByteBuffer nioBuffer():将当前 ByteBuf可读的缓冲区转换成ByteBuffer,两者共享同一个缓冲区内容引用,对ByteBuffer的读写操作并不会修改原ByteBuf的读写索引。需要指出的是,返回后的ByteBuffer无法感知原 ByteBuf的动态扩展操作。
  • ByteBuffer nioBuffer(int index,int length):将当前ByteBuf 从index开始长度为length 的缓冲区转换成 ByteBuffer,两者共享同一个缓冲区内容引用,对 ByteBuffer 的读写操作并不会修改原ByteBuf的读写索引。需要指出的是,返回后的 ByteBuffer无法感知原ByteBuf的动态扩展操作。

4.1.4 AbstractByteBuf源码分析

4.1.4.1 继承关系

  • 直接内存(DirectByteBuf〉字节缓冲区:非堆内存,它在堆外进行内存分配,相比于堆内存,它的分配和回收速度会慢一些,但是将它写入或者从Socket Channel中读取时,由于少了一次内存复制,速度比堆内存快。
  • 堆内存(HeapByteBuf)字节缓冲区:特点是内存的分配和回收速度快,可以被JVM自动回收;缺点就是如果进行Socket的 I/O读写,需要额外做一次内存复制,将堆内存对应的缓冲区复制到内核Channel中,性能会有一定程度的下降。
  • 正是因为各有利弊,所以 Netty提供了多种 ByteBuf供开发者使用,经验表明,ByteBuf的最佳实践是在IO通信线程的读写缓冲区使用DirectByteBuf,后端业务消息的编解码模块使用HeapByteBuf,这样组合可以达到性能最优。
  • 从内存回收角度看,ByteBuf也分为两类:基于对象池的ByteBuf和普通ByteBuf。两者的主要区别就是基于对象池的 ByteBuf可以重用 ByteBuf对象,它自己维护了一个内存池,可以循环利用创建的ByteBuf,提升内存的使用效率,降低由于高负载导致的频繁GC.测试表明使用内存池后的Netty在高负载、大并发的冲击下内存和GC 更加平稳。
  • 尽管推荐使用基于内存池的 ByteBuf,但是内存池的管理和维护更加复杂,使用起来也需要更加谨慎,因此,Netty提供了灵活的策略供使用者来做选择。
4.1.4.2 读取方法

 @Overridepublic byte readByte() {checkReadableBytes0(1);int i = readerIndex;byte b = _getByte(i);readerIndex = i + 1;return b;}
  • 在读之前,首先对缓冲区的可用空间进行校验。
  private void checkReadableBytes0(int minimumReadableBytes) {ensureAccessible();if (checkBounds && readerIndex > writerIndex - minimumReadableBytes) {throw new IndexOutOfBoundsException(String.format("readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",readerIndex, minimumReadableBytes, writerIndex, this));}}
  • 如果读取的长度小于0,则抛出 IllegalArgumentException异常提示参数非法;如果可写的字节数小于需要读取的长度,则抛出 IndexOutOfBoundsException异常,由于异常中封装了详细的异常信息,所以使用者可以非常方便地进行问题定位。
4.1.4.3 写方法
  • 首先对写入字节数组的长度进行合法性校验。
    @Overridepublic ByteBuf writeByte(int value) {ensureWritable0(1);_setByte(writerIndex++, value);return this;}
  • 如果当前写入的字节数组长度虽然大于目前ByteBuf的可写字节数,但是通过自身的动态扩展可以满足新的写入请求,则进行动态扩展。可能有读者会产生疑问,既然需要写入的字节数组长度大于当前缓冲区可写的空间,为什么不像JDK 的 ByteBuffer那样抛出缓冲区越界异常呢?
 final void ensureWritable0(int minWritableBytes) {final int writerIndex = writerIndex();// 目标容量final int targetCapacity = writerIndex + minWritableBytes;// using non-short-circuit & to reduce branching - this is a hot path and targetCapacity should rarely overflowif (targetCapacity >= 0 & targetCapacity <= capacity()) {ensureAccessible();return;}// 异常处理if (checkBounds && (targetCapacity < 0 || targetCapacity > maxCapacity)) {ensureAccessible();throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",writerIndex, minWritableBytes, maxCapacity, this));}// Normalize the target capacity to the power of 2.final int fastWritable = maxFastWritableBytes();// 是否扩容int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable: alloc().calculateNewCapacity(targetCapacity, maxCapacity);// 设置新容量capacity(newCapacity);}
  • Netty 的 ByteBuffer可以动态扩展,为了保证安全性,允许使用者指定最大的容量,在容量范围内,可以先分配个较小的初始容量,后面不够用再动态扩展,这样可以达到功能和性能的最优组合。
  • 我们继续看calculateNewCapacity方法的实现:首先需要重新计算下扩展后的容量,它有一个参数,等于 writerIndex + minWritableBytes,也就是满足要求的最小容量。
@Overridepublic int calculateNewCapacity(int minNewCapacity, int maxCapacity) {checkPositiveOrZero(minNewCapacity, "minNewCapacity");if (minNewCapacity > maxCapacity) {throw new IllegalArgumentException(String.format("minNewCapacity: %d (expected: not greater than maxCapacity(%d)",minNewCapacity, maxCapacity));}final int threshold = CALCULATE_THRESHOLD; // 4 MiB pageif (minNewCapacity == threshold) {return threshold;}// If over threshold, do not double but just increase by threshold.if (minNewCapacity > threshold) {int newCapacity = minNewCapacity / threshold * threshold;if (newCapacity > maxCapacity - threshold) {newCapacity = maxCapacity;} else {newCapacity += threshold;}return newCapacity;}// Not over threshold. Double up to 4 MiB, starting from 64.int newCapacity = 64;while (newCapacity < minNewCapacity) {newCapacity <<= 1;}return Math.min(newCapacity, maxCapacity);}
  • 首先设置门限阈值为4M,当需要的新容量正好等于门限阈值,则使用阈值作为新的缓冲区容量。如果新申请的内存空间大于阈值,不能采用倍增的方式(防止内存膨胀和浪费)扩张内存,采用每次步进4M的方式进行内存扩张。扩张的时候需要对扩张后的内存和最大内存 ( maxCapacity)进行比较,如果大于缓冲区的最大长度,则使用maxCapacity作为扩容后的缓冲区容量。
   static final int DEFAULT_INITIAL_CAPACITY = 256;static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;static final int DEFAULT_MAX_COMPONENTS = 16;static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
  • 如果扩容后的新容量小于阈值,则以64为计数进行倍增,直到倍增后的结果大于或等于需要的容量值。
  • 采用倍增或者步进算法的原因如下:如果以 minNewCapacity 作为目标容量,则本次扩容后的可写字节数刚好够本次写入使用。写入完成后,它的可写字节数会变为0,下次做写入操作的时候,需要再次动态扩张。这样就会形成第一次动态扩张后,每次写入操作都会进行动态扩张,由于动态扩张需要进行内存复制,频繁的内存复制会导致性能下降。
  • 采用先倍增后步进的原因如下:当内存比较小的情况下,倍增操作并不会带来太多的内存浪费,例如64字节–>128字节–>256字节,这样的内存扩张方式对于大多数应用系统是可以接受的。但是,当内存增长到一定阈值后,再进行倍增就可能会带来额外的内存浪费,例如10M,采用倍增后变为20M,很有可能系统只需要12M,扩张到20M后会带来8M的内存浪费。由于每个客户端连接都可能维护自己独立的接收和发送缓冲区,这样随着客户读的线性增长,内存浪费也会成比例的增加,因此,达到某个阈值后就需要以步进的方式对内存进行平滑地扩张。
4.1.4.5 重用缓冲区
@Overridepublic ByteBuf discardReadBytes() {if (readerIndex == 0) {ensureAccessible();return this;}if (readerIndex != writerIndex) {setBytes(0, this, readerIndex, writerIndex - readerIndex);writerIndex -= readerIndex;adjustMarkers(readerIndex);readerIndex = 0;} else {ensureAccessible();adjustMarkers(readerIndex);writerIndex = readerIndex = 0;}return this;}

首先对读索引进行判断,如果为0则说明没有可重用的缓冲区,直接返回。如果读索引大于О且读索引不等于写索引,说明缓冲区中既有已经读取过的被丢弃的缓冲区,也有尚未读取的可读缓冲区。调用setBytes(0, this, readerIndex, writerIndex - readerIndex)方法进行字节数组复制。将尚未读取的字节数组复制到缓冲区的起始位置,然后重新设置读写索引,读索引设置为0,写索引设置为之前的写索引减去读索引(重用的缓冲区长度)。

protected final void adjustMarkers(int decrement) {int markedReaderIndex = this.markedReaderIndex;if (markedReaderIndex <= decrement) {this.markedReaderIndex = 0;int markedWriterIndex = this.markedWriterIndex;if (markedWriterIndex <= decrement) {this.markedWriterIndex = 0;} else {this.markedWriterIndex = markedWriterIndex - decrement;}} else {this.markedReaderIndex = markedReaderIndex - decrement;markedWriterIndex -= decrement;}}
  • 首先对备份的markedReaderIndex和需要减少的decrement进行判断,如果小于需要减少的值,则将 markedReaderIndex设置为0。注意,无论 markedReaderIndex还是markedWriterIndex,它的取值都不能小于0。如果markedWriterIndex也小于需要减少的值,则markedWriterIndex置为0,否则,markedWriterIndex 减去decrement之后的值就是新的markedWriterIndex 。
  • 如果需要减小的值小于 markedReaderIndex,则它也一定也小于markedWriterIndex,markedReaderIndex和 markedWriterIndex的新值就是减去decrement之后的取值。
  • 如果rcadcrIndcx等于writcrIndcx,则说明没有可读的字节数组,那就不需要进行内存复制,直接调整mark,将读写索引设置为О即可完成缓冲区的重用。
4.1.4.6 skipBytes

在解码的时候,有时候需要丢弃非法的数据报,或者跳跃过不需要读取的字节或字节数组,此时,使用skipBytes方法就非常方便。它可以忽略指定长度的字节数组,读操作时直接跳过这些数据读取后面的可读缓冲区。

 @Overridepublic ByteBuf skipBytes(int length) {checkReadableBytes(length);readerIndex += length;return this;}

4.2 Reference 引用计数

  • 为了管理和释放资源,netty 采用了引用计数的方式,当某个对象不在被其他对象引用时,释放这个对象锁所持有的资源来优化内存使用和性能。
  • 在netty中,这个对象必须实现 ReferenceCounted 接口。
  • 引用计数的原理并不复杂: ReferenceCounted实例通常以引用计数为 1 作为开始。只要引用计数大于 0,就能保证对象不会被释放。
  • 注意:虽然释放的确切语义可能是特定于实现的,但是访问一个已经被释放的引用计数的ReferenceCounted对象,一定会抛出IllegalReferenceCountException 异常。
  • 而 AbstractReferenceCountedByteBuf 类就是处理缓存区引用计数相关的实现。

4.2.1 ReferenceCounted 类

4.2.1.1 基本概述
A reference-counted object that requires explicit deallocation.
When a new ReferenceCounted is instantiated, it starts with the reference count of 1. retain() increases the reference count, and release() decreases the reference count. If the reference count is decreased to 0, the object will be deallocated explicitly, and accessing the deallocated object will usually result in an access violation.
If an object that implements ReferenceCounted is a container of other objects that implement ReferenceCounted, the contained objects will also be released via release() when the container's reference count becomes 0.
  • 首先说明 ReferenceCounted 是一个显式要求重新分配资源的用计数对象。
  • 第二段说明了,当实例化一个新的ReferenceCounted对象时,它从引用计数1开始。retain() 方法将增加引用计数,release() 方法将减少引用计数。如果引用计数减少到0,对象将被显式释放,而访问一个被释放的对象通常会导致访问冲突。
  • 如果一个ReferenceCounted对象,它里面包含了其他的ReferenceCounted对象,那么当这个ReferenceCounted对象被释放时,它会调用它包含了的那些ReferenceCounted对象的release() 方法去释放它们。
4.2.1.2 基本方法
 // 返回此对象的引用计数。如果0 ,则表示该对象已被释放。int refCnt();// 计数器加一ReferenceCounted retain();// 按指定的increment增加引用计数。ReferenceCounted retain(int increment);// 记录此对象的当前访问位置以进行调试。如果确定该对象被泄露,该操作记录的信息将通过    ResourceLeakDetector提供给您。此方法是touch(null)的快捷方式。ReferenceCounted touch();// 记录此对象的当前访问位置以及用于调试目的的附加任意信息。如果确定该对象被泄露,该操作记录的信息将通过ResourceLeakDetector提供给您。ReferenceCounted touch(Object hint);//如果引用计数达到0 ,则将引用计数减1并释放此对象。boolean release();// 如果引用计数达到0 ,则按指定的decrement减少引用计数并释放此对象boolean release(int decrement);

4.2.2 ReferenceCountUpdater

4.2.2.1 实现原理
  • 相信很多人第一次看 ReferenceCountUpdater 实现源码时,都是一脸懵逼,感觉和我们想象中引用计数实现原理不一样啊。
  • 在我们的理解中,实现原理就是调用 retain() 方法引用计数就加一,调用release() 方法引用计数就减一,当引用计数是0的时候,就释放资源。
  • 但是netty 为了提高性能,它没有采用加法器这种实现,而是采用位运算的实现,那么它是如何工作的呢。
  • 初始时引用计数的值是 2,当调用 retain() 方法,就采用左移位运算 2 << 1 变成 4。当调用release() 方法,就采用右移位运算 4 >> 1 又变成了 2。
  • 此时你会发现,因为初始值是 2,而且每次引用的时候,都是用左移位运算(相当于乘以2),那么引用还存在的情况下,引用计数一定是一个偶数啊。
  • 当每次调用release() 方法释放时,就采用右移位运算(相当于除以2),直到到了引用计数是2 时,再次调用release() 方法,进行右移位运算,引用计数变成1,这时就应该释放对象了啊。
  • 那么我们就可以得到,引用计数是偶数表示引用还存在,引用计数是1 表示要释放对象了,这时就可以用简单地位运算就可以判断了(引用计数 & 1)。
  • (引用计数 & 1) 等于 0,那么引用计数就是偶数啊,因为偶数的最低一位是 0;当 (引用计数 & 1)不等于 0,那么引用计数就是奇数,因为奇数的最低一位是 1,而在我们上述的运算规则中,引用计数只会有一个奇数1,也就是说引用计数是奇数的话,表示这个对象被释放了。
  • 因此在ReferenceCountUpdater 实现中,它使用奇偶性来判断对象是否要释放,通过位运算代替加法器提高性能。
  • 但是你在 ReferenceCountUpdater 实现中,看到很多类似 (rawCnt == 2 || rawCnt == 4 || (rawCnt & 1) == 0) 判断。
  • 通过 (rawCnt & 1) 判断奇偶性,不就可以了么,为什么还要判断 rawCnt == 2。
    那是因为位与操作(x & y)比直接相等操作 (x == y) 操作更耗 CPU 性能,而且大部分引用计数都是 2 和 4 的数,因此 netty 为了提高性能,就先使用了相等判断,为了提高性能 netty 也是丧心病狂啊。
4.2.2.2 基本方法

AtomicIntegerFieldUpdater

   protected abstract AtomicIntegerFieldUpdater<T> updater();
  • 这是一个抽样方法,有子类实现,返回引用计数原子化更新器AtomicIntegerFieldUpdater。例如在 AbstractReferenceCountedByteBuf 的实现中。
    private static final long REFCNT_FIELD_OFFSET =ReferenceCountUpdater.getUnsafeOffset(AbstractReferenceCountedByteBuf.class, "refCnt");private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> AIF_UPDATER =AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");private static final ReferenceCountUpdater<AbstractReferenceCountedByteBuf> updater =new ReferenceCountUpdater<AbstractReferenceCountedByteBuf>() {@Overrideprotected AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater() {return AIF_UPDATER;}@Overrideprotected long unsafeOffset() {return REFCNT_FIELD_OFFSET;}};private volatile int refCnt = updater.initialValue();
  • AtomicIntegerFieldUpdater 的作用和 AtomicInteger 一样,采用 CAS的原理,原子化更新数据,防止并发冲突。
  • 至于为什么不直接使用 AtomicInteger,是因为AtomicInteger是一个对象,每个引用计数ReferenceCounted实例都产生一个AtomicInteger实例。而采用AtomicIntegerFieldUpdater 和 refCnt 组合, 每个引用计数ReferenceCounted实例共享AtomicIntegerFieldUpdater实例,和一个基本数据类型 refCnt。
  • 为什么不在 ReferenceCountUpdater 中定义refCnt 变量,那是因为引用计数是属于每个ReferenceCounted实例的而不是工具类ReferenceCountUpdater的。

unsafeOffset

protected abstract long unsafeOffset();
  • 返回引用计数 refCnt 的内存地址

initialValue()

    public final int initialValue() {return 2;}
  • 返回默认引用计数初始值是 2。

realRefCnt

    private static int realRefCnt(int rawCnt) {return rawCnt != 2 && rawCnt != 4 && (rawCnt & 1) != 0 ? 0 : rawCnt >>> 1;}

返回引用计数真实值。这里我们将引用计数分为原始值和真实值。

  1. 原始值:就是引用计数变量refCnt 的值,初始值是2,每次引用都是乘以2。
  2. 真实值: 就是引用计数象征意义上的值,那么它的初始值是1,每次引用就是加1。也就是说一般情况下,原始值是真实值的2倍。
  • 这里,(rawCnt & 1) != 0 表示是奇数,那就是表示被释放了,引用计数的真实值就是 0,所以直接返回 0;否则就使用右移位运算返回真实值。
  • 至于 rawCnt != 2 && rawCnt != 4 就是为了提高性能,因为相等判断比位与 & 操作快,当 rawCnt 是 2 或者 4 的时候,不用进行位与操作判断了,直接返回 rawCnt >>> 1 。

toLiveRealRefCnt

    private static int toLiveRealRefCnt(int rawCnt, int decrement) {if (rawCnt == 2 || rawCnt == 4 || (rawCnt & 1) == 0) {return rawCnt >>> 1;}// odd rawCnt => already deallocatedthrow new IllegalReferenceCountException(0, -decrement);}
  • 这个方法与realRefCnt(int) 功能一样,只不过当引用计数真实值是 0 的时候,本方法会直接抛出 IllegalReferenceCountException 异常,而realRefCnt(int) 是返回 0。
  • 这个方法一般都在 release 方法中被调用,因此当发现引用计数真实值是 0 ,就会抛出异常,提示正在 release 一个已经释放的对象。

nonVolatileRawCnt

    private int nonVolatileRawCnt(T instance) {// TODO: Once we compile against later versions of Java we can replace the Unsafe usage here by varhandles.final long offset = unsafeOffset();return offset != -1 ? PlatformDependent.getInt(instance, offset) : updater().get(instance);}
  • 获取引用计数变量refCnt 的值,即原始值。

refCnt

    public final int refCnt(T instance) {return realRefCnt(updater().get(instance));}

获取引用计数真实值,这个方法被 ReferenceCounted 实例的 refCnt() 方法调用。

isLiveNonVolatile

    public final boolean isLiveNonVolatile(T instance) {final long offset = unsafeOffset();final int rawCnt = offset != -1 ? PlatformDependent.getInt(instance, offset) : updater().get(instance);// The "real" ref count is > 0 if the rawCnt is even.return rawCnt == 2 || rawCnt == 4 || rawCnt == 6 || rawCnt == 8 || (rawCnt & 1) == 0;}
  • 判断对象是否还在被引用。返回true 表示这个对象还在被别人引用,返回false 表示这个对应已经被释放了。
  • (rawCnt & 1) == 0 表示是偶数,那么就还在被引用;rawCnt == 2 || rawCnt == 4 || rawCnt == 6 || rawCnt == 8 都是用来加快性能的。

setRefCnt(T instance, int refCnt) 和 resetRefCnt(T instance)

    public final void setRefCnt(T instance, int refCnt) {updater().set(instance, refCnt > 0 ? refCnt << 1 : 1); // overflow OK here}/*** Resets the reference count to 1*/public final void resetRefCnt(T instance) {updater().set(instance, initialValue());}
  • 直接设置引用计数。注意这里即使 refCnt << 1 溢出 int 最大值,也无所谓。
4.2.2.3 retain 系列方法
    public final T retain(T instance) {return retain0(instance, 1, 2);}public final T retain(T instance, int increment) {// 所有对原始值的更改是真实值更改的2倍,溢出是可以的int rawIncrement = checkPositive(increment, "increment") << 1;return retain0(instance, increment, rawIncrement);}// rawIncrement == increment << 1 rawIncrement 是increment的两倍private T retain0(T instance, final int increment, final int rawIncrement) {// 通过 updater(),原子化增加原始值,并返回改变前的值int oldRef = updater().getAndAdd(instance, rawIncrement);// 如果改变前的值oldRef是奇数,即对象已经被释放了,那么就要抛出异常。if (oldRef != 2 && oldRef != 4 && (oldRef & 1) != 0) {throw new IllegalReferenceCountException(0, increment);}// don't pass 0!if ((oldRef <= 0 && oldRef + rawIncrement >= 0)|| (oldRef >= 0 && oldRef + rawIncrement < oldRef)) {// overflow caseupdater().getAndAdd(instance, -rawIncrement);throw new IllegalReferenceCountException(realRefCnt(oldRef), increment);}return instance;}
  • 因为原始值是真实值的两倍,所以增加值 rawIncrement 也是increment 的两倍。
  • 通过 updater() 的 getAndAdd 方法进行原子化更新。
  • oldRef != 2 && oldRef != 4 && (oldRef & 1) != 0 判断,是表明有线程在本线程进行getAndAdd 方法时,先将本对象释放了,那么这里就需要抛出异常。
  • don’t pass 0! 有溢出,我们就要进行处理。
4.2.2.4 release 系列方法

release

public final boolean release(T instance) {// 返回引用计数原始值int rawCnt = nonVolatileRawCnt(instance);// 当rawCnt == 2,表示就要释放了,调用 tryFinalRelease0(instance, 2) || retryRelease0(instance, 1),// 否则调用nonFinalRelease0 方法,减少引用计数原始值return rawCnt == 2 ? tryFinalRelease0(instance, 2) || retryRelease0(instance, 1): nonFinalRelease0(instance, 1, rawCnt, toLiveRealRefCnt(rawCnt, 1));
}public final boolean release(T instance, int decrement) {// 返回引用计数原始值int rawCnt = nonVolatileRawCnt(instance);// 返回引用计数真实值int realCnt = toLiveRealRefCnt(rawCnt, checkPositive(decrement, "decrement"));// 当待减值 decrement 等于引用计数真实值,表示就要释放了,调用 tryFinalRelease0 和 retryRelease0 方法,// 否则调用nonFinalRelease0 方法,减少引用计数原始值return decrement == realCnt ? tryFinalRelease0(instance, rawCnt) || retryRelease0(instance, decrement): nonFinalRelease0(instance, decrement, rawCnt, realCnt);
}

tryFinalRelease0

private boolean tryFinalRelease0(T instance, int expectRawCnt) {return updater().compareAndSet(instance, expectRawCnt, 1); // any odd number will work
}

尝试将引用计数原始值设置为 1 ,如果设置成功那么就代表这个对象就要被释放了。
retryRelease0

private boolean retryRelease0(T instance, int decrement) {// 死循环采用 CAS 更新for (;;) {// 获取引用计数原始值 rawCnt 和 引用计数真实值 realCntint rawCnt = updater().get(instance), realCnt = toLiveRealRefCnt(rawCnt, decrement);// 待减值 decrement 等于引用计数真实值 realCnt,就尝试进行释放if (decrement == realCnt) {// 调用 tryFinalRelease0 方法进行释放,成功就跳出死循环,返回 trueif (tryFinalRelease0(instance, rawCnt)) {return true;}// 待减值 decrement 小于 引用计数真实值 realCnt} else if (decrement < realCnt) {// 对引用计数原始值更改要乘以2,即 decrement << 1,如果compareAndSet返回true,也表示设置成功,// 跳出死循环,返回 false,对象还在被引用if (updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {return false;}} else {// 待减值 decrement 大于 引用计数真实值 realCnt,那么抛出异常throw new IllegalReferenceCountException(realCnt, -decrement);}// 这有利于在高争用情况下的吞吐量Thread.yield(); }
}

利用死循环采用 CAS 更新
nonFinalRelease0

private boolean nonFinalRelease0(T instance, int decrement, int rawCnt, int realCnt) {// 先尝试一次更新if (decrement < realCnt// all changes to the raw count are 2x the "real" change - overflow is OK&& updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {return false;}// 更新不成功,就调用 retryRelease0 方法,保证更新成功return retryRelease0(instance, decrement);
}

这个方法就是先尝试一次更新,不成功继续调用 retryRelease0 去更新。

public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {private static final long REFCNT_FIELD_OFFSET =ReferenceCountUpdater.getUnsafeOffset(AbstractReferenceCountedByteBuf.class, "refCnt");private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> AIF_UPDATER =AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");private static final ReferenceCountUpdater<AbstractReferenceCountedByteBuf> updater =new ReferenceCountUpdater<AbstractReferenceCountedByteBuf>() {@Overrideprotected AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater() {return AIF_UPDATER;}@Overrideprotected long unsafeOffset() {return REFCNT_FIELD_OFFSET;}};// Value might not equal "real" reference count, all access should be via the updater@SuppressWarnings({"unused", "FieldMayBeFinal"})private volatile int refCnt = updater.initialValue();protected AbstractReferenceCountedByteBuf(int maxCapacity) {super(maxCapacity);}@Overrideboolean isAccessible() {// Try to do non-volatile read for performance as the ensureAccessible() is racy anyway and only provide// a best-effort guard.return updater.isLiveNonVolatile(this);}@Overridepublic int refCnt() {return updater.refCnt(this);}/*** An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly*/protected final void setRefCnt(int refCnt) {updater.setRefCnt(this, refCnt);}/*** An unsafe operation intended for use by a subclass that resets the reference count of the buffer to 1*/protected final void resetRefCnt() {updater.resetRefCnt(this);}@Overridepublic ByteBuf retain() {return updater.retain(this);}@Overridepublic ByteBuf retain(int increment) {return updater.retain(this, increment);}@Overridepublic ByteBuf touch() {return this;}@Overridepublic ByteBuf touch(Object hint) {return this;}@Overridepublic boolean release() {return handleRelease(updater.release(this));}@Overridepublic boolean release(int decrement) {return handleRelease(updater.release(this, decrement));}private boolean handleRelease(boolean result) {if (result) {deallocate();}return result;}/*** Called once {@link #refCnt()} is equals 0.*/protected abstract void deallocate();
}

4.3 UnpooledHeapByteBuf 源码分析

  • UnpooledHeapByteBuf是基于堆内存进行内存分配的字节缓冲区,它没有基于对象池技术实现,这就意味着每次IO 的读写都会创建一个新的UnpooledHeapByteBuf,频繁进行大块内存的分配和回收对性能会造成一定影响,但是相比于堆外内存的申请和释放,它的成本还是会低一些。
  • 相比于PooledHeapByteBuf,UnpooledHeapByteBuf的实现原理更加简单,也不容易出现内存管理方面的问题,因此在满足性能的情况下,推荐使用UnpooledHeapByteBuf。

4.3.1 常用变量与构造器

// 继承于引用计数器来管理内存的回收
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {// 字节缓冲区分配器private final ByteBufAllocator alloc; // 缓冲区byte[] array;//Netty ByteBuf 到JDK NIO ByteBuffer 的转换。private ByteBuffer tmpNioBuf;// 使用新分配的字节数组创建一个新的堆缓冲区。public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {super(maxCapacity);if (initialCapacity > maxCapacity) {throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));}this.alloc = checkNotNull(alloc, "alloc");setArray(allocateArray(initialCapacity));setIndex(0, 0);}// 使用现有字节数组创建一个新的堆缓冲区。protected UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) {super(maxCapacity);checkNotNull(alloc, "alloc");checkNotNull(initialArray, "initialArray");if (initialArray.length > maxCapacity) {throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity));}this.alloc = alloc;setArray(initialArray);setIndex(0, initialArray.length);}}

4.3.2 扩容机制

  • 方法入口首先对新容量进行合法性校验,如果大于容量上限或者小于0,则抛出IllegalArgumentException异常。
  • 判断新的容量值是否大于当前的缓冲区容量,如果大于则需要进行动态扩展,通过byte[] newArray = new byte[newCapacity]创建新的缓冲区字节数组,然后通过System.arraycopy进行内存复制,将旧的字节数组复制到新创建的字节数组中,最后调用setArray替换旧的字节数组。
  • 需要指出的是,当动态扩容完成后,需要将原来的视图 tmpNioBuf设置为空。
  • 如果新的容量小于当前的缓冲区容量不需要动态扩展,但是需要截取当前缓冲区创建一个新的子缓冲区,具体的算法如下:首先判断下读索引是否小于新的容量值,如果小于进一步判断写索引是否大于新的容量值,如果大于则将写索引设置为新的容量值(防止越界)。更新完写索引之后通过内存复制System.arraycopy将当前可读的字节数组复制到新创建的子缓冲区中。
  • 如果新的容量值小于读索引,说明没有可读的字节数组需要复制到新创建的缓冲区中,将读写索引设置为新的容量值即可。最后调用sctArray方法替换原来的字节数组。
@Overridepublic ByteBuf capacity(int newCapacity) {// 检查新容量的合法性checkNewCapacity(newCapacity);// 数组字段byte[] oldArray = array;// 旧缓存区长度int oldCapacity = oldArray.length;if (newCapacity == oldCapacity) {return this;}int bytesToCopy;if (newCapacity > oldCapacity) {bytesToCopy = oldCapacity;} else {trimIndicesToCapacity(newCapacity);bytesToCopy = newCapacity;}// 分配新空间byte[] newArray = allocateArray(newCapacity);// 将当前可读的字节数组复制到新创建的子缓冲区System.arraycopy(oldArray, 0, newArray, 0, bytesToCopy);// 替换旧字节数,并将tmpNioBuf为空setArray(newArray);freeArray(oldArray);return this;}

4.3.3 字节数组复制

 @Overridepublic ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {// 合法校验checkSrcIndex(index, length, srcIndex, src.length);System.arraycopy(src, srcIndex, array, index, length);return this;}/**
校验index 和length的值,如果它们小于0,则抛出Il1egalArgumentException,然后对两者之和进行判断,如果大于缓冲区的容量,则抛出 IndexOutOfBoundsException。srcIndex和 srcCapacity的校验与index类似,不再赘述。校验通过之后,调用System.arraycopy(src,srcIndex, array,index, length)方法进行字节数组的复制。
需要指出的是,ByteBuf 以 set和 get开头读写缓冲区的方法并不会修改读写索引。
**/protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) {checkIndex(index, length);if (checkBounds) {checkRangeBounds("srcIndex", srcIndex, length, srcCapacity);}}

4.3.4 转换成JDKByteBuffer

  • JDK自带的Warp方法将Byte[]转换成ByteBuffer对象
 public static ByteBuffer wrap(byte[] array,int offset, int length){try {return new HeapByteBuffer(array, offset, length);} catch (IllegalArgumentException x) {throw new IndexOutOfBoundsException();}}
  • UnpooledHeapByteBuf的装换方法
@Overridepublic ByteBuffer nioBuffer(int index, int length) {// 检查正确性ensureAccessible();// 调用byteBuffer方法,介绍过了,由于每次调用nioBuffer都会创建一个新的ByteBuffer,因此此处的slice方法起不到重用缓冲区内容的效果,只能保证读写索引的独立性。return ByteBuffer.wrap(array, index, length).slice();}

4.3.5 其他方法

  • isDirect方法:如果是基于堆内存实现的 ByteBuf,它返回false。
 @Overridepublic boolean isDirect() {return false;}
  • hasArray方法:由于UnpooledHeapByteBuf基于字节数组实现,所以它的返回值。
@Overridepublic boolean hasArray() {return true;}
  • array方法:由于UnpoolcdHcapBytcBuf基于孛节数组实现,所以它的返回值是内部的字节数组成员变量。
@Overridepublic byte[] array() {ensureAccessible();return array;}

4.4 PooledByteBuf内存池原理分析

4.4.1 PoolArena

  • Arena本身是指一块区域,在内存管理中,Memory Arena是指内存中的一大块连续的区域,PoolArena就是Netty的内存池实现类。
  • 为了集中管理内存的分配和释放,同时提高分配和释放内存时候的性能,很多框架和应用都会通过预先申请一大块内存,然后通过提供相应的分配和释放接口来使用内存。这样一来,对内存的管理就被集中到几个类或者函数中,由于不再频繁使用系统调用来申请和释放内存,应用或者系统的性能也会大大提高。在这种设计思路下,预先申请的那一大块内存就被称为Memory Arena。
  • 不同的框架,Memory Arena的实现不同,Netty 的 PoolArena是由多个Chunk组成的大块内存区域,而每个 Chunk 则由一个或者多个Page组成,因此,对内存的组织和管理也就主要集中在如何管理和组织Chunk 和 Page了。
abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();enum SizeClass {Small,Normal}final PooledByteBufAllocator parent;final int numSmallSubpagePools;final int directMemoryCacheAlignment;final int directMemoryCacheAlignmentMask;private final PoolSubpage<T>[] smallSubpagePools;private final PoolChunkList<T> q050;private final PoolChunkList<T> q025;private final PoolChunkList<T> q000;private final PoolChunkList<T> qInit;private final PoolChunkList<T> q075;private final PoolChunkList<T> q100;
}

4.4.2 PoolChunk

  • Chunk 主要用来组织和管理多个Page 的内存分配和释放,在 Netty中,Chunk 中的Page被构建成一棵二叉树。假设一个Chunk 由16个 Page组成,那么这些Page将会被按照图所示的形式组织起来。

  • Page的大小是4个字节,Chunk 的大小是64个字节(4×16)。整棵树有5层,第1层(也就是叶子节点所在的层)用来分配所有Page 的内存,第4层用来分配2个 Page 的内存,依次类推。
  • 每个节点都记录了自己在整个Memory Arena中的偏移地址,当一个节点代表的内存区域被分配出去之后,这个节点就会被标记为已分配,自这个节点以下的所有节点在后面的内存分配请求中都会被忽略。举例来说,当我们请求一个16字节的存储区域时,上面这个树中的第3层中的4个节点中的一个就会被标记为已分配,这就表示整个Memroy Arena中有16个字节被分配出去了,新的分配请求只能从剩下的3个节点及其子树中寻找合适的节点。
  • 对树的遍历采用深度优先的算法,但是在选择哪个子节点继续遍历时则是随机的,并不像通常的深度优先算法中那样总是访问左边的子节点。

4.4.3 PoolSubpage

  • 对于小于一个Page的内存,Netty在 Page中完成分配。每个Page 会被切分成大小相等的多个存储块,存储块的大小由第一次申请的内存块大小决定。假如一个Page是8个字节,如果第一次申请的块大小是4个字节,那么这个Page就包含2个存储块;如果第一次申请的是8个字节,那么这个Page就被分成1个存储块。
  • 一个Page只能用于分配与第一次申请时大小相同的内存,比如,一个4字节的 Page,如果第一次分配了1字节的内存,那么后面这个Page 只能继续分配1字节的内存,如果有一个申请2字节内存的请求,就需要在一个新的Page中进行分配。
  • Page中存储区域的使用状态通过一个long 数组来维护,数组中每个long的每一位表示一个块存储区域的占用情况:0表示未占用,1表示以占用。对于一个4字节的Page来说,如果这个Page用来分配1个字节的存储区域,那么long 数组中就只有一个long类型的元素,这个数值的低4位用来指示各个存储区域的占用情况。对于一个128字节的 Page来说,如果这个Page也是用来分配1个字节的存储区域,那么long 数组中就会包含2个元素,总共128位,每一位代表一个区域的占用情况。
final class PoolSubpage<T> implements PoolSubpageMetric {final PoolChunk<T> chunk;private final int pageShifts;private final int runOffset;private final int runSize;private final long[] bitmap;PoolSubpage<T> prev;PoolSubpage<T> next;boolean doNotDestroy;int elemSize;private int maxNumElems;private int bitmapLength;private int nextAvail;private int numAvail;}

无论是Chunk还是 Page,都通过状态位来标识内存是否可用,不同之处是Chunk 通过在二叉树上对节点进行标识实现,Page是通过维护块的使用状态标识来实现。

4.5 PooledDirectByteBuf

  • PooledDirectByteBuf 基于内存池实现,与UnPooledDirectByteBuf的唯一不同就是缓冲区的分配是销毁策略不同,其他功能都是等同的,也就是说,两者唯一的不同就是内存分配策略不同。

4.5.1 创建字节缓存区

从池中获取对象,而不是创建对象

 private static final ObjectPool<PooledDirectByteBuf> RECYCLER = ObjectPool.newPool(new ObjectCreator<PooledDirectByteBuf>() {@Overridepublic PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {return new PooledDirectByteBuf(handle, 0);}});static PooledDirectByteBuf newInstance(int maxCapacity) {PooledDirectByteBuf buf = RECYCLER.get();buf.reuse(maxCapacity);return buf;}

4.5.2 复制新的字节缓冲区

  • 如果使用者确实需要复制一个新的实例,与原来的PooledDirectByteBuf独立,则调用它的copy ( int index, int length)可以达到上述目标。
 @Overridepublic ByteBuf copy(int index, int length) {checkIndex(index, length);ByteBuf copy = alloc().directBuffer(length, maxCapacity());return copy.writeBytes(this, index, length);}
  • 首先对索引和长度进行合法性校验,通过之后调用PooledByteBufAllocator 分配一个新的ByteBuf,由于PooledByteBufAllocator没有实现 directBuffer方法,所以最终会调用到AbstractByteBufAllocator 的 directBuffer方法。
  protected final void checkIndex(int index, int fieldLength) {ensureAccessible();checkIndex0(index, fieldLength);}final void checkIndex0(int index, int fieldLength) {if (checkBounds) {checkRangeBounds("index", index, fieldLength, capacity());}}

《Netty权威指南》(五)ByteBuf源码分析相关推荐

  1. tensorflow入门教程(三十五)facenet源码分析之MTCNN--人脸检测及关键点检测

    # #作者:韦访 #博客:https://blog.csdn.net/rookie_wei #微信:1007895847 #添加微信的备注一下是CSDN的 #欢迎大家一起学习 # ------韦访 2 ...

  2. Nginx学习笔记(五) 源码分析内存模块内存对齐

    Nginx源码分析&内存模块 今天总结了下C语言的内存分配问题,那么就看看Nginx的内存分配相关模型的具体实现.还有内存对齐的内容~~不懂的可以看看~~ src/os/unix/Ngx_al ...

  3. 了不起的 Webpack HMR 学习指南(含源码分析)

    学习时间:2020.06.14 学习章节:<Webpack HMR 原理解析> 一.HMR 介绍 Hot Module Replacement(以下简称:HMR 模块热替换)是 Webpa ...

  4. Netty学习笔记 - 1 (带源码分析部分)

    2021年12月 北京 xxd 一.Netty是什么 Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github 上的独立项目. Netty 是一个异步的.基于事件驱动的网络应用 ...

  5. netty案例,netty4.1源码分析篇五《一行简单的writeAndFlush都做了哪些事》

    前言介绍 对于使用netty的小伙伴来说,ctx.writeAndFlush()再熟悉不过了,它可以将我们的消息发送出去.那么它都执行了那些行为呢,是怎么将消息发送出去的呢. I/O Requestv ...

  6. 并发编程专题五-AbstractQueuedSynchronizer源码分析

    PS:外号鸽子王不是白来的,鸽了好几天,也是因为比较忙,时间太少了,这篇东西有点多,需要慢慢消化.不知不觉居然写了4个多小时.... 一.什么是AQS aqs是AbstractQueuedSynchr ...

  7. Netty工作笔记0044---Netty案例源码分析

    技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 来看看前面写的案例 可以看到这个bossgroup还有workergroup的底层代码 Multi ...

  8. Netty(九)——ByteBuf源码之析

    ByteBuf在Netty占据着中重要的位置,上篇<Netty--ByteBuf功能之说>讲了ByteBuf的工作原理和重要功能介绍.这篇从源码的角度来看ByteBuf.首先,来看一下主要 ...

  9. 《Netty权威指南》

    <Netty权威指南> 基本信息 作者: 李林锋 出版社:电子工业出版社 ISBN:9787121233432 上架时间:2014-5-29 出版日期:2014 年6月 开本:16开 页码 ...

  10. [201504][Netty 权威指南][第2版][李林锋][著]

    [201504][Netty 权威指南][第2版][李林锋][著] https://github.com/wuyinxian124/nettybook2 基础篇 走进 Java NIO 第 1 章 J ...

最新文章

  1. 抓取apache2的进程pid
  2. python常见错误-Python错误及异常总结汇总
  3. modernizr.js的介绍和使用
  4. const char * 类型的实参与 char * 类型的形参不兼容_4 种 C++ 强制类型转换,你都清楚吗?...
  5. js生日计算年龄_你知道用EXCEL可以从身份证中提取生日、性别、年龄、生肖吗?...
  6. Netty工作笔记0040---Netty入门--服务端1
  7. 【高并发解决方案】1、高并发解决方案汇总
  8. 很有趣的一道题:找出有毒的瓶子
  9. 典型行业大数据应用和安全风险和解决方案
  10. 安徽太极计算机 刘建春,阅读理解的检测作业
  11. 如何用Vue开发前端和网站
  12. 后台系统登录一般流程
  13. 【Linux】Linux操作的一些基本指令
  14. 英语基础不好可以学会编程吗?
  15. 反调试/反汇编技术、TEB/PEB部分说明
  16. 2020-02-26
  17. 复星医药遭员工举报:比长生生物更恶劣 多次遭美国FDA警告
  18. 【量亿数据-量化交易学习】均线系统
  19. 互联网大佬爱情故事~
  20. shtml学习笔记 SSI 指令 语法 详细说明

热门文章

  1. flash CS6 导入音频不成功的问题
  2. OPPOR7Splus_官方线刷包_救砖包_解账户锁
  3. 计算机快速换界面,老板来了?这些好用的Windows快捷键让你一秒切换操作界面!-页面设置快捷键...
  4. 如何安装虚拟光驱大学计算机考试,用虚拟光驱安装win10专业版的方法
  5. 计算机技术排除故障网站有哪些,电脑技术交流之常见故障排除【详解】
  6. 苹果13可以用无线充电宝吗?苹果专用无线充电宝推荐
  7. 时间序列的平稳性检验方法
  8. 易语言怎么查看服务器文件,易语言文件传输查看进度
  9. 《duilib入门到精通》- duilib下载与编译(duilib视频教程)
  10. 软件安全(开发模型、需求分析、测试)总结