disruptor源码阅读与分析---RingBuffer与Sequence
首先,我们还是编写一个测试例子吧,根据测试例子去分析代码更为直观:
public static void main(String[] args){EventFactory<EventObject> myEventFactory = new EventObjectFactory();//1ThreadFactory threadFactory = new StubThreadFactory();//2int ringBufferSize = 4;//3Disruptor<EventObject> disruptor = new Disruptor<EventObject>(myEventFactory,ringBufferSize,threadFactory, ProducerType.SINGLE,new BlockingWaitStrategy());//4EventHandler<EventObject> b = new ConsumerB();//5EventHandler<EventObject> c = new ConsumerC();//6EventHandler<EventObject> d = new ConsumerD();//7SequenceBarrier sequenceBarrier = disruptor.handleEventsWith(b,c).asSequenceBarrier();//8BatchEventProcessor processord = new BatchEventProcessor(disruptor.getRingBuffer(),sequenceBarrier,d);//9disruptor.handleEventsWith(processord);//10RingBuffer<EventObject> ringBuffer = disruptor.start();//11for(int i=0; i<10; i++) {//12long sequence = ringBuffer.next();//13try {EventObject myEvent = ringBuffer.get(sequence);//14myEvent.setValue(i);//15} finally {ringBuffer.publish(sequence);//16}SingleProducerSequencer singleProducerSequencer = (SingleProducerSequencer)(disruptor.getRingBuffer().getSequencer());System.out.println("producer:"+singleProducerSequencer.getProducerSequence()+"---consumer:"+singleProducerSequencer.getGatingSequences());try{Thread.sleep(100);}catch (Exception e){}}disruptor.shutdown();}
我们的disruptor版本为3.3.5,它是基于事件来进行处理任务的,下面我们逐行分析上面的代码,
第1行,EventFactory用于产生EventObject实例,是Disruptor构造函数的一个入参
第2行,threadFactory用于disruptor构造函数
第3行,ringbuffer大小
第4行,disruptor构造函数,
public Disruptor(final EventFactory<T> eventFactory,final int ringBufferSize,final ThreadFactory threadFactory,final ProducerType producerType,final WaitStrategy waitStrategy){this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),new BasicExecutor(threadFactory));}private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor){this.ringBuffer = ringBuffer;this.executor = executor;}
这里我们主要看下RingBuffer是如何创建的,这里我们的例子是模拟单个生产者的情况,所以看下单生产者下RingBuffer如何创建的
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory,int bufferSize,WaitStrategy waitStrategy){SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);return new RingBuffer<E>(factory, sequencer);}
首先看下SingleProducerSequener,构造函数的入参有两个,最终构造函数会进入到AbstractSequencer的构造器上。AbstractSequencer是一个抽象类,这里我们重点看下其成员,
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");protected final int bufferSize;protected final WaitStrategy waitStrategy;protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);//代表着生产者对应的Sequence,初始的时候为-1,放入一个元素有这个值为0protected volatile Sequence[] gatingSequences = new Sequence[0];//代表着消费者对应的Sequence,有多少个消费者就有多少个对应的Sequence
可以看到一个SingleProducerSequencer里面包含了cursor和gatingSequences,我理解cursor就是生产者生产的元素该放入到哪个位置,gatingSequences就表示消费者消费到哪个位置,SEQUENCE_UPDATER用于对gatingSequences进行原子更新,这几个成员非常重要,可以在下文中看到其具体用处.这里我们需要看下Sequence类究竟长啥样:
class LhsPadding
{protected long p1, p2, p3, p4, p5, p6, p7;
}class Value extends LhsPadding
{protected volatile long value;
}class RhsPadding extends Value
{protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding
{static final long INITIAL_VALUE = -1L;private static final Unsafe UNSAFE;private static final long VALUE_OFFSET;static{UNSAFE = Util.getUnsafe();try{VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));}catch (final Exception e){throw new RuntimeException(e);}}
}
这里在Value字段前后也做了填充,也是为了处理伪共享问题,其他的相关set操作都是使用的Unsafe类.
接下来我们再看下这个构造函数中干了啥,
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy){if (bufferSize < 1){throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1){throw new IllegalArgumentException("bufferSize must be a power of 2");}this.bufferSize = bufferSize;this.waitStrategy = waitStrategy;}
可以看到bufferSize要求是2的次方,为什么要有这个限制呢?这是因为普通的取余操作是比较耗性能的,限制为2的次方后,可以通过按位操作达到同样的效果.至此,SingleProducerSequencer的构造函数我们看完了,接下来看下RingBuffer的构造函数,最终是调用RingBufferFields的构造函数
RingBufferFields(EventFactory<E> eventFactory,Sequencer sequencer){this.sequencer = sequencer;this.bufferSize = sequencer.getBufferSize();if (bufferSize < 1){throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount(bufferSize) != 1){throw new IllegalArgumentException("bufferSize must be a power of 2");}this.indexMask = bufferSize - 1;/*** 申请的数组 entries 实际大小为 bufferSize + 2 * BUFFER_PAD,BUFFER_PAD 个数组元素占用 128 字节,也就是说在数组前后各加了 128 字节的填充,这主要是为了防止伪共享。*/this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];fill(eventFactory);}
private void fill(EventFactory<E> eventFactory){for (int i = 0; i < bufferSize; i++){entries[BUFFER_PAD + i] = eventFactory.newInstance();}}
这里可以看到entries是一个数组,看上面的注释解释了为啥大小是bufferSize+2*BUFFER_PAD,所以实际使用中,entries[32...63]才是被EventObject填充过。
第5-7行实例化了三个消费者
第8行,首先看下handleEventWith这个方法
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){return createEventProcessors(new Sequence[0], handlers);}
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,final EventHandler<? super T>[] eventHandlers){checkNotStarted();final Sequence[] processorSequences = new Sequence[eventHandlers.length];final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++){final EventHandler<? super T> eventHandler = eventHandlers[i];final BatchEventProcessor<T> batchEventProcessor =new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);if (exceptionHandler != null){batchEventProcessor.setExceptionHandler(exceptionHandler);}consumerRepository.add(batchEventProcessor, eventHandler, barrier);processorSequences[i] = batchEventProcessor.getSequence();}if (processorSequences.length > 0){ringBuffer.addGatingSequences(processorSequences);for (final Sequence barrierSequence : barrierSequences){ringBuffer.removeGatingSequence(barrierSequence);}consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);}return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);}
上面的代码我们主要关注几行重要的代码:
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences),注意这里的barrierSequences是个空数组,实现如下:
public SequenceBarrier newBarrier(Sequence... sequencesToTrack){return sequencer.newBarrier(sequencesToTrack);//这里的sequencer就是前面实例化SingleProducerSequencer的一个对象}
public ProcessingSequenceBarrier(final Sequencer sequencer,final WaitStrategy waitStrategy,final Sequence cursorSequence,final Sequence[] dependentSequences){this.sequencer = sequencer;//SingleProducerSequencerthis.waitStrategy = waitStrategy;this.cursorSequence = cursorSequence;//生产者对应的Sequenceif (0 == dependentSequences.length){dependentSequence = cursorSequence;//与生产者相同的sequence}else{dependentSequence = new FixedSequenceGroup(dependentSequences);//被依赖的sequence,}}
SequenceBarrier其实就是一个ProcessingSequenceBarrier实例,可以看到此时的dependentSequence就是生产者的sequence,即生产者一旦生产了数据,立马就可以进行消费。
final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler),我们来看下这个构造函数:
//成员变量
private final AtomicBoolean running = new AtomicBoolean(false);private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();private final DataProvider<T> dataProvider;private final SequenceBarrier sequenceBarrier;private final EventHandler<? super T> eventHandler;private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);//消费者初始化的Sequenceprivate final TimeoutHandler timeoutHandler;
//构造函数
public BatchEventProcessor(final DataProvider<T> dataProvider,//就是我们的RingBufferfinal SequenceBarrier sequenceBarrier,//刚产生的ProcessingSequenceBarrierfinal EventHandler<? super T> eventHandler){this.dataProvider = dataProvider;//this.sequenceBarrier = sequenceBarrier;this.eventHandler = eventHandler;if (eventHandler instanceof SequenceReportingEventHandler){((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);}timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;}
从这里我们可以看到,每个消费者都持有一个Sequence。
ringBuffer.addGatingSequences(processorSequences),这里也是一个比较关键的地方,processorSequences保存了消费者的Sequence,我们来看下实现,
public void addGatingSequences(Sequence... gatingSequences){sequencer.addGatingSequences(gatingSequences);//SingleProducerSequencer}
@Override
public final void addGatingSequences(Sequence... gatingSequences)
{SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
还记得上面所说的SEQUENCE_UPDATER吧,这里就用上了,接下来我们进入这个方法:
static <T> void addSequences(final T holder,final AtomicReferenceFieldUpdater<T, Sequence[]> updater,final Cursored cursor,final Sequence... sequencesToAdd){long cursorSequence;Sequence[] updatedSequences;Sequence[] currentSequences;do{currentSequences = updater.get(holder);updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);cursorSequence = cursor.getCursor();int index = currentSequences.length;for (Sequence sequence : sequencesToAdd){sequence.set(cursorSequence);updatedSequences[index++] = sequence;}}while (!updater.compareAndSet(holder, currentSequences, updatedSequences));cursorSequence = cursor.getCursor();for (Sequence sequence : sequencesToAdd){sequence.set(cursorSequence);}}
它的作用其实就是将消费者的Sequence保存到对应的变量gatingSeqeuces这个数组中。
回到我们的测试程序,我们再看下asSequenceBarrier方法:
public SequenceBarrier asSequenceBarrier(){return disruptor.getRingBuffer().newBarrier(sequences);}
同样是调用RingBuffer的newBarrier方法,与之前不同的是这里有dependentSequence,即消费者使用这个barrier时,需要等到dependentSequence对一你个的消费者消费后才能消费。
第9行,新建了一个实际的消费处理者BatchEventProcessor,
第10行,参见上面的handleEventsWith函数,这里会将新的消费者的sequence保存到SingleProducerSequencer的gatingSequences中
第11行,启动消费者,其处理逻辑主要是在BatchEventProcessor的run方法中,
public void run(){if (!running.compareAndSet(false, true))//将running标志设置为true{throw new IllegalStateException("Thread is already running");}sequenceBarrier.clearAlert();//设置ProcessingSequenceBarrier.alerted=falsenotifyStart();//这里暂时没有用上T event = null;long nextSequence = sequence.get() + 1L;//这里的sequence是每个消费者所持有的,初始值为-1,这里的nextSequence就是下一个要消费的位置try{while (true){try{final long availableSequence = sequenceBarrier.waitFor(nextSequence);while (nextSequence <= availableSequence){event = dataProvider.get(nextSequence);eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);nextSequence++;}sequence.set(availableSequence);}catch (final TimeoutException e){notifyTimeout(sequence.get());}catch (final AlertException ex){if (!running.get()){break;}}catch (final Throwable ex){exceptionHandler.handleEventException(ex, nextSequence, event);sequence.set(nextSequence);nextSequence++;}}}finally{notifyShutdown();running.set(false);}}
这里我们首先看下final long availableSequence = sequenceBarrier.waitFor(nextSequence)这行,看下waitfor方法的实现:
public long waitFor(final long sequence)throws AlertException, InterruptedException, TimeoutException{checkAlert();long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);if (availableSequence < sequence){return availableSequence;}return sequencer.getHighestPublishedSequence(sequence, availableSequence);}
这里,我们的等待策略是BlockingWaitStrategy,看下其waitfor的实现:
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)throws AlertException, InterruptedException{long availableSequence;if (cursorSequence.get() < sequence){lock.lock();try{while (cursorSequence.get() < sequence){barrier.checkAlert();processorNotifyCondition.await();}}finally{lock.unlock();}}while ((availableSequence = dependentSequence.get()) < sequence){barrier.checkAlert();}return availableSequence;}
消费者启动后,生产者还没有生产数据,所以此时的cursorSequence的值为-1,而我们需要消费的位置为0,当生产者Sequence小于消费者的sequence时,此时会进入等待,当生产者有数据来时,会将其唤醒。当被唤醒后,还需要看下依赖的sequence是否消费过了,否则还是要等待。
接下来就是开始消费,这里我们需要看下RingBuffer的get方法,
protected final E elementAt(long sequence){return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));}
这里的entries就是那个数组,REF_ARRAY_BASE就是实际元素的起始地址,这里我们可以看出RingBuffer底层本质上是一个循环数组(队列)!
接下来设置当前消费者消费到哪个位置了。再回到测试程序,我们看下生产者,
第13行,获取下一个位置,最终是调用SingleProducerSequencer.next方法:
public long next(int n){if (n < 1){throw new IllegalArgumentException("n must be > 0");}long nextValue = this.nextValue;long nextSequence = nextValue + n;long wrapPoint = nextSequence - bufferSize;long cachedGatingSequence = this.cachedValue;if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue){cursor.setVolatile(nextValue); // StoreLoad fencelong minSequence;while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))){waitStrategy.signalAllWhenBlocking();LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?}this.cachedValue = minSequence;}this.nextValue = nextSequence;return nextSequence;}
这里我们的n为1,表示获取下一个位置。wrapPoint和cachedGatingSequence用于判断当前队列的状况,比如队列是否是满的。举个例子,我们的RingBuffer大小为4,所以第一圈结束后,这个cachedValue=3,第二圈结束后这个值为7,以此类推。当队列中没有空的位置给生产者时,会调用park方法将生产者阻塞1纳秒。
第16行,看下publish方法,
public void publish(long sequence){cursor.set(sequence);waitStrategy.signalAllWhenBlocking();}
设置当前生产者的sequence,表示这个位置已经产生了元素,可以进行消费了,然后唤醒等待的消费者。
至此,相关的代码已分析完毕,我们来总结下:
1,RingBuffer和Sequence在实现上都是借助于juc并发工具CAS,LockSupport等以及内存操作Unsafe类来实现高效操作
2,RingBuffer本质上是一个循环队列,采用了防伪共享的操作,利用Sequence实现了无锁的高效访问,它持有生产者和消费者的sequence
3,disruptor把cpu的有关缓存行,伪共享,内存屏障等等体现的很好
4,生产者通过Sequencer的大部分功能来使用序列。
5,消费者者通过SequenceBarrier来使用序列
6,从gc上看,RingBuffer通过填充eventObject的方式(底层是个数组,实际要操作的成员在eventObject中),极大的避免了gc带来的性能影响,这点相对于链表实现有极大的优势。
7,并发编程网上也有disruptor的介绍,不过光看介绍是很难理解其原理的,还是得看源码
上面的情况是单个生产者多个消费者这种一对多的方式,实际使用中当然这种情况占了大多数,但也不排除多对多的方式,这种情况下大致与一对多这种方式相同,不同的是在生产者sequence上会产生竞争,这个竞争通过cas方式来实现的,具体的可对照着源代码看,这里就不再分析了。
disruptor源码阅读与分析---RingBuffer与Sequence相关推荐
- strings.Builder 源码阅读与分析
strings.Builder源码阅读与分析 背景之字符串拼接 在 Go 语言中,对于字符串的拼接处理有很多种方法,那么那种方法才是效率最高的呢? str := []string{"aa&q ...
- Disruptor 源码阅读笔记--转
原文地址:http://coderbee.net/index.php/open-source/20130812/400 一.Disruptor 是什么? Disruptor 是一个高性能异步处理框架, ...
- LightGBM源码阅读+理论分析(处理特征类别,缺省值的实现细节)
前言 关于LightGBM,网上已经介绍的很多了,笔者也零零散散的看了一些,有些写的真的很好,但是最终总觉的还是不够清晰,一些细节还是懵懵懂懂,大多数只是将原论文翻译了一下,可是某些技术具体是怎么做的 ...
- 如何模拟一个XMLHttpRequest请求用于单元测试——nise源码阅读与分析
概述 在我们进行单元测试的过程中,如果我们需要对一些HTTP接口进行相关的业务测试,那么我们就需要来模拟HTTP请求的发送与响应,否则我们就无法完成测试的闭环. 目前,有许许多多的测试框架都提供了模拟 ...
- 分布式文件系统KFS源码阅读与分析(四):RPC实现机制(KfsClient端)
上一篇博客介绍了KFS中RPC实现机制MetaServer端的实现,下面接着介绍一下KfsClient端的实现框架. KfsClient是为应用程序暴露的接口类,它是在应用程序代码和KFS Serve ...
- Zookeeper源码阅读(一)Jute和传输协议
前言 最近开始了Zookeeper的源码阅读和分析,也从现在开始把之前和现在学习到的一些Zookeeper的源码知识和我的一些理解放到博客上.不得不说这是自己第一次去完整的看一个开源项目的完整源码,从 ...
- 源码阅读分析 View的Touch事件分发
其实 Android 事件分发机制在早几年一直都困扰着我,那时候处理事件分发的自定义 View 脑子都是一片白,老感觉处理不好.后来自己看了 android 源码,也阅读了很多大牛的文章才算彻底明白, ...
- 代码分析:NASM源码阅读笔记
NASM源码阅读笔记 NASM(Netwide Assembler)的使用文档和代码间的注释相当齐全,这给阅读源码 提供了很大的方便.按作者的说法,这是一个模块化的,可重用的x86汇编器, 而且能够被 ...
- NJ4X源码阅读分析笔记系列(一)——项目整体分析
NJ4X源码阅读分析笔记系列(一)--项目整体分析 NJ4X是什么 参见NJ4X的官网:http://www.nj4x.com/ Java and .Net interfaces to support ...
最新文章
- 【双11狂欢的背后】微服务注册中心如何承载大型系统的千万级访问?
- ASP.NET页面通过URL传递参数(一)(转载)
- 一元二次方程python脚本_Python实现求解一元二次方程的方法示例
- nosuchelementexception 是什么异常_有甲状腺结节的人为什么越来越多?
- 别“躺”着了,赶紧把「复盘」做起来
- 腾讯云服务器性能测试心得经验总结
- ubuntu防火墙操作
- 学习人工智能的头四个月
- 常用html标签 —— 链接的颜色
- Codeforces Round #499 (Div. 2): F. Mars rover(DFS)
- idp 苹果开发账号续费
- 36 岁程序员应聘被公司领导直接拒绝;B 站面试官回应北邮校招中的不当言论
- plsql设置代码提示和自动补全
- cnn卷积神经网络_【CNN】一文带你了解卷积神经网络CNN的发展史
- oracle下载jdk需要注册怎么办? jdk8下载
- Citrix桌面虚拟化基础搭建教程(持续更新)
- Orcle 12c 新特性--- 支持PDB OMF
- Win10 蓝牙已配对但无法连接的问题
- SPM软件的参考资料链接
- arduino开发学习日记 3.15