Disruptor的消费者实现是WorkerProcessor类,都实现了EventProcessor接口,这个接口继承了Runnable接口,因此,每个消费者实则就是一条线程,具体的业务逻辑都实现自run()方法,一群不重复消费的并行消费者集合被称为一个WorkerPool。

private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);

在一个WorkerPool中,该消费者集合的消费进度通过workSequence来进行共享。

for (int i = 0; i < numWorkers; i++)
{workProcessors[i] = new WorkProcessor<>(ringBuffer,sequenceBarrier,workHandlers[i],exceptionHandler,workSequence);
}

在WorkerPool的构造方法中,将会依次构造消费者实例到其数组中等待执行,具体的消费者线程启动在disruptor的start()方法被调用的时候,这里的sequenceBarrier用来控制消费者的消费进度,在消费者没有消息消费的时候根据的等待策略执行相应的操作。

那么具体的消费者具体逻辑在WorkerProcessor的run()方法。

核心逻辑在run()的一段循环中。

while (true)
{try{// if previous sequence was processed - fetch the next sequence and set// that we have successfully processed the previous sequence// typically, this will be true// this prevents the sequence getting too far forward if an exception// is thrown from the WorkHandlerif (processedSequence){processedSequence = false;do{nextSequence = workSequence.get() + 1L;sequence.set(nextSequence - 1L);}while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));}if (cachedAvailableSequence >= nextSequence){event = ringBuffer.get(nextSequence);workHandler.onEvent(event);processedSequence = true;}else{cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);}}

消费者线程在启动的时候,首先会从workSequence中尝试通过cas得到当前消费进度的下一个序列号,在这里通过cas在整个WorkerPool中每条线程都能得到自己当前唯一的消费序列号。

在完成消费序列号的获取后,将会通过等待策略的waitFor()方法等待,直到当前Disruptor的环形队列中的消息序列号到达从workSequence获取到的WorkerPool中唯一的序列号为止,以BlockingWaitingStrategy为例子,每次生产者生产消息之后都会唤醒一次等待的线程,将会判断当前的消息序列号是否小于当前消费者线程的所持序列号,如果是,则继续等待,否则开始根据当前唤醒时的消息序列号来从环形队列中获取消息进行消费。消费之后,继续通过cas得到下一个唯一的序列号,循环上述的方式等待执行下一个消息。

disruptor消费者模型相关推荐

  1. pattern in java_Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

    生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题.有如下几个常见的实现方法: 1. wait()/notify() 2. lock & condition 3. Blockin ...

  2. linux进程间通信:system V 信号量 生产者和消费者模型编程案例

    生产者和消费者模型: 有若干个缓冲区,生产者不断向里填数据,消费者不断从中取数据 两者不冲突的前提: 缓冲区有若干个,且是固定大小,生产者和消费者各有若干个 生产者向缓冲区中填数据前需要判断缓冲区是否 ...

  3. python生产和消费模型_python queue和生产者和消费者模型

    queue队列 当必须安全地在多个线程之间交换信息时,队列在线程编程中特别有用. classqueue.Queue(maxsize=0) #先入先出classqueue.LifoQueue(maxsi ...

  4. 用三个线程实现生产者消费者模型,其中一个线程作为生产者,二个线程作为消费者,生产者随机生产一个时间戳或者字符串,消费者消费这个时间戳,并不能重复消费,并将其打印出来

    题目要求: 用三个线程实现生产者消费者模型,其中一个线程作为生产者,二个线程作为消费者,生产者随机生产一个时间戳或者字符串,消费者消费这个时间戳,并不能重复消费,并将其打印出来.(这是一道百度面试的算 ...

  5. 生产者/消费者模型详解(基于Java)

    title: 生产者消费者模型 tags: 多线程 synchronized 锁 wait() notify() 生产者/消费者模型原理以及代码实现 一.生产者/消费者模型原理 所谓的生产者消费者模型 ...

  6. python 生产消费者_python之生产者消费者模型实现详解

    代码及注释如下 #Auther Bob #--*--conding:utf-8 --*-- #生产者消费者模型,这里的例子是这样的,有一个厨师在做包子,有一个顾客在吃包子,有一个服务员在储存包子,这个 ...

  7. java多线程抽奖_java 线程池、多线程并发实战(生产者消费者模型 1 vs 10) 附案例源码...

    导读 前二天写了一篇<Java 多线程并发编程>点我直达,放国庆,在家闲着没事,继续写剩下的东西,开干! 线程池 为什么要使用线程池 例如web服务器.数据库服务器.文件服务器或邮件服务器 ...

  8. 进程 互斥锁、队列与管道、生产者消费者模型

    目录 1.互斥锁 2.队列与管道 3.生产者消费者模型(Queue) 4.生产者消费者模型(JoinableQueue) 1.互斥锁 首先导入Lock模块 实例化一把锁 但是每次生成子进程的时候都会重 ...

  9. 11.python并发入门(part8 基于线程队列实现生产者消费者模型)

    一.什么是生产者消费者模型? 生产者就是生产数据的线程,消费者指的就是消费数据的线程. 在多线程开发过程中,生产者的速度比消费者的速度快,那么生产者就必须等待消费者把数据处理完,生产者才会产生新的数据 ...

最新文章

  1. [转载]用数据说话 Pytorch详解NLLLoss和CrossEntropyLoss
  2. 这段代码你一定看不懂!不信?那你说说是干嘛的?
  3. 网站优化之如何提升快照的更新时间?
  4. .net 网页播放器
  5. 最全高考分数线出炉!!查了分后,这届学生为了过线真是太拼了.......
  6. 你会通过Docker部署war包吗
  7. CentOS7 正确安装mysql(亲测)
  8. Python 数据科学手册 5.2 Scikit-Learn 简介
  9. Open3d之内部形状描述子ISS
  10. redis 基础知识
  11. figsize, dpi参数
  12. 软件工程导论学习总结
  13. 解决百度网盘下载限速 速度慢问题
  14. ALV能否实现自动小计
  15. Android 布局文件添加edittext报:The following classes could not be found错误
  16. 学习笔记 JavaScript 动画
  17. Keras深度学习实战(39)——音乐音频分类
  18. D90四种对焦点模式
  19. 阿拉丁神灯奖出炉,顺舟智能基于CBOX云盒网关的智慧路灯再上榜
  20. 2021年轻人头发报告:秃头男女,恋爱告急

热门文章

  1. 没有理想的人不会伤心,不想做程序猿,没目标,不知道自己还能干嘛呢
  2. 诗与远方:无题(二十三)
  3. Python学习笔记之文件
  4. HDOJ 2602-Bone Collector(0/1背包模板、打印方案及滚动数组解法)
  5. Java 总结equals()方法
  6. lora信号测试小助手_433m无线收发模块LoRaF30如何进行距离测试
  7. shell的建立与执行实验报告_实验七 Shell脚本运行的优化
  8. C# USB视频人脸检测
  9. 在windows server 2008 R2上运行disk cleanup
  10. android 开发赚钱