前言

理解Disruptor的最佳方式是,将其与一些容易理解和目的相似的东西比较。这里的参照物就是java里的阻塞队列(BlockingQueue)。

与BlockingQueue的异同:
同:目的相同,都是为了在同一进程的线程间传输数据。
异:对消费者多播事件;预分配事件内存;可选无锁。

核心概念

  • Ring Buffer : 曾经的核心。自从3.0以上,环形缓冲器只作为Disruptor存储和更新数据(事件)的容器。对于一些高级用法,可以完全替换为用户提供的容器。
  • Sequence:Disruptor使用Sequence作为一种确定特定组件位置的方法。每个消费者(EventProcessor)都维护一个Sequence,Disruptor自己也是一样。大部分并发代码以来这些Sequence值的移动,因此Sequence支持AtomicLong的当前许多特性。事实上,两者唯一的区别是Sequence包含了附加功能来防止Sequence和其他值的伪共享。
  • Sequencer:Disruptor的真正核心。此接口的两个实现(单生产者和多生产者)都实现了用于在生产者和消费者间快速准确传递数据的并发算法。
  • Sequence Barrier:由Sequencer产生,持有Sequencer的主要发布Sequence和任意独立消费者的Sequence的索引。它包含判断是否有可供消费者处理的可用事件的逻辑。
  • Wait Strategy:等待策略决定了一个消费者如何等待生产者发布到Disruptor的事件。
  • Event:生产者传递给消费者的数据单元。用户自定义。
  • EventProcessor:处理Disruptor事件的主要循环,拥有消费者的Sequence。有一个BatchEventProcessor包含了一个事件循环的高效实现,会在事件可用时回调用户提供的EventHandler接口实现。
  • EventHandler:用户实现接口,代表Disruptor的一个消费者。
  • Producer:用户调用Disruptor进行入队的代码。在框架中没有代码表示。

多播事件

这是queue和Disruptor最大的行为区别。队列中的一个事件只能被一个消费者消费,而Disruptor中的时间会发布给所有消费者。这是由于Disruptor意图处理同一数据的独立并行处理操作(译注:类似JMS的topic模式)。比如LMAX中同一数据需要进行记录日志、复制和业务逻辑操作。当然,在Disruptor中同时并行处理不同事件可以使用WorkerPool(译注:类似JMS的queue模式中的多消费者实现)。但需要注意的是,由于这种特性并非是Disruptor的首要工作,所以使用WorkerPool可能并不是最高效的做法。
查看上图,三个消费者JournalConsumer、ReplicationConsumer和ApplicationConsumer将会以相同顺序接收Disruptor所有可用消息。这实现了这些消费者的并行工作。

消费者依赖图

为了支持并发处理的真实世界应用,很有必要支持消费者间的协调工作。回顾上图,在日志记录和复制消费者完成工作前,有必要阻止业务逻辑消费者的进一步工作。我们称这个概念为gating,更准确的说是这种行为的超集称为gating。Gating发生在两个地方:第一用来保证生产者不能超过消费者。这通过调用RingBuffer.addGatingConsumers()把相关消费者添加到Disruptor实现。第二,先前提到的情况是通过从必须首先完成其处理的组件构造包含序列的SequenceBarrier来实现的。
回顾图1,有三个消费者监听RingBuffer的事件。在这个例子中,有一个依赖图。ApplicationConsumer依赖JournalConsumer和ReplicationConsumer。这意味着JournalConsumer和ReplicationConsumer可以相互并行运行。依赖关系可以从ApplicationConsumer的SequenceBarrier和JournalConsumer及ReplicationConsumer的Sequence观察到。同时引起注意的是Sequencer和下游消费者的关系。它的一个角色是保证发布不会环绕RingBuffer。为了做到这点,下游消费者的Sequence不能小于RingBuffer的Sequence。然而,使用依赖图会发生一个有趣的优化。由于ApplicationConsumer Sequence保证小于等于JournalConsumer和ReplicationConsumer(由依赖关系保证),Sequencer只需要观察ApplicationConsumer的Sequence。从广义上来说,Sequencer只需要关注依赖树种叶子节点的消费者Sequence。

事件预分配

Disruptor的一个目标是可以用于低延迟环境中。在低延迟系统中,有必要减少或消除内存分配。在Java系统中,目标是减少由于垃圾回收造成的停顿次数(在低延迟的C/C++系统中,重内存分配会由于内存分配器的征用也可能导致问题)。
为了支持这个目标,用户可以预分配Disruptor中事件的存储。用户提供的EventFactory会在Disruptor中RingBuffer每个条目构建时调用。当发布新数据到Disruptor时,有API供用户调用来持有构建出的对象,这样可以调用对象的方法或更新对象属性。在正确实现下,Disruptor保证这些操作操作是并发安全的。

可选的无锁

对低延迟的渴望造就的另一个实现细节是无锁算法在Disruptor中的大量使用。所有内存可见性和正确性保证使用内存屏障和/或CAS操作实现。真正使用锁的场景只有一个,那就是使用BlockingWatiStrategy。这样做只为了使用Condition让消费线程可以在等待新事件到达前进行park操作。许多低延迟系统使用忙等待(busy-wait)来避免使用Condition可能导致的抖动,然而一些系统的忙等待操作会导致性能的急剧下降,尤其是CPU资源被严重制约时。比方说在虚拟环境下的web服务器。

入门指南

基本的事件生产和消费

从简单的事件开始:

public class LongEvent
{private long value;public void set(long value){this.value = value;}
}

为了让Disruptor能够预分配事件,我们需要提供一个EventFactory完成构建:

import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactory<LongEvent>
{public LongEvent newInstance(){return new LongEvent();}
}

事件定义好后,需要创建消费者处理这些事件。这里只做简单的打印:

import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler<LongEvent>
{public void onEvent(LongEvent event, long sequence, boolean endOfBatch){System.out.println("Event: " + event);}
}

我们还需要一个事件的生产源,举个例子,假定数据是来自某种I/O设备,如网络或文件的字节缓冲(ByteBuffer)。

import com.lmax.disruptor.RingBuffer;public class LongEventProducer
{private final RingBuffer<LongEvent> ringBuffer;public LongEventProducer(RingBuffer<LongEvent> ringBuffer){this.ringBuffer = ringBuffer;}public void onData(ByteBuffer bb){long sequence = ringBuffer.next();  // Grab the next sequencetry{LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor// for the sequenceevent.set(bb.getLong(0));  // Fill with data}finally{ringBuffer.publish(sequence);}}
}

可以发现,相比使用简单的queue,事件的发布更具有相关性。这是由于需要事件预分配。事件发布需要(最低)2阶段方式,先声明环形缓冲器中的槽位,再发布可用数据。同时也需要把发布过程使用try/finally块包裹起来。如果声明了环形缓冲的一个槽位(通过调用RingBuffer.next())然后必须发布这个序列。如果没有这么做,会导致Disruptor的状态损坏(corruption)。特别地,在多生产者的情况下,这将会导致消费者阻塞,只能通过重启解决。

使用3.x版本的Translator

Disruptor3.0提供了一种富Lambda风格的API,旨在帮助开发者屏蔽直接操作RingBuffer的复杂性,所以3.0以上版本发布消息更好的办法是通过事件发布者(Event Publisher)或事件翻译器(Event Translator)API。如下

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;public class LongEventProducerWithTranslator
{private final RingBuffer<LongEvent> ringBuffer;public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer){this.ringBuffer = ringBuffer;}private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =new EventTranslatorOneArg<LongEvent, ByteBuffer>(){public void translateTo(LongEvent event, long sequence, ByteBuffer bb){event.set(bb.getLong(0));}};public void onData(ByteBuffer bb){ringBuffer.publishEvent(TRANSLATOR, bb);}
}

这种方法另一个好处是翻译器代码可以放到一个单独的类中,以便于更容易进行单元测试。Disruptor提供了一些用于翻译器的不同的接口(EventTranslator,EventTranslatorOneArg,EventTranslatorTwoArg,等)。这样做的原因是,允许翻译器表示为静态类,或以非捕获lambda表达式(使用java8时)作为翻译方法参数,通过调用RingBuffer上的翻译器进行传递。
最后一步是把上面这些步骤统一到一起。可以手工把这些组件都组装到一起,但还是有点复杂,所以引入了DSL来简化构建。尽管通过DSL的方式不能使用有些复杂选项,但这种方式还是适合绝大多数场景。

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;public class LongEventMain
{public static void main(String[] args) throws Exception{// Executor that will be used to construct new threads for consumersExecutor executor = Executors.newCachedThreadPool();// The factory for the eventLongEventFactory factory = new LongEventFactory();// Specify the size of the ring buffer, must be power of 2.int bufferSize = 1024;// Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);// Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();LongEventProducer producer = new LongEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);producer.onData(bb);Thread.sleep(1000);}}
}

使用Java8

Disruptor API的设计影响之一是Java 8将依靠功能接口的概念作为Java Lambdas的类型声明。 Disruptor API中的大多数接口定义符合功能接口的要求,因此可以使用Lambda而不是自定义类,这样可以减少所需的重复代码(boiler place)。

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;public class LongEventMain
{public static void main(String[] args) throws Exception{// Executor that will be used to construct new threads for consumersExecutor executor = Executors.newCachedThreadPool();// Specify the size of the ring buffer, must be power of 2.int bufferSize = 1024;// Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);// Connect the handlerdisruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);Thread.sleep(1000);}}
}

注意有一些类(如handler,translator)不再需要了。还要注意用于publishEvent()的lambda是如何引用传入的参数的。如果使用如下代码代替:

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{bb.putLong(0, l);ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));Thread.sleep(1000);
}

这会创建一个capturing lambda,意味着需要实例化一个对象来持有ByteBuffer bb变量,通过调用publishEvent()来传递lambda。这样会创建额外不必须的垃圾,所以如果需要低GC压力就需要传递参数给lambda。
使用这种方法引用可以代替匿名的lamdba,以这种方式重写这个例子是可能的。

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;public class LongEventMain
{public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch){System.out.println(event);}public static void translate(LongEvent event, long sequence, ByteBuffer buffer){event.set(buffer.getLong(0));}public static void main(String[] args) throws Exception{// Executor that will be used to construct new threads for consumersExecutor executor = Executors.newCachedThreadPool();// Specify the size of the ring buffer, must be power of 2.int bufferSize = 1024;// Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);// Connect the handlerdisruptor.handleEventsWith(LongEventMain::handleEvent);// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);ringBuffer.publishEvent(LongEventMain::translate, bb);Thread.sleep(1000);}}
}

基本调优选项

使用上述的方法可以在最广泛的部署场景中工作正常。然而,如果你能够确定Disruptor将要运行的硬件和软件环境,就可以调整参数提升性能。主要有以下两种调优方式:单vs.多生产者和替换等待策略。

单vs.多生产者

提高并发系统性能的最佳方法之一就是遵守单作者原则(Single Writer Principle https://mechanical-sympathy.blogspot.tw/2011/09/single-writer-principle.html,这适用于Disruptor。如果你的情况是只有一个线程会在Disruptor中发布事件,那就可以利用此功能获得额外的性能提升。

public class LongEventMain
{public static void main(String[] args) throws Exception{//.....// Construct the Disruptor with a SingleProducerSequencerDisruptor<LongEvent> disruptor = new Disruptor(factory, bufferSize, ProducerType.SINGLE, new BlockingWaitStrategy(), executor);//.....}
}

OneToOne 性能测试(https://github.com/LMAX-Exchange/disruptor/blob/master/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedThroughputTest.java)可以说明这种技术能够提升多少性能。以下测试使用i7 Sandy Bridge MacBook Air。
多生产者

Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec

单生产者

Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec

替换等待策略

BlockingWaitStategy

Disruptor默认的等待策略是BlockingWaitStategy。在BlockingWaitStategy内部使用一个典型的锁和条件(a typical lock and condition)变量处理线程唤醒。BlockingWaitStategy是可用等待策略中最慢的,但也是在CPU使用上最保守的,同时也将在最广泛的部署选项中提供最一致的行为。然而,再说一次,了解部署系统可以获得额外的性能提升。

SleepingWaitStrategy

类似BlockingWaitStategy,SleepingWaitStrategy也试图在CPU使用上保持保守,这通过一个忙等待(busy wait loop)循环实现,但在循环中间会调用LockSupport.parkNanos(1)。在一个典型的Linux系统,这样会暂停线程大概60µs(译注1µs=1000ns)。但它的好处是生产者线程除了增加响应的计数器外,不需要采取任何行动,而且不需要给条件变量发信号的成本(cost of signalling a condition variable)。然而,生产者和消费者转移事件的平均延迟会增加。这种方式最好工作在不需要低延迟,但对生产者线程影响最小的情况下。一个常见的使用场景是异步日志。

YieldingWaitStrategy

可用于低延迟系统的两种等待策略其中之一,这种策略通过消耗CPU时钟周期来达到优化延迟的目的。这种策略使用忙循环(busy spin)等待正确的序号到达。在循环内部,Thread.yield()将被调用,来允许其他排队中的线程运行。当需要很高的性能,而且事件处理者EventHandler的线程数少于CPU逻辑核心数时(比如使用超线程时),推荐使用这种策略。

BusySpinWaitStrategy

这种策略有最高的性能,但也有最高的部署边境限制。这种等待策略应该只用于事件处理者线程小于CPU物理核心数。

清除环形缓冲的对象

使用Disruptor传输数据时,对象的存活周期有可能比预期更长。为了避免发生这种情况,有必要在事件处理完毕后做清理。如果有一个事件处理器,在这个事件处理器中做清理就足够了。如果有一个事件处理链,那就可能会在链尾需要一个特定的处理器来清理这个对象。

class ObjectEvent<T>
{T val;void clear(){val = null;}
}public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
{public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch){// Failing to call clear here will result in the// object associated with the event to live until// it is overwritten once the ring buffer has wrapped// around to the beginning.event.clear();}
}public static void main(String[] args)
{Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(() -> ObjectEvent<String>(), bufferSize, executor);disruptor.handleEventsWith(new ProcessingEventHandler()).then(new ClearingObjectHandler());
}

在美团呆了7年的架构师带你解读Disruptor系列并发框架相关推荐

  1. BATJ大数据架构师带你领略实时计算框架Flink的魅力!

    你是不是经常体验或看到以下这些场景? "小张,你看能不能做个监控大屏实时查看促销活动销售额(GMV)?" "小王,我们现在搞促销活动能不能实时统计销量 Top3 啊?&q ...

  2. 【Java从零到架构师第1季】【并发 Concurrent 03】线程间通信_ReentrantLock_线程池

    持续学习&持续更新中- 守破离 [Java从零到架构师第1季][并发 Concurrent 03]线程间通信_ReentrantLock_线程池 线程间通信 线程间通信-示例 可重入锁Reen ...

  3. K3s初探:Rancher架构师带你尝鲜史上最轻量Kubernetes发行版

    发布不到两天,GitHub上Star数已近3000,这个业界大热的.史上最轻量的开源Kubernetes发行版,你试过了没? Rancher资深架构师来教你走出尝鲜第一步!使用教程在此! 前 言 昨天 ...

  4. 北大毕业 15 年经验架构师,重磅解读 5G 时代的计算平台

    整理 | 顾钧 出品 | CSDN(ID:CSDNnews) 5G,物联网,边缘计算,万物互联.这些名词越来越频繁的出现在人们的视野中,话题热度也是不断升高.甚至隐隐有超过云计算的势头. 一个很重要的 ...

  5. 一线互联网架构师设计思想解读开源框架!全套教学资料

    前言 这段时间也一直在学习Netty相关知识,因为涉及知识点比较多,也走了不少弯路.目前网上关于Netty学习资料玲琅满目,不知如何下手,其实大家都是一样的,学习方法和技巧都是总结出来的,我们在没有找 ...

  6. 软考·系统架构师论文——论软件的高并发设计

    文章目录 说明 摘要 过渡 项目背景 论点理论 论点实践 结尾 说明 1.[摘要 300~330字] ① 项目介绍:时间.项目名.项目主要功能简述.作者角色及工作内容 ② 项目技术简介:正文理论/分论 ...

  7. 明日直播| NLPCC workshop百度架构师带你快速上手飞桨NLP

    点击左上方蓝字关注我们 国际自然语言处理和中文计算会议(NLPCC)10月16日正式开幕,云集自然语言处理和语言计算领域的研究和创新成果,是自然语言处理领域年度盛会. 本届会议中,主办方特邀飞桨承办线 ...

  8. 【架构师必知必会系列】系统架构设计需要知道的5大精要(5 System Design fundamentals)...

    无论是在大厂还是初创公司,技术产品经理 (TPM)都需要具备系统设计的基础知识.从历史上看,系统设计基础知识通常是软件工程师在面试时的要求,而 TPM 不受此期望的约束.然而,现在趋势正在发生变化.作 ...

  9. 架构师进阶之路——1、持久化框架(一)

    目录 一.为什么要用MyBatis 1.Mybatis简介 2.持久化框架对比 二.Mybatis架构原理 1.架构设计 2.主要构件及其相互关系 1)Mybatis主要构件描述 2)Mybatis主 ...

最新文章

  1. fastReport 随记
  2. MOSS字段编辑权限控制方案--发布源码
  3. Android 通过局域网udp广播自动建立socket连接
  4. 【MySQL】MySQL 8 IDEA连接本地MySQL报错 Host DESKTOP-MISSMJIJ is not allowed to connect to this serv
  5. Java Socket缓冲区
  6. 码农翻身之编程语言的巅峰
  7. 幸好权健AI还没落地!一个腕表顶中医,18个关键点就能刷脸
  8. setTimeout和setInteval
  9. android 用代码模拟滑动,Android开发之使用150行代码实现滑动返回效果
  10. 设计模式之Prototype(原型)
  11. 在JFrame窗口上绘制文字,用PrintJob打印出来
  12. Linux管理与应用(张美平著)- 绪论知识点
  13. 世嘉MD游戏开发【十三】:音乐和音效
  14. 清华大学四连冠,南科大获得最高性能奖!国际大学生超算竞赛SC21结果出炉
  15. pycharm 2018永久破解激活补丁 附安装教程
  16. 源来是你-Vol.38 | 浪潮开务数据库招人辣!准备好加入幸福感爆棚的KW家族了么?...
  17. Git史诗级入门教程
  18. oppo怎么广告接入_oppo信息流广告投放操作指南
  19. lilypond笔记 -- Chopin Prelude G major
  20. 电子科技大学软件工程860考研上岸初试经验分享

热门文章

  1. C/C++动态内存tcy
  2. python pynput鼠标键盘监控(详细)第2部键盘监控tcy
  3. 感知世界的最新利器、毫米波技术的继任者——超宽带(UWB)雷达技术
  4. 驼峰 下划线 String
  5. 跟燕十八学习PHP-第二十天-讲解数据库概念
  6. fastjson2 介绍及使用
  7. Golang::括号作用域
  8. VLAD算法简介 图像检索
  9. 华为荣耀6--usb共享网络 设置
  10. PTA天梯20+深度优先搜索及动态规划