Disruptor内存消息队列简单使用
Disruptor内存消息队列
最近在做一个有关使用内存消息队列到功能,比如将日志信息或点击统计信息持久化等操作,开始想着用java到内存队列作为缓冲区,后来在网上搜到Disruptor这个东西,神乎其神到,就简单了解了一下,做了一个demo,感觉还不错,可以用用,有关概念可以自行搜索,下面就简单介绍一下开发过程。
新建测试工程
新建maven工程listen-disruptor
工程目录结构如下
pom.xml文件内容
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>listen</groupId>
<artifactId>listen-disruptor</artifactId>
<packaging>war</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>listen-disruptor Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.20</version>
</dependency>
</dependencies>
<build>
<finalName>listen-disruptor</finalName>
</build>
</project>
在Disruptor中,所有到生产和消费是基于事件驱动到,即事件对象。
编码
新建事件对象类LogEvent
package com.listen.disruptor;
import java.io.Serializable;
import java.util.Date;
import com.alibaba.fastjson.JSONObject;
/**
* 事件对象(日志事件)
* @author Administrator
*
*/
public class LogEvent implements Serializable {
private long logId;
private String content;
private Date date;
public LogEvent(){
}
public LogEvent(long logId, String content, Date date){
this.logId = logId;
this.content = content;
this.date = date;
}
public long getLogId() {
return logId;
}
public void setLogId(long logId) {
this.logId = logId;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
public String toString(){
return JSONObject.toJSONString(this);
}
}
事件工厂类,用来初始化预分配空到事件对象
package com.listen.disruptor;
import com.lmax.disruptor.EventFactory;
/**
* 事件生成工厂(用来初始化预分配事件对象)
* @author Administrator
*
*/
public class LogEventFactory implements EventFactory<LogEvent> {
public LogEvent newInstance() {
return new LogEvent();
}
}
新建事件到消费者LogEventConsumer,这里演示只是做简单到打印显示
package com.listen.disruptor;
import com.lmax.disruptor.EventHandler;
/**
* 事件到消费者
* @author Administrator
*
*/
public class LogEventConsumer implements EventHandler<LogEvent> {
public void onEvent(LogEvent logEvent, long seq, boolean bool) throws Exception {
System.out.println("seq:" + seq + ",bool:" + bool + ",logEvent:" + logEvent.toString());
}
}
新建事件的生产者LogEventProducer
package com.listen.disruptor;
import java.util.Date;
import com.lmax.disruptor.RingBuffer;
public class LogEventProducer {
private final RingBuffer<LogEvent> ringBuffer;
public LogEventProducer(RingBuffer<LogEvent> ringBuffer){
this.ringBuffer = ringBuffer;
}
public void onData(long logId, String content, Date date){
//ringBuffer类似一个队列,next就是下一个槽
long seq = ringBuffer.next();
try{
//用seq索引取出一个空到事件用于填充
LogEvent logEvent = ringBuffer.get(seq);
logEvent.setLogId(logId);
logEvent.setContent(content);
logEvent.setDate(date);
}
finally{
//最终发布事件,很重要
ringBuffer.publish(seq);
}
}
}
至此有关消费和生产到代码已经码完,但是目前推荐另一种生产者到写法,两种都可以使用
新建LogEventProducerWithTranslator生产者
package com.listen.disruptor;
import java.util.Date;
import com.lmax.disruptor.EventTranslatorVararg;
import com.lmax.disruptor.RingBuffer;
/**
* 使用translator方式到事件生产者发布事件
* @author Administrator
*
*/
public class LogEventProducerWithTranslator {
private final static EventTranslatorVararg<LogEvent> translator = new EventTranslatorVararg<LogEvent>() {
public void translateTo(LogEvent logEvent, long seq, Object... objs) {
logEvent.setLogId((Long) objs[0]);
logEvent.setContent((String) objs[1]);
logEvent.setDate((Date) objs[2]);
}
};
private final RingBuffer<LogEvent> ringBuffer;
public LogEventProducerWithTranslator(RingBuffer<LogEvent> ringBuffer){
this.ringBuffer = ringBuffer;
}
public void onData(long logId, String content, Date date){
this.ringBuffer.publishEvent(translator, logId, content, date);
}
}
测试类LogEventMain
package com.listen.disruptor;
import java.util.Date;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
public class LogEventMain {
public static void main(String[] args) {
// producer();
producerWithTranslator();
}
public static void producer(){
Executor executor = Executors.newCachedThreadPool();
LogEventFactory factory = new LogEventFactory();
int ringBufferSize = 1024;
Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory, ringBufferSize, executor);
disruptor.handleEventsWith(new LogEventConsumer());
disruptor.start();
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
LogEventProducer producer = new LogEventProducer(ringBuffer);
for(int i = 0; true; i++){
producer.onData(i, "c" + i, new Date());
}
}
public static void producerWithTranslator(){
LogEventFactory factory = new LogEventFactory();
int ringBufferSize = 1024;
ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r);
}
};
Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory, ringBufferSize, threadFactory);
disruptor.handleEventsWith(new LogEventConsumer());
disruptor.start();
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
LogEventProducerWithTranslator producer2 = new LogEventProducerWithTranslator(ringBuffer);
for(int i = 0; true; i++){
producer2.onData(i, "c" + i, new Date());
}
}
}
测试类分别写类两种生产者到测试,其中LogEventProducerWithTranslator生产者中到translator变量还有其他几种写法,适用于其他到场景,用法都是类似到,可以自行研究一下
测试
测试结果如下
seq:57804,bool:false,logEvent:{"content":"c57804","date":1477899755475,"logId":57804}
seq:57805,bool:false,logEvent:{"content":"c57805","date":1477899755475,"logId":57805}
seq:57806,bool:false,logEvent:{"content":"c57806","date":1477899755475,"logId":57806}
seq:57807,bool:false,logEvent:{"content":"c57807","date":1477899755475,"logId":57807}
seq:57808,bool:false,logEvent:{"content":"c57808","date":1477899755475,"logId":57808}
seq:57809,bool:false,logEvent:{"content":"c57809","date":1477899755475,"logId":57809}
seq:57810,bool:false,logEvent:{"content":"c57810","date":1477899755475,"logId":57810}
seq:57811,bool:false,logEvent:{"content":"c57811","date":1477899755475,"logId":57811}
seq:57812,bool:false,logEvent:{"content":"c57812","date":1477899755475,"logId":57812}
seq:57813,bool:false,logEvent:{"content":"c57813","date":1477899755475,"logId":57813}
seq:57814,bool:false,logEvent:{"content":"c57814","date":1477899755475,"logId":57814}
seq:57815,bool:false,logEvent:{"content":"c57815","date":1477899755475,"logId":57815}
seq:57816,bool:false,logEvent:{"content":"c57816","date":1477899755475,"logId":57816}
seq:57817,bool:false,logEvent:{"content":"c57817","date":1477899755475,"logId":57817}
seq:57818,bool:false,logEvent:{"content":"c57818","date":1477899755475,"logId":57818}
seq:57819,bool:false,logEvent:{"content":"c57819","date":1477899755475,"logId":57819}
seq:57820,bool:false,logEvent:{"content":"c57820","date":1477899755475,"logId":57820}
seq:57821,bool:false,logEvent:{"content":"c57821","date":1477899755475,"logId":57821}
seq:57822,bool:false,logEvent:{"content":"c57822","date":1477899755475,"logId":57822}
seq:57823,bool:false,logEvent:{"content":"c57823","date":1477899755475,"logId":57823}
seq:57824,bool:false,logEvent:{"content":"c57824","date":1477899755475,"logId":57824}
seq:57825,bool:false,logEvent:{"content":"c57825","date":1477899755475,"logId":57825}
性能测试我也没有研究,毕竟也没有专业到测试机器,只是在自己到电脑上跑一下,完全满足业务需要。
码云地址:https://git.oschina.net/git.listen/listen-disruptor
Disruptor内存消息队列简单使用相关推荐
- Disruptor内存消息队列的资料整理
什么是 Disruptor? Disruptor是一个开源的Java框架,它被设计用于在生产者-消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS ...
- php 内存队列,memcache构建简单的内存消息队列_PHP教程
本文章来给各位同学介绍使用memcache构建简单的内存消息队列,用一个比较不错的实例来给大家介绍,希望此方法对大家有帮助哦. memcache功能太简单了,只能 set get 和delete, 只 ...
- thinkphp5 redis消息队列简单教程
thinkphp5 redis消息队列简单教程 1.1 安装 thinkphp-queue composer install thinkphp-queue 1.2 搭建消息队列的存储环境 使用 Red ...
- C# Queue与RabbitMQ的爱恨情仇(文末附源码):Q与MQ消息队列简单应用(一)
首先我们简单了解一下什么堆.栈.队列. 堆是在程序运行时,而不是在程序编译时,申请某个大小的内存空间.即动态分配内存,对其访问和对一般内存的访问没有区别. 栈就是一个容器,后放进去的先拿出来,它下面本 ...
- C# Queue与RabbitMQ的爱恨情仇(文末附源码):Q与MQ消息队列简单应用(一) 时间 2019-06-03 14:09:00 博客园
首先我们简单了解一下什么堆.栈.队列. 堆是在程序运行时,而不是在程序编译时,申请某个大小的内存空间.即动态分配内存,对其访问和对一般内存的访问没有区别. 栈就是一个容器,后放进去的先拿出来,它下面本 ...
- 【Linux】Linux进程间通信——共享内存/消息队列/守护进程
文章目录 进程间通信--共享内存/守护进程 一, 共享内存 1. 共享内存概念 2. 共享内存使用 1. 共享内存使用步骤 2. 共享内存操作函数 3. 共享内存常用操作命令 4. 共享内存使用示例: ...
- 消息队列-简单介绍Java消息队列,什么是消息队列,作用以及常见消息队列
天天说队列, 项目请求数据不能及时处理时,就一言不合通过队列啊, 心中那个是妈卖批,那么到底什么队列呢,队列有到底运用于哪些运用场景呢; 先说说应用场景吧, 不知道有啥作用,看多了含义,原理什么的还是 ...
- RabbitMQ消息队列简单异步邮件发送和订单异步处理实战【应用解耦】【异步削峰】
介绍
- 【Linux学习】进程间通信——system V(共享内存 | 消息队列 | 信号量)
最新文章
- 学术 科研 论文写作 生物信息学
- 如何打赢一场唯快不破的比赛,看看他们的绝招
- VTK修炼之道27:图像基本操作_三维图像切片交互提取(回调函数、观察者-命令模式)
- linux oracle12c dbca,Linux下Oracle 12c R2图形化安装笔记
- Codeforces Round #547 (Div. 3)
- jetty NoSuchFieldError: MAX_INACTIVE_MINUTES
- 使用PLSQL 远程连接oracle数据库
- 为什么数据库连接很消耗资源?
- 华为交换机STP的配置实例
- 如何在Excel中快速删除空白行
- 大数据下无隐私APP为何要用户摄像头麦克风通讯录等全权限才服务(公号回复“无隐私APP”下载PDF彩标典藏版资料,欢迎转发赞赏)
- windows10睡眠问题完美解决,设置睡眠时间不管用怎么办?
- 【贪吃蛇C语言版源代码(推荐使用Dev-C++)——附运行截图】
- 电子科技大学格拉斯哥学院基础实践——共享单车的调查
- 【Python可视化展示】-多维数据可视化分析
- JavaScript slice( )、splice( )、split( )
- Vue搭建可视化界面
- 1087: Time
- 软考高级 真题 2013年下半年 信息系统项目管理师 案例分析
- 纳豆红曲的功效与作用是什么?
热门文章
- 独立站现在好不好做?个人适合做跨境电商独立站吗?
- 强化学习在推荐混排中的应用
- 花书+吴恩达深度学习(十)卷积神经网络 CNN 之卷积层
- 「小程序JAVA实战」微信小程序工程结构了解(五)
- Fedora升级后Python虚拟环境中的pip报错
- Android的滑动分析
- 第五:Pytest之收集用例规则与运行指定用例
- python axes函数_matplotlib中的axes.flat做什么?
- stm32g474教程_(完整版)STM32F103通用教程
- android sim卡分析,Android 判断SIM卡属于哪个移动运营商详解及实例