Disruptor VS BlockingQueue的压测对比:

import java.util.concurrent.ArrayBlockingQueue;public class ArrayBlockingQueue4Test {public static void main(String[] args) {final ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<Data>(100000000);final long startTime = System.currentTimeMillis();//向容器中添加元素new Thread(new Runnable() {public void run() {long i = 0;while (i < Constants.EVENT_NUM_OHM) {Data data = new Data(i, "c" + i);try {queue.put(data);} catch (InterruptedException e) {e.printStackTrace();}i++;}}}).start();new Thread(new Runnable() {public void run() {int k = 0;while (k < Constants.EVENT_NUM_OHM) {try {queue.take();} catch (InterruptedException e) {e.printStackTrace();}k++;}long endTime = System.currentTimeMillis();System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms");}}).start();}
}

  

public interface Constants {int EVENT_NUM_OHM = 1000000;int EVENT_NUM_FM = 50000000;int EVENT_NUM_OM = 10000000;}

  

import java.util.concurrent.Executors;import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;public class DisruptorSingle4Test {public static void main(String[] args) {int ringBufferSize = 65536;final Disruptor<Data> disruptor = new Disruptor<Data>(new EventFactory<Data>() {public Data newInstance() {return new Data();}},ringBufferSize,Executors.newSingleThreadExecutor(),ProducerType.SINGLE, //new BlockingWaitStrategy()new YieldingWaitStrategy());DataConsumer consumer = new DataConsumer();//消费数据disruptor.handleEventsWith(consumer);disruptor.start();new Thread(new Runnable() {public void run() {RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) {long seq = ringBuffer.next();Data data = ringBuffer.get(seq);data.setId(i);data.setName("c" + i);ringBuffer.publish(seq);}}}).start();}
}

  

import com.lmax.disruptor.EventHandler;public class DataConsumer implements EventHandler<Data> {private long startTime;private int i;public DataConsumer() {this.startTime = System.currentTimeMillis();}public void onEvent(Data data, long seq, boolean bool)throws Exception {i++;if (i == Constants.EVENT_NUM_OHM) {long endTime = System.currentTimeMillis();System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");}}}

  

import java.io.Serializable;public class Data implements Serializable {private static final long serialVersionUID = 2035546038986494352L;private Long id ;private String name;public Data() {}public Data(Long id, String name) {super();this.id = id;this.name = name;}public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}
}

  

BlockingQueue测试:

1.建立一个工厂Event类,用于创建Event类实例对象

2.需要有一个jian监听事件类,用于处理数据(Event类)

3.实例化Disruptor实例,配置一系列参数,编写DisDisruptor核心组件

4.编写生产者组件,向Disruptor容器中投递数据

pom.xml添加:

<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><scope>3.3.2</scope>
</dependency>

  

public class OrderEvent {private long value; //订单的价格public long getValue() {return value;}public void setValue(long value) {this.value = value;}
}

  

import com.lmax.disruptor.EventFactory;public class OrderEventFactory implements EventFactory<OrderEvent>{public OrderEvent newInstance() {return new OrderEvent();       //这个方法就是为了返回空的数据对象(Event)}
}

  

public class OrderEventHandler implements EventHandler<OrderEvent>{public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(Integer.MAX_VALUE);System.err.println("消费者: " + event.getValue());}
}

  

import java.nio.ByteBuffer;import com.lmax.disruptor.RingBuffer;public class OrderEventProducer {private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(ByteBuffer data) {//1 在生产者发送消息的时候, 首先 需要从我们的ringBuffer里面 获取一个可用的序号long sequence = ringBuffer.next();  //0 try {//2 根据这个序号, 找到具体的 "OrderEvent" 元素 注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"OrderEvent event = ringBuffer.get(sequence);//3 进行实际的赋值处理event.setValue(data.getLong(0));         } finally {//4 提交发布操作ringBuffer.publish(sequence);          }}
}

  

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;public class Main {public static void main(String[] args) {// 参数准备工作OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());/*** 1 eventFactory: 消息(event)工厂对象* 2 ringBufferSize: 容器的长度* 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler* 4 ProducerType: 单生产者 还是 多生产者* 5 waitStrategy: 等待策略*///1. 实例化disruptor对象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());//2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)disruptor.handleEventsWith(new OrderEventHandler());//3. 启动disruptordisruptor.start();//4. 获取实际存储数据的容器: RingBufferRingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();OrderEventProducer producer = new OrderEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for(long i = 0 ; i < 100; i ++){bb.putLong(0, i);producer.sendData(bb);}disruptor.shutdown();executor.shutdown();}
}

  

public final class BlockingWaitStrategy implements WaitStrategy
{private final Lock lock = new ReentrantLock();private final Condition processorNotifyCondition = lock.newCondition();@Overridepublic long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)throws AlertException, InterruptedException{long availableSequence;if ((availableSequence = cursorSequence.get()) < sequence){lock.lock();try{while ((availableSequence = cursorSequence.get()) < sequence){barrier.checkAlert();processorNotifyCondition.await();}}finally{lock.unlock();}}while ((availableSequence = dependentSequence.get()) < sequence){barrier.checkAlert();}return availableSequence;}@Overridepublic void signalAllWhenBlocking(){lock.lock();try{processorNotifyCondition.signalAll();}finally{lock.unlock();}}
}

  

public final class SleepingWaitStrategy implements WaitStrategy
{private static final int DEFAULT_RETRIES = 200;private final int retries;public SleepingWaitStrategy(){this(DEFAULT_RETRIES);}public SleepingWaitStrategy(int retries){this.retries = retries;}@Overridepublic long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)throws AlertException, InterruptedException{long availableSequence;int counter = retries;while ((availableSequence = dependentSequence.get()) < sequence){counter = applyWaitMethod(barrier, counter);}return availableSequence;}@Overridepublic void signalAllWhenBlocking(){}private int applyWaitMethod(final SequenceBarrier barrier, int counter)throws AlertException{barrier.checkAlert();if (counter > 100){--counter;}else if (counter > 0){--counter;Thread.yield();}else{LockSupport.parkNanos(1L);}return counter;}
}

  

public final class YieldingWaitStrategy implements WaitStrategy
{private static final int SPIN_TRIES = 100;@Overridepublic long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)throws AlertException, InterruptedException{long availableSequence;int counter = SPIN_TRIES;while ((availableSequence = dependentSequence.get()) < sequence){counter = applyWaitMethod(barrier, counter);}return availableSequence;}@Overridepublic void signalAllWhenBlocking(){}private int applyWaitMethod(final SequenceBarrier barrier, int counter)throws AlertException{barrier.checkAlert();if (0 == counter){Thread.yield();}else{--counter;}return counter;}
}

  

转载于:https://www.cnblogs.com/sunliyuan/p/10872380.html

Java 并发框架Disruptor(七)相关推荐

  1. 并发框架Disruptor(核心概念 入门 高性能原理-伪共享 CAS 环形数据 生产和消费模式 高级使用 )

    并发框架Disruptor 并发框架Disruptor Disruptor概述 背景 什么是Disruptor 为什么使用Disruptor Disruptor 的核心概念 Ring Buffer S ...

  2. java并发框架支持锁包括,tip/面试题_并发与多线程.md at master · 171437912/tip · GitHub...

    01. java用()机制实现了进程之间的同步执行 A. 监视器 B. 虚拟机 C. 多个CPU D. 异步调用 正解: A 解析: 监视器机制即锁机制 02. 线程安全的map在JDK 1.5及其更 ...

  3. java并发框架支持锁包括,jdk1.8锁

    JDK1.8有什么锁?_李广进的博客-CSDN博客 2020年4月23日 18.排他锁(不包含),X锁,若事务T对数据对象A加上x锁,则只允许T读取和修改A,其他任何事务都不能再对A加任何类型的锁,直 ...

  4. 深入理解Java并发框架AQS系列(四):共享锁(Shared Lock)

    深入理解Java并发框架AQS系列(一):线程 深入理解Java并发框架AQS系列(二):AQS框架简介及锁概念 深入理解Java并发框架AQS系列(三):独占锁(Exclusive Lock) 深入 ...

  5. java disruptor压测_Java并发框架Disruptor实现原理与源码分析(二) 缓存行填充与CAS操作...

    ##缓存行填充 关于缓存行填充在我个人的印象里面第一次看到是在Java的java.util.concurrent包中,因为当时很好奇其用法背后的逻辑,所以查了很多资料才明白到底是怎么回事*(也许事实上 ...

  6. java 无锁框架_高性能无锁并发框架 Disruptor,太强了!

    Java技术栈 www.javastack.cn 关注优质文章 Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操 ...

  7. java 并发框架源码_某网Java并发编程高阶技术-高性能并发框架源码解析与实战(云盘下载)...

    第1章 课程介绍(Java并发编程进阶课程) 什么是Disruptor?它一个高性能的异步处理框架,号称"单线程每秒可处理600W个订单"的神器,本课程目标:彻底精通一个如此优秀的 ...

  8. java 并发框架源码_Java并发编程高阶技术-高性能并发框架源码解析与实战

    Java并发编程高阶技术-高性能并发框架源码解析与实战 1 _0 Z' @+ l: s3 f6 r% t|____资料3 Z9 P- I2 x8 T6 ^ |____coding-275-master ...

  9. java queues 性能_鸟瞰 Java 并发框架

    (给ImportNew加星标,提高Java技能) 来自:唐尤华 https://dzone.com/articles/a-birds-eye-view-on-java-concurrency-fram ...

最新文章

  1. iptables的conntrack表满了导致访问网站很慢
  2. linux系统时间相关
  3. Java序列化(Serialization)的理解
  4. java基础1--继承
  5. SpringBoot 使用Thymeleaf模板 没有提示
  6. Java集合之HashMap源码分析
  7. Java程序员必备:异常的十个关键知识点
  8. cookie无法读取bdstoken_Web自动化测试:对cookie的操作
  9. WCF服务部署到IIS上,然后通过web服务引用方式出现错误的解决办法
  10. 半导体物理学学习资源
  11. Flume错误:Flume:Class path contains multiple SLF4J bindings
  12. android killer 反编译工具,androidkiller反编译软件使用与踩坑并解决的过程
  13. Tp5.0对接腾讯云语音验证码
  14. 基于ThinkPHP的校园网上订餐系统设计与实现
  15. 存储连接应用服务器简单入门
  16. mysql实操《学生表》
  17. 等式约束优化(可行点)
  18. 《OKR:源于英特尔和谷歌的目标管理利器》读书笔记
  19. 计算机测试word总是零分,word excel做好后评分为什么是0分
  20. 计算机科学和电子信息学报,太赫兹科学与电子信息学报

热门文章

  1. java 比较源文件_Beyond Compare比较Java源代码文件的操作流程
  2. img写入工具_硬盘有坏道,得用靠谱的修复工具,这3个不会让你失望
  3. 第十二届 蓝桥杯 青少年C++组 10月比赛 第1题
  4. PHP给后台管理系统加安全防护机制的一些方案
  5. 【docker】第五节:docker常用命令总结
  6. 德芙网络营销策略ppt_德芙网络营销方案
  7. flutter listview 滚动到底部_??一个高颜值Flutter版WanAndroid客户端
  8. 【ES6(2015)】Set
  9. dedecms 系统迁移及问题
  10. java out.flush_java中基本输入输出流的解释(flush方法的使用)