高性能并发队列Disruptor使用详解,详细解析Disruptor框架的应用和基本原理
基本概念
- Disruptor是一个高性能的异步处理框架,是一个轻量的Java消息服务JMS, 能够在无锁的情况下实现队列的并发操作
- Disruptor使用环形数组实现了类似队列的功能,并且是一个有界队列.通常应用于生产者-消费者的场景
- Disruptor是一个观察者模式的实现
- Disruptor通过以下三种设计方案解决性能问题:
- 环形数组结构:
- 为了避免垃圾回收,使用数组代替链表
- 数组对处理器的缓存机制更加友好
- 元素位置定位:
- 数组长度为2^n^, 可以通过位运算,提升定位的速度
- 数组中元素下标采用递增的形式
- index采用long类型,不用担心索引index溢出问题
- 无锁设计:
- 每个生产者或者消费者线程,会首先申请可以操作的元素在数组中的位置,如果申请成功,直接在申请到的位置上写入数据或者读取数据
- 环形数组结构:
- Disruptor和BlockingQueue比较:
- BlockingQueue: FIFO队列.生产者Producer向队列中发布publish一个事件时,消费者Consumer能够获取到通知.如果队列中没有消费的事件,消费者就会被阻塞,直到生产者发布新的事件
- Disruptor可以比BlockingQueue做到更多:
- Disruptor队列中同一个事件可以有多个消费者,消费者之间既可以并行处理,也可以形成依赖图相互依赖,按照先后次序进行处理
- Disruptor可以预分配用于存储事件内容的内存空间
- Disruptor使用极度优化和无锁的设计实现极高性能的目标
- 通常情况下,如果存在两个独立的处理过程的线程时,就可以使用高性能的并发队列Disruptor来实现
- Disruptor的优点:
- 使用无锁的队列实现并发操作,性能非常高
- 所有访问者都记录自身序号的实现方式,允许多个生产者和多个消费者共享相同的数据结构
- 在每个对象,包括RingBuffer,WaitStrategy, 生产者Producer和消费者Consumer中都能跟踪序列号,使用了缓存行填充cache line padding, 这样就没有伪共享和非预期的竞争
Disruptor应用
定义事件
- Event: 事件 .Disruptor队列中进行交换的数据类型
定义事件工厂
- Event Factory: 定义事件Event实例化方法,用来实例化一个事件Event. 需要实现接口com.lmax.disruptor.EventFactory< T >
- Disruptor通过事件工厂EventFactory在RingBuffer中预创建事件Event的实例
- 一个事件实例Event类似于一个数据槽
- 生产者Producer发布Publish之前,先从Ringbuffer中获取一个事件Event实例
- 然后生产者Producer向事件Event实例中填充数据,然后再发布到RingBuffer中
- 最后由消费者Consumer获取事件Event实例并读取实例中的数据
定义事件处理实现
- 通过实现接口com.lmax.disruptor.EventHandler< T > 定义事件处理的具体实现
定义事件处理的线程池
- Disruptor通过java.util.concurrent.ExecutorService提供的线程来触发消费者Consumer的事件处理
指定等待策略
- Disruptor中使用策略模式定义消费者Consumer处理事件的等待策略,通过com.lmax.disruptor.WaitStrategy接口实现
- WaitStrategy等待策略有三种常用的实现: 每种策略具有不同的性能和优缺点.根据实际运行环境的CPU的硬件特点选择恰当的策略,并且使用特定的JVM配置启动参数,能够实现不同的性能提升
- BlockingWaitStrategy:
- 性能最低
- 对CPU的消耗最小
- 能够在不同的部署环境中提供更加一致的性能
- SleepingWaitStrategy:
- 性能以及对CPU的消耗和BlockingWaitStrategy差不多
- 对生产者线程影响最小
- 适合应用于异步日志等场景
- YieldingWaitStrategy:
- 性能最高
- 适合应用于低延迟的系统
- 在要求极高的性能并且事件处理线程个数小于CPU逻辑核心个数的场景中,推荐使用这个等待策略
- 比如CPU开启超线程的特性
- BlockingWaitStrategy:
启动Disruptor
EventFactory<Event> eventFactory = new EventFactory();
int ringBufferSize = 1024*1024;Disruptor<Event> disruptor = new Disruptor<Event>(eventFactory, ringBufferSize, executor, ProcedureType.SINGLE, blockingWaitStrategy);
EventHandler<Event> eventHandler = new EventHandle();
disruptor.handleEventsWith(eventHandler);
disruptor.start();
复制代码
发布事件
- Disruptor的事件Event的发布Publish过程是一个两阶段提交过程:
- 第一步: 先从RingBuffer获取下一个可以写入事件的序号
- 第二步: 获取对应的事件Event对象,将数据写入事件对象
- 第三步: 将事件提交到RingBuffer
- Disruptor中要求RingBuffer.publish()方法必须要被调用.也就是说,即使发生异常,也要执行publish()方法,这就要求调用者Producer在事件处理的实现上要判断携带的数据的正确性和完整性
关闭Disruptor
- disruptor.shutdown() : 关闭Disruptor. 方法会阻塞,直至所有的事件都得到处理
- executor.shutdown() : 关闭Disruptor使用的线程池. 如果线程池需要关闭,必须进行手动关闭 ,Disruptor在shutdown时不会自动关闭使用的线程池
Disruptor原理
核心概念
RingBuffer
- RingBuffer: 环形缓冲区
- RingBuffer从3.0开始,仅仅负责对通过Disruptor进行交换的事件数据进行存储和更新
- 在Disruptor的高级应用场景中 ,RingBuffer可以使用用户自定义的实现来替代
Sequence Disruptor
- 使用顺序递增的序号来编号管理通过Sequence Disruptor进行交换的事件数据,对事件的处理总是按着序号逐个递增进行处理
- 使用Sequence用于跟踪标识某个特定的事件处理者,包括RingBuffer和Consumer的处理进度
- 使用Sequence来标识进度可以防止不同的Sequence之间的CPU的缓存间的伪共享Flase Sharing问题
Sequencer
- Sequencer是Disruptor的核心
- Sequencer接口有两个实现类:
- SingleProducerSequencer
- MultiProducerSequencer
- 这是定义在生产者和消费者之间快速,正确传递数据的并发算法
Sequence Barrier
- 用于保持对RingBuffer的published Sequence和Consumer依赖的其余的Consumer的Sequence引用
- Sequence Barrier中定义了Consumer是否还有可处理的事件的逻辑
WaitStrategy
- WaitStrategy定义Consumer等待事件的策略
Event
- Disruptor中生产者Producer和消费者Consumer之间进行交换的数据叫做事件Event
- Event类型不是Disruptor定义的,而是由Disruptor的使用者来自定义指定
EventProcessor
- EventProcessor持有指定的消费者Consumer的Sequence, 并且提供用于调用事件处理实现的事件循环EventLoop
EventHandler
- Disruptor中定义的事件处理接口,由使用者实现,用于事件的具体处理,是消费者Consumer的真正实现
Producer
- Producer: 生产者. 泛指Disruptor发布事件的调用方.没有在Disruptor中定义特定的接口或者类型
内存预分配
- RingBuffer使用数组Object[] entries来存储元素:
- 初始化RingBuffer时,会将所有数组元素entries的指定为特定的事件Event参数,此时Event中的detail属性为null
- 生产者向RingBuffer写入消息时 ,RingBuffer不是直接将数组元素entries指向Event对象,而是先获取Event对象,更改Event对象中的detail属性
- 消费者在消费时,也是从RingBuffer中读取Event, 读取Event对象中的detail属性
- 由此可见,在生产和消费过程中 ,RingBuffer中的数组元素entries没有发生任何变化,没有产生临时对象,数组中的元素一直存活,直到RingBuffer消亡
- 通过以上方式,可以最小化JVM中的垃圾回收GC的频率,提升性能
private void fill(EventFactory<E> eventFactory) {for (int i = 0; i < bufferSize; i++) {// 使用工厂方法初始化数组中的entries元素entries[BUFFER_PAD + i] = eventFactory.newInstance(); }
}
复制代码
消除伪共享
- Disruptor中的伪共享: 如果两个相互独立的并发变量位于同一个缓存行时,在并发的情况下,会相互影响彼此的缓存有效性,进而影响并发操作的性能
- Disruptor中消除伪共享:
- Sequence.java中使用多个long变量填充,确保一个序号独占一个缓存行
private static class Padding {public long nextValue = Sequence.INITIAL_VALUE, cachedValue = Sequence.INITIAL_VALUE, p2, p3, p4, p5, p6, p7;
}
复制代码
消除锁和CAS操作
- Disruptor中,通过联合使用SequenceBarrier和Sequence, 协调和管理消费者和生产者之间的处理关系,避免了锁和CAS操作
- Disruptor中的各个消费者和生产者持有自己的序号Sequence, 序号Sequence需要满足以下条件:
- 条件一: 消费者的序号Sequence的数值必须小于生产者的序号Sequence的数值
- 条件二: 消费者的序号Sequence的数值必须小于依赖关系中前置的消费者的序号Sequence的数值
- 条件三: 生产者的序号Sequence的数值不能大于消费者正在消费的序号Sequence的数值,防止生产者速度过快,将还没有来得及消费的事件消息覆盖
- 条件一和条件二在SequenceBarrier中的waitFor() 方法中实现:
/*** 等待给定的序号值可以供消费者使用* * @param sequence 消费者期望获取的下一个序号值* @return long 可供消费者使用的序号的值*/
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {checkALert();/*** 根据指定的waitStrategy策略,等待期望的下一序号值可供使用* 这里不一定能保证返回值availableSequence一定和给定的参数sequence的值相等,两者的大小关系取决于使用的等待策略waitStrategy* - YieldingWaitStrategy : 自旋100次后,会直接返回dependentSequence中最小的序号sequence,这是不能保证返回的值大于等于给定的序号值* - BlockingWaitStrategy : 阻塞等待给定的序号sequence值可用为止,可用不是返回的值就等于给定的序号值,而是返回的值大于等于给定的序号值*/long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);// 如果当前可用的序号值小于给定的序号值,就返回当前可用的序号值,此时调用者EventProcessor会继续等待waitif (availableSequence < sequence) {return sequence;}// 批处理return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
复制代码
- 条件三是针对生产者建立的SequenceBarrier,逻辑判定发生在生产者从RingBuffer获取下一个可用的entry时,RingBuffer会将获取下一个可用的entry委托给Sequencer处理:
@Override
public long next() {if (n < 1) {throw new IllegalArgumentException("n must be > 0");}long nextValue = this.nextValue;// 下一个序号值等于当前序号值加上期望获取的序号数量long nextSequence = nextValue + n;// 使用下一个序号值减掉RingBuffer中的总量值bufferSize,来判断是否会发生覆盖long wrapPoint = nextSequence - bufferSize;/** cachedValue就是缓存的消费者中的最小序号值* cachedValue不是当前最新的消费者中最小序号值,而是上一次方法调用时进入到下面if条件判断时,被赋值的消费者中最小序号值* * 这样做可以在判定是否出现覆盖的时候,不需要每次都调用getMinimumSequence计算消费者中的最小序号值,从而节省开销。只要确保* 当生产者的值大于了缓存cachedGatingSequence一个bufferSize时,重新获取一下getMinimumSequence()即可*/long cachedGatingSequence = this.cachedValue;/** wrapPoint > cachedGatingSequence : 当生产者已经超过上一次缓存的消费者中的最小序号值cachedGatingSequence一个bufferSize大小时,需要重新获取cachedGatingSequence,防止生产者一直生产,消费者没有来得及消费时,发生覆盖的情况* cachedGatingSequence > nextValue : 生产者和消费者的序号值都是顺序递增的,并且生产者的序号Sequence是先于消费者Sequence,这里是先于而不是大于。对于nextValue的值大于了LONG.MAXVALUE时,此时nextValue + 1就会变为负数,wrapPoint值也会变为负数,此时必然cachedGatingSequence > nextValue。 getMinimumSequence()获取的是消费者中最小序号值,但不代表是走在最后的一个消费者*/if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {cursor.setVolatile(nextValue);long minSequence;while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {// 生产者阻塞,等待消费者消费,直到不会发生覆盖的情况继续向下执行LockSupport.parkNanos(1L);}this.cacheValue = minSequence;}this.nextValue = nextSequence;return nextSequence;
}
复制代码
批处理效应
- 当出现生产者比消费者过快时,消费者可以通过批处理效应来追赶生产者进度
- 消费者一次性从RingBuffer中获取多个已经准好的数组事件元素进行消费处理,从而提高消费效率
/*** 等待给定的序号值可以供消费者使用* * @param sequence 消费者期望获取的下一个序号值* @return long 可供消费者使用的序号的值*/
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {checkALert();long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);if (availableSequence < sequence) {return sequence;}/** 获取消费者可以消费的最大序列号,通过批处理来提升效率:* - 当availableSequence > sequence时,需要遍历序号sequence到序号availableSequence,获取到最前面一个准备就绪,可以进行消费的事件Event对应的序号sequence* - 最小值为sequence - 1 */return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
作者:攻城狮Chova
链接:https://juejin.cn/post/7085677820323561502
高性能并发队列Disruptor使用详解,详细解析Disruptor框架的应用和基本原理相关推荐
- P2P技术详解(一):NAT详解——详细原理、P2P简介(转)
这是一篇介绍NAT技术要点的精华文章,来自华3通信官方资料库,文中对NAT技术原理的介绍很全面也很权威,对网络应用的应用层开发人员而言有很高的参考价值. <P2P技术详解>系列文章 ➊ 本 ...
- 015. P2P技术详解(一):NAT详解——详细原理、P2P简介
http://www.52im.net/thread-50-1-1.html 这是一篇介绍NAT技术要点的精华文章,来自华3通信官方资料库,文中对NAT技术原理的介绍很全面也很权威,对网络应用的应用层 ...
- Java并发编程最佳实例详解系列
Java并发编程最佳实例详解系列: Java并发编程(一)线程定义.状态和属性 Java并发编程(一)线程定义.状态和属性 线程是指程序在执行过程中,能够执行程序代码的一个执行单元.在java语言中, ...
- C++11 并发指南三(Lock 详解)(转载)
multithreading 多线程 C++11 C++11多线程基本使用 C++11 并发指南三(Lock 详解) 在 <C++11 并发指南三(std::mutex 详解)>一文中我们 ...
- java 可见性_Java并发编程-volatile可见性详解
前言 要学习好Java的多线程,就一定得对volatile关键字的作用机制了熟于胸.最近博主看了大量关于volatile的相关博客,对其有了一点初步的理解和认识,下面通过自己的话叙述整理一遍. 有什么 ...
- AQS抽象队列同步器原理详解
系列文章目录 第一节 synchronized关键字详解-偏向锁.轻量级锁.偏向锁.重量级锁.自旋.锁粗化.锁消除 AQS抽象队列同步器原理详解 系列文章目录 前言 一.AQS特性 二.AQS原理 1 ...
- x264 代码重点详解 详细分析
eg mplayer x264 代码重点详解 详细分析 分类: ffmpeg 2012-02-06 09:19 4229人阅读 评论(1) 收藏 举报 h.264codecflv优化initializ ...
- Java并发编程之CyclicBarrier详解
简介 栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生.栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行.闭锁用于等待事件,而栅栏用于等待其他线程. CyclicBarrier ...
- python queue 查询是否在队列中_python队列Queue的详解
Queue Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递 基本FIFO队列 clas ...
- C++11 并发指南四(future 详解三 std::future std::shared_future)
上一讲<C++11 并发指南四(<future> 详解二 std::packaged_task 介绍)>主要介绍了 <future> 头文件中的 std::pack ...
最新文章
- django model filter_Django分表的两个方案
- 京东大型API网关实践之路
- 学python用ubuntu还是win10_Windows 10上使用Ubuntu的优点
- 4行代码AC——L1-026 I Love GPLT (5分)
- Python(5):循环
- mysql表空间被占用,同名表无法创建或导入
- 合振动的初相位推导_基于振动信号的机械设备故障诊断(一)
- LL1分析构造法_16条数学得分法,想提分快来看!
- Python type创建类
- cat /etc/redhat-release 查看centos版本
- StyleCop学习笔记——初识StyleCop
- 同态加密 应用案例 入门
- python操作数据库的两种方式
- Hillstone 防火墙流量命令
- pytorch(11)-- crnn 车牌端到端识别
- 计算机基础常见八股问题集合(含计算机网络,操作系统,计算机组成,数据结构与算法,数据库)
- c语言数组存在哪个地方,C语言数组考点归纳
- C语言|temp=a,a=b,b=temp;|同行语句可以用逗号隔开
- php源码后台密码被改了,帝国cms后台密码重置插件后台密码恢复找回密码工具
- 便签内容如何从旧手机转到新手机?
热门文章
- 矩形脉冲信号合成_矩形脉冲信号的分解和合成
- 服务器崩溃产生什么文件夹,我在我的src目录里有jiffy,但是当我运行时我的牛仔服务器崩溃了...
- 财务自由之路 笔记 第十二章-财务保障、财务安全
- 能耗监管系统在既有综合医院的建设与应用
- VS2013下串口数据char型转COleVariant问题
- 看5G时代,“一键喊话”的大喇叭如何奏响基层治理最强音
- java 流 压缩 开源_java压缩归档算法开源框架工具 compress
- 物理实验-单摆测重力加速度 (仿真实验大厅) 实验报告 重庆理工大学
- 施密特正交化 对矩阵QR分解
- java string转int 异常_Java中String转int型的方法以及错误处理