disruptor实现细节及源码分析

一、     背景介绍

Disruptor它是一个开源的并发框架,并获得 2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。

说明:下文所有内容基于disruptor3.34版本。

二、     应用场景

在消费者--生产者模式中或发布订阅模式中使用。

具有以下特点:

  1. 无锁的设计及CAS式的原子访问。

  2. 预分配存储空间,避免垃圾回收带来的资源消耗。

    三、     核心对象

RingBuffer:环形的一个数据结构,对象初始化时,会使用事件Event进行填充。Buffer的大小必须是2的幂次方,方便移位操作。

Event:无指定具体接口,用户自己实现,可以携带任何业务数据。

EventFactory:产生事件Event的工厂,由用户自己实现。

EventTranslator:事件发布的回调接口,由用户实现,负责将业务参数设置到事件中。

Sequencer:序列产生器,也是协调生产者和消费者及实现高并发的核心。有MultiProducerSequencer 和 SingleProducerSequencer两个实现类。

SequenceBarrier:拥有RingBuffer的发布事件Sequence引用和消费者依赖的Sequence引用。决定消费者消费可消费的Sequence。

EventHandler:事件的处理者,由用户自己实现。

EventProcessor:事件的处理器,单独在一个线程中运行。

WorkHandler:事件的处理者,由用户自己实现。

WorkProcessor:事件的处理器,单独在一个线程中运行。

WorkerPool:一组WorkProcessor的处理。

WaitStrategy:在消费者比生产者快时,消费者处理器的等待策略。

四、     简单示例

  • 定义业务数据类:

public class MyData {private long value;public MyData(long value){this.value = value;}public long getValue() {return value;}public void setValue(long value) {this.value = value;}public String toString(){StringBuffer sb = new StringBuffer();sb.append("value=").append(value);return sb.toString();}
}
  • 定义事件类:

public class MyEvent {private MyData data;public MyData getData() {return data;}public void setData(MyData data) {this.data = data;}}
  • 定义事件处理类:

public class MyEventHandler implements EventHandler<MyEvent>{@Overridepublic void onEvent(MyEvent event, long sequence, boolean endOfBatch)throws Exception {System.out.println("事件处理:"+event.getData());}}
  • 定义事件工厂类:

public class MyFactory implements EventFactory<MyEvent>{@Overridepublic MyEvent newInstance() {return new MyEvent();}}
  • 定义事件发布辅助类:

public class MyEventTranslatorOneArg implements EventTranslatorOneArg<MyEvent,MyData>{@Overridepublic void translateTo(MyEvent event, long sequence, MyData data) {System.out.println("发布事件:"+data);event.setData(data);}}
  • 主类:

public class MyMainTest {public static void main(String[]args){Disruptor<MyEvent> disruptor = newDisruptor<MyEvent>(new MyFactory(),//事件工厂128, //必须为2的幂次方new ThreadFactory(){//线程工厂@Overridepublic Thread newThread(Runnable runnable) {returnnew Thread(runnable);}},ProducerType.SINGLE,//指定生产者为一个或多个newYieldingWaitStrategy());//等待策略//指定处理器disruptor.handleEventsWith(newMyEventHandler());disruptor.start();//发布事件MyEventTranslatorOneArg translator = newMyEventTranslatorOneArg();for(int i = 0; i < 10; i++){disruptor.publishEvent(translator, new MyData(i));}disruptor.shutdown();}
}

五、     实现原理及源码分析

  • RingBuffer的实现:

封装了一个对象数组,RingBuffer实例化时,用Event填充。生产者和消费者通过对序列(long的原子操作封装)取模计算获取对象数组中Event。

public E get(long sequence){return elementAt(sequence);
}protected final E elementAt(long sequence){return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));}
  • 单个生产者的实现:

保存有所有消费者当前消费的前一个序列值,在取下一个要发布的序列时,检查要发布的序列是否覆盖所有消费者正在处理的最小序列。如果未覆盖,则获取可发布的游标值,如果覆盖(说明缓存已经满了),则自旋等待,直到可以发布。发布事件时则先发布,后指定当前游标为发布的序列值。

public long next(int n){if (n < 1){thrownew IllegalArgumentException("n must be > 0");}//当前生产者发布的的最大序列long nextValue = this.nextValue;long nextSequence = nextValue + n;//要发布的最大序列long wrapPoint = nextSequence - bufferSize;//覆盖点long cachedGatingSequence = this.cachedValue;//消费者中处理序列最小的前一个序列//缓存已满  或者处理器处理异常时if (wrapPoint > cachedGatingSequence ||cachedGatingSequence > nextValue){long minSequence;//等待直到有可用的缓存while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences,nextValue))){LockSupport.parkNanos(1L);// TODO: Use waitStrategy to spin?}this.cachedValue = minSequence;}//更新当前生产者发布的的最大序列this.nextValue = nextSequence;return nextSequence;}
  • 多个生产者的实现:

保存有所有消费者当前消费的前一个序列值,并维护一个和RingBuffer一样大小的数组,在取下一个要发布的序列时,检查要发布的序列是否覆盖所有消费者正在处理的最小序列。如果未覆盖,则先发布,后指定当前游标为发布的序列值,如果未覆盖,则获取可发布的游标值,如果覆盖(说明缓存已经满了),则自旋等待,直到可以发布。一个生产者获取可发布的序列后,立即更新当前游标。发布事件时生产者每发布一个序列,则记录到数组指定位置。

public long next(int n){if (n < 1){thrownew IllegalArgumentException("n must be > 0");}long current;long next;do{//当前游标current = cursor.get();//要发布的游标next = current + n;//覆盖点long wrapPoint = next - bufferSize;//消费者中处理序列最小的前一个序列long cachedGatingSequence = gatingSequenceCache.get();//缓存已满  或者处理器处理异常时if (wrapPoint > cachedGatingSequence ||cachedGatingSequence > current){long gatingSequence = Util.getMinimumSequence(gatingSequences, current);if (wrapPoint > gatingSequence){LockSupport.parkNanos(1);// TODO, should we spin based on the waitstrategy?//缓存满时,继续再次尝试continue;}//更新当前生产者发布的的最大序列gatingSequenceCache.set(gatingSequence);}elseif (cursor.compareAndSet(current, next)){//成功获取到发布序列并设置当前游标成功时跳出循环break;}}while (true);return next;}
  • 消费者的实现:

消费者保持一个自己的序列,每次累加后nextSequence,去获取可访问的最大序列。对于一个生产者,就是nextSequence到RingBuffer当前游标的序列。对于多个生产者,就是nextSequence到RingBuffer当前游标之间,最大的连续的序列集。

public long waitFor(finallong sequence)throws AlertException,InterruptedException, TimeoutException{checkAlert();//获取最大的可消费的序列,依赖等待策略long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);if (availableSequence < sequence){return availableSequence;}return sequencer.getHighestPublishedSequence(sequence,availableSequence);}

一个生产者:

public long getHighestPublishedSequence(long lowerBound, long availableSequence){// 返回最大序列availableSequencereturn availableSequence;
}

多个生产者:

public boolean isAvailable(long sequence){int index = calculateIndex(sequence);int flag = calculateAvailabilityFlag(sequence);long bufferAddress = (index * SCALE) + BASE;//相应位置上的值相等,说明已经发布该序列returnUNSAFE.getIntVolatile(availableBuffer,bufferAddress) == flag;}@Overridepublic long getHighestPublishedSequence(long lowerBound, long availableSequence){//从数组中找出未发布序列,即由小到大连续的发布序列for (long sequence = lowerBound; sequence <= availableSequence;sequence++){if (!isAvailable(sequence)){//返回未发布序列的前一个序列return sequence - 1;}}return availableSequence;}
  • 等待策略:

消费者在缓存中没有可以消费的事件时,采取的等待策略:

BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒

BusySpinWaitStrategy:线程一直自旋等待,比较耗CPU。

LiteBlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,比BlockingWaitStrategy要轻,某些情况下可以减少阻塞的次数。

PhasedBackoffWaitStrategy:根据指定的时间段参数和指定的等待策略决定采用哪种等待策略。

SleepingWaitStrategy:可通过参数设置,使线程通过Thread.yield()主动放弃执行,通过线程调度器重新调度;或一直自旋等待。

TimeoutBlockingWaitStrategy:通过参数设置阻塞时间,如果超时则抛出异常。

YieldingWaitStrategy: 通过Thread.yield()主动放弃执行,通过线程调度器重新调度。

转载于:https://blog.51cto.com/11246272/1745472

disruptor实现细节及源码分析相关推荐

  1. java disruptor压测_Java并发框架Disruptor实现原理与源码分析(二) 缓存行填充与CAS操作...

    ##缓存行填充 关于缓存行填充在我个人的印象里面第一次看到是在Java的java.util.concurrent包中,因为当时很好奇其用法背后的逻辑,所以查了很多资料才明白到底是怎么回事*(也许事实上 ...

  2. Vuex 2.0 源码分析

    作者:滴滴公共前端团队 - 黄轶 大家好,我叫黄轶,来自滴滴公共前端团队,我们团队最近写了一本书 --<Vue.js 权威指南>,内容丰富,由浅入深.不过有一些同学反馈说缺少 Vuex 的 ...

  3. kazoo源码分析:服务器交互的实现细节

    kazoo源码分析 kazoo-2.6.1 kazoo客户端与服务器概述 上文start概述中,只是简单的概述了kazoo客户端初始化之后,调用了start方法,本文继续详细的了解相关的细节. kaz ...

  4. 【Java 并发编程】线程池机制 ( 线程池执行任务细节分析 | 线程池执行 execute 源码分析 | 先创建核心线程 | 再放入阻塞队列 | 最后创建非核心线程 )

    文章目录 一.线程池执行任务细节分析 二.线程池执行 execute 源码分析 一.线程池执行任务细节分析 线程池执行细节分析 : 核心线程数 101010 , 最大小成熟 202020 , 非核心线 ...

  5. 单例模式在JDK 应用的源码分析||单例模式注意事项和细节说明

    单例模式在JDK 应用的源码分析 单例模式在JDK 应用的源码分析 1) 我们JDK中,java.lang.Runtime就是经典的单例模式(饿汉式) 2) 代码分析+Debug源码+代码说明 单例模 ...

  6. SpringBoot-web开发(四): SpringMVC的拓展、接管(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) SpringBoot-web开发(二): 页面和图标定制(源码分析) SpringBo ...

  7. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  8. Journey源码分析三:模板编译

    2019独角兽企业重金招聘Python工程师标准>>> 在Journey源码分析二:整体启动流程中提到了模板编译,这里详细说下启动流程 看下templates.Generate()源 ...

  9. Java并发基础:了解无锁CAS就从源码分析

    CAS的全称为Compare And Swap,直译就是比较交换.是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值,其实现方式是基于硬件平台的汇编指令,在i ...

最新文章

  1. python字符串find函数实现_python中实现查找字符串的find函数
  2. Leetcode PHP题解--D57 762. Prime Number of Set Bits in Binary Representation
  3. java和C操作数组的一个小区别
  4. 在SQL Server 2005中解决死锁(转)
  5. 2017.10.25水题大作战题解
  6. 洛克人红色思考型机器人叫什么_如何让机器人“好好说话”?
  7. POJ 2117 Electricity 割点 Tarjan算法
  8. 【Computer Organization笔记24】光盘,FLASH MEMORY,本单元总结
  9. 关于使用DFS,BFS的一些思考总结
  10. keil uvision4完整破解版下载
  11. 如何将docker部署的wekan迁移另一台服务器
  12. iOS Instrument
  13. fastjson解析json文本
  14. 球半篮球比分,西篮甲:沙萨基 VS 华伦西亚 5月31日
  15. 高三学生早恋怎么处理?家长该怎么做?
  16. 松果出行 x StarRocks:实时数仓新范式的实践之路
  17. Scheduled定时任务的使用
  18. mp4box工具包下载
  19. python绘制柱状图横向显示_Python实现绘制双柱状图并显示数值功能示例
  20. Ansys Zemax | 眼科镜片设计

热门文章

  1. bond-vlan-bridge
  2. spring YML属性提示
  3. 借鉴开源框架自研日志收集系统
  4. “突破•重塑”2017年数据中心设施讨论
  5. 百分点宣布完成C轮融资2500万美元 将进一步开放云平台应用
  6. 英国政府发布5G政策文件
  7. 突然想起来,前天是感恩节。
  8. python里unexpected eof while parsing_使用Python编程时的10个注意事项
  9. jupyterlab debugger+显示图片
  10. Angular cli 发布自定义组件