Netty的队列有何不一样

其实呢,追踪一下常用的 Spring 等框架,会发现正常运转的情况下,一个线程最多也就三四十个 ThreadLocal 变量,那么,Netty 为何还要大费周章搞一个 FastThreadLocal 呢?

这是由于 Netty 的使用场景导致的,不管是对象池还是内存池,亦或者是前面讲到的请求处理的过程,都大量使用了线程本地变量,且操作频繁,而 Java 原生的 ThreadLocal 使用的是线性探测法实现的哈希表,使得哈希冲突的概率太大且解决冲突的方式也不友好,且解决冲突之后更容易引起哈希冲突,所以,Netty 必须定义一个全新的 ThreadLocal 用来存储本地变量,
简单点说,就是 Java 原生的 ThreadLocal 太慢了,无法应对 Netty 这种多缓存高频率的场景。

上面我们提到了 “场景” 两个字,其实,在 Netty 中,很多地方都针对特定的场景使用了特定的技术,比如,我们今天要说的一揽子队列 ——MpscArrayQueue、MpscChunkedArrayQueue、MpscUnboundedArrayQueue、MpscAtomicArrayQueue、MpscGrowableAtomicArrayQueue、MpscUnboundedAtomicArrayQueue 等。

可以发现,这些队列都有统一的前缀 Mpsc-,它是什么意思呢?
这些队列又是使用在什么样的场景呢?
相比于 Java 原生的队列,这些队列又有哪些好处呢?
它们又是怎么实现的呢?

Java 原生队列回顾

1 非并发安全的队列

1 LinkedList,没错,你没看错,LInkedList 确实实现了 Queue 接口,可以把它当作一个队列来使用;
2 PriorityQueue,优先级队列,使用 “堆” 这种数据结构实现的队列,主要用于堆排序、中位数、99% 位数等场景;

2 阻塞队列

ArrayBlockingQueue,最简单的阻塞队列,使用数组实现,有界,使用一个 ReentrantLock 及两个 Condition 控制并发安全,效率低下;

LinkedBlockingQueue,使用链表实现,有界或无界,使用两个 ReentrantLock 分别控制入队和出队,效率相对 ArrayBlockingQueue 要高一些;

SynchronizedQueue,俗称无缓冲队列,里面不存储任何元素,所有入队的元素都移交给另一个线程来处理,如果放入元素时没有线程来消费,那么,调用者线程会阻塞;同样地,如果取元素时没有生产者放入元素,那么消费线程也会阻塞;

PriorityBlockingQueue,优先级队列的阻塞模式,可在多线程环境中用于堆排序、中位数、99% 位数等场景;

LinkedTransferQueue,这是个强大的阻塞队列,它使用了一种叫作 “双重队列” 的数据结构,而且它相当于是 LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue 三者的集合体,且比它们更高效;

DelayQueue,延时队列,它在优先级队列的基础上加入了 “延时” 的概念,出队时,如果堆顶的元素还没有到期,是不会出队的,主要运用在定时任务的场景中,比如,Java 的定时任务线程池 ScheduledThreadPoolExecutor,不过它是自己又实现了一遍延时队列,叫作 DelayedWorkQueue,而没有使用现成的 DelayQueue。

3 并发安全的队列

ConcurrentLinkedQueue,它是并发安全的队列却不是阻塞队列,内部使用 自旋 + CAS 实现,是一种无锁队列,但是它无法使用在线程池中。

阻塞队列一定是并发安全的队列,关于以上所有队列的源码分析,可以参考文末链接解锁。

MpscArrayQueue
如果把避免伪共享添加的这些字段去掉,那么,MpscArrayQueue 就只剩下这么几个字段了:

public class FakeMpscArrayQueue<E> {// 掩码,用来计算数组下标,加1就成了容量protected final long mask;// 存储数据的环形数组protected final E[] buffer;// 生产者索引,有volatileprivate volatile long producerIndex;// 生产者索引的最大值,有volatileprivate volatile long producerLimit;// 消费者索引,无volatileprotected long consumerIndex;}

可以发现,producerIndex 和 producerLimit 加了 volatile,而 consumerIndex 却没有,这是为什么呢?

请记住我们的场景是 MPSC,多生产者单消费者,所以,生产者的索引修改必须立马对其它线程可见,而只有一个消费者,它并不需要对别的线程立马可见,当然,生产者在特定情况下也是需要 consumerIndex 的最新值的,比如,环形数组的头性相接了,它是怎么实现的呢?让我们跟着源码来一起学习吧。

源码剖析
调试用例
这一节的用例就比较好写了,因为 MpscArrayQueue 实现了 Queue 接口,所以,我们只要按其它的队列一样来写用例就可以了:

public class MpscArrayQueueTest {public static final MpscArrayQueue<String> QUEUE = new MpscArrayQueue<>(5);public static void main(String[] args) {// 入队,如果队列满了则会抛出异常QUEUE.add("1");// 入队,返回是否成功QUEUE.offer("2");QUEUE.offer("3");QUEUE.offer("4");// 存储了多少元素System.out.println("队列大小:" + QUEUE.size());// 容量,可以存储多少元素,会按2次方对齐,所以这里为8System.out.println("队列容量" + QUEUE.capacity());// 出队,如果队列为空则会抛出异常System.out.println("出队:" + QUEUE.remove());// 出队,如果队列为空返回nullSystem.out.println("出队:" + QUEUE.poll());// 查看队列头元素,如果队列为空则会抛出异常System.out.println("查看队列头元素:" + QUEUE.element());// 查看队列头元素,如果队列为空则返回nullSystem.out.println("查看队列头元素:" + QUEUE.peek());}
}

接口 Queue 中对于入队、出队、查看队首元素各定义了两种方法,一类是抛出异常,一类是返回特定值:
抛出异常 返回特定值
入队 add(e) offer(e)
出队 remove() poll()
查看队首元素 element() peek()
其中,
抛出异常的方法最终也还是调用的返回特定值的方法,
而查看队首元素跟出队方法是比较类似的,
所以,这里我们主要看 offer (e)、poll () 这两个方法的源码实现。

入队 offer (e)
好了,让我们先来看看入队方法 offer (e) 的实现,在 QUEUE.offer(“2”); 处打一个断点,此时,已经入队一个元素了,跟踪进去:

// 入队
@Override
public boolean offer(final E e)
{// 空值检查if (null == e){throw new NullPointerException();}// 掩码final long mask = this.mask;// 生产者的最大的索引值,初始时为传入的容量,即5long producerLimit = lvProducerLimit(); // LoadLoadlong pIndex;do{// 生产者索引pIndex = lvProducerIndex(); // LoadLoad// 如果生产者索引达到了最大值,防止追尾if (pIndex >= producerLimit){// 消费者索引,以volatile的形式获取,保证获取的是最新的值final long cIndex = lvConsumerIndex(); // LoadLoad// 修改为当前消费者的索引加上数组的大小producerLimit = cIndex + mask + 1;// 如果依然达到了最大值,则返回false,表示队列满了,再放元素就追尾了if (pIndex >= producerLimit){return false; // FULL :(}else{// 否则更新最大索引为新值soProducerLimit(producerLimit);}}}// CAS更新生产者索引,更新成功了则跳出循环,说明数组中这个下标被当前这个生产者占有了// 此时即使更新索引成功了,数组中依然还没有放入元素// 如果更新失败,说明其它生产者(线程)先占用了这个位置,重新来过while (!casProducerIndex(pIndex, pIndex + 1));// 计算这个索引在数组中的下标偏移量final long offset = calcElementOffset(pIndex, mask);// 将元素放到这个位置soElement(buffer, offset, e); // StoreStore// 入队成功return true; // AWESOME :)
}
// lv=load valatile
// 读取producerLimit
protected final long lvProducerLimit()
{   // producerLimit本身就是volatile修饰的// 所以不用像下面的consumerIndex一样通过UNSAFE.getLongVolatile()一样来读取return producerLimit;
}
// 读取producerIndex
public final long lvProducerIndex()
{// producerIndex本身就用volatile修饰了return producerIndex;
}
// 读取consumerIndex
public final long lvConsumerIndex()
{   // 以volatile的形式加载consumerIndex// 此时,可以把consumerIndex想像成前面加了volatile// 会从内存读取最新的值return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET);
}
// so=save ordered
// 保存producerLimit
protected final void soProducerLimit(long newValue)
{// 这个方法会加StoreStore屏障// 会把最新值直接更新到主内存中,但其它线程不会立即可见// 其它线程需要使用volatile语义才能读取到最新值// 这相当于是一种延时更新的方法,比volatile语义的性能要高一些UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, newValue);
}
// 修改数组对应偏移量的值
public static <E> void soElement(E[] buffer, long offset, E e)
{// 与上面同样的方法,比使用下标更新数组元素有两个优势// 1. 使用Unsafe操作内存更新更快// 2. 使用putOrderedObject会直接更新到主内存,而使用下标不会立马更新到主内存UNSAFE.putOrderedObject(buffer, offset, e);
}
// CAS更新producerIndex
protected final boolean casProducerIndex(long expect, long newValue)
{// CAS更新return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}

这段入队的方法看似简单,实则蕴含大量的底层知识和优化技巧,让我们来看几个问题:

为什么需要 producerLimit,拿 producerIndex 与 consumerIndex 直接比较行不行?
很多方法后面写了 LoadLoad、StoreStore,它们是什么意思?
Unsafe 的新方法 putOrderedObject () 和 getLongVolatile ()?

接着看看第二个问题:LoadLoad、StoreStore 是什么意思?

可以把 LoadLoad 看成是读屏障,表示每次都从主内存读取最新值,StoreStore 看成是写屏障,每次都把最新值写入到主内存。如果一个线程使用 StoreStore 屏障把最新值写入主内存,另一个线程只需要使用 LoadLoad 屏障就可以读取到最新值了,它们俩往往结合着来使用。

最后一个问题:Unsafe 的新方法 putOrderedObject () 和 getLongVolatile ()?
其实,在 Unsafe 中有五组相似的方法:

1 putOrderedXxx (),使用 StoreStore 屏障,会把最新值更新到主内存,但不会立即失效其它缓存行中的数据,是一种延时更新机制;
2 putXxxVolatile (),使用 StoreLoad 屏障,会把最新值更新到主内存,同时会把其它缓存行的数据失效,或者说会刷新其它缓存行的数据;
3 putXxx (obj, offset),不使用任何屏障,更新对象对应偏移量的值;
4 getXxxVolatile (),使用 LoadLoad 屏障,会从主内存获取最新值;
5 getXxx,不使用任何屏障,读取对象对应偏移量的值;

从性能方面来说的话,putOrderedXxx () 用得好的话,性能会比 putXxxVolatile () 高一些,但是,如果用的不好的话,可能会出现并发安全的问题,所以,个人请谨慎使用,即使使用了,也要做好并发安全的测试。
OK,基础知识也补齐了,如果还看不懂,不要紧,先跳过去,我们再来看看出队方法,等看完出队方法了,我们使用脑补法来模拟一下入队出队的实现。

出队 poll ()
同样地,在 System.out.println(“出队:” + QUEUE.poll()); 这行打一个断点,并跟踪进去:

// 出队
@Override
public E poll()
{// 读取consumerIndex的值,注意这里是lp不是lvfinal long cIndex = lpConsumerIndex();// 计算在数组中的偏移量final long offset = calcElementOffset(cIndex);// 存储元素的数组final E[] buffer = this.buffer;// 取元素,前面通过StoreStore写入的,这里通过LoadLoad取出来的就是最新值E e = lvElement(buffer, offset); // LoadLoadif (null == e){// 有一种例外,还记得上面入队的时候吗?// 是先更新了producerIndex的值,再把更新元素到数组中的。// 如果在两者之间,进行了消费,则此处是无法获取到元素的// 所以需要进入下面的判断// 判断consumerIndex是否等于producerIndex// 只要两则不相等,就可以再消费元素if (cIndex != lvProducerIndex()){// 使用死循环来取元素,直到取到为止do{e = lvElement(buffer, offset);}while (e == null);}else{// 如果两个索引相等了,说明没有元素了,返回nullreturn null;}}// 更新取出的位置元素为null,注意是sp,不是sospElement(buffer, offset, null);// 修改consumerIndex的索引为新值,使用StoreStore屏障,直接更新到主内存soConsumerIndex(cIndex + 1); // StoreStore// 返回出队的元素return e;
}
// lp=load plain,简单读取
protected final long lpConsumerIndex()
{return consumerIndex;
}
// sp=store plain,简单存储
public static <E> void spElement(E[] buffer, long offset, E e)
{UNSAFE.putObject(buffer, offset, e);
}

时刻要记住消费者只有一个,所以,消费端完全不需要使用任何锁或者 CAS 操作,但是,生产者端是有可能读取 consumerIndex 的值的,所以,使用 StoreStore 屏障修改它的值即可。

还有一种例外,是生产者端先更新 producerIndex,再更新数组元素,这里使用死循环不断读取直到读取到为止。

入队出队的代码都分析完毕了,可以看到,整体的逻辑非常少,我算了下,入队出队两者加一起的主体逻辑都不到 100 行,但是,里面蕴含了大量的底层知识,为了更好地理解这种队列,我决定使用脑补法来模拟一下入队出队的过程。

脑补入队出队过程(脑补法):

为了简单点,我们假设队列的长度为 4,一共入队 5 个元素,并出队 2 个元素,2 个生产者:

带 Atomic 的类,是表示在 Netty 无法使用 Unsafe 的情况下使用 Atomic 原子类来做替代方案。

Netty的队列有何不一样相关推荐

  1. 原理剖析(第 012 篇)Netty之无锁队列MpscUnboundedArrayQueue原理分析

    原理剖析(第 012 篇)Netty之无锁队列MpscUnboundedArrayQueue原理分析 - 一.大致介绍 1.了解过netty原理的童鞋,其实应该知道工作线程组的每个子线程都维护了一个任 ...

  2. 原理剖析-Netty之无锁队列

    一.大致介绍 1.了解过netty原理的童鞋,其实应该知道工作线程组的每个子线程都维护了一个任务队列: 2.细心的童鞋会发现netty的队列是重写了队列的实现方法,覆盖了父类中的LinkedBlock ...

  3. Netty防止内存泄漏措施

    谨以此文献给李林锋即将新生的爱女. 1.  背景 1.1 直播平台内存泄漏问题 某直播平台,一些网红的直播间在业务高峰期,会有 10W+ 的粉丝接入,如果瞬间发生大量客户端连接掉线.或者一些客户端网络 ...

  4. 牛皮!阿里资深架构师耗费三年终于把Netty进阶之路PDF整理完了,读完我彻底跪了

    前言 Netty将Java NIO接口封装,提供了全异步编程方式,是各大Java项目的网络应用开发必备神器. 在本文中,将Netty学习者咨询的相关问题,进行了归纳和总结,以问题案例做牵引,通过对案例 ...

  5. 发送队列积压导致内存泄漏

    导致Netty内存泄漏的原因很多,例如,使用内存池方式创建的对象忘记释放,或者系统处理压力过大导致发送队列积压. 尽管Netty 采用了NIO非阻塞通信,I/O处理往往不会成为业务瓶颈,但是如果客户端 ...

  6. 深入解析 Apache BookKeeper 系列:第四篇—背压

    本文由 StreamNative 组织翻译自<Apache BookKeeper Internals - Part 4 - Back Pressure>,作者 Jack Vanlightl ...

  7. Java后端程序员技术栈

    Java后端程序员技术栈 它可以是知识提纲,便于快速复习与查阅 它也可以是你的学习规划,帮助小白快速了解学Java要走的路(当然你也可以选择搭配我的学习路线一起享用!) 相关链接: <gitee ...

  8. Pulsar和Kafka基准测试:Pulsar性能精准解析(完整版)

    关于 Apache Pulsar Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息.存储.轻量化函数式计算为一体,采用计算与存储分离架构设计,支 ...

  9. Netty技术细节源码分析-MpscLinkedQueue队列原理分析

    本文的github地址:点此 该文所涉及的netty源码版本为4.1.6. MpscLinkedQueue是什么 在Netty的核心中的核心成员NioEventLoop中,其中任务队列的实现taskQ ...

最新文章

  1. 重载函数的调用匹配规则
  2. Tableau必知必会之用 Page 功能创建你的动态视图
  3. 使用 Visual Studio 对源代码文件进行哈希处理以确保文件完整性
  4. 一直在构建工作空间_智能工作空间让Dropbox拥有无限扩展潜力
  5. rust程序设计语言第二版_C语言程序设计(山东联盟青岛大学版)
  6. csu 1577 Dice Game (博弈)
  7. 单元格 编辑 获取_Excel批量导入图片,还能一键将图片固定到单元格!这是什么操作.........
  8. 苹果U盘格式化了怎么恢复
  9. system(“mode con cols=40 lines=15“)参数活起来
  10. MySQL练习题及答案
  11. 分数阶混沌系统李雅普指数和分岔图
  12. 奇怪的小鸭子也增加了
  13. python列表输出学生姓名学号链表_建立一个链表,记录学生的姓名,学号和成绩,
  14. static修饰的特点
  15. easyUI tree 自定义图标
  16. “平衡小车之家”家的STM32F103最小系统源代码分享
  17. 利用51单片机实现与RS485通讯,接收数据
  18. 【ZYNQ】Petalinux 编译工程报错
  19. 数据结构:一元多项式(线性表)
  20. 图像处理45-grabCut图像分割

热门文章

  1. 组件:参数验证、组件:事件传递
  2. BitMap-BitSet(JDK1.8)基本使用入门
  3. java环境配置 Windows10
  4. python开发之路目录
  5. SSM-springMvc配置文件
  6. 北京开源人linux运维实战
  7. JavaScript跨域问题分析与总结_直来直往_百度空间
  8. (转)The POM for 0.0.1-SNAPSHOT is missing, no dependency informat
  9. @Profile注解与@Conditional注解
  10. Enum枚举类|注解Annotation