Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

Java内置队列
介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。

队列    有界性    锁    数据结构
ArrayBlockingQueue    bounded    加锁    arraylist
LinkedBlockingQueue    optionally-bounded    加锁    linkedlist
ConcurrentLinkedQueue    unbounded    无锁    linkedlist
LinkedTransferQueue    unbounded    无锁    linkedlist
PriorityBlockingQueue    unbounded    加锁    heap
DelayQueue    unbounded    加锁    heap
队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。

我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。

通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue。

Disruptor论文中讲述了一个实验:

这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
机器环境:2.4G 6核
运算: 64位的计数器累加5亿次
Method    Time (ms)
Single thread    300
Single thread with CAS    5,700
Single thread with lock    10,000
Single thread with volatile write    4,700
Two threads with CAS    30,000
Two threads with lock    224,000
CAS操作比单线程无锁慢了1个数量级;有锁且多线程并发的情况下,速度比单线程无锁慢3个数量级。可见无锁速度最快。

单线程情况下,不加锁的性能 > CAS操作的性能 > 加锁的性能。

在多线程情况下,为了保证线程安全,必须使用CAS或锁,这种情况下,CAS的性能超过锁的性能,前者大约是后者的8倍。

综上可知,加锁的性能是最差的。

Disruptor的设计方案
Disruptor通过以下设计来解决队列速度慢的问题:

环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。

元素位置定位
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

一个生产者
写数据
生产者单线程写数据的流程比较简单:

申请写入m个元素;
若是有m个元素可以写入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;
若是返回的正确,则生产者开始写入元素。

图5 单个生产者生产过程示意图

多个生产者
多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

下面分读数据和写数据两种情况介绍。

读数据
生产者多线程写入的情况会复杂很多:

申请读取到序号n;
若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
消费者读取元素。
如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。

读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。

然后,消费者读取下标从3到6共计4个元素。

图6 多个生产者情况下,消费者消费过程示意图

写数据
多个生产者写入的时候:

申请写入m个元素;
若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。
如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。

Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。

图7 多个生产者情况下,生产者生产过程示意图

下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量CAS,保证操作的线程安全。

防止不同生产者对同一段空间写入的代码,如下所示:

public long tryNext(int n) throws InsufficientCapacityException
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }
 
    long current;
    long next;
 
    do
    {
        current = cursor.get();
        next = current + n;
 
        if (!hasAvailableCapacity(gatingSequences, n, current))
        {
            throw InsufficientCapacityException.INSTANCE;
        }
    }
    while (!cursor.compareAndSet(current, next));
 
    return next;
}
通过do/while循环的条件cursor.compareAndSet(current, next),来判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,该函数会返回失败,While循环重新执行,申请写入空间。

消费者的流程与生产者非常类似,这儿就不多描述了。

总结
Disruptor通过精巧的无锁设计实现了在高并发情形下的高性能。

在美团点评内部,很多高并发场景借鉴了Disruptor的设计,减少竞争的强度。其设计思想可以扩展到分布式场景,通过无锁设计,来提升服务性能。

代码样例
package com.vrv.linkdood_disruptor;
 
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
 
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;
 
/**
 *      <B>说    明<B/>:开始
 * 
 * @author 作者名:冯龙淼
 *            E-mail:fenglongmiao@vrvmail.com.cn
 * 
 * @version 版   本  号:1.0.<br/>
 *          创建时间:2018年1月22日 下午12:12:31
 */
public class DemoMain {  
    
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {  
        
        final int BUFFER_SIZE=1024;  
        final int THREAD_NUMBERS=4;  
        
        /* 
         * createSingleProducer创建一个单生产者的RingBuffer, 
         * 第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。 
         * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 
         * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 
         */  
        final RingBuffer<TradeTransaction> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TradeTransaction>() {  
            @Override  
            public TradeTransaction newInstance() {  
                return new TradeTransaction();  
            } // PhasedBackoffWaitStrategy YieldingWaitStrategy
        }, BUFFER_SIZE,new YieldingWaitStrategy());  
        //创建线程池  
        ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
        //创建SequenceBarrier  
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
          
        //创建消息处理器  
        BatchEventProcessor<TradeTransaction> transProcessor = new BatchEventProcessor<TradeTransaction>(  
                ringBuffer, sequenceBarrier, new TradeTransactionInDBHandler());  
          
        //这一部的目的是让RingBuffer根据消费者的状态    如果只有一个消费者的情况可以省略  
        ringBuffer.addGatingSequences(transProcessor.getSequence());  
          
        //把消息处理器提交到线程池  
        executors.submit(transProcessor);  
        //如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类  
          
        Future<?> future=executors.submit(new Callable<Void>() {  
            @Override  
            public Void call() throws Exception {  
                long seq = 0;  
                for(int i=0;i<2005;i++){  
                    try{
                         seq=ringBuffer.next();//占个坑 --ringBuffer一个可用区块  
                         
                         ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据  如果此处不理解,想想RingBuffer的结构图  
                           
                    }finally{
                         ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见
                    }
                   
                }  
                return null;  
            }  
        });  
        future.get();//等待生产者结束  
//        TimeUnit timeUnit = TimeUnit.MILLISECONDS;
//        future.get(100, timeUnit);
        Thread.sleep(5000);//等上5秒,等消费都处理完成  
        transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)  
        executors.shutdown();//终止线程  
    }  
    
}  
 
 
package com.vrv.linkdood_disruptor;
/**
 *      <B>说    明<B/>:数据实体类
 * 
 * @author 作者名:冯龙淼
 *            E-mail:fenglongmiao@vrvmail.com.cn
 * 
 * @version 版   本  号:1.0.<br/>
 *          创建时间:2018年1月22日 下午12:10:58
 */
//DEMO中使用的 消息全假定是一条交易  
public class TradeTransaction {  
    
   private String id;//交易ID  
   
   private double price;//交易金额  
     
   public TradeTransaction() {  
   }  
   public TradeTransaction(String id, double price) {  
       super();  
       this.id = id;  
       this.price = price;  
   }  
   public String getId() {  
       return id;  
   }  
   public void setId(String id) {  
       this.id = id;  
   }  
   public double getPrice() {  
       return price;  
   }  
   public void setPrice(double price) {  
       this.price = price;  
   }
    @Override
    public String toString() {
        return "TradeTransaction [id=" + id + ", price=" + price + "]";
    }  
 
}

package com.vrv.linkdood_disruptor;
 
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
 
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
 
/**
 *      <B>说    明<B/>:消费实现类
 * 
 * @author 作者名:冯龙淼
 *            E-mail:fenglongmiao@vrvmail.com.cn
 * 
 * @version 版   本  号:1.0.<br/>
 *          创建时间:2018年1月22日 下午12:11:33
 */
public class TradeTransactionInDBHandler implements EventHandler<TradeTransaction>,WorkHandler<TradeTransaction> {  
    
    /** 交易条数 */
    private AtomicLong count = new AtomicLong(0);
    
    @Override  
    public void onEvent(TradeTransaction event, long sequence,  
            boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(TradeTransaction event) throws Exception {  
        //这里做具体的消费逻辑  
        event.setId(UUID.randomUUID().toString());//简单生成下ID  
        System.out.println(event.getId()+"&& price:"+event.getPrice()+"count: "+count.getAndIncrement()); 
    }  
    
}

切记:一定要在设置值的地方加上

try{

}finally{

},

否则如果数据发布不成功,最后数据会逐渐填满ringbuffer,最后后面来的数据根本没有办法调用可用空间,导致方法阻塞,占用CPU和内存,无法释放资源,最后导致服务器死机

注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。

Disruptor 还提供另外一种形式的调用来简化以上操作,并确保 publish 总是得到调用。

public class LongEvent
{
    private long value;
 
    public void set(long value)
    {
        this.value = value;
    }
}
static class Translator implements EventTranslatorOneArg<LongEvent, Long>{
    @Override
    publicvoid translateTo(LongEvent event, long sequence, Long data) {
        event.set(data);
    }    
}
    
public static Translator TRANSLATOR = new Translator();
    
public staticvoid publishEvent2(Disruptor<LongEvent> disruptor) {
    // 发布事件;
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    long data = getEventDataxxxx();//获取要通过事件传递的业务数据;    
    ringBuffer.publishEvent(TRANSLATOR,data);
}
多个生产者

在构建Disruptor实例的时候,需要指定生产者是单生产者(ProducerType.SINGLE)还是多生产者(ProducerType.MULTI)

多个消费者

(类型1)   多个消费者每个消费者都有机会消费相同数据,使用handleEventsWith方法

class ComsumerHandler implements EventHandler<ResultTxt>{
    
    private int no;
 
    public  ComsumerHandler(int no) {
        this.no=no;
    }
 
    @Override
    public void onEvent(ResultTxt resultTxt, long sequence, boolean endOfBatch)
        throws Exception {
        System.out.println(no+" data commming......"+resultTxt.getBarCode());
    }
}
 
//设置多个消费者
disruptor.handleEventsWith(new ComsumerHandler(1),new ComsumerHandler(2));
     (类型2)  多个消费者,每个消费者消费不同数据。也就是说每个消费者竞争数据,竞争到消费,其他消费者没有机会。使用handleEventsWithWorkerPool方法

class ComsumerHandler implements WorkHandler<ResultTxt>{
 
    private int no;
 
    public  ComsumerHandler(int no) {
        this.no=no;
    }
 
    @Override
    public void onEvent(ResultTxt event)
        throws Exception {
        System.out.println(no+"  data commming......"+event.getBarCode());
    }
}
 
//多个消费者,每个消费者竞争消费不同数据
disruptor.handleEventsWithWorkerPool(new ComsumerHandler(1),new ComsumerHandler(2));

等待策略
生产者的等待策略
暂时只有休眠1ns。

LockSupport.parkNanos(1);
消费者的等待策略
名称    措施    适用场景
BlockingWaitStrategy    加锁    CPU资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy    自旋    通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
PhasedBackoffWaitStrategy    自旋 + yield + 自定义策略    CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy    自旋 + yield + sleep    性能和CPU资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy    加锁,有超时限制    CPU资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy    自旋 + yield + 自旋    性能和CPU资源之间有很好的折中。延迟比较均匀

并发框架disruptor(高性能内存Queue)相关推荐

  1. 并发框架Disruptor(核心概念 入门 高性能原理-伪共享 CAS 环形数据 生产和消费模式 高级使用 )

    并发框架Disruptor 并发框架Disruptor Disruptor概述 背景 什么是Disruptor 为什么使用Disruptor Disruptor 的核心概念 Ring Buffer S ...

  2. java 无锁框架_高性能无锁并发框架 Disruptor,太强了!

    Java技术栈 www.javastack.cn 关注优质文章 Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操 ...

  3. 【java并发编程】无锁并发框架disruptor

    一.简介 Disruptor是一个高性能队列,研发的初衷是解决内部的内存队列的延迟问题,而不是分布式队列.基于Disruptor开发的系统单线程能支撑每秒600万订单. 使用场景:对延时要求很高的场景 ...

  4. 你需要知道的高性能并发框架Disruptor原理

    Disruptor的小史 现在要是不知道Disruptor真的已经很outer了,Disruptor是英国外汇交易公司LMAX开发的一款开源的高性能队列,LMAX Disruptor是一个高性能的线程 ...

  5. 高并发框架 Disruptor

    1.Disruptor介绍 Disruptor是一个开源的Java框架,它被设计用于在生产者-消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS) ...

  6. java disruptor压测_Java并发框架Disruptor实现原理与源码分析(二) 缓存行填充与CAS操作...

    ##缓存行填充 关于缓存行填充在我个人的印象里面第一次看到是在Java的java.util.concurrent包中,因为当时很好奇其用法背后的逻辑,所以查了很多资料才明白到底是怎么回事*(也许事实上 ...

  7. Java 并发框架Disruptor(七)

    Disruptor VS BlockingQueue的压测对比: import java.util.concurrent.ArrayBlockingQueue;public class ArrayBl ...

  8. 并发框架Disruptor

    2019独角兽企业重金招聘Python工程师标准>>> 在看log日志的时候发先有人jar包中有一个奇怪的东西 <dependency><groupId>co ...

  9. 高性能并发队列Disruptor使用详解,详细解析Disruptor框架的应用和基本原理

    基本概念 Disruptor是一个高性能的异步处理框架,是一个轻量的Java消息服务JMS, 能够在无锁的情况下实现队列的并发操作 Disruptor使用环形数组实现了类似队列的功能,并且是一个有界队 ...

最新文章

  1. LTE/NR用户接入过程笔记
  2. 使用drbd进行磁盘扩容,小磁盘扩容大磁盘后大小未变的问题解决方法
  3. rust(28)-具名结构体
  4. mysql事务顺序重排_MySQL事务处理及字符集和校对顺序
  5. MongoDB学习笔记二—Shell操作
  6. 通过hashtable实现dic
  7. 全中!七大初学者易踩的坑!
  8. 秩为1的矩阵,向量,绩的联合使用
  9. 连续系统离散化_连续系统转化为离散系统之 z 变换
  10. vs2015 安装破解版Visual Assist X
  11. python10的负n次方_python中n次方怎么表示
  12. 计算机科学与技术的研究背景,计算机科学与技术发展背景
  13. Kubernetes网络插件(CNI)超过10Gbit/s网络的基准结果
  14. 服务器自建云存储,如何搭建私有云?私有云储存服务器NAS搭建方法(轻松搞定)...
  15. VR全景视频、图片播放器
  16. 西奥电梯服务器故障维修,电梯维保须知:西子奥的斯电梯故障分析及解决
  17. TCP/IP协议族的数据链路层基础(1)——MTU
  18. sipjs 保存mp4文件_微信视频号视频怎么下载,视频号视频怎么保存到手机
  19. Zadig 构建效率提升 40% 背后的实践思路
  20. linux 重启nginx命令

热门文章

  1. oracle hcm 发展,甲骨文发布Oracle HCM Cloud云服务 呈现三大亮点
  2. 气体管道管径及流量对照表_气体涡轮流量计的选型要点?
  3. xmind怎样画流程图_老师是怎样上网课的?
  4. influxDB框架 数据存储 TSM 数据操作等详解
  5. 深度学习笔记——生成模型
  6. 文本获取和搜索引擎中的反馈模型
  7. 关于servlet中出现GET方法不能应用于此url的解决办法
  8. sql操作mysql数据库_一些常用的操作MySQL数据库的sql语句
  9. 简述直方图和柱形图的区别_如何区分直方图与柱形图
  10. DocDokuPLM介绍