更详细的文章: Disruptor详解

Disruptor核心概念

Disruptor:本质是无锁并发框架,用了很多的CAS来解决多线程抢夺问题,是在内存中处理。

  1. RingBuffer(环形缓冲区): 基于数组的内存级别缓存,是创建sequence(序号)与定义WaitStrategy(拒绝策略)的入口
  2. Disruptor(总体执行入口):对于RingBuffer的封装,持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用
  3. Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式李管理进行交换的数据(时间/Event),一个Sequence可以跟踪标识某个时间的处理进度,同时还能消除伪共享
  4. Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencee有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间的快递、正确传递数据的并发算法
  5. SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑
  6. WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略
  7. Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义
  8. EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者接口
  9. EventProcessor:这个是事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence

WaitStrategy策略

  1. BlockingWaitStrategy策略,常见且默认的等待策略,当这个队列里满了,不执行覆盖,而是堵塞等待,使用ReentrantLock+Condition实现阻塞,最节省cpu,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景
  2. SleepingWaitStrategy策略,会在循环中不断等待数据。先进行自选等待如果不成功,则会使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此这个策略会阐释比较高的平均延迟。经典的应用场景就是异步日志
    3.YieldingWaitStrategy策略,这个策略用于延时低的场合。消费者线程会不断循环监控缓冲区变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间,如果需要一个高性能的系统,并且对延时比较有严格的要求可以考虑这种策略
  3. BusySpinWaitStrategy策略,采用死循环,消费者线程会尽最大的努力监控缓冲区的变化,对延时非常苛刻的场景使用,Cpu核心数必须大于消费者线程数量,推荐在线程绑定到固定的CPU场景下使用

消费关系

        //消费消息的优先级    [ 示例 ] A -> (B,C [只会给其中的一个消费]) -> Ddisruptor.handleEventsWith(new OrderEventHandler()).thenHandleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler()).then(new OrderEventHandler())

实例

pom

 <dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version></dependency>

Event

OrderEvent


import lombok.Data;/*** 消息载体(事件)* @author qubing* @date 2022/1/25 14:59*/
@Data
public class OrderEvent {private long value;private String name;}

OrderEventFactory

import com.lmax.disruptor.EventFactory;/*** 事件工厂* @author qubing* @date 2022/1/25 15:00*/
public class OrderEventFactory implements EventFactory<OrderEvent> {@Overridepublic OrderEvent newInstance() {return new OrderEvent();}
}

生产者

OrderEventProducer


import com.example.juc.disruptor.event.OrderEvent;
import com.lmax.disruptor.RingBuffer;/*** 消息(事件) 生产者* @author qubing* @date 2022/1/25 15:02*/
public class OrderEventProducer {//事件队列private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer){this.ringBuffer = ringBuffer;}public void onData(long value,String name){//获取事件队列的下一个槽long sequence = ringBuffer.next();try {//获取消息(事件)OrderEvent orderEvent = ringBuffer.get(sequence);//写入数据orderEvent.setValue(value);orderEvent.setName(name);}catch (Exception e){e.printStackTrace();}finally {System.err.println("生产者"+Thread.currentThread().getName()+"发送数据value:"+value+",name:"+name);//发布事件ringBuffer.publish(sequence);}}}

消费者

OrderEventHandler

import com.example.juc.disruptor.event.OrderEvent;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;/*** 消费者* @author qubing* @date 2022/1/25 15:08*/
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {@Overridepublic void onEvent(OrderEvent orderEvent, long sequence, boolean endOfBatch) throws Exception {System.err.println("消费者"+Thread.currentThread().getName()+"获取数据value:"+orderEvent.getValue()+",name:"+orderEvent.getName());}@Overridepublic void onEvent(OrderEvent orderEvent) throws Exception {System.err.println("消费者"+Thread.currentThread().getName()+"获取数据value:"+orderEvent.getValue()+",name:"+orderEvent.getName());}
}

测试1

    public static void main(String[] args) {Disruptor<OrderEvent> disruptor = new Disruptor<>(OrderEvent::new,1024 * 1024,Executors.defaultThreadFactory(),//单生成者ProducerType.SINGLE,//等待策略new YieldingWaitStrategy());//设置消费者用于处理RingBuffer的事件//disruptor.handleEventsWith(new OrderEventHandler());//设置多消费这,消息会被重复消费//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口,一条消息只能被一个消费者消费disruptor.handleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler());//启动disruptordisruptor.start();//创建ringbuffer容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();//创建生产者OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);//发送消息for (int i = 0; i < 100; i++) {orderEventProducer.onData(i,"Fox"+i);}disruptor.shutdown();}

测试2

   public static void main(String[] args) {Disruptor<OrderEvent> disruptor = new Disruptor<>(OrderEvent::new,1024 * 1024,Executors.defaultThreadFactory(),//多生成者ProducerType.MULTI,//等待策略new YieldingWaitStrategy());//设置消费者用于处理RingBuffer的事件//disruptor.handleEventsWith(new OrderEventHandler());//设置多消费这,消息会被重复消费//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口,一条消息只能被一个消费者消费disruptor.handleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler());//启动disruptordisruptor.start();//创建ringbuffer容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();new Thread(() ->{//创建生产者OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);//发送消息for (int i = 0; i < 100; i++) {orderEventProducer.onData(i,"Fox"+i);}},"producer1").start();new Thread(() ->{//创建生产者OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);//发送消息for (int i = 0; i < 100; i++) {orderEventProducer.onData(i,"Monkey"+i);}},"producer2").start();// disruptor.shutdown();}

测试3

public static void main(String[] args) {Disruptor<OrderEvent> disruptor = new Disruptor<>(OrderEvent::new,1024 * 1024,Executors.defaultThreadFactory(),//多生成者ProducerType.MULTI,//等待策略new YieldingWaitStrategy());//设置消费者用于处理RingBuffer的事件//disruptor.handleEventsWith(new OrderEventHandler());//设置多消费这,消息会被重复消费//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口,一条消息只能被一个消费者消费//消费消息的优先级    [ 示例 ] A -> (B,C [只会给其中的一个消费]) -> Ddisruptor.handleEventsWith(new OrderEventHandler()).thenHandleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler()).then(new OrderEventHandler());//启动disruptordisruptor.start();//创建ringbuffer容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();new Thread(() ->{//创建生产者OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);//发送消息for (int i = 0; i < 100; i++) {orderEventProducer.onData(i,"Fox"+i);}},"producer1").start();//        disruptor.shutdown();}

Disruptor笔记相关推荐

  1. disruptor笔记之六:常见场景

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <disruptor笔记>系列链接 快速入 ...

  2. disruptor笔记之二:Disruptor类分析

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <disruptor笔记>系列链接 快速入 ...

  3. Disruptor笔记(一)-预备知识

    Memory Barrier 内存障 .它是一个CPU指令.是的,再一次,我们在思考CPU级的东西以便得到我们需要的性能(Martin著名的MechanicalSympathy).基本上它是一个指令, ...

  4. 快递包裹自动化分拣系统_包裹识别系统的类型

    快递包裹自动化分拣系统 包裹识别码的类型 (Types of Parcel identifiers) There are several classes or parcel identificatio ...

  5. ai人工智能与it_保护您的IT业务必须了解的有关网络安全和人工智能的6件事

    ai人工智能与it With the year almost coming to a close, there is a need for reflection and tweaks in vario ...

  6. 如何成为一个更好的开发者

    Programming is mostly about learning a language, but how good you are at it is not just dependent on ...

  7. Disruptor本地线程队列_实现线程间通信---线程间通信工作笔记001

    Disruptor本地线程队列_实现线程间通信---线程间通信工作笔记001 看到同事用这个东西了,这个挺好用的说是,可以实现,本地线程间的通信,好像在c++和java中都可以用 现在没时间研究啊,暂 ...

  8. Disruptor 源码阅读笔记--转

    原文地址:http://coderbee.net/index.php/open-source/20130812/400 一.Disruptor 是什么? Disruptor 是一个高性能异步处理框架, ...

  9. Disruptor并发框架--学习笔记

    Disruptor并发框架简介 Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在J ...

最新文章

  1. 安装Oracle11g先决条件检查失败
  2. Java 理论与实践: 非阻塞算法简介——看吧,没有锁定!(转载)
  3. VISP视觉库识别AprilTag详细解读
  4. java + maven 实现发送短信验证码功能
  5. SpringBoot使用Easypoi导出excel示例
  6. asp sql ip地址排序_SQL必知必会读书笔记,30分钟入门SQL!
  7. VMware5.5的序列号
  8. 书写技术文档的模板技术调研文档书写规范
  9. 计算机设计大赛作品——冬奥可视化
  10. 信息系统软件配置、过程管理、开发工具(详细介绍)
  11. 条形码技术应用属于计算机系统的,条形码技术在现代物流系统中的应用
  12. oracle静默安装集群,Oracle RAC 静默安装实践
  13. 韩昊 20190919-1 每周例行报告
  14. win7休眠的开启与关闭方法
  15. 回顾一年的工作历程_回顾历程、总结经验、展望未来
  16. pxe自动装机利用tfp,http,nfs服务实现。
  17. Java实现mds降维_降维算法MDS
  18. 融会贯通面对对象编程思想
  19. 17 重定向(Redirect) vs 转发(Forward)
  20. c++小游戏杀手1.1.1版本

热门文章

  1. 基于JAVA疫情社区健康评估系统设计与实现 开题报告
  2. 利用google or-tools 求解逻辑难题:斑马问题
  3. vue使用element-ui的栅格布局的时候,有内容会被非overflow:hidden的内容遮住的处理方法
  4. Vue 使用jsPlumb 实现连线绘图
  5. vue屏幕长宽自适应
  6. DG(Data Guard)
  7. 美团后台开发笔试-数字字符
  8. 虚拟机soft lockup CPU死锁问题
  9. 水仙花数c之和语言程序,水仙花数C语言的
  10. 疫情数据可视化01---中国疫情时间序列数据整理(截至7月30号)