Disruptor 源码

https://github.com/LMAX-Exchange/disruptor/blob/master/README.md
https://github.com/LMAX-Exchange/disruptor/wiki/Introduction
https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

Disruptor 与 RingBuffer 的关系

  • Disruptor 的存储部分实现了 RingBuffer。
  • Disruptor 提供了方法供 Producer 和 Consumer 线程来通过 ringbuffer 传输数据。

RingBuffer 的本质

  • 固定大小的
  • 先入先出的 (FIFO)
  • Producer-Consumer 模型的
  • 循环使用的一段内存
  • 由于进程周期内,可不用重新释放和分配空间

本质就是一个可重用的 FIFO 队列

(图片来自:https://blog.csdn.net/qq51931373/article/details/46652029)

Disruptor 适用场景

  • Producer-Consumer 场景,一生产者多消费者,多生产者多消费者(线程安全)
  • 线程之间交换数据
  • 轻量化的消息队列
  • 对队列性能要求高:Disruptor 的速度比 LinkedBlockingQueue 提高了七倍(无锁设计)
  • 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图)
  • 典型场景:Canal,从一个 mysql 实例读取 binlog,放到 Disruptor,下游可有多个并发消费者

核心概念

(图自官网:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction)

Ring Buffer

  • com/lmax/disruptor/RingBuffer.java
  • Disruptor 的核心存储,环形缓冲区。

Sequence

  • com/lmax/disruptor/Sequence.java
  • 每个Consumer (EventProcessor) 和 Disruptor 本身各保有一个 Sequence。
  • 用来追踪 ringbuffer 和每个 Consumer 的进度。
tracking the progress of the ring buffer and event processors
  • 并发相关代码主要依赖 Sequence 值的改变。
  • Sequence 的核心是一个 protected volatile long value;
  • 可理解 Sequence 为一个加强版的 AtomicLong。在后者基础上增加了防止伪共享的代码。
    (关于伪共享:https://www.cnblogs.com/blastbao/p/8290332.html)
  • Sequence 如何避免伪共享
    简单地说:就是通过 Padding 的方式,将一个 Sequence 在内存中的大小和一个 cache line 对齐,避免伪共享,提高性能。
Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field. 

Sequencer

  • com/lmax/disruptor/MultiProducerSequencer.java
  • com/lmax/disruptor/SingleProducerSequencer.java
  • Disruptor 的核心组件
  • 协调 Producer 和 Consumer 对同一段 ringBuffer 的使用
  • 在生产者和消费者之间快速、正确地传递数据的并发算法

Sequence Barrier

  • 由 Sequencer 产生
  • Sequence Barrier 包含 “决定 Consumer 是否有数据可供消费” 的逻辑
// com/lmax/disruptor/BatchEventProcessor.javaprivate void processEvents(){T event = null;long nextSequence = sequence.get() + 1L;while (true){try{// 调用 sequenceBarrier.waitFor(nextSequence) 来确定当前可消费的数据位点final long availableSequence = sequenceBarrier.waitFor(nextSequence);if (batchStartAware != null && availableSequence >= nextSequence){batchStartAware.onBatchStart(availableSequence - nextSequence + 1);}while (nextSequence <= availableSequence){event = dataProvider.get(nextSequence);eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);nextSequence++;}sequence.set(availableSequence);}catch (final TimeoutException e){notifyTimeout(sequence.get());}catch (final AlertException ex){if (running.get() != RUNNING){break;}}catch (final Throwable ex){exceptionHandler.handleEventException(ex, nextSequence, event);sequence.set(nextSequence);nextSequence++;}}}// com/lmax/disruptor/ProcessingSequenceBarrier.javapublic long waitFor(final long sequence)throws AlertException, InterruptedException, TimeoutException{checkAlert();// 可见 SequenceBarrier 的核心是用 waitStrategy 去 waitFor 数据// 下面的变量,// sequence: 记录 consumer 消费位置的// cursorSequence: 记录 ringBuffer 生产位置的// dependentSequence: 记录当前 consumer 依赖的其他 consumer 的消费位置的(如果当前 consumer 只从 ringBuffer 读取数据,而不依赖于其他 consumer,那么 dependentSequence 就和 cursorSequence 是同一个,参考 ProcessingSequenceBarrier 的构造函数)long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);if (availableSequence < sequence){return availableSequence;}return sequencer.getHighestPublishedSequence(sequence, availableSequence);}

Wait Strategy

  • com/lmax/disruptor/BlockingWaitStrategy.java
  • com/lmax/disruptor/BusySpinWaitStrategy.java
// 当 Producer 往 ringBuffer 写入了新数据之后,是怎么通知 Consumer 的呢?
// 这个逻辑就在 waitStrategy 里。
// 以 BlockingWaitStrategy 为例
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)throws AlertException, InterruptedException{long availableSequence;if (cursorSequence.get() < sequence){synchronized (mutex){while (cursorSequence.get() < sequence){barrier.checkAlert();// 如果没有数据,调用 mutex (就是一个普通 Object )的wait,这里代码阻塞。直到有 notify/ notifyAll 被调用时,代码继续执行。// 通过 wait 方法阻塞一个线程时,这个线程会放弃 CPU 时间片。// 那么 notify / notifyAll 被谁调用呢?答案,BlockingWaitStrategy  有 signalAllWhenBlocking 方法调用 notifyAll,这个方法在 Producer 调用 ringBuffer 的 publish 时被调用。mutex.wait();}}}while ((availableSequence = dependentSequence.get()) < sequence){barrier.checkAlert();ThreadHints.onSpinWait();}return availableSequence;}public void signalAllWhenBlocking(){synchronized (mutex){mutex.notifyAll();}}

Event

  • Producer 传输数据给 Consumer 的单位

EventProcessor

  • com/lmax/disruptor/BatchEventProcessorTest.java
  • com/lmax/disruptor/BatchEventProcessor.java
  • 处理 Disruptor 产生数据的主要事件循环,持有 Consumer 的 SequenceEventHandler
The main event loop for handling events from the Disruptor and has ownership of consumer's Sequence
  • 实际就是 Consumer 的主循环。Consumer 只需要注入 eventHandler,BatchEventProcessor 就会调用 eventHandler.onEvent() 来处理 Producer 写入到 ringbuffer 的数据。
// com/lmax/disruptor/BatchEventProcessor.java
private void processEvents(){T event = null;long nextSequence = sequence.get() + 1L;while (true){try{final long availableSequence = sequenceBarrier.waitFor(nextSequence);if (batchStartAware != null && availableSequence >= nextSequence){batchStartAware.onBatchStart(availableSequence - nextSequence + 1);}// 获取数据,并处理while (nextSequence <= availableSequence){event = dataProvider.get(nextSequence);eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);nextSequence++;}sequence.set(availableSequence);}......}}

EventHandler

  • Disruptor 只定义了接口。
  • 由 Consumer 实现,并注入到 EventProcessor。

Producer

  • 生产者

Disruptor 为什么快而且线程安全

官方文档:http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html

http://ifeve.com/dissecting-disruptor-whats-so-special/

http://ifeve.com/locks-are-bad/      锁为什么慢,以及 Disruptor 如何避免

http://ifeve.com/disruptor-cacheline-padding/         cache-line-padding

http://ifeve.com/disruptor-memory-barrier/           disruptor-memory-barrier

简单说:

  • 它是数组,所以要比链表快(添加删除更简单,耗费内存更小),且可以利用 CPU 缓存来预加载
  • 数组对象本身一直存在,避免了大对象的垃圾回收(当然元素本身还是要回收的)
  • 在需要确保线程安全的地方,用 CAS 取代锁。
  • 没有竞争 = 没有锁 = 非常快。
  • 所有 Consumer 都记录自己的序号(Sequence),允许多个 Producer 与多个 Consumer 共享 ringbuffer。
  • 在每个对象中都能跟踪 Sequence(ring buffer,claim Strategy,生产者和消费者),加上 Sequence 的 cache line padding,就意味着没有为伪共享和非预期的竞争。

个人觉得最重要的设计就是:

  • 每个 Consumer 持有一个 Sequence,各 Consumer 消费独立。
  • Producer 根据所有 Consumer 的 Sequence 位置决定是否能写入到 ringbuffer,以及写入到何位置。
  • 各 Producer 在并发写时,通过 CAS 避免锁。(可参考下面的代码分析)

关键问题

  • 问题1:多个 Producer 如何协调把数据写入到 ringBuffer

  • 问题2:ringbuffer 如何根据各 consumer 消费速度告知各 Producer 现在是否能写入数据

// 我本地起了 2 个 Producer。LongEventProducer producer = new LongEventProducer(ringBuffer);LongEventProducer producer2 = new LongEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);producer.onData(bb);producer2.onData(bb);Thread.sleep(1000);}// com/lmax/disruptor/shicaiExample/LongEventProducer.java
public void onData(ByteBuffer bb){// 抢占 ringBuffer 的最新空位,以便把自己的数据写入。long sequence = ringBuffer.next();  // Grab the next sequencetry{LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor// for the sequenceevent.set(bb.getLong(0));  // Fill with data}finally{ringBuffer.publish(sequence);}}// 那么 ringBuffer.next() 是如何在多个 producer 之间协调的呢?
// com/lmax/disruptor/RingBuffer.java* Increment and return the next sequence for the ring buffer.  Calls of this* method should ensure that they always publish the sequence afterward.  E.g.public long next(){// 这个 sequencer 是 com/lmax/disruptor/MultiProducerSequencer.javareturn sequencer.next();}// com/lmax/disruptor/MultiProducerSequencer.java 这个就是 disruptor 的核心,Sequencerpublic long next(int n){if (n < 1 || n > bufferSize){throw new IllegalArgumentException("n must be > 0 and < bufferSize");}long current;long next;do{// cursor 就是一个 sequence,多个 Producer 公用一个 sequence 进行写控制current = cursor.get();          next = current + n;// 判断 ringBuffer 是否满了long wrapPoint = next - bufferSize;           long cachedGatingSequence = gatingSequenceCache.get();if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){// gatingSequences 是什么?// 实际就是所有 consumer 的 seuqence 的集合。创建 consumer 时,通过 updateGatingSequencesForNextInChain 函数把它注册到  MultiProducerSequencer 的。// 这里做的,是从各个 consumer 的 sequence 中,找到最小的哪个(就是消费最慢的那个)。long gatingSequence = Util.getMinimumSequence(gatingSequences, current);// 如果 buffer 已经满了,就一直自旋等待 consumer 消费。当 consumer 消费后,gatingSequence 就更新,从而 gatingSequenceCache 更新,从而从新判断 wrapPoint > cachedGatingSequence, 从而有可能 Producer 获得 buffer 中可写入的位置。// 为什么有一个 gatingSequenceCache, 这个只是为了减少 getMinimuSequence 的次数,真实逻辑和没有这个 cache 一样。// 回答了问题 2。if (wrapPoint > gatingSequence){LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?continue;}gatingSequenceCache.set(gatingSequence);}// 通过 CAS 来确保多个 Producer 能够正确写入,且不冲突。// 回答了问题 1。else if (cursor.compareAndSet(current, next))      {break;}}while (true);return next;}// GatingSequence 是由各个 Consumer 在启动 EventProcessor 时添加的public final void addGatingSequences(Sequence... gatingSequences){SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);}// MultiProducerSequencer 的 publish 做了什么?public void publish(final long sequence){setAvailable(sequence);          // 设置被 publish 的位置为 not availablewaitStrategy.signalAllWhenBlocking();          // 通知所有 consumer }
  • Consumer 启动 EventProcessor 时 addGatingSequences

  • 问题3:Consumer 是怎么注册的?

// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());// com/lmax/disruptor/dsl/Disruptor.java
* <p>Set up event handlers to handle events from the ring buffer. These handlers will process events* as soon as they become available, in parallel.</p>public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){return createEventProcessors(new Sequence[0], handlers);}EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,final EventHandler<? super T>[] eventHandlers){checkNotStarted();final Sequence[] processorSequences = new Sequence[eventHandlers.length];final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++){final EventHandler<? super T> eventHandler = eventHandlers[i];final BatchEventProcessor<T> batchEventProcessor =new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);if (exceptionHandler != null){batchEventProcessor.setExceptionHandler(exceptionHandler);}// 把 consumer 加入 consumer 库中。其实就是一个 观察者模式,ringBuffer 有数据后,通知各个 consumer 线程。consumerRepository.add(batchEventProcessor, eventHandler, barrier);         processorSequences[i] = batchEventProcessor.getSequence();}// 两个 sequence,一个是 barrierSequence, 一个是 processorSequenceupdateGatingSequencesForNextInChain(barrierSequences, processorSequences);return new EventHandlerGroup<>(this, consumerRepository, processorSequences);}SequenceBarrier 是用来控制 consumer 读取进度的。
* Wait for the given sequence to be available for consumption.
long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
* Get the current cursor value that can be read.
long getCursor();// 再往里看。sequence
// com/lmax/disruptor/ProcessingSequenceBarrier.java@Overridepublic long waitFor(final long sequence)throws AlertException, InterruptedException, TimeoutException{checkAlert();long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);if (availableSequence < sequence){return availableSequence;}return sequencer.getHighestPublishedSequence(sequence, availableSequence);}@Overridepublic long getCursor(){// 返回当前位置,dependentSequence 是一个 Sequence 对象return dependentSequence.get();}// ringBuffer 有了新数据时,disruptor 怎么通知各 consumer 的?
* <p>Starts the event processors and returns the fully configured ring buffer.</p>
public RingBuffer<T> start(){checkOnlyStartedOnce();for (final ConsumerInfo consumerInfo : consumerRepository){// 为每个 consumer 启动一个线程,线程逻辑由 BatchEventProcessor 控制// BatchEventProcessor 有一个 eventHandler 字段,那就是我们自己写处理代码。consumerInfo.start(executor);        }return ringBuffer;}

Demo

  • Event
public class LongEvent {private long value;public void set(long value){this.value = value;}public long getValue() {return value;}
}
  • Customer
public class LongEventHandler implements EventHandler<LongEvent>
{public void onEvent(LongEvent event, long sequence, boolean endOfBatch){try {Thread.sleep(5000);} catch (Exception e) {}System.out.println("Event: " + event.getValue());}
}
  • Producer
public class LongEventProducer
{private final RingBuffer<LongEvent> ringBuffer;public LongEventProducer(RingBuffer<LongEvent> ringBuffer){this.ringBuffer = ringBuffer;}public void onData(ByteBuffer bb){long sequence = ringBuffer.next();  // Grab the next sequencetry{LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor// for the sequenceevent.set(bb.getLong(0));  // Fill with data}finally{ringBuffer.publish(sequence);}}
}
  • Main
public class LongEventMain
{public static void main(String[] args) throws Exception{// The factory for the eventLongEventFactory factory = new LongEventFactory();// Specify the size of the ring buffer, must be power of 2.int bufferSize = 2;// Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);// Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());disruptor.handleEventsWith(new LongEventHandler());// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();LongEventProducer producer = new LongEventProducer(ringBuffer);LongEventProducer producer2 = new LongEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);producer.onData(bb);producer2.onData(bb);Thread.sleep(1000);}}
}

Disruptor RingBuffer 原理相关推荐

  1. disruptor 框架使用以及ringbuffer原理解析

    Disruptor 概述 子主题 1 从功能上来看,Disruptor 是实现了"队列"的功能,而且是一个有界队列.那么它的应用场景自然就是"生产者-消费者"模 ...

  2. java disruptor压测_Java并发框架Disruptor实现原理与源码分析(二) 缓存行填充与CAS操作...

    ##缓存行填充 关于缓存行填充在我个人的印象里面第一次看到是在Java的java.util.concurrent包中,因为当时很好奇其用法背后的逻辑,所以查了很多资料才明白到底是怎么回事*(也许事实上 ...

  3. disruptor RingBuffer初始化与生产者事件产生

    在Disruptor中,为了防止伪共享导致的性能降低,所有元素都会在前后尽量填充64个字节以保证在cpu以64字节缓存数据的时候,在缓存行中,都只会有自己所需要的数据,不会导致缓冲行的更新影响到别的c ...

  4. Disruptor学习笔记:基本使用、核心概念和原理

    Disruptor是英国外汇交易公司LMAX开发的一个高性能队列(系统内部的内存队列),研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级).基于Disruptor开 ...

  5. disruptor原理详解

    一.Disruptor 1.Disruptor基本原理 在多线程开发中,我们常常遇到这样一种场景:一些线程接受用户请求,另外一些线程处理这些请求.比如日志处理中的日志输入和告警.这种典型的生产者消费者 ...

  6. 你需要知道的高性能并发框架Disruptor原理

    Disruptor的小史 现在要是不知道Disruptor真的已经很outer了,Disruptor是英国外汇交易公司LMAX开发的一款开源的高性能队列,LMAX Disruptor是一个高性能的线程 ...

  7. 多线程与高并发(九):单机压测工具JMH,单机最快MQ - Disruptor原理解析

    单机压测工具JMH JMH Java准测试工具套件 什么是JMH 官网 http://openjdk.java.net/projects/code-tools/jmh/ 创建JMH测试 1.创建Mav ...

  8. Disruptor并发框架,核心组件RingBuffer

    1.1 Disruptor并发框架简介 Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是 ...

  9. coinex02// 撮合引擎 RingBuffer Disruptor的构建与使用

    目录 0. 课程视频地址 0.1 撮合引擎课程 0.1 RocketMQ安装 0.3 RocketMQ搭建成功后登录 1. docker 配置rocketmq 2 逻辑树 : 构建RingBuffer ...

最新文章

  1. Python学习之While语句小游戏
  2. 零基础学Python:作用域详解
  3. 字符串对比 (STl强制转换字符串)
  4. Linux服务器下运行SpringBoot HelloWorldDemo(Mac篇)
  5. node sqlite 插入数据_使用 Sequelize 操作 Sqlite3 数据库
  6. 微软私有云分享(R2)16PowerShell查看虚拟机信息
  7. 经典排序算法之基数排序(C语言版)
  8. hp1015驱动64位_惠普1015打印机驱动下载
  9. 安卓系统刷机怎么刷机_手机怎么刷机
  10. linux程序员实用教程,Linux教程合集(Linux程序员必备)
  11. 软件开发模型2:增量模型/螺旋模型/敏捷模型
  12. STM32驱动SPI FLASH(W25Q64)
  13. python报错跳过继续执行_python如何设置报错跳过
  14. 计算机网络 『内部网关协议IGP中的路由信息协议RIP』
  15. linux gdb中c(continue)的使用总结
  16. TVS二极管、环流二极管、稳压(齐纳)二极管、肖特基二极管、开关二极管的分类说明
  17. CTF练习-TU-CTF-2016 pwn BBYS-first-elf-25 记录
  18. 我们常说祝你一切顺利,实际上,顺利的状态是危险的,因为顺利意味着怠惰和懒于思考,是会让人失去奋斗的意志,丧失竞争力。
  19. Mac删除文件不经过废纸篓直接删除,提示“将立即删除此项目。您不能撤销此操作”的方案
  20. 【购房必备知识】成都落户政策调研(主要介绍——研究生落户)

热门文章

  1. 【代码1】应用眼中的操作系统;系统调用
  2. iOS音视频实现边下载边播放
  3. Educational Codeforces Round 117 (Rated for Div. 2)题解(A~D)
  4. 用java生成word文档(转载)
  5. 本来面目——大教堂、集市,与作坊
  6. 关于文件上传失败的问题
  7. 一秒批量修改文件扩展名(后缀名)
  8. shell wait 等待命令
  9. 泰山OFFICE技术讲座:全网首发:中文字体,字号就是中文字符的宽度
  10. C# TreeView CheckBox 代码挑勾选中