disruptor 介绍
一、背景
1.来源
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内部的内存队列的延迟问题,而不是分布式队列。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。
2.应用背景和介绍
据目前资料显示:应用Disruptor的知名项目有如下的一些:Storm, Camel, Log4j2,还有目前的美团点评技术团队也有很多不少的应用,或者说有一些借鉴了它的设计机制。
Disruptor是一个高性能的线程间异步通信的框架,即在同一个JVM进程中的多线程间消息传递。
二、传统队列问题
队列 | 有界性 | 锁 | 结构 | 队列类型 |
---|---|---|---|---|
ArrayBlockingQueue | 有界 | 加锁 | 数组 | 阻塞 |
LinkedBlockingQueue | 可选 | 加锁 | 链表 | 阻塞 |
ConcurrentLinkedQueue | 无界 | 无锁 | 链表 | 非阻塞 |
LinkedTransferQueue | 无界 | 无锁 | 链表 | 阻塞 |
PriorityBlockingQueue | 无界 | 加锁 | 堆 | 阻塞 |
DelayQueue | 无界 | 加锁 | 堆 | 阻塞 |
1.伪共享概念
共享
缓存行
一个缓存对应的缓存行的结构图
伪共享
2.ArrayBlockingQueue 的伪共享问题
刚我们已经讲了伪共享的问题,那么ArrayBlockingQueue的这个伪共享问题存在于哪里呢,分析下核心的部分源码
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;//获取当前对象锁lock.lockInterruptibly();try {while (count == items.length)//阻塞并释放锁,等待notFull.signal()通知notFull.await();//将数据放入数组enqueue(e);} finally {lock.unlock();}}
private void enqueue(E x) {final Object[] items = this.items;//putIndex 就是入队的下标items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//加锁lock.lockInterruptibly();try {while (count == 0)//阻塞并释放对象锁,并等待notEmpty.signal()通知notEmpty.await();//在数据不为空的情况下return dequeue();} finally {lock.unlock();}}
private E dequeue() {final Object[] items = this.items;//takeIndex 是出队的下标E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;
}
其中最核心的三个成员变量为
putIndex:入队下标
takeIndex:出队下标
count:队列中元素的数量
而三个成员的位置如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
三、高性能原理
刚说了上面队列的两个性能问题:一个是加锁,一个是伪共享,那么disruptor是怎么解决这两个问题的,以及除了解决这两个问题之外,还引入了其他什么先进的东西提升性能的。
这里简单列举下:
- 引入环形的数组结构:数组元素不会被回收,避免频繁的GC,
- 无锁的设计:采用CAS无锁方式,保证线程的安全性
- 属性填充:通过添加额外的无用信息,避免伪共享问题
- 元素位置的定位:采用跟一致性哈希一样的方式,一个索引,进行自增
1.环形数组结构
其实质只是一个普通的数组,只是当放置数据填充满队列(即到达2^n-1位置)之后,再填充数据,就会从0开始,覆盖之前的数据,于是就相当于一个环。
2.生产和消费模式
多生产者——生产
假设现在又两个生产者,开始写数据,通过CAS竞争,w1得到的34的空间,w2得到了78的空间,其中6是代表已被写入或者没有被消费的数据。
多生产者——消费
绿色代表已经写OK的数据
- 1
假设三个生产者在写中,还没有置位AvailableBuffer,那么消费者可获取的消费下标只能获取到6,然后等生产者都写OK后,通知到消费者,消费者继续重复上面的步骤。如下图
消费者常见的等待
BusySpinWaitStrategy: 自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。
BlockingWaitStrategy: 使用锁和条件变量。CPU资源的占用少,延迟大,默认等待策略。
SleepingWaitStrategy: 在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。
YieldingWaitStrategy: 在多次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。
PhasedBackoffWaitStrategy: 上面多种策略的综合,CPU资源的占用少,延迟大
3.牛逼的下标指针
class LhsPadding{//缓存行补齐, 提升cache缓存命中率protected long p1, p2, p3, p4, p5, p6, p7;
}class Value extends LhsPadding{protected volatile long value;
}class RhsPadding extends Value{//缓存行补齐, 提升cache缓存命中率protected long p9, p10, p11, p12, p13, p14, p15;
}public class Sequence extends RhsPadding{...
}
四、用法
用法很简单,一共三个角色:生产者,消费者,disruptor对象
1.简单用法
disruptor对象
disruptor 就两个构造方法
public Disruptor(final EventFactory<T> eventFactory, // 数据实体构造工厂final int ringBufferSize, // 队列大小,必须是2的次方final ThreadFactory threadFactory, // 线程工厂final ProducerType producerType, // 生产者类型,单个生产者还是多个final WaitStrategy waitStrategy){ // 消费者等待策略...
}public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory){...
}
生产处理
生产者这里没有固定的对象,只是需要获取放置数据的位置,然后进行publish
public void send(String data){RingBuffer<MsgEvent> ringBuffer = this.disruptor.getRingBuffer();//获取下一个放置数据的位置long next = ringBuffer.next();try{MsgEvent event = ringBuffer.get(next);event.setValue(data);}finally {//发布事件ringBuffer.publish(next);}
}
消费处理
消费处理可以有如下几种
- 1
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){...
}
public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors){...
}
public EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories){...
}
public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers){...
}
简单用例
//消费者
public class MsgConsumer implements EventHandler<MsgEvent>{private String name;public MsgConsumer(String name){this.name = name;}@Overridepublic void onEvent(MsgEvent msgEvent, long l, boolean b) throws Exception {System.out.println(this.name+" -> 接收到信息: "+msgEvent.getValue());}
}//生产者处理
public class MsgProducer {private Disruptor disruptor;public MsgProducer(Disruptor disruptor){this.disruptor = disruptor;}public void send(String data){RingBuffer<MsgEvent> ringBuffer = this.disruptor.getRingBuffer();long next = ringBuffer.next();try{MsgEvent event = ringBuffer.get(next);event.setValue(data);}finally {ringBuffer.publish(next);}}public void send(List<String> dataList){dataList.stream().forEach(data -> this.send(data));}
}//触发测试
public class DisruptorDemo {@Testpublic void test(){Disruptor<MsgEvent> disruptor = new Disruptor<>(MsgEvent::new, 1024, Executors.defaultThreadFactory());//定义消费者MsgConsumer msg1 = new MsgConsumer("1");MsgConsumer msg2 = new MsgConsumer("2");MsgConsumer msg3 = new MsgConsumer("3");//绑定配置关系disruptor.handleEventsWith(msg1, msg2, msg3);disruptor.start();// 定义要发送的数据MsgProducer msgProducer = new MsgProducer(disruptor);msgProducer.send(Arrays.asList("nihao","hah"));}
}
输出(消费没有固定顺序):
1 -> 接收到信息: nihao
3 -> 接收到信息: nihao
3 -> 接收到信息: hah
2 -> 接收到信息: nihao
2 -> 接收到信息: hah
1 -> 接收到信息: hah
2.其他用法
上面主要介绍了多消费统一消费,但是在生产者模型中是有很多种,如下,一对一,一对多,多对多,多对一
生产者配置
其中生产模式中的单生产者模式和多生产模式,这里主要是通过一个枚举:ProduceType来区分,建议,多个生产者用多生产者模式,性能会好点。
消费者配置
统一消费:每个消费者都消费一份生产者生产的数据
分组消费:每个生产这生产的数据只被消费一次
统一消费像上面简单用法中运用即可,对于分组消费,用函数 handleEventsWithWorkerPool 即可
/*** 分组处理 handleEventWithWorkerPool*/
@Test
public void test1(){Disruptor<MsgEvent> disruptor = new Disruptor(MsgEvent::new, 1024, Executors.defaultThreadFactory());disruptor.handleEventsWithWorkerPool(new MyWorkHandler("work1"), new MyWorkHandler("work2"));disruptor.start();MsgProducer msgProducer = new MsgProducer(disruptor);msgProducer.send(Arrays.asList("aaa","bbb"));
}
输出:
work1 : MsgEvent(value=bbb)
work2 : MsgEvent(value=aaa)
work1 : MsgEvent(value=cc)
work2 : MsgEvent(value=dd)
消费顺序配置
1.消费者的顺序消费
/*** 测试顺序消费* 每一条消息的消费者1和3消费完毕后,消费者2再进行消费*/
@Test
public void test2(){MsgConsumer msg1 = new MsgConsumer("1");MsgConsumer msg2 = new MsgConsumer("2");MsgConsumer msg3 = new MsgConsumer("3");Disruptor<MsgEvent> disruptor = new Disruptor(MsgEvent::new, 1024, Executors.defaultThreadFactory());disruptor.handleEventsWith(msg1, msg3).then(msg2);disruptor.start();MsgProducer msgProducer = new MsgProducer(disruptor);msgProducer.send(Arrays.asList("aaa", "bbb", "ccc", "ddd"));
}
输出(里面的是根据每一条消息的消费者顺序):
1 -> 接收到信息: aaa
3 -> 接收到信息: aaa
1 -> 接收到信息: bbb
1 -> 接收到信息: ccc
2 -> 接收到信息: aaa
3 -> 接收到信息: bbb
3 -> 接收到信息: ccc
3 -> 接收到信息: ddd
1 -> 接收到信息: ddd
2 -> 接收到信息: bbb
2 -> 接收到信息: ccc
2 -> 接收到信息: ddd
2.消费分为多个支线,而且也有消费顺序问题
/*** 测试多支线消费* 消费者1和消费者3一个支线,消费者2和消费者4一个支线,消费者3和消费者4消费完毕后,消费者5再进行消费*/
@Test
public void test3(){MsgConsumer msg1 = new MsgConsumer("1");MsgConsumer msg2 = new MsgConsumer("2");MsgConsumer msg3 = new MsgConsumer("3");MsgConsumer msg4 = new MsgConsumer("4");MsgConsumer msg5 = new MsgConsumer("5");//支线:消费者1和消费者3disruptor.handleEventsWith(msg1, msg3);//支线:消费者2和消费者4disruptor.handleEventsWith(msg2, msg4);//消费者3和消费者4执行完之后,指向消费者5disruptor.after(msg3, msg4).handleEventsWith(msg5);disruptor.start();MsgProducer msgProducer = new MsgProducer(disruptor);msgProducer.send(Arrays.asList("aaa", "bbb", "ccc", "ddd"));
}
1 -> 接收到信息: aaa
2 -> 接收到信息: aaa
2 -> 接收到信息: bbb
3 -> 接收到信息: aaa
3 -> 接收到信息: bbb
4 -> 接收到信息: aaa
4 -> 接收到信息: bbb
5 -> 接收到信息: aaa
1 -> 接收到信息: bbb
5 -> 接收到信息: bbb
五、常见问题
1.disruptor应该如何用才能发挥最大功效?
disruptor原本就是事件驱动的设计,其整个架构跟普通的多线程很不一样。比如一种用法,将disruptor作为业务处理,中间带I/O处理,这种玩法比多线程还慢;相反,如果将disruptor做业务处理,需要I/O时采用nio异步调用,不阻塞disruptor消费者线程,等到I/O异步调用回来后在回调方法中将后续处理重新塞到disruptor队列中,可以看出来,这是典型的事件处理架构,确实能在时间上占据优势,加上ringBuffer固有的几项性能优化,能让disruptor发挥最大功效。
2.如果buffer常常是满的怎么办?
一种是把buffer变大,另一种是从源头解决producer和consumer速度差异太大问题,比如试着把producer分流,或者用多个disruptor,使每个disruptor的load变小。
3. 什么时候使用disruptor?
如果对延迟的需求很高,可以考虑使用。
六、参考:
disruptor 介绍相关推荐
- Disruptor介绍使用
Disruptor介绍使用 1,什么是 Disruptor? (1)Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能的并发框架.可以认为是线程间通信的高效低延时的内存消息组件,它最大 ...
- Disruptor介绍 -- 初识Disruptor框架
Disruptor简介: LMAX Disruptor是一个高性能的线程间消息库.它源于LMAX对并发性,性能和非阻塞算法的研究,如今构成了Exchange基础架构的核心部分. Disruptor它是 ...
- 高并发框架 Disruptor
1.Disruptor介绍 Disruptor是一个开源的Java框架,它被设计用于在生产者-消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS) ...
- LMAX Disruptor用户手册-4.0.0.RC2-最好的入门文章
LMAX Disruptor 用户手册 原文链接 LMAX Disruptor是一个高性能线程通信库.它起源于LMAX对高并发,高性能,无锁算法的研究,如今已成长为Exchange基础架构的核心部分. ...
- springboot + Disruptor 实现特快高并发处理!!
01.背景 工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka,也不是rabbitmq;Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录. 02 ...
- 陆金所 CAT 优化实践
1 背景 CAT 介绍 CAT (Central Application Tracking)是一个实时监控系统,由美团点评开发并开源,定位于后端应用监控.应用集成客户端的方式上报中间件和业务数据,支持 ...
- Java并发与多线程
1.多线程优点 资源利用率更好:文件读写操作 程序设计在某些情况下更简单: 程序响应更快:端口监听操作 2.多线程的代价 设计更复杂:多线程共享数据时尤其需要注意 上下文切换的开销: CPU 会在一个 ...
- 深入浅出生产者-消费者模式
笔者也建立的自己的公众号啦,平时会分享一些编程知识,欢迎各位大佬支持~ 扫码或微信搜索北风IT之路关注 生产者-消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案.也经常有面 ...
- 并发编程之Disruptor框架介绍和高阶运用
1. Disruptor是什么 1.1 技术背景 LMAX是在英国注册并受到FCA监管(监管号码为509778)的外汇黄金交易所, LMAX架构是LMAX内部研发并应用到交易系统的一种技术.它 ...
最新文章
- iOS开发-照片选择
- 阿里云 MSE 云原生网关助力斯凯奇轻松应对双 11 大促
- 数据仓库之 ETL漫谈
- 非正常关闭vi编辑器时会生成一个.swp文件
- java 正規表示 group_经验分享|Java+百度AI实现人脸识别
- Ionic2 下处理 Android 设备下返回按钮的事件
- 入行网络攻城狮的自述
- 尚德计算机科学与技术网课,计算机科学与技术
- 【读书笔记】【未】杀死一只知更鸟
- 模糊神经网络应用实例,神经网络与模糊控制
- c语言速算24课程设计,C语言速算24数据结构课程设计.doc
- Strust2 success sucess
- Python+Vue计算机毕业设计网上美妆购物商城8k7w5(源码+程序+LW+部署)
- js隐藏显示div页面方法
- TextGrabber重大更新,识别文字并实时离线翻译,支持中文
- 黑客攻防web安全实战详解笔记
- 行业洞察 | Web3、AI4Science、机器人,热门赛道全解析...AI商业化受阻,拐点在何方?...
- CTF小白新手导航(基础建议)
- Codeforces Round #702 (Div. 3)补题
- ZCash的零知识证明
热门文章
- gsettings-desktop-schemas : 破坏: mutter (< 3.31.4) 但是 3.28.4-0ubuntu18.04.2 正要被安装解决方案
- VC6、BC5、G2.9标准分配器一览
- LintCode 375. 克隆二叉树(深复制)
- c语言条件语句示例_PHP中的条件语句和示例
- Java ObjectOutputStream writeFields()方法与示例
- 实训09.09:简单的彩票系统(机选多注)
- FreeRTOS时间管理
- java内存模型 创建类_JVM内存模型及String对象内存分配
- 35. 搜索插入位置 golang
- C语言模拟实现标准库函数之strstr()