Disruptor笔记
更详细的文章: Disruptor详解
Disruptor核心概念
Disruptor:本质是无锁并发框架,用了很多的CAS来解决多线程抢夺问题,是在内存中处理。
- RingBuffer(环形缓冲区): 基于数组的内存级别缓存,是创建sequence(序号)与定义WaitStrategy(拒绝策略)的入口
- Disruptor(总体执行入口):对于RingBuffer的封装,持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用
- Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式李管理进行交换的数据(时间/Event),一个Sequence可以跟踪标识某个时间的处理进度,同时还能消除伪共享
- Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencee有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间的快递、正确传递数据的并发算法
- SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑
- WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略
- Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义
- EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者接口
- EventProcessor:这个是事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence
WaitStrategy策略
- BlockingWaitStrategy策略,常见且默认的等待策略,当这个队列里满了,不执行覆盖,而是堵塞等待,使用ReentrantLock+Condition实现阻塞,最节省cpu,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景
- SleepingWaitStrategy策略,会在循环中不断等待数据。先进行自选等待如果不成功,则会使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此这个策略会阐释比较高的平均延迟。经典的应用场景就是异步日志
3.YieldingWaitStrategy策略,这个策略用于延时低的场合。消费者线程会不断循环监控缓冲区变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间,如果需要一个高性能的系统,并且对延时比较有严格的要求可以考虑这种策略- BusySpinWaitStrategy策略,采用死循环,消费者线程会尽最大的努力监控缓冲区的变化,对延时非常苛刻的场景使用,Cpu核心数必须大于消费者线程数量,推荐在线程绑定到固定的CPU场景下使用
消费关系
//消费消息的优先级 [ 示例 ] A -> (B,C [只会给其中的一个消费]) -> Ddisruptor.handleEventsWith(new OrderEventHandler()).thenHandleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler()).then(new OrderEventHandler())
实例
pom
<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version></dependency>
Event
OrderEvent
import lombok.Data;/*** 消息载体(事件)* @author qubing* @date 2022/1/25 14:59*/
@Data
public class OrderEvent {private long value;private String name;}
OrderEventFactory
import com.lmax.disruptor.EventFactory;/*** 事件工厂* @author qubing* @date 2022/1/25 15:00*/
public class OrderEventFactory implements EventFactory<OrderEvent> {@Overridepublic OrderEvent newInstance() {return new OrderEvent();}
}
生产者
OrderEventProducer
import com.example.juc.disruptor.event.OrderEvent;
import com.lmax.disruptor.RingBuffer;/*** 消息(事件) 生产者* @author qubing* @date 2022/1/25 15:02*/
public class OrderEventProducer {//事件队列private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer){this.ringBuffer = ringBuffer;}public void onData(long value,String name){//获取事件队列的下一个槽long sequence = ringBuffer.next();try {//获取消息(事件)OrderEvent orderEvent = ringBuffer.get(sequence);//写入数据orderEvent.setValue(value);orderEvent.setName(name);}catch (Exception e){e.printStackTrace();}finally {System.err.println("生产者"+Thread.currentThread().getName()+"发送数据value:"+value+",name:"+name);//发布事件ringBuffer.publish(sequence);}}}
消费者
OrderEventHandler
import com.example.juc.disruptor.event.OrderEvent;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;/*** 消费者* @author qubing* @date 2022/1/25 15:08*/
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {@Overridepublic void onEvent(OrderEvent orderEvent, long sequence, boolean endOfBatch) throws Exception {System.err.println("消费者"+Thread.currentThread().getName()+"获取数据value:"+orderEvent.getValue()+",name:"+orderEvent.getName());}@Overridepublic void onEvent(OrderEvent orderEvent) throws Exception {System.err.println("消费者"+Thread.currentThread().getName()+"获取数据value:"+orderEvent.getValue()+",name:"+orderEvent.getName());}
}
测试1
public static void main(String[] args) {Disruptor<OrderEvent> disruptor = new Disruptor<>(OrderEvent::new,1024 * 1024,Executors.defaultThreadFactory(),//单生成者ProducerType.SINGLE,//等待策略new YieldingWaitStrategy());//设置消费者用于处理RingBuffer的事件//disruptor.handleEventsWith(new OrderEventHandler());//设置多消费这,消息会被重复消费//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口,一条消息只能被一个消费者消费disruptor.handleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler());//启动disruptordisruptor.start();//创建ringbuffer容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();//创建生产者OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);//发送消息for (int i = 0; i < 100; i++) {orderEventProducer.onData(i,"Fox"+i);}disruptor.shutdown();}
测试2
public static void main(String[] args) {Disruptor<OrderEvent> disruptor = new Disruptor<>(OrderEvent::new,1024 * 1024,Executors.defaultThreadFactory(),//多生成者ProducerType.MULTI,//等待策略new YieldingWaitStrategy());//设置消费者用于处理RingBuffer的事件//disruptor.handleEventsWith(new OrderEventHandler());//设置多消费这,消息会被重复消费//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口,一条消息只能被一个消费者消费disruptor.handleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler());//启动disruptordisruptor.start();//创建ringbuffer容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();new Thread(() ->{//创建生产者OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);//发送消息for (int i = 0; i < 100; i++) {orderEventProducer.onData(i,"Fox"+i);}},"producer1").start();new Thread(() ->{//创建生产者OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);//发送消息for (int i = 0; i < 100; i++) {orderEventProducer.onData(i,"Monkey"+i);}},"producer2").start();// disruptor.shutdown();}
测试3
public static void main(String[] args) {Disruptor<OrderEvent> disruptor = new Disruptor<>(OrderEvent::new,1024 * 1024,Executors.defaultThreadFactory(),//多生成者ProducerType.MULTI,//等待策略new YieldingWaitStrategy());//设置消费者用于处理RingBuffer的事件//disruptor.handleEventsWith(new OrderEventHandler());//设置多消费这,消息会被重复消费//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口,一条消息只能被一个消费者消费//消费消息的优先级 [ 示例 ] A -> (B,C [只会给其中的一个消费]) -> Ddisruptor.handleEventsWith(new OrderEventHandler()).thenHandleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler()).then(new OrderEventHandler());//启动disruptordisruptor.start();//创建ringbuffer容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();new Thread(() ->{//创建生产者OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);//发送消息for (int i = 0; i < 100; i++) {orderEventProducer.onData(i,"Fox"+i);}},"producer1").start();// disruptor.shutdown();}
Disruptor笔记相关推荐
- disruptor笔记之六:常见场景
欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <disruptor笔记>系列链接 快速入 ...
- disruptor笔记之二:Disruptor类分析
欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <disruptor笔记>系列链接 快速入 ...
- Disruptor笔记(一)-预备知识
Memory Barrier 内存障 .它是一个CPU指令.是的,再一次,我们在思考CPU级的东西以便得到我们需要的性能(Martin著名的MechanicalSympathy).基本上它是一个指令, ...
- 快递包裹自动化分拣系统_包裹识别系统的类型
快递包裹自动化分拣系统 包裹识别码的类型 (Types of Parcel identifiers) There are several classes or parcel identificatio ...
- ai人工智能与it_保护您的IT业务必须了解的有关网络安全和人工智能的6件事
ai人工智能与it With the year almost coming to a close, there is a need for reflection and tweaks in vario ...
- 如何成为一个更好的开发者
Programming is mostly about learning a language, but how good you are at it is not just dependent on ...
- Disruptor本地线程队列_实现线程间通信---线程间通信工作笔记001
Disruptor本地线程队列_实现线程间通信---线程间通信工作笔记001 看到同事用这个东西了,这个挺好用的说是,可以实现,本地线程间的通信,好像在c++和java中都可以用 现在没时间研究啊,暂 ...
- Disruptor 源码阅读笔记--转
原文地址:http://coderbee.net/index.php/open-source/20130812/400 一.Disruptor 是什么? Disruptor 是一个高性能异步处理框架, ...
- Disruptor并发框架--学习笔记
Disruptor并发框架简介 Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在J ...
最新文章
- 安装Oracle11g先决条件检查失败
- Java 理论与实践: 非阻塞算法简介——看吧,没有锁定!(转载)
- VISP视觉库识别AprilTag详细解读
- java + maven 实现发送短信验证码功能
- SpringBoot使用Easypoi导出excel示例
- asp sql ip地址排序_SQL必知必会读书笔记,30分钟入门SQL!
- VMware5.5的序列号
- 书写技术文档的模板技术调研文档书写规范
- 计算机设计大赛作品——冬奥可视化
- 信息系统软件配置、过程管理、开发工具(详细介绍)
- 条形码技术应用属于计算机系统的,条形码技术在现代物流系统中的应用
- oracle静默安装集群,Oracle RAC 静默安装实践
- 韩昊 20190919-1 每周例行报告
- win7休眠的开启与关闭方法
- 回顾一年的工作历程_回顾历程、总结经验、展望未来
- pxe自动装机利用tfp,http,nfs服务实现。
- Java实现mds降维_降维算法MDS
- 融会贯通面对对象编程思想
- 17 重定向(Redirect) vs 转发(Forward)
- c++小游戏杀手1.1.1版本