disruptor实现细节及源码分析
disruptor实现细节及源码分析
一、 背景介绍
Disruptor它是一个开源的并发框架,并获得 2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。
说明:下文所有内容基于disruptor3.34版本。
二、 应用场景
在消费者--生产者模式中或发布订阅模式中使用。
具有以下特点:
无锁的设计及CAS式的原子访问。
预分配存储空间,避免垃圾回收带来的资源消耗。
三、 核心对象
RingBuffer:环形的一个数据结构,对象初始化时,会使用事件Event进行填充。Buffer的大小必须是2的幂次方,方便移位操作。
Event:无指定具体接口,用户自己实现,可以携带任何业务数据。
EventFactory:产生事件Event的工厂,由用户自己实现。
EventTranslator:事件发布的回调接口,由用户实现,负责将业务参数设置到事件中。
Sequencer:序列产生器,也是协调生产者和消费者及实现高并发的核心。有MultiProducerSequencer 和 SingleProducerSequencer两个实现类。
SequenceBarrier:拥有RingBuffer的发布事件Sequence引用和消费者依赖的Sequence引用。决定消费者消费可消费的Sequence。
EventHandler:事件的处理者,由用户自己实现。
EventProcessor:事件的处理器,单独在一个线程中运行。
WorkHandler:事件的处理者,由用户自己实现。
WorkProcessor:事件的处理器,单独在一个线程中运行。
WorkerPool:一组WorkProcessor的处理。
WaitStrategy:在消费者比生产者快时,消费者处理器的等待策略。
四、 简单示例
定义业务数据类:
public class MyData {private long value;public MyData(long value){this.value = value;}public long getValue() {return value;}public void setValue(long value) {this.value = value;}public String toString(){StringBuffer sb = new StringBuffer();sb.append("value=").append(value);return sb.toString();} }
定义事件类:
public class MyEvent {private MyData data;public MyData getData() {return data;}public void setData(MyData data) {this.data = data;}}
定义事件处理类:
public class MyEventHandler implements EventHandler<MyEvent>{@Overridepublic void onEvent(MyEvent event, long sequence, boolean endOfBatch)throws Exception {System.out.println("事件处理:"+event.getData());}}
定义事件工厂类:
public class MyFactory implements EventFactory<MyEvent>{@Overridepublic MyEvent newInstance() {return new MyEvent();}}
定义事件发布辅助类:
public class MyEventTranslatorOneArg implements EventTranslatorOneArg<MyEvent,MyData>{@Overridepublic void translateTo(MyEvent event, long sequence, MyData data) {System.out.println("发布事件:"+data);event.setData(data);}}
主类:
public class MyMainTest {public static void main(String[]args){Disruptor<MyEvent> disruptor = newDisruptor<MyEvent>(new MyFactory(),//事件工厂128, //必须为2的幂次方new ThreadFactory(){//线程工厂@Overridepublic Thread newThread(Runnable runnable) {returnnew Thread(runnable);}},ProducerType.SINGLE,//指定生产者为一个或多个newYieldingWaitStrategy());//等待策略//指定处理器disruptor.handleEventsWith(newMyEventHandler());disruptor.start();//发布事件MyEventTranslatorOneArg translator = newMyEventTranslatorOneArg();for(int i = 0; i < 10; i++){disruptor.publishEvent(translator, new MyData(i));}disruptor.shutdown();} }
五、 实现原理及源码分析
RingBuffer的实现:
封装了一个对象数组,RingBuffer实例化时,用Event填充。生产者和消费者通过对序列(long的原子操作封装)取模计算获取对象数组中Event。
public E get(long sequence){return elementAt(sequence); }protected final E elementAt(long sequence){return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));}
单个生产者的实现:
保存有所有消费者当前消费的前一个序列值,在取下一个要发布的序列时,检查要发布的序列是否覆盖所有消费者正在处理的最小序列。如果未覆盖,则获取可发布的游标值,如果覆盖(说明缓存已经满了),则自旋等待,直到可以发布。发布事件时则先发布,后指定当前游标为发布的序列值。
public long next(int n){if (n < 1){thrownew IllegalArgumentException("n must be > 0");}//当前生产者发布的的最大序列long nextValue = this.nextValue;long nextSequence = nextValue + n;//要发布的最大序列long wrapPoint = nextSequence - bufferSize;//覆盖点long cachedGatingSequence = this.cachedValue;//消费者中处理序列最小的前一个序列//缓存已满 或者处理器处理异常时if (wrapPoint > cachedGatingSequence ||cachedGatingSequence > nextValue){long minSequence;//等待直到有可用的缓存while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences,nextValue))){LockSupport.parkNanos(1L);// TODO: Use waitStrategy to spin?}this.cachedValue = minSequence;}//更新当前生产者发布的的最大序列this.nextValue = nextSequence;return nextSequence;}
多个生产者的实现:
保存有所有消费者当前消费的前一个序列值,并维护一个和RingBuffer一样大小的数组,在取下一个要发布的序列时,检查要发布的序列是否覆盖所有消费者正在处理的最小序列。如果未覆盖,则先发布,后指定当前游标为发布的序列值,如果未覆盖,则获取可发布的游标值,如果覆盖(说明缓存已经满了),则自旋等待,直到可以发布。一个生产者获取可发布的序列后,立即更新当前游标。发布事件时生产者每发布一个序列,则记录到数组指定位置。
public long next(int n){if (n < 1){thrownew IllegalArgumentException("n must be > 0");}long current;long next;do{//当前游标current = cursor.get();//要发布的游标next = current + n;//覆盖点long wrapPoint = next - bufferSize;//消费者中处理序列最小的前一个序列long cachedGatingSequence = gatingSequenceCache.get();//缓存已满 或者处理器处理异常时if (wrapPoint > cachedGatingSequence ||cachedGatingSequence > current){long gatingSequence = Util.getMinimumSequence(gatingSequences, current);if (wrapPoint > gatingSequence){LockSupport.parkNanos(1);// TODO, should we spin based on the waitstrategy?//缓存满时,继续再次尝试continue;}//更新当前生产者发布的的最大序列gatingSequenceCache.set(gatingSequence);}elseif (cursor.compareAndSet(current, next)){//成功获取到发布序列并设置当前游标成功时跳出循环break;}}while (true);return next;}
消费者的实现:
消费者保持一个自己的序列,每次累加后nextSequence,去获取可访问的最大序列。对于一个生产者,就是nextSequence到RingBuffer当前游标的序列。对于多个生产者,就是nextSequence到RingBuffer当前游标之间,最大的连续的序列集。
public long waitFor(finallong sequence)throws AlertException,InterruptedException, TimeoutException{checkAlert();//获取最大的可消费的序列,依赖等待策略long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);if (availableSequence < sequence){return availableSequence;}return sequencer.getHighestPublishedSequence(sequence,availableSequence);}
一个生产者:
public long getHighestPublishedSequence(long lowerBound, long availableSequence){// 返回最大序列availableSequencereturn availableSequence; }
多个生产者:
public boolean isAvailable(long sequence){int index = calculateIndex(sequence);int flag = calculateAvailabilityFlag(sequence);long bufferAddress = (index * SCALE) + BASE;//相应位置上的值相等,说明已经发布该序列returnUNSAFE.getIntVolatile(availableBuffer,bufferAddress) == flag;}@Overridepublic long getHighestPublishedSequence(long lowerBound, long availableSequence){//从数组中找出未发布序列,即由小到大连续的发布序列for (long sequence = lowerBound; sequence <= availableSequence;sequence++){if (!isAvailable(sequence)){//返回未发布序列的前一个序列return sequence - 1;}}return availableSequence;}
等待策略:
消费者在缓存中没有可以消费的事件时,采取的等待策略:
BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒
BusySpinWaitStrategy:线程一直自旋等待,比较耗CPU。
LiteBlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,比BlockingWaitStrategy要轻,某些情况下可以减少阻塞的次数。
PhasedBackoffWaitStrategy:根据指定的时间段参数和指定的等待策略决定采用哪种等待策略。
SleepingWaitStrategy:可通过参数设置,使线程通过Thread.yield()主动放弃执行,通过线程调度器重新调度;或一直自旋等待。
TimeoutBlockingWaitStrategy:通过参数设置阻塞时间,如果超时则抛出异常。
YieldingWaitStrategy: 通过Thread.yield()主动放弃执行,通过线程调度器重新调度。
转载于:https://blog.51cto.com/11246272/1745472
disruptor实现细节及源码分析相关推荐
- java disruptor压测_Java并发框架Disruptor实现原理与源码分析(二) 缓存行填充与CAS操作...
##缓存行填充 关于缓存行填充在我个人的印象里面第一次看到是在Java的java.util.concurrent包中,因为当时很好奇其用法背后的逻辑,所以查了很多资料才明白到底是怎么回事*(也许事实上 ...
- Vuex 2.0 源码分析
作者:滴滴公共前端团队 - 黄轶 大家好,我叫黄轶,来自滴滴公共前端团队,我们团队最近写了一本书 --<Vue.js 权威指南>,内容丰富,由浅入深.不过有一些同学反馈说缺少 Vuex 的 ...
- kazoo源码分析:服务器交互的实现细节
kazoo源码分析 kazoo-2.6.1 kazoo客户端与服务器概述 上文start概述中,只是简单的概述了kazoo客户端初始化之后,调用了start方法,本文继续详细的了解相关的细节. kaz ...
- 【Java 并发编程】线程池机制 ( 线程池执行任务细节分析 | 线程池执行 execute 源码分析 | 先创建核心线程 | 再放入阻塞队列 | 最后创建非核心线程 )
文章目录 一.线程池执行任务细节分析 二.线程池执行 execute 源码分析 一.线程池执行任务细节分析 线程池执行细节分析 : 核心线程数 101010 , 最大小成熟 202020 , 非核心线 ...
- 单例模式在JDK 应用的源码分析||单例模式注意事项和细节说明
单例模式在JDK 应用的源码分析 单例模式在JDK 应用的源码分析 1) 我们JDK中,java.lang.Runtime就是经典的单例模式(饿汉式) 2) 代码分析+Debug源码+代码说明 单例模 ...
- SpringBoot-web开发(四): SpringMVC的拓展、接管(源码分析)
[SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) SpringBoot-web开发(二): 页面和图标定制(源码分析) SpringBo ...
- Spark源码分析之七:Task运行(一)
在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...
- Journey源码分析三:模板编译
2019独角兽企业重金招聘Python工程师标准>>> 在Journey源码分析二:整体启动流程中提到了模板编译,这里详细说下启动流程 看下templates.Generate()源 ...
- Java并发基础:了解无锁CAS就从源码分析
CAS的全称为Compare And Swap,直译就是比较交换.是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值,其实现方式是基于硬件平台的汇编指令,在i ...
最新文章
- python字符串find函数实现_python中实现查找字符串的find函数
- Leetcode PHP题解--D57 762. Prime Number of Set Bits in Binary Representation
- java和C操作数组的一个小区别
- 在SQL Server 2005中解决死锁(转)
- 2017.10.25水题大作战题解
- 洛克人红色思考型机器人叫什么_如何让机器人“好好说话”?
- POJ 2117 Electricity 割点 Tarjan算法
- 【Computer Organization笔记24】光盘,FLASH MEMORY,本单元总结
- 关于使用DFS,BFS的一些思考总结
- keil uvision4完整破解版下载
- 如何将docker部署的wekan迁移另一台服务器
- iOS Instrument
- fastjson解析json文本
- 球半篮球比分,西篮甲:沙萨基 VS 华伦西亚 5月31日
- 高三学生早恋怎么处理?家长该怎么做?
- 松果出行 x StarRocks:实时数仓新范式的实践之路
- Scheduled定时任务的使用
- mp4box工具包下载
- python绘制柱状图横向显示_Python实现绘制双柱状图并显示数值功能示例
- Ansys Zemax | 眼科镜片设计