Disruptor并发框架-2
import java.util.concurrent.atomic.AtomicInteger;public class Trade { private String id;//ID private String name;private double price;//金额 private AtomicInteger count = new AtomicInteger(0);public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}public AtomicInteger getCount() {return count;}public void setCount(AtomicInteger count) {this.count = count;} }
import java.util.UUID;import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { //这里做具体的消费逻辑 event.setId(UUID.randomUUID().toString());//简单生成下ID System.out.println(event.getId()); }
}
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;public class Main1 { public static void main(String[] args) throws Exception { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; /* * createSingleProducer创建一个单生产者的RingBuffer, * 第一个参数叫EventFactory,从名字上理解就是"事件工厂",其实它的职责就是产生数据填充RingBuffer的区块。 * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 */ final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, BUFFER_SIZE, new YieldingWaitStrategy()); //创建线程池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); //创建SequenceBarrier SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //创建消息处理器 BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>( ringBuffer, sequenceBarrier, new TradeHandler()); //这一步的目的就是把消费者的位置信息引用注入到生产者 如果只有一个消费者的情况可以省略 ringBuffer.addGatingSequences(transProcessor.getSequence()); //把消息处理器提交到线程池 executors.submit(transProcessor); //如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类 Future<?> future= executors.submit(new Callable<Void>() { @Override public Void call() throws Exception { long seq; for(int i=0;i<10;i++){ seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块 ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据 ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见 } return null; } }); future.get();//等待生产者结束 Thread.sleep(1000);//等上1秒,等消费都处理完成 transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!) executors.shutdown();//终止线程 }
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;public class Main2 { public static void main(String[] args) throws InterruptedException { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; EventFactory<Trade> eventFactory = new EventFactory<Trade>() { public Trade newInstance() { return new Trade(); } }; RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS); WorkHandler<Trade> handler = new TradeHandler(); WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler); workerPool.start(executor); //下面这个生产8个数据for(int i=0;i<8;i++){ long seq=ringBuffer.next(); ringBuffer.get(seq).setPrice(Math.random()*9999); ringBuffer.publish(seq); } Thread.sleep(1000); workerPool.halt(); executor.shutdown(); }
}
Disruptor并发框架-2相关推荐
- Disruptor并发框架--学习笔记
Disruptor并发框架简介 Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在J ...
- Disruptor并发框架,核心组件RingBuffer
1.1 Disruptor并发框架简介 Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是 ...
- Disruptor并发框架-1
//http://ifeve.com/disruptor-getting-started/ public class LongEvent { private long value;public lon ...
- 并发框架Disruptor(核心概念 入门 高性能原理-伪共享 CAS 环形数据 生产和消费模式 高级使用 )
并发框架Disruptor 并发框架Disruptor Disruptor概述 背景 什么是Disruptor 为什么使用Disruptor Disruptor 的核心概念 Ring Buffer S ...
- java 无锁框架_高性能无锁并发框架 Disruptor,太强了!
Java技术栈 www.javastack.cn 关注优质文章 Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操 ...
- Java 并发框架全览,这个牛逼!
来自:唐尤华 https://dzone.com/articles/a-birds-eye-view-on-java-concurrency-frameworks-1 1. 为什么要写这篇文章 几年前 ...
- 来,带你鸟瞰 Java 中的并发框架!
来自 ImportNew,作者:唐尤华 https://dzone.com/articles/a-birds-eye-view-on-java-concurrency-frameworks-1 1. ...
- 互联网java常用框架_来,带你鸟瞰 Java 中4款常用的并发框架!
1. 为什么要写这篇文章 几年前 NoSQL 开始流行的时候,像其他团队一样,我们的团队也热衷于令人兴奋的新东西,并且计划替换一个应用程序的数据库. 但是,当深入实现细节时,我们想起了一位智者曾经说过 ...
- java 并发框架源码_Java并发编程高阶技术-高性能并发框架源码解析与实战
Java并发编程高阶技术-高性能并发框架源码解析与实战 1 _0 Z' @+ l: s3 f6 r% t|____资料3 Z9 P- I2 x8 T6 ^ |____coding-275-master ...
最新文章
- 弹出显示多条的message对话框
- matlab 自再现模,平行平面腔自再现模FoxLi数值迭代解法及MATLAB实现
- MySQL索引设计原则
- formula 返回list_python正则实现计算器功能
- php 开发一个聊天系统,ajax+php 实现一个简单的在线聊天室功能(附带源码)
- php检测是否存在敏感词,如何用PHP+Ajax判断是否有敏感词汇
- IOS5开发-http get/post调用mvc4 webapi互操作(图片上传)
- 快速幂或者矩阵快速幂
- 焦点图,带数字显示,支持常见浏览器
- steamvr自定义按键_SteamVR SDK更新:带来运动平滑、自定义控制器键等多项功能
- emmx文件用什么软件打开电脑_fbx文件是什么_电脑fbx文件用什么软件打开
- python基础资料(Learn|Codecademy好用的工具)
- SQL 的各种 join 用法
- 自己对香港一卡通的总结
- 百度网盘:未知错误播放失败1000
- 今日头条自媒体怎么提高推荐量
- BIG6——解决问题时收集、利用资料的思路
- 在公众号中通过链接下载APP时,如何不通过应用宝,直接跳浏览器下载?
- 使用nuget 打包并上传 nuget.org
- BT 面板控制命令 宝塔 Linux 常用命令收集整理