目录

一、什么是Disruptor

二、Disruptor的设计方案

三、Disruptor实现特征

四、Disruptor实现生产者-消费者模式

4.1 依赖

4.2 声明Event

4.3 创建EventFactory

4.4 消费者

4.5 生产者

4.6 调用

五、Disryptor的核心概念

5.1 RingBuffer

5.1.1 什么是RingBuffer

5.1.2 优点

5.1.3 底层实现

5.2 Sequence

5.3 Sequencer

5.4 SequenceBarrier

5.5 WaitStrategy

5.6 Event

5.7 EventProcessor

5.8 EventHandler

5.9 Producer

5.10 WorkProcessor

5.11 WorkerPool

5.12 LifecycleAware

六、分类讨论

6.1 一个生产者

6.2 多个生产者

6.2.1 读数据

6.2.2 写数据

七、等待策略

7.1 生产者的等待策略

7.2 消费者的等待策略


一、什么是Disruptor

可以简单理解为一种高效的"生产者-消费者"模型,性能远远高于传统的BlockingQueue容器。在JDK的多线程与并发库一文中, 提到了BlockingQueue实现了生产者-消费者模型。BlockingQueue是基于锁实现的, 而锁的效率通常较低。没有使用CAS机制实现的生产者-消费者。Disruptor使用观察者模式, 主动将消息发送给消费者, 而不是等消费者从队列中取;在无锁的情况下, 实现queue(环形, RingBuffer)的并发操作, 性能远高于BlockingQueue。

二、Disruptor的设计方案

Disruptor通过以下设计来解决队列速度慢的问题:

环形数组结构:为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。

元素位置定位:数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

无锁设计:每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

三、Disruptor实现特征

实现低延迟的关键细节就是在Disruptor中利用无锁的算法,所有内存的可见性和正确性都是利用内存屏障或者CAS操作。使用CAS来保证多线程安全,与大部分并发队列使用的锁相比,CAS显然要快很多。CAS是CPU级别的指令,更加轻量,不必像锁一样需要操作系统提供支持,所以每次调用不需要在用户态与内核态之间切换,也不需要上下文切换。

只有一个用例中锁是必须的,那就是BlockingWaitStrategy(阻塞等待策略),唯一的实现方法就是使用Condition实现消费者在新事件到来前等待。许多低延迟系统使用忙等待去避免Condition的抖动,然而在系统忙等待的操作中,性能可能会显著降低,尤其是在CPU资源严重受限的情况下,例如虚拟环境下的WEB服务器。

四、Disruptor实现生产者-消费者模式

4.1 依赖

        <dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version></dependency>

4.2 声明Event

声明一个event来包含需要传递的数据

package com.thread.disruptor;/*** @Author: 98050* @Time: 2018-12-19 16:19* @Feature: 生产者与消费者传递的数据*/
public class LongEvent {private Long value;public Long getValue() {return value;}public void setValue(Long value) {this.value = value;}
}

4.3 创建EventFactory

通过EventFactory来实例化Event对象:

package com.thread.disruptor;import com.lmax.disruptor.EventFactory;/*** @Author: 98050* @Time: 2018-12-19 16:22* @Feature: 实例化LongEvent*/
public class LongEventFactory implements EventFactory<LongEvent> {public LongEvent newInstance() {return new LongEvent();}
}

4.4 消费者

事件消费者,也就是一个事件处理器。这个事件处理器简单地把事件中存储的数据打印到终端:

package com.thread.disruptor;import com.lmax.disruptor.EventHandler;/*** @Author: 98050* @Time: 2018-12-19 16:23* @Feature:  相当于消费者,获取生产者推送过来的消息*/
public class LongEventHandler implements EventHandler<LongEvent> {public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {System.out.println("消费者:"+longEvent.getValue());}
}

4.5 生产者

package com.thread.disruptor;import com.lmax.disruptor.RingBuffer;import java.nio.ByteBuffer;/*** @Author: 98050* @Time: 2018-12-19 16:27* @Feature:*/
public class LongEventProducer {public final RingBuffer<LongEvent> ringBuffer;public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void onData(ByteBuffer byteBuffer){//1.获取ringBuffer的下标位置long sequence = ringBuffer.next();Long data = null;//2.取出ringBuffer中的空位置LongEvent longEvent = ringBuffer.get(sequence);//3.然后赋值data = byteBuffer.getLong(0);longEvent.setValue(data);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("生产者准备发送数据");//4.发送数据ringBuffer.publish(sequence);}}
}

4.6 调用

package com.thread.disruptor;import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @Author: 98050* @Time: 2018-12-19 16:34* @Feature:*/
public class DisruptorMain {public static void main(String[] args) {//1.创建一个可缓存的线程 提供线程来出发Consumer 的事件处理ExecutorService executorService = Executors.newCachedThreadPool();//2.创建Event工厂EventFactory<LongEvent> eventEventFactory = new LongEventFactory();//3.设置ringBuffer大小int ringBufferSize = 1024 * 1024;//4.创建Disruptor,单生产者模式,消费者等待策略为YieldingWaitStrategyDisruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventEventFactory, ringBufferSize, executorService, ProducerType.SINGLE,new YieldingWaitStrategy());//5.注册消费者disruptor.handleEventsWith(new LongEventHandler());//可以配置多个消费者,一个生产者 默认重复消费,配置分组//6.启动Disruptordisruptor.start();//7.创建RingBuffer容器RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();//8.创建生产者LongEventProducer producer = new LongEventProducer(ringBuffer);//9.指定缓冲区的大小ByteBuffer byteBuffer = ByteBuffer.allocate(8);for (int i = 0; i < 100; i++) {//10.将i放在第0个位置byteBuffer.putLong(0,i);producer.onData(byteBuffer);}disruptor.shutdown();executorService.shutdown();}
}

五、Disryptor的核心概念

5.1 RingBuffer

被看做Disruptor最主要的组件,然而从3.0开始RingBuffer仅仅负责存储和更新再Disruptor中流通的数据。对一些特殊的使用场景能够被用户(使用其他数据结构)完全替代。

5.1.1 什么是RingBuffer

相当于一个循环队列(首尾相接的环),可以把它用做在不同上下文(线程)间传递数据的buffer。RingBuffer拥有一个序号,这个序号指向数组中下一个可用的元素。要找到数组中当前序号指向的元素,可以通过mod操作。

5.1.2 优点

之所以采用RingBuffer这种数据结构,是因为它在可靠消息传递方面有很好的性能。这就够了,不过它还有一些其他的优点。

首先,因为它是数组,所以要比链表快,而且有一个容易预测的访问模式。(数组内元素的内存地址的连续性存储的)。这是对CPU缓存友好的—也就是说,在硬件级别,数组中的元素是会被预加载的,因此在RingBuffer当中,cpu无需时不时去主存加载数组中的下一个元素。(因为只要一个元素被加载到缓存行,其他相邻的几个元素也会被加载进同一个缓存行)

其次,你可以为数组预先分配内存,使得数组对象一直存在(除非程序终止)。这就意味着不需要花大量的时间用于垃圾回收。此外,不像链表那样,需要为每一个添加到其上面的对象创造节点对象—对应的,当删除节点时,需要执行相应的内存清理操作。

5.1.3 底层实现

RingBuffer是一个首尾相连的环形数组,所谓首尾相连,是指当RingBuffer上的指针越过数组是上界后,继续从数组头开始遍历。因此,RingBuffer中至少有一个指针,来表示RingBuffer中的操作位置。另外,指针的自增操作需要做并发控制,Disruptor使用CAS的乐观并发控制来保证指针自增的原子性。

Disruptor中的RingBuffer上只有一个指针,表示当前RingBuffer上消息写到了哪里,此外,每个消费者会维护一个sequence表示自己在RingBuffer上读到哪里,从这个角度讲,Disruptor中的RingBuffer上实际有消费者数+1个指针。由于我们要实现的是一个单消息单消费的阻塞队列,只要维护一个读指针(对应消费者)和一个写指针(对应生产者)即可,无论哪个指针,每次读写操作后都自增一次,一旦越界,即从数组头开始继续读写。

5.2 Sequence

Disruptor使用Sequence来表示一个特殊组件处理的序号。和Disruptor一样,每一个消费者(EventProcessor)都维持着一个Sequence。大部分的并发代码依赖这些Sequence值得运转,因此Sequence支持多种当前为AtomicLong类的特性。

5.3 Sequencer

这是Disruptor真正的核心,此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

5.4 SequenceBarrier

由Sequencer生成,并且包含了已经发布的Sequence的引用,这些Sequence源于Sequencer和一些独立的消费者的Sequence。它包含了决定是否有供消费者消费的Event的逻辑。用来权衡当消费者无法从RingBuffer里面获取事件时的处理策略。(例如:当生产者太慢,消费者太快,会导致消费者获取不到新的事件会根据该策略进行处理,默认会堵塞)

5.5 WaitStrategy

决定一个消费者将如何等待生产者将Event置入Disruptor的策略。用来权衡当生产者无法将新的事件放进RingBuffer时的处理策略。(例如:当生产者太快,消费者太慢,会导致生产者获取不到新的事件槽来插入新事件,则会根据该策略进行处理,默认会堵塞)

5.6 Event

从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event,因为它完全是由用户定义的。

5.7 EventProcessor

主要事件循环,处理Disruptor中的Event,并且拥有消费者的Sequence。它有一个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象。

5.8 EventHandler

由用户实现并且代表了Disruptor中的一个消费者的接口。

5.9 Producer

由用户实现,它调用RingBuffer来插入事件(Event),在Disruptor中没有相应的实现代码,由用户实现。

5.10 WorkProcessor

确保每个sequence只被一个processor消费,在同一个WorkPool中的处理多个WorkProcessor不会消费同样的sequence。

5.11 WorkerPool

一个WorkProcessor池,其中WorkProcessor将消费Sequence,所以任务可以在实现WorkHandler接口的worker之间移交

5.12 LifecycleAware

当BatchEventProcessor启动和停止时,于实现这个接口用于接收通知。

六、分类讨论

6.1 一个生产者

生产者单线程写数据的流程比较简单:

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;
  3. 若是返回的正确,则生产者开始写入元素。

6.2 多个生产者

多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

6.2.1 读数据

生产者多线程写入的情况会复杂很多:

  1. 申请读取到序号n;
  2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
  3. 消费者读取元素。

如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。

读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。

然后,消费者读取下标从3到6共计4个元素。

6.2.2 写数据

多个生产者写入的时候:

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
  3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。

如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。

Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。

防止不同生产者对同一段空间写入的代码,如下所示:

public long tryNext(int n) throws InsufficientCapacityException
{if (n < 1){throw new IllegalArgumentException("n must be > 0");}long current;long next;do{current = cursor.get();next = current + n;if (!hasAvailableCapacity(gatingSequences, n, current)){throw InsufficientCapacityException.INSTANCE;}}while (!cursor.compareAndSet(current, next));return next;
}

通过do/while循环的条件cursor.compareAndSet(current, next),来判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,该函数会返回失败,While循环重新执行,申请写入空间。——自旋锁!!!!

七、等待策略

7.1 生产者的等待策略

休眠一秒:

LockSupport.parkNanos(1);

7.2 消费者的等待策略

名称 措施 适用场景
BlockingWaitStrategy 加锁 CPU资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU资源之间有很好的折中。延迟比较均匀

并发框架——Distruptor相关推荐

  1. java并发框架支持锁包括,tip/面试题_并发与多线程.md at master · 171437912/tip · GitHub...

    01. java用()机制实现了进程之间的同步执行 A. 监视器 B. 虚拟机 C. 多个CPU D. 异步调用 正解: A 解析: 监视器机制即锁机制 02. 线程安全的map在JDK 1.5及其更 ...

  2. java的并发框架_java并发框架有哪些

    展开全部 Java并发框架java.util.concurrent是JDK5中引入到标准库中的(采用的32313133353236313431303231363533e78988e69d8331333 ...

  3. Java 并发框架全览,这个牛逼!

    来自:唐尤华 https://dzone.com/articles/a-birds-eye-view-on-java-concurrency-frameworks-1 1. 为什么要写这篇文章 几年前 ...

  4. 来,带你鸟瞰 Java 中的并发框架!

    来自 ImportNew,作者:唐尤华 https://dzone.com/articles/a-birds-eye-view-on-java-concurrency-frameworks-1 1. ...

  5. Disruptor并发框架--学习笔记

    Disruptor并发框架简介 Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在J ...

  6. 互联网java常用框架_来,带你鸟瞰 Java 中4款常用的并发框架!

    1. 为什么要写这篇文章 几年前 NoSQL 开始流行的时候,像其他团队一样,我们的团队也热衷于令人兴奋的新东西,并且计划替换一个应用程序的数据库. 但是,当深入实现细节时,我们想起了一位智者曾经说过 ...

  7. java并发框架支持锁包括,jdk1.8锁

    JDK1.8有什么锁?_李广进的博客-CSDN博客 2020年4月23日 18.排他锁(不包含),X锁,若事务T对数据对象A加上x锁,则只允许T读取和修改A,其他任何事务都不能再对A加任何类型的锁,直 ...

  8. Python Gevent – 高性能的 Python 并发框架

    From:http://www.xuebuyuan.com/1604603.html Gevent 指南(英文):http://sdiehl.github.io/gevent-tutorial Gev ...

  9. java 无锁框架_高性能无锁并发框架 Disruptor,太强了!

    Java技术栈 www.javastack.cn 关注优质文章 Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操 ...

  10. object转class_从零并发框架(三)异步转同步注解+字节码增强代理实现

    序言 上一节我们学习了异步查询转同步的 7 种实现方式,今天我们就来学习一下,如何对其进行封装,使其成为一个更加便于使用的工具. 思维导图如下: 异步转同步字节码增强 拓展阅读 java 手写并发框架 ...

最新文章

  1. .net core项目启动时报_未处理Socket异常(以一种访问权限不允许的方式做了一个访问套接字的尝试。)...
  2. 直接获取submission结果
  3. 是vans_硬核复刻,就服VANS棋盘格
  4. [渝粤教育] 西南科技大学 高速公路 在线考试复习资料
  5. 同学, 你的板砖呢?
  6. 使用CSVDE批量导入命令/出口AD用户
  7. day27-python并发编程之多进程
  8. 特斯拉被踢出致命车祸调查组:提前披露信息,涉嫌把责任推向车主
  9. JS中定时器的返回数值ID值
  10. 蓝天采集系统的安装和遇到的问题及解决方案
  11. 约瑟夫环问题(动态链表操作)n个学生围成一圈,每m个出队,输出所有出队的序列
  12. Python开发手册
  13. 第十一届“认证杯”数学中国数学建模国际赛(小美赛) (2022 CERTIFICATE AUTHORITY CUP INTERNATIONAL MATHEMATICAL CONTEST IN MOD
  14. 数字化转型投入大、效果差,永洪BI如何帮助企业迈出数据应用第一步
  15. PHP 每日学习函数之 floatval 函数
  16. catia今天突然打不开了_catia打不开的解答
  17. pmap anon 内存泄露
  18. linux 重定向 2 gt gt,Linux命令- echo、grep 、重定向、1gt;amp;2、2gt;amp;1的介绍
  19. java 数组形式字符串_java 数组格式字符串转化为字符串
  20. 伯特兰·阿瑟·威廉·罗素

热门文章

  1. 【资源分享新方式】基于IPv6+Windows的共享文件夹,从此告别第三方云盘
  2. 计算机网络中计算机资源管理器,计算机基础知识:资源管理器的使用
  3. Qt 菜单栏、工具栏、状态栏、浮动窗口、核心部件
  4. 【掩码机制】解决LSTM中特征长度不一致问题
  5. webpack 处理网页小图标favicon
  6. 新股发行制度五年改革历程
  7. 从零学习算法竞赛3:aabb问题
  8. related、relative和relevant表示相关意思时的区别
  9. jquery的odd和even
  10. 上海计算机一级和四六级,大学英语六级比四级难多少?985学长含泪告诉你!