Disruptor内存消息队列

最近在做一个有关使用内存消息队列到功能,比如将日志信息或点击统计信息持久化等操作,开始想着用java到内存队列作为缓冲区,后来在网上搜到Disruptor这个东西,神乎其神到,就简单了解了一下,做了一个demo,感觉还不错,可以用用,有关概念可以自行搜索,下面就简单介绍一下开发过程。

新建测试工程

新建maven工程listen-disruptor

工程目录结构如下

pom.xml文件内容

 
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  3. <modelVersion>4.0.0</modelVersion>

  4. <groupId>listen</groupId>

  5. <artifactId>listen-disruptor</artifactId>

  6. <packaging>war</packaging>

  7. <version>0.0.1-SNAPSHOT</version>

  8. <name>listen-disruptor Maven Webapp</name>

  9. <url>http://maven.apache.org</url>

  10. <dependencies>

  11. <dependency>

  12. <groupId>junit</groupId>

  13. <artifactId>junit</artifactId>

  14. <version>3.8.1</version>

  15. <scope>test</scope>

  16. </dependency>

  17. <!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->

  18. <dependency>

  19. <groupId>com.lmax</groupId>

  20. <artifactId>disruptor</artifactId>

  21. <version>3.3.6</version>

  22. </dependency>

  23. <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->

  24. <dependency>

  25. <groupId>com.alibaba</groupId>

  26. <artifactId>fastjson</artifactId>

  27. <version>1.2.20</version>

  28. </dependency>

  29. </dependencies>

  30. <build>

  31. <finalName>listen-disruptor</finalName>

  32. </build>

  33. </project>

在Disruptor中,所有到生产和消费是基于事件驱动到,即事件对象。

编码

新建事件对象类LogEvent

 
  1. package com.listen.disruptor;

  2. import java.io.Serializable;

  3. import java.util.Date;

  4. import com.alibaba.fastjson.JSONObject;

  5. /**

  6. * 事件对象(日志事件)

  7. * @author Administrator

  8. *

  9. */

  10. public class LogEvent implements Serializable {

  11. private long logId;

  12. private String content;

  13. private Date date;

  14. public LogEvent(){

  15. }

  16. public LogEvent(long logId, String content, Date date){

  17. this.logId = logId;

  18. this.content = content;

  19. this.date = date;

  20. }

  21. public long getLogId() {

  22. return logId;

  23. }

  24. public void setLogId(long logId) {

  25. this.logId = logId;

  26. }

  27. public String getContent() {

  28. return content;

  29. }

  30. public void setContent(String content) {

  31. this.content = content;

  32. }

  33. public Date getDate() {

  34. return date;

  35. }

  36. public void setDate(Date date) {

  37. this.date = date;

  38. }

  39. public String toString(){

  40. return JSONObject.toJSONString(this);

  41. }

  42. }

事件工厂类,用来初始化预分配空到事件对象

 
  1. package com.listen.disruptor;

  2. import com.lmax.disruptor.EventFactory;

  3. /**

  4. * 事件生成工厂(用来初始化预分配事件对象)

  5. * @author Administrator

  6. *

  7. */

  8. public class LogEventFactory implements EventFactory<LogEvent> {

  9. public LogEvent newInstance() {

  10. return new LogEvent();

  11. }

  12. }

新建事件到消费者LogEventConsumer,这里演示只是做简单到打印显示

 
  1. package com.listen.disruptor;

  2. import com.lmax.disruptor.EventHandler;

  3. /**

  4. * 事件到消费者

  5. * @author Administrator

  6. *

  7. */

  8. public class LogEventConsumer implements EventHandler<LogEvent> {

  9. public void onEvent(LogEvent logEvent, long seq, boolean bool) throws Exception {

  10. System.out.println("seq:" + seq + ",bool:" + bool + ",logEvent:" + logEvent.toString());

  11. }

  12. }

新建事件的生产者LogEventProducer

 
  1. package com.listen.disruptor;

  2. import java.util.Date;

  3. import com.lmax.disruptor.RingBuffer;

  4. public class LogEventProducer {

  5. private final RingBuffer<LogEvent> ringBuffer;

  6. public LogEventProducer(RingBuffer<LogEvent> ringBuffer){

  7. this.ringBuffer = ringBuffer;

  8. }

  9. public void onData(long logId, String content, Date date){

  10. //ringBuffer类似一个队列,next就是下一个槽

  11. long seq = ringBuffer.next();

  12. try{

  13. //用seq索引取出一个空到事件用于填充

  14. LogEvent logEvent = ringBuffer.get(seq);

  15. logEvent.setLogId(logId);

  16. logEvent.setContent(content);

  17. logEvent.setDate(date);

  18. }

  19. finally{

  20. //最终发布事件,很重要

  21. ringBuffer.publish(seq);

  22. }

  23. }

  24. }

至此有关消费和生产到代码已经码完,但是目前推荐另一种生产者到写法,两种都可以使用

新建LogEventProducerWithTranslator生产者

 
  1. package com.listen.disruptor;

  2. import java.util.Date;

  3. import com.lmax.disruptor.EventTranslatorVararg;

  4. import com.lmax.disruptor.RingBuffer;

  5. /**

  6. * 使用translator方式到事件生产者发布事件

  7. * @author Administrator

  8. *

  9. */

  10. public class LogEventProducerWithTranslator {

  11. private final static EventTranslatorVararg<LogEvent> translator = new EventTranslatorVararg<LogEvent>() {

  12. public void translateTo(LogEvent logEvent, long seq, Object... objs) {

  13. logEvent.setLogId((Long) objs[0]);

  14. logEvent.setContent((String) objs[1]);

  15. logEvent.setDate((Date) objs[2]);

  16. }

  17. };

  18. private final RingBuffer<LogEvent> ringBuffer;

  19. public LogEventProducerWithTranslator(RingBuffer<LogEvent> ringBuffer){

  20. this.ringBuffer = ringBuffer;

  21. }

  22. public void onData(long logId, String content, Date date){

  23. this.ringBuffer.publishEvent(translator, logId, content, date);

  24. }

  25. }

测试类LogEventMain

 
  1. package com.listen.disruptor;

  2. import java.util.Date;

  3. import java.util.concurrent.Executor;

  4. import java.util.concurrent.Executors;

  5. import java.util.concurrent.ThreadFactory;

  6. import com.lmax.disruptor.RingBuffer;

  7. import com.lmax.disruptor.dsl.Disruptor;

  8. public class LogEventMain {

  9. public static void main(String[] args) {

  10. // producer();

  11. producerWithTranslator();

  12. }

  13. public static void producer(){

  14. Executor executor = Executors.newCachedThreadPool();

  15. LogEventFactory factory = new LogEventFactory();

  16. int ringBufferSize = 1024;

  17. Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory, ringBufferSize, executor);

  18. disruptor.handleEventsWith(new LogEventConsumer());

  19. disruptor.start();

  20. RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();

  21. LogEventProducer producer = new LogEventProducer(ringBuffer);

  22. for(int i = 0; true; i++){

  23. producer.onData(i, "c" + i, new Date());

  24. }

  25. }

  26. public static void producerWithTranslator(){

  27. LogEventFactory factory = new LogEventFactory();

  28. int ringBufferSize = 1024;

  29. ThreadFactory threadFactory = new ThreadFactory() {

  30. public Thread newThread(Runnable r) {

  31. return new Thread(r);

  32. }

  33. };

  34. Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory, ringBufferSize, threadFactory);

  35. disruptor.handleEventsWith(new LogEventConsumer());

  36. disruptor.start();

  37. RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();

  38. LogEventProducerWithTranslator producer2 = new LogEventProducerWithTranslator(ringBuffer);

  39. for(int i = 0; true; i++){

  40. producer2.onData(i, "c" + i, new Date());

  41. }

  42. }

  43. }

测试类分别写类两种生产者到测试,其中LogEventProducerWithTranslator生产者中到translator变量还有其他几种写法,适用于其他到场景,用法都是类似到,可以自行研究一下

测试

测试结果如下

 
  1. seq:57804,bool:false,logEvent:{"content":"c57804","date":1477899755475,"logId":57804}

  2. seq:57805,bool:false,logEvent:{"content":"c57805","date":1477899755475,"logId":57805}

  3. seq:57806,bool:false,logEvent:{"content":"c57806","date":1477899755475,"logId":57806}

  4. seq:57807,bool:false,logEvent:{"content":"c57807","date":1477899755475,"logId":57807}

  5. seq:57808,bool:false,logEvent:{"content":"c57808","date":1477899755475,"logId":57808}

  6. seq:57809,bool:false,logEvent:{"content":"c57809","date":1477899755475,"logId":57809}

  7. seq:57810,bool:false,logEvent:{"content":"c57810","date":1477899755475,"logId":57810}

  8. seq:57811,bool:false,logEvent:{"content":"c57811","date":1477899755475,"logId":57811}

  9. seq:57812,bool:false,logEvent:{"content":"c57812","date":1477899755475,"logId":57812}

  10. seq:57813,bool:false,logEvent:{"content":"c57813","date":1477899755475,"logId":57813}

  11. seq:57814,bool:false,logEvent:{"content":"c57814","date":1477899755475,"logId":57814}

  12. seq:57815,bool:false,logEvent:{"content":"c57815","date":1477899755475,"logId":57815}

  13. seq:57816,bool:false,logEvent:{"content":"c57816","date":1477899755475,"logId":57816}

  14. seq:57817,bool:false,logEvent:{"content":"c57817","date":1477899755475,"logId":57817}

  15. seq:57818,bool:false,logEvent:{"content":"c57818","date":1477899755475,"logId":57818}

  16. seq:57819,bool:false,logEvent:{"content":"c57819","date":1477899755475,"logId":57819}

  17. seq:57820,bool:false,logEvent:{"content":"c57820","date":1477899755475,"logId":57820}

  18. seq:57821,bool:false,logEvent:{"content":"c57821","date":1477899755475,"logId":57821}

  19. seq:57822,bool:false,logEvent:{"content":"c57822","date":1477899755475,"logId":57822}

  20. seq:57823,bool:false,logEvent:{"content":"c57823","date":1477899755475,"logId":57823}

  21. seq:57824,bool:false,logEvent:{"content":"c57824","date":1477899755475,"logId":57824}

  22. seq:57825,bool:false,logEvent:{"content":"c57825","date":1477899755475,"logId":57825}

性能测试我也没有研究,毕竟也没有专业到测试机器,只是在自己到电脑上跑一下,完全满足业务需要。

码云地址:https://git.oschina.net/git.listen/listen-disruptor

Disruptor内存消息队列简单使用相关推荐

  1. Disruptor内存消息队列的资料整理

    什么是 Disruptor? Disruptor是一个开源的Java框架,它被设计用于在生产者-消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS ...

  2. php 内存队列,memcache构建简单的内存消息队列_PHP教程

    本文章来给各位同学介绍使用memcache构建简单的内存消息队列,用一个比较不错的实例来给大家介绍,希望此方法对大家有帮助哦. memcache功能太简单了,只能 set get 和delete, 只 ...

  3. thinkphp5 redis消息队列简单教程

    thinkphp5 redis消息队列简单教程 1.1 安装 thinkphp-queue composer install thinkphp-queue 1.2 搭建消息队列的存储环境 使用 Red ...

  4. C# Queue与RabbitMQ的爱恨情仇(文末附源码):Q与MQ消息队列简单应用(一)

    首先我们简单了解一下什么堆.栈.队列. 堆是在程序运行时,而不是在程序编译时,申请某个大小的内存空间.即动态分配内存,对其访问和对一般内存的访问没有区别. 栈就是一个容器,后放进去的先拿出来,它下面本 ...

  5. C# Queue与RabbitMQ的爱恨情仇(文末附源码):Q与MQ消息队列简单应用(一) 时间 2019-06-03 14:09:00 博客园

    首先我们简单了解一下什么堆.栈.队列. 堆是在程序运行时,而不是在程序编译时,申请某个大小的内存空间.即动态分配内存,对其访问和对一般内存的访问没有区别. 栈就是一个容器,后放进去的先拿出来,它下面本 ...

  6. 【Linux】Linux进程间通信——共享内存/消息队列/守护进程

    文章目录 进程间通信--共享内存/守护进程 一, 共享内存 1. 共享内存概念 2. 共享内存使用 1. 共享内存使用步骤 2. 共享内存操作函数 3. 共享内存常用操作命令 4. 共享内存使用示例: ...

  7. 消息队列-简单介绍Java消息队列,什么是消息队列,作用以及常见消息队列

    天天说队列, 项目请求数据不能及时处理时,就一言不合通过队列啊, 心中那个是妈卖批,那么到底什么队列呢,队列有到底运用于哪些运用场景呢; 先说说应用场景吧, 不知道有啥作用,看多了含义,原理什么的还是 ...

  8. RabbitMQ消息队列简单异步邮件发送和订单异步处理实战【应用解耦】【异步削峰】

    介绍

  9. 【Linux学习】进程间通信——system V(共享内存 | 消息队列 | 信号量)

最新文章

  1. 学术 科研 论文写作 生物信息学
  2. 如何打赢一场唯快不破的比赛,看看他们的绝招
  3. VTK修炼之道27:图像基本操作_三维图像切片交互提取(回调函数、观察者-命令模式)
  4. linux oracle12c dbca,Linux下Oracle 12c R2图形化安装笔记
  5. Codeforces Round #547 (Div. 3)
  6. jetty NoSuchFieldError: MAX_INACTIVE_MINUTES
  7. 使用PLSQL 远程连接oracle数据库
  8. 为什么数据库连接很消耗资源?
  9. 华为交换机STP的配置实例
  10. 如何在Excel中快速删除空白行
  11. 大数据下无隐私APP为何要用户摄像头麦克风通讯录等全权限才服务(公号回复“无隐私APP”下载PDF彩标典藏版资料,欢迎转发赞赏)
  12. windows10睡眠问题完美解决,设置睡眠时间不管用怎么办?
  13. 【贪吃蛇C语言版源代码(推荐使用Dev-C++)——附运行截图】
  14. 电子科技大学格拉斯哥学院基础实践——共享单车的调查
  15. 【Python可视化展示】-多维数据可视化分析
  16. JavaScript slice( )、splice( )、split( )
  17. Vue搭建可视化界面
  18. 1087: Time
  19. 软考高级 真题 2013年下半年 信息系统项目管理师 案例分析
  20. 纳豆红曲的功效与作用是什么?

热门文章

  1. 独立站现在好不好做?个人适合做跨境电商独立站吗?
  2. 强化学习在推荐混排中的应用
  3. 花书+吴恩达深度学习(十)卷积神经网络 CNN 之卷积层
  4. 「小程序JAVA实战」微信小程序工程结构了解(五)
  5. Fedora升级后Python虚拟环境中的pip报错
  6. Android的滑动分析
  7. 第五:Pytest之收集用例规则与运行指定用例
  8. python axes函数_matplotlib中的axes.flat做什么?
  9. stm32g474教程_(完整版)STM32F103通用教程
  10. android sim卡分析,Android 判断SIM卡属于哪个移动运营商详解及实例