单一的生产者,消费者有多个,使用WorkerPool来管理多个消费者;

RingBuffer在生产Sequencer中记录一个cursor,追踪生产者生产到的最新位置,通过WorkSequence和sequence记录整个workpool消费的位置和每个WorkProcessor消费到位置,来协调生产和消费程序

1、定义事件

package com.ljq.disruptor;import java.io.Serializable;/*** 交易事件数据* * @author Administrator**/
@SuppressWarnings("serial")
public class TradeEvent implements Serializable {private String id; // 订单IDprivate String name;private double price; // 金额public TradeEvent() {}public TradeEvent(String id) {super();this.id = id;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}@Overridepublic String toString() {return "Trade [id=" + id + ", name=" + name + ", price=" + price + "]";}}

2、TradeEvent事件消费者

package com.ljq.disruptor;import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;public class TradeEventHandler implements EventHandler<TradeEvent>, WorkHandler<TradeEvent> {@Overridepublic void onEvent(TradeEvent event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}/*** WorkProcessor多线程排队领event然后再执行,不同线程执行不同的event。但是多了个排队领event的过程,这个是为了减少对生产者队列查询的压力*/@Overridepublic void onEvent(TradeEvent event) throws Exception {// 具体的消费逻辑System.out.println("consumer:" + Thread.currentThread().getName() + " Event: value=" + event);}
}

3、EventProcessor消费者-生产者启动类

package com.ljq.disruptor;import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;public class EventProcessorMain {public static void main(String[] args) throws Exception {  long beginTime = System.currentTimeMillis();// 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能int bufferSize = 1024;//固定线程数int nThreads = 4;EventFactory<TradeEvent> eventFactory = new EventFactory<TradeEvent>() {  @Override  public TradeEvent newInstance() {  return new TradeEvent(UUID.randomUUID().toString());}  };//RingBuffer. createSingleProducer创建一个单生产者的RingBuffer//第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。 //第二个参数是RingBuffer的大小,它必须是2的整数倍,目的是为了将求模运算转为&运算提高效率//第三个参数是RingBuffer的生产在没有可用区块的时候(可能是消费者太慢了)的等待策略 final RingBuffer<TradeEvent> ringBuffer = RingBuffer.createSingleProducer(eventFactory, bufferSize, new YieldingWaitStrategy());  //SequenceBarrier, 协调消费者与生产者, 消费者链的先后顺序. 阻塞后面的消费者(没有Event可消费时)SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  //创建消费者事件处理器, 多线程并发执行,不同线程执行不同的event BatchEventProcessor<TradeEvent> transProcessor = new BatchEventProcessor<TradeEvent>(ringBuffer, sequenceBarrier, new TradeEventHandler());  //把消费者的消费进度情况注册给RingBuffer结构(生产者),如果只有一个消费者的情况可以省略
        ringBuffer.addGatingSequences(transProcessor.getSequence());  //创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程ExecutorService executors = Executors.newFixedThreadPool(nThreads);  //把消费者提交到线程池,说明EventProcessor实现了callable接口
        executors.submit(transProcessor);  // 生产者,这里新建线程不是必要的Future<?> future= executors.submit(new Callable<Void>() {  @Override  public Void call() throws Exception {  long seq;  for (int i = 0; i < 100000; i++) {seq = ringBuffer.next();ringBuffer.get(seq).setPrice(i);ringBuffer.publish(seq);} return null;  }  }); future.get();//等待生产者结束
        Thread.sleep(1000); //等上1秒,等消费都处理完成transProcessor.halt(); //通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)
        executors.shutdown(); System.out.println(String.format("总共耗时%s毫秒", (System.currentTimeMillis() - beginTime)));}
}

4、WorkerPool消费者-生产者启动类

package com.ljq.disruptor;import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkerPool;public class WorkPoolMain {public static void main(String[] args) throws InterruptedException {// 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能int bufferSize = 1024;//固定线程数int nThreads = 4;//RingBuffer. createSingleProducer创建一个单生产者的RingBufferRingBuffer<TradeEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TradeEvent>() {public TradeEvent newInstance() {return new TradeEvent(UUID.randomUUID().toString());}}, bufferSize);SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();WorkerPool<TradeEvent> workerPool = new WorkerPool<TradeEvent>(ringBuffer, sequenceBarrier,new IgnoreExceptionHandler(), new TradeEventHandler());//创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程ExecutorService executors = Executors.newFixedThreadPool(nThreads);workerPool.start(executors);// 生产10个数据for (int i = 0; i < 80000; i++) {long seq = ringBuffer.next();ringBuffer.get(seq).setPrice(i);ringBuffer.publish(seq);}Thread.sleep(1000); //等上1秒,等消费都处理完成workerPool.halt(); //通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)
        executors.shutdown(); }
}

EventProcessor与WorkPool用法--可处理多消费者相关推荐

  1. Disruptor框架中生产者、消费者的各种复杂依赖场景下的使用总结-我见过最好的Disruptor

    更多高并发知识请访问 www.itkc8.com 非常感谢 https://www.cnblogs.com/pku-liuqiang/p/8544700.html Disruptor是一个优秀的并发框 ...

  2. 咕泡java架构师二期网盘_咕泡学院java架构师VIP3期

    致学者:不论你在什么时候开始,重要的是开始之后请不要停止.: a# T0 c4 D+ B% g' J5 d 并发编程 + activemq + 实战案例并发编程基础篇 第一天* Y' t4 v, u1 ...

  3. 咕泡学院java课程怎么样_咕泡学院java高级架构

    一:并发编程 + activemq + 实战案例并发编程基础篇 第一天===[caier-20] 1.课程大纲简要介绍 2.线程基础概念.线程安全概念.多个线程多个锁概念 3.对象锁的同步和异步 4. ...

  4. java架构师成长之路_java架构师成长之路

    一:并发编程 + activemq + 实战案例并发编程基础篇 第一天 1.课程大纲简要介绍 2.线程基础概念.线程安全概念.多个线程多个锁概念 3.对象锁的同步和异步 4.脏读概念.脏读业务场景.S ...

  5. 中国会计科目的中英文对照

    中国会计科目的中英文对照 中国会计科目的中英文对照 一.资产类 Assets 流动资产 Current assets 货币资金 Cash and cash equivalents 1001 现金 Ca ...

  6. secKill项目 --- 总结 + 推荐阅读顺序 + 源码地址

    先上源码地址:https://github.com/HermanCho/seckill 项目总结 本项目基于慕课网的秒杀项目,源代码的参考很多来源于以下博客 商城秒杀系统 改进的地方:(主要是前两个) ...

  7. 信贷基本词汇英汉对照[突然发现写软件也要被迫学企业管理的一些相关知识....]...

    a payment or serious payments 一次或多次付款 abatement 扣减 absolute and unconditional payments 绝对和无条件付款 acce ...

  8. python爬虫课程笔记-续

    selenium的内容真杂,代码超多,周日搞了半天才把2小时的课程搞完 from selenium import webdriver from selenium.webdriver.common.by ...

  9. 星巴克其实是一家数据科技公司!?

    星巴克并不仅仅是一家简单地向全世界各地销售冷热饮品的公司.它同时也汇集了来自客户和消费者每周1亿多笔的海量数据.面对这么庞大的数据,星巴克是如何使用这些数据的呢?人工智能和物联网(IoT)在其中又扮演 ...

最新文章

  1. 两台计算机通过传统电话网络,计算机网络的复习题.doc
  2. 「杂谈」计算机视觉人脸图像的十几个大的应用方向,你懂了几分?
  3. Ado.Net事物处理
  4. 1、HTML 初步认识
  5. arcgis 圈选获取图层下点位_ArcGIS小技巧——提取面要素的质心点
  6. java json 返回null,[] Spring4 MVC 返回json格式时候 设置不返回null值属性的有关问题...
  7. Python基础(八)--迭代,生成器,装饰器与元类
  8. php绘制时钟刻度,怎么用canvas写钟表刻度的时钟和分钟
  9. 使用setuptools和cython打包python程序的时候遇到:Microsoft visual c++ 14.0 is required问题解决办法
  10. Go语言中的字符和字符串
  11. SpringCloud的Hystrix(二) 某消费者应用(如:ui、网关)访问的多个微服务的断路监控...
  12. 面对10倍需求只用 40% 成本,这是一种怎样的体验?
  13. 摄影测量——EPS三维测图软件正射影像+三维模型后期成果处理(附软件安装包+学习视频)
  14. [CM311-1A]-全网最全 Android 用户管理及用户应用权限
  15. 【资源挖掘】免费遥感影像文件下载
  16. 回调函数原理及应用实例
  17. linux开启wifi热点命令,deepin Linux 开启wifi热点
  18. 简述计算机组装的具体流程,自己动手组装电脑详细步骤【图文】
  19. 更安全的ftp服务器Pure-FTP搭建(4)
  20. Java集合框架笔记,足够深,足够全面

热门文章

  1. UA PHYS515A 电磁理论V 电磁波与辐射9 简单辐射系统
  2. UA MATH567 高维统计III 随机矩阵10 亚高斯矩阵的应用:协方差估计与聚类问题的样本量需求计算
  3. win32 创建进程三种方式简单示例 - 使用CFree
  4. Win32 API 窗口版本转换度分秒为小数
  5. C#数字证书编程总结
  6. 为什么从前那些.NET开发者都不写单元测试呢?
  7. Impala性能优化
  8. @清晰掉 GNU C __attribute__
  9. asp.net基础中Get和Post的区别。
  10. Leetcode OJ: Remove Duplicates from Sorted Array I/II