什么是Disruptor?

Disruptor是一个开源的JAVA框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案。

Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。

我们可以用下面的这几个图形象的理解一下该框架:

简单来说,可以这样理解,有一个环形仓库,用于存放生产者生产出来的数据,生产者生成了数据马上放到环状上的某个节点【槽】上,然后生产者的指针指向下一个空槽,消费者呢,同样有一个指针,指向某个放置叾数据的槽不断从里面获取数据,这样整个过程就流通起来了,其实这个原理在redis的底层实现上也有类似的应用,有兴趣的伙伴可以去查查相关资料,

同时,Disruptor里面有一个很有用的操作,就是可以人为的涉及生产者和消费者的处理顺序,即线程执行顺序,也叫编排顺序,

用上面这两张图表示,我们可以使用Disruptor 设定不同消费者的执行顺序,这在一些复杂特殊的业务场景中很有必要,说到这儿是不是觉得很像大数据里面的一个实时数据处理框架storm的处理逻辑呢?

基本上理论知识就普普及到这里了,下面我们来用Disruptor 做一个简单的业务逻辑处理,一个线程产生10000个订单,有3个消费者线程,一个消费者线程模拟订单入库逻辑,一个模拟发送消息给用户,这两个线程全部处理完毕再执行增值业务逻辑,模型图如下:

下面直接上代码,

<dependencies><!-- https://mvnrepository.com/artifact/com.lmax/disruptor --><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.1</version></dependency></dependencies>

1、创建一个订单实体对象,作为流通在环形内存中的对象,

package com.congge.model;public class TradeTransaction {private String id;//交易订单号private double price;public TradeTransaction() {}public TradeTransaction(String id, double price) {super();this.id = id;this.price = price;}public String getId() {return id;}public void setId(String id) {this.id = id;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}}

2、处理订单入库线程,

package com.congge.model;import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;import java.util.UUID;/*** 处理商品入库线程*/
public class TradeTransactionInDBHandler implements EventHandler<TradeTransaction>, WorkHandler<TradeTransaction> {public void onEvent(TradeTransaction tradeTransaction, long l, boolean b) throws Exception {this.onEvent(tradeTransaction);}public void onEvent(TradeTransaction event) throws Exception {String threadName = Thread.currentThread().getName();String id = event.getId();System.out.println("线程:" + threadName + "处理订单业务入库服务  ------> :  商品ID是 :" + id);}
}

3、处理发送消息线程,

package com.congge.model;import com.lmax.disruptor.EventHandler;/*** 处理订单发送短信业务*/
public class TradeTransactionJMSNotifyHandler implements EventHandler<TradeTransaction> {public void onEvent(TradeTransaction tradeTransaction, long l, boolean b) throws Exception {String threadName = Thread.currentThread().getName();String id = tradeTransaction.getId();System.out.println("线程:" + threadName + "处理订单业务发送短信通知服务:  商品ID是 :" + id);}
}

4、处理增值服务线程,

package com.congge.model;import com.lmax.disruptor.EventHandler;/*** 处理增值业务的线程*/
public class TradeTransactionVasHandler implements EventHandler<TradeTransaction> {public void onEvent(TradeTransaction tradeTransaction, long l, boolean b) throws Exception {String threadName = Thread.currentThread().getName();String id = tradeTransaction.getId();System.out.println("线程:" + threadName + "处理订单增值服务: 处理的 商品ID是 :" + id);}
}

5、生产者线程,模拟产生10000个订单,

package com.congge.model;import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;import java.util.Random;
import java.util.concurrent.CountDownLatch;/*** 模拟生产者,产生订单线程*/
public class TradeTransactionPublisher implements Runnable {Disruptor<TradeTransaction> disruptor;private CountDownLatch latch;private static int LOOP = 10000;   //模拟一万次交易的发生public TradeTransactionPublisher(CountDownLatch latch,Disruptor<TradeTransaction> disruptor) {this.disruptor=disruptor;this.latch=latch;}public void run() {TradeTransactionEventTranslator tradeTransloator = new TradeTransactionEventTranslator();for(int i=0;i<LOOP;i++){disruptor.publishEvent(tradeTransloator);}latch.countDown();}
}class TradeTransactionEventTranslator implements EventTranslator<TradeTransaction> {private Random random = new Random();public void translateTo(TradeTransaction event, long sequence) {this.generateTradeTransaction(event);}//生成订单流水号逻辑private TradeTransaction generateTradeTransaction(TradeTransaction trade) {trade.setId(random.nextInt() * 9999+"");return trade;}
}

6、测试类:

package com.congge.model;import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class TestDemo3 {public static void main(String[] args) throws Exception {long beginTime=System.currentTimeMillis();int bufferSize=1024;ExecutorService executor= Executors.newFixedThreadPool(4);Disruptor<TradeTransaction> disruptor=new Disruptor<TradeTransaction>(new EventFactory<TradeTransaction>() {public TradeTransaction newInstance() {return new TradeTransaction();}}, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());//使用disruptor创建消费者组C1,C2EventHandlerGroup<TradeTransaction> handlerGroup=disruptor.handleEventsWith(new TradeTransactionVasHandler(),new TradeTransactionInDBHandler());TradeTransactionJMSNotifyHandler jmsConsumer=new TradeTransactionJMSNotifyHandler();//声明在C1,C2之后执行JMS消息发送操作 也就是流程走到C3handlerGroup.then(jmsConsumer);disruptor.start();//启动CountDownLatch latch=new CountDownLatch(1);//生产者准备executor.submit(new TradeTransactionPublisher(latch, disruptor));latch.await();//等待生产者结束disruptor.shutdown();executor.shutdown();System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));}
}

运行一下,让我们看一下效果:

可以看到,这个过程的执行非常快,10秒多的时间全部处理完,当然,实际中,我们的每个业务逻辑处理线程里面还有大量的其他业务需要处理,但可以肯定的是,使用Disruptor来处理,可以充分使用CPU的性能,这里做一个小小的提示也是我们实际业务处理中可能面临的问题,就是当你的生产者线程和消费者线程处理逻辑的速度存在一定的时差怎么处理呢?可以考虑的方式是在生产者生产完毕数据后等待一小段时间,留给消费者处理,至于这个时间是多大,可以根据业务的处理时间进行粗略的计算。

Disruptor 的简单使用就说到这里,大家有兴趣可以进行更深入的研究,谢谢观看!

disruptor模拟高速处理大规模订单类业务场景相关推荐

  1. 君子签:打破知识付费资源对接窘境,为3类业务场景提供专业解决方案

    据有关行业数据报告显示,中国知识付费用户规模保持平稳增长态势,2020年中国知识付费行业用户规模达4.18亿人,今年有望突破4.77亿人.近年来,知识付费行业开始以平台化的模式迅速发展,知识付费平台面 ...

  2. 当S8遇上边缘计算:谈阿里云ENS对直播业务场景的支撑

    近日,英雄联盟S8全球总决赛落下帷幕,中国战队IG零封FNC夺得冠军.这场比赛引起了国内网友的超高关注度,也给直播平台带来了不小的技术挑战.虎牙直播平台结合阿里云边缘节点技术方案,保障了总决赛当日70 ...

  3. Redis实现朋友圈,微博等Feed流功能,实现Feed流微服务(业务场景、实现思路和环境搭建)

    文章目录 业务场景 Feed流相关概念 Feed流特征 Feed流分类 实现思路 环境搭建 数据库表结构 新建Feeds功能微服务ms-feeds 配置类 RedisTemplateConfigura ...

  4. ATM高层定义了4类业务,压缩视频信号的传送属于______。B

    ATM高层定义了4类业务,压缩视频信号的传送属于______.B A.CBR B.VBR C.UBR D.ABR [分析] ATM高层定义了如下4类业务: 固定比特率(CBR)业务,用于模拟铜线和光纤 ...

  5. 基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-架构篇

    简介: 本文简要介绍了基于 MySQL 结合 Tablestore 的大规模订单系统方案.这种方案支持大数据存储.高性能数据检索.SQL搜索.实时与全量数据分析,且部署简单.运维成本低. 作者 | 弘 ...

  6. java订单类_基于Java创建一个订单类代码实例

    这篇文章主要介绍了基于Java创建一个订单类代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 需求描述 定义一个类,描述订单信息 订单id 订 ...

  7. 前端证券项目_证监会公告[2018]6号 证券公司投资银行类业务内部控制指引

    证监会公告 [2018]6 号 证券公司投资银行类业务内部控制指 引 中国证券监督管理委员会公告 [2018]6 号现公布 <证券公 司投资银行类业务内部控制指引> ,自 2018 年 7 ...

  8. java订单类_使用Java创建一个订单类代码实例

    这篇文章主要简介了使用Java创建一个订单类代码实例,文中通过示例代码简介的非常具体,对大家的学习或者工作具有一定的参考学习网上卖,需要的朋友可以学习下 需求描述 定义一个类,描述订单信息 订单id ...

  9. 内购订单进行二次处理_物流图表复杂业务场景下的订单管理系统搭建

    来源:非红物流 销售流程中的订单管理是业务管理的重点.公司内部需要多部门协同.多资源整合才能顺利地完成订单交付:公司外部需要实时有效的共享订单状态,响应客户的诉求,提升客户的满意度.基于这个目标,订单 ...

最新文章

  1. 【C++快读快输详解(快速读入数字,快速输出数字)】
  2. 有关jquery checkbox获取checked的问题
  3. 2020-11-27
  4. android的帧布局,七、Android帧布局FrameLayout和霓虹灯效果
  5. 昆明理工大学复试计算机试题,2012年昆明理工大学计算机考研复试试题(.PDF
  6. 解决VS2010自带的C/C++编译器CL找不到mspdb100.dll的问题
  7. 第三方服务-极光推送
  8. 【一天一个C++小知识】016:c++11中的lambda表达式
  9. 第二把数独游戏 代码
  10. [导入]+ADO.NET读书笔记系列 一+
  11. google datastudio 使用教程
  12. 别再问我exe反编译成Python脚本了!
  13. 在计算机操作中粘贴的快捷键是什么,电脑复制粘贴的快捷键是什么
  14. matlab求广义逆及线性方程组的解
  15. 12306 崩了,90% 的人都用过这三款抢票工具
  16. 计算机程序ui设计员工资,ui设计师工资一般多少,发展前景怎么样
  17. 老树开新花:DLL劫持漏洞新玩法
  18. 玩转华为数据中心交换机系列 | 配置LACP模式的跨设备聚合(单机)
  19. 使用腾讯轻量云搭建个人邮箱系统
  20. 微信小程序元素节点滚到某位置后固定

热门文章

  1. IOS9 微信sdk升级指南
  2. Kafka Consumer端的一些解惑
  3. oracle developer 连接 mysql
  4. OECP社区正式上线
  5. Greenplum segment级问题的排查 - gp_session_role=utility (含csvlog日志格式解读)
  6. “针对即席工作负荷进行优化”如何影响你的计划缓存
  7. 编译Nginx提示gzip module requires the zlib library
  8. linux枯燥命令行下的“有趣命令”
  9. ubuntu下链接open***
  10. 关系型数据库和非关系型数据库的区别