首先,我们还是编写一个测试例子吧,根据测试例子去分析代码更为直观:

public static void main(String[] args){EventFactory<EventObject> myEventFactory = new EventObjectFactory();//1ThreadFactory threadFactory = new StubThreadFactory();//2int ringBufferSize = 4;//3Disruptor<EventObject> disruptor = new Disruptor<EventObject>(myEventFactory,ringBufferSize,threadFactory, ProducerType.SINGLE,new BlockingWaitStrategy());//4EventHandler<EventObject> b = new ConsumerB();//5EventHandler<EventObject> c = new ConsumerC();//6EventHandler<EventObject> d = new ConsumerD();//7SequenceBarrier sequenceBarrier = disruptor.handleEventsWith(b,c).asSequenceBarrier();//8BatchEventProcessor processord = new BatchEventProcessor(disruptor.getRingBuffer(),sequenceBarrier,d);//9disruptor.handleEventsWith(processord);//10RingBuffer<EventObject> ringBuffer = disruptor.start();//11for(int i=0; i<10; i++) {//12long sequence = ringBuffer.next();//13try {EventObject myEvent = ringBuffer.get(sequence);//14myEvent.setValue(i);//15} finally {ringBuffer.publish(sequence);//16}SingleProducerSequencer singleProducerSequencer = (SingleProducerSequencer)(disruptor.getRingBuffer().getSequencer());System.out.println("producer:"+singleProducerSequencer.getProducerSequence()+"---consumer:"+singleProducerSequencer.getGatingSequences());try{Thread.sleep(100);}catch (Exception e){}}disruptor.shutdown();}

我们的disruptor版本为3.3.5,它是基于事件来进行处理任务的,下面我们逐行分析上面的代码,

第1行,EventFactory用于产生EventObject实例,是Disruptor构造函数的一个入参

第2行,threadFactory用于disruptor构造函数

第3行,ringbuffer大小

第4行,disruptor构造函数,

public Disruptor(final EventFactory<T> eventFactory,final int ringBufferSize,final ThreadFactory threadFactory,final ProducerType producerType,final WaitStrategy waitStrategy){this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),new BasicExecutor(threadFactory));}private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor){this.ringBuffer = ringBuffer;this.executor = executor;}

这里我们主要看下RingBuffer是如何创建的,这里我们的例子是模拟单个生产者的情况,所以看下单生产者下RingBuffer如何创建的

public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory,int bufferSize,WaitStrategy waitStrategy){SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);return new RingBuffer<E>(factory, sequencer);}

首先看下SingleProducerSequener,构造函数的入参有两个,最终构造函数会进入到AbstractSequencer的构造器上。AbstractSequencer是一个抽象类,这里我们重点看下其成员,

private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");protected final int bufferSize;protected final WaitStrategy waitStrategy;protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);//代表着生产者对应的Sequence,初始的时候为-1,放入一个元素有这个值为0protected volatile Sequence[] gatingSequences = new Sequence[0];//代表着消费者对应的Sequence,有多少个消费者就有多少个对应的Sequence

可以看到一个SingleProducerSequencer里面包含了cursor和gatingSequences,我理解cursor就是生产者生产的元素该放入到哪个位置,gatingSequences就表示消费者消费到哪个位置,SEQUENCE_UPDATER用于对gatingSequences进行原子更新,这几个成员非常重要,可以在下文中看到其具体用处.这里我们需要看下Sequence类究竟长啥样:

class LhsPadding
{protected long p1, p2, p3, p4, p5, p6, p7;
}class Value extends LhsPadding
{protected volatile long value;
}class RhsPadding extends Value
{protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding
{static final long INITIAL_VALUE = -1L;private static final Unsafe UNSAFE;private static final long VALUE_OFFSET;static{UNSAFE = Util.getUnsafe();try{VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));}catch (final Exception e){throw new RuntimeException(e);}}
}

这里在Value字段前后也做了填充,也是为了处理伪共享问题,其他的相关set操作都是使用的Unsafe类.

接下来我们再看下这个构造函数中干了啥,

public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy){if (bufferSize < 1){throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1){throw new IllegalArgumentException("bufferSize must be a power of 2");}this.bufferSize = bufferSize;this.waitStrategy = waitStrategy;}

可以看到bufferSize要求是2的次方,为什么要有这个限制呢?这是因为普通的取余操作是比较耗性能的,限制为2的次方后,可以通过按位操作达到同样的效果.至此,SingleProducerSequencer的构造函数我们看完了,接下来看下RingBuffer的构造函数,最终是调用RingBufferFields的构造函数

RingBufferFields(EventFactory<E> eventFactory,Sequencer sequencer){this.sequencer = sequencer;this.bufferSize = sequencer.getBufferSize();if (bufferSize < 1){throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1){throw new IllegalArgumentException("bufferSize must be a power of 2");}this.indexMask = bufferSize - 1;/*** 申请的数组 entries 实际大小为 bufferSize + 2 * BUFFER_PAD,BUFFER_PAD 个数组元素占用 128 字节,也就是说在数组前后各加了 128 字节的填充,这主要是为了防止伪共享。*/this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];fill(eventFactory);}
private void fill(EventFactory<E> eventFactory){for (int i = 0; i < bufferSize; i++){entries[BUFFER_PAD + i] = eventFactory.newInstance();}}

这里可以看到entries是一个数组,看上面的注释解释了为啥大小是bufferSize+2*BUFFER_PAD,所以实际使用中,entries[32...63]才是被EventObject填充过。

第5-7行实例化了三个消费者

第8行,首先看下handleEventWith这个方法

public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){return createEventProcessors(new Sequence[0], handlers);}
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,final EventHandler<? super T>[] eventHandlers){checkNotStarted();final Sequence[] processorSequences = new Sequence[eventHandlers.length];final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++){final EventHandler<? super T> eventHandler = eventHandlers[i];final BatchEventProcessor<T> batchEventProcessor =new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);if (exceptionHandler != null){batchEventProcessor.setExceptionHandler(exceptionHandler);}consumerRepository.add(batchEventProcessor, eventHandler, barrier);processorSequences[i] = batchEventProcessor.getSequence();}if (processorSequences.length > 0){ringBuffer.addGatingSequences(processorSequences);for (final Sequence barrierSequence : barrierSequences){ringBuffer.removeGatingSequence(barrierSequence);}consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);}return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);}

上面的代码我们主要关注几行重要的代码:

final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences),注意这里的barrierSequences是个空数组,实现如下:

public SequenceBarrier newBarrier(Sequence... sequencesToTrack){return sequencer.newBarrier(sequencesToTrack);//这里的sequencer就是前面实例化SingleProducerSequencer的一个对象}
public ProcessingSequenceBarrier(final Sequencer sequencer,final WaitStrategy waitStrategy,final Sequence cursorSequence,final Sequence[] dependentSequences){this.sequencer = sequencer;//SingleProducerSequencerthis.waitStrategy = waitStrategy;this.cursorSequence = cursorSequence;//生产者对应的Sequenceif (0 == dependentSequences.length){dependentSequence = cursorSequence;//与生产者相同的sequence}else{dependentSequence = new FixedSequenceGroup(dependentSequences);//被依赖的sequence,}}

SequenceBarrier其实就是一个ProcessingSequenceBarrier实例,可以看到此时的dependentSequence就是生产者的sequence,即生产者一旦生产了数据,立马就可以进行消费。

final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler),我们来看下这个构造函数:

//成员变量
private final AtomicBoolean running = new AtomicBoolean(false);private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();private final DataProvider<T> dataProvider;private final SequenceBarrier sequenceBarrier;private final EventHandler<? super T> eventHandler;private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);//消费者初始化的Sequenceprivate final TimeoutHandler timeoutHandler;
//构造函数
public BatchEventProcessor(final DataProvider<T> dataProvider,//就是我们的RingBufferfinal SequenceBarrier sequenceBarrier,//刚产生的ProcessingSequenceBarrierfinal EventHandler<? super T> eventHandler){this.dataProvider = dataProvider;//this.sequenceBarrier = sequenceBarrier;this.eventHandler = eventHandler;if (eventHandler instanceof SequenceReportingEventHandler){((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);}timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;}

从这里我们可以看到,每个消费者都持有一个Sequence。

ringBuffer.addGatingSequences(processorSequences),这里也是一个比较关键的地方,processorSequences保存了消费者的Sequence,我们来看下实现,

public void addGatingSequences(Sequence... gatingSequences){sequencer.addGatingSequences(gatingSequences);//SingleProducerSequencer}
@Override
public final void addGatingSequences(Sequence... gatingSequences)
{SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}

还记得上面所说的SEQUENCE_UPDATER吧,这里就用上了,接下来我们进入这个方法:

static <T> void addSequences(final T holder,final AtomicReferenceFieldUpdater<T, Sequence[]> updater,final Cursored cursor,final Sequence... sequencesToAdd){long cursorSequence;Sequence[] updatedSequences;Sequence[] currentSequences;do{currentSequences = updater.get(holder);updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);cursorSequence = cursor.getCursor();int index = currentSequences.length;for (Sequence sequence : sequencesToAdd){sequence.set(cursorSequence);updatedSequences[index++] = sequence;}}while (!updater.compareAndSet(holder, currentSequences, updatedSequences));cursorSequence = cursor.getCursor();for (Sequence sequence : sequencesToAdd){sequence.set(cursorSequence);}}

它的作用其实就是将消费者的Sequence保存到对应的变量gatingSeqeuces这个数组中。

回到我们的测试程序,我们再看下asSequenceBarrier方法:

public SequenceBarrier asSequenceBarrier(){return disruptor.getRingBuffer().newBarrier(sequences);}

同样是调用RingBuffer的newBarrier方法,与之前不同的是这里有dependentSequence,即消费者使用这个barrier时,需要等到dependentSequence对一你个的消费者消费后才能消费。

第9行,新建了一个实际的消费处理者BatchEventProcessor,

第10行,参见上面的handleEventsWith函数,这里会将新的消费者的sequence保存到SingleProducerSequencer的gatingSequences中

第11行,启动消费者,其处理逻辑主要是在BatchEventProcessor的run方法中,

public void run(){if (!running.compareAndSet(false, true))//将running标志设置为true{throw new IllegalStateException("Thread is already running");}sequenceBarrier.clearAlert();//设置ProcessingSequenceBarrier.alerted=falsenotifyStart();//这里暂时没有用上T event = null;long nextSequence = sequence.get() + 1L;//这里的sequence是每个消费者所持有的,初始值为-1,这里的nextSequence就是下一个要消费的位置try{while (true){try{final long availableSequence = sequenceBarrier.waitFor(nextSequence);while (nextSequence <= availableSequence){event = dataProvider.get(nextSequence);eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);nextSequence++;}sequence.set(availableSequence);}catch (final TimeoutException e){notifyTimeout(sequence.get());}catch (final AlertException ex){if (!running.get()){break;}}catch (final Throwable ex){exceptionHandler.handleEventException(ex, nextSequence, event);sequence.set(nextSequence);nextSequence++;}}}finally{notifyShutdown();running.set(false);}}

这里我们首先看下final long availableSequence = sequenceBarrier.waitFor(nextSequence)这行,看下waitfor方法的实现:

public long waitFor(final long sequence)throws AlertException, InterruptedException, TimeoutException{checkAlert();long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);if (availableSequence < sequence){return availableSequence;}return sequencer.getHighestPublishedSequence(sequence, availableSequence);}

这里,我们的等待策略是BlockingWaitStrategy,看下其waitfor的实现:

public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)throws AlertException, InterruptedException{long availableSequence;if (cursorSequence.get() < sequence){lock.lock();try{while (cursorSequence.get() < sequence){barrier.checkAlert();processorNotifyCondition.await();}}finally{lock.unlock();}}while ((availableSequence = dependentSequence.get()) < sequence){barrier.checkAlert();}return availableSequence;}

消费者启动后,生产者还没有生产数据,所以此时的cursorSequence的值为-1,而我们需要消费的位置为0,当生产者Sequence小于消费者的sequence时,此时会进入等待,当生产者有数据来时,会将其唤醒。当被唤醒后,还需要看下依赖的sequence是否消费过了,否则还是要等待。

接下来就是开始消费,这里我们需要看下RingBuffer的get方法,

protected final E elementAt(long sequence){return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));}

这里的entries就是那个数组,REF_ARRAY_BASE就是实际元素的起始地址,这里我们可以看出RingBuffer底层本质上是一个循环数组(队列)!

接下来设置当前消费者消费到哪个位置了。再回到测试程序,我们看下生产者,

第13行,获取下一个位置,最终是调用SingleProducerSequencer.next方法:

public long next(int n){if (n < 1){throw new IllegalArgumentException("n must be > 0");}long nextValue = this.nextValue;long nextSequence = nextValue + n;long wrapPoint = nextSequence - bufferSize;long cachedGatingSequence = this.cachedValue;if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue){cursor.setVolatile(nextValue);  // StoreLoad fencelong minSequence;while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))){waitStrategy.signalAllWhenBlocking();LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?}this.cachedValue = minSequence;}this.nextValue = nextSequence;return nextSequence;}

这里我们的n为1,表示获取下一个位置。wrapPoint和cachedGatingSequence用于判断当前队列的状况,比如队列是否是满的。举个例子,我们的RingBuffer大小为4,所以第一圈结束后,这个cachedValue=3,第二圈结束后这个值为7,以此类推。当队列中没有空的位置给生产者时,会调用park方法将生产者阻塞1纳秒。

第16行,看下publish方法,

public void publish(long sequence){cursor.set(sequence);waitStrategy.signalAllWhenBlocking();}

设置当前生产者的sequence,表示这个位置已经产生了元素,可以进行消费了,然后唤醒等待的消费者。

至此,相关的代码已分析完毕,我们来总结下:

1,RingBuffer和Sequence在实现上都是借助于juc并发工具CAS,LockSupport等以及内存操作Unsafe类来实现高效操作

2,RingBuffer本质上是一个循环队列,采用了防伪共享的操作,利用Sequence实现了无锁的高效访问,它持有生产者和消费者的sequence

3,disruptor把cpu的有关缓存行,伪共享,内存屏障等等体现的很好

4,生产者通过Sequencer的大部分功能来使用序列。

5,消费者者通过SequenceBarrier来使用序列

6,从gc上看,RingBuffer通过填充eventObject的方式(底层是个数组,实际要操作的成员在eventObject中),极大的避免了gc带来的性能影响,这点相对于链表实现有极大的优势。

7,并发编程网上也有disruptor的介绍,不过光看介绍是很难理解其原理的,还是得看源码

上面的情况是单个生产者多个消费者这种一对多的方式,实际使用中当然这种情况占了大多数,但也不排除多对多的方式,这种情况下大致与一对多这种方式相同,不同的是在生产者sequence上会产生竞争,这个竞争通过cas方式来实现的,具体的可对照着源代码看,这里就不再分析了。

disruptor源码阅读与分析---RingBuffer与Sequence相关推荐

  1. strings.Builder 源码阅读与分析

    strings.Builder源码阅读与分析 背景之字符串拼接 在 Go 语言中,对于字符串的拼接处理有很多种方法,那么那种方法才是效率最高的呢? str := []string{"aa&q ...

  2. Disruptor 源码阅读笔记--转

    原文地址:http://coderbee.net/index.php/open-source/20130812/400 一.Disruptor 是什么? Disruptor 是一个高性能异步处理框架, ...

  3. LightGBM源码阅读+理论分析(处理特征类别,缺省值的实现细节)

    前言 关于LightGBM,网上已经介绍的很多了,笔者也零零散散的看了一些,有些写的真的很好,但是最终总觉的还是不够清晰,一些细节还是懵懵懂懂,大多数只是将原论文翻译了一下,可是某些技术具体是怎么做的 ...

  4. 如何模拟一个XMLHttpRequest请求用于单元测试——nise源码阅读与分析

    概述 在我们进行单元测试的过程中,如果我们需要对一些HTTP接口进行相关的业务测试,那么我们就需要来模拟HTTP请求的发送与响应,否则我们就无法完成测试的闭环. 目前,有许许多多的测试框架都提供了模拟 ...

  5. 分布式文件系统KFS源码阅读与分析(四):RPC实现机制(KfsClient端)

    上一篇博客介绍了KFS中RPC实现机制MetaServer端的实现,下面接着介绍一下KfsClient端的实现框架. KfsClient是为应用程序暴露的接口类,它是在应用程序代码和KFS Serve ...

  6. Zookeeper源码阅读(一)Jute和传输协议

    前言 最近开始了Zookeeper的源码阅读和分析,也从现在开始把之前和现在学习到的一些Zookeeper的源码知识和我的一些理解放到博客上.不得不说这是自己第一次去完整的看一个开源项目的完整源码,从 ...

  7. 源码阅读分析 View的Touch事件分发

    其实 Android 事件分发机制在早几年一直都困扰着我,那时候处理事件分发的自定义 View 脑子都是一片白,老感觉处理不好.后来自己看了 android 源码,也阅读了很多大牛的文章才算彻底明白, ...

  8. 代码分析:NASM源码阅读笔记

    NASM源码阅读笔记 NASM(Netwide Assembler)的使用文档和代码间的注释相当齐全,这给阅读源码 提供了很大的方便.按作者的说法,这是一个模块化的,可重用的x86汇编器, 而且能够被 ...

  9. NJ4X源码阅读分析笔记系列(一)——项目整体分析

    NJ4X源码阅读分析笔记系列(一)--项目整体分析 NJ4X是什么 参见NJ4X的官网:http://www.nj4x.com/ Java and .Net interfaces to support ...

最新文章

  1. 【双11狂欢的背后】微服务注册中心如何承载大型系统的千万级访问?
  2. ASP.NET页面通过URL传递参数(一)(转载)
  3. 一元二次方程python脚本_Python实现求解一元二次方程的方法示例
  4. nosuchelementexception 是什么异常_有甲状腺结节的人为什么越来越多?
  5. 别“躺”着了,赶紧把「复盘」做起来
  6. 腾讯云服务器性能测试心得经验总结
  7. ubuntu防火墙操作
  8. 学习人工智能的头四个月
  9. 常用html标签 —— 链接的颜色
  10. Codeforces Round #499 (Div. 2): F. Mars rover(DFS)
  11. idp 苹果开发账号续费
  12. 36 岁程序员应聘被公司领导直接拒绝;B 站面试官回应北邮校招中的不当言论
  13. plsql设置代码提示和自动补全
  14. cnn卷积神经网络_【CNN】一文带你了解卷积神经网络CNN的发展史
  15. oracle下载jdk需要注册怎么办? jdk8下载
  16. Citrix桌面虚拟化基础搭建教程(持续更新)
  17. Orcle 12c 新特性--- 支持PDB OMF
  18. Win10 蓝牙已配对但无法连接的问题
  19. SPM软件的参考资料链接
  20. arduino开发学习日记 3.15

热门文章

  1. Imperva waf简介
  2. 服务器群发消息,群发消息怎么发
  3. 正负数原码、反码、补码以及位运算
  4. 安卓recovery菜单中英文对照
  5. 使用cv2在图片上绘制点
  6. 区块链软件开发公司 区块链带给信贷行业的优势
  7. 派克比例方向控制阀放大器
  8. 【Windows11系统更新后蓝牙没了】
  9. form表单字段默认值
  10. springboot 单元测试使用 @value读取不到值, yml的两个坑