import java.util.concurrent.atomic.AtomicInteger;public class Trade {  private String id;//ID  private String name;private double price;//金额  private AtomicInteger count = new AtomicInteger(0);public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}public AtomicInteger getCount() {return count;}public void setCount(AtomicInteger count) {this.count = count;} }  

import java.util.UUID;import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  @Override  public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  this.onEvent(event);  }  @Override  public void onEvent(Trade event) throws Exception {  //这里做具体的消费逻辑  event.setId(UUID.randomUUID().toString());//简单生成下ID  System.out.println(event.getId());  }
}  

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;public class Main1 {  public static void main(String[] args) throws Exception {  int BUFFER_SIZE=1024;  int THREAD_NUMBERS=4;  /* * createSingleProducer创建一个单生产者的RingBuffer, * 第一个参数叫EventFactory,从名字上理解就是"事件工厂",其实它的职责就是产生数据填充RingBuffer的区块。 * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 */  final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  @Override  public Trade newInstance() {  return new Trade();  }  }, BUFFER_SIZE, new YieldingWaitStrategy());  //创建线程池  ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  //创建SequenceBarrier  SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  //创建消息处理器  BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(  ringBuffer, sequenceBarrier, new TradeHandler());  //这一步的目的就是把消费者的位置信息引用注入到生产者    如果只有一个消费者的情况可以省略 ringBuffer.addGatingSequences(transProcessor.getSequence());  //把消息处理器提交到线程池  executors.submit(transProcessor);  //如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类  Future<?> future= executors.submit(new Callable<Void>() {  @Override  public Void call() throws Exception {  long seq;  for(int i=0;i<10;i++){  seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块  ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据 ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见  }  return null;  }  }); future.get();//等待生产者结束  Thread.sleep(1000);//等上1秒,等消费都处理完成  transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)  executors.shutdown();//终止线程  }
}  

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;public class Main2 {  public static void main(String[] args) throws InterruptedException {  int BUFFER_SIZE=1024;  int THREAD_NUMBERS=4;  EventFactory<Trade> eventFactory = new EventFactory<Trade>() {  public Trade newInstance() {  return new Trade();  }  };  RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);  SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);  WorkHandler<Trade> handler = new TradeHandler();  WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler);  workerPool.start(executor);  //下面这个生产8个数据for(int i=0;i<8;i++){  long seq=ringBuffer.next();  ringBuffer.get(seq).setPrice(Math.random()*9999);  ringBuffer.publish(seq);  }  Thread.sleep(1000);  workerPool.halt();  executor.shutdown();  }
}

Disruptor并发框架-2相关推荐

  1. Disruptor并发框架--学习笔记

    Disruptor并发框架简介 Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在J ...

  2. Disruptor并发框架,核心组件RingBuffer

    1.1 Disruptor并发框架简介 Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是 ...

  3. Disruptor并发框架-1

    //http://ifeve.com/disruptor-getting-started/ public class LongEvent { private long value;public lon ...

  4. 并发框架Disruptor(核心概念 入门 高性能原理-伪共享 CAS 环形数据 生产和消费模式 高级使用 )

    并发框架Disruptor 并发框架Disruptor Disruptor概述 背景 什么是Disruptor 为什么使用Disruptor Disruptor 的核心概念 Ring Buffer S ...

  5. java 无锁框架_高性能无锁并发框架 Disruptor,太强了!

    Java技术栈 www.javastack.cn 关注优质文章 Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操 ...

  6. Java 并发框架全览,这个牛逼!

    来自:唐尤华 https://dzone.com/articles/a-birds-eye-view-on-java-concurrency-frameworks-1 1. 为什么要写这篇文章 几年前 ...

  7. 来,带你鸟瞰 Java 中的并发框架!

    来自 ImportNew,作者:唐尤华 https://dzone.com/articles/a-birds-eye-view-on-java-concurrency-frameworks-1 1. ...

  8. 互联网java常用框架_来,带你鸟瞰 Java 中4款常用的并发框架!

    1. 为什么要写这篇文章 几年前 NoSQL 开始流行的时候,像其他团队一样,我们的团队也热衷于令人兴奋的新东西,并且计划替换一个应用程序的数据库. 但是,当深入实现细节时,我们想起了一位智者曾经说过 ...

  9. java 并发框架源码_Java并发编程高阶技术-高性能并发框架源码解析与实战

    Java并发编程高阶技术-高性能并发框架源码解析与实战 1 _0 Z' @+ l: s3 f6 r% t|____资料3 Z9 P- I2 x8 T6 ^ |____coding-275-master ...

最新文章

  1. 弹出显示多条的message对话框
  2. matlab 自再现模,平行平面腔自再现模FoxLi数值迭代解法及MATLAB实现
  3. MySQL索引设计原则
  4. formula 返回list_python正则实现计算器功能
  5. php 开发一个聊天系统,ajax+php 实现一个简单的在线聊天室功能(附带源码)
  6. php检测是否存在敏感词,如何用PHP+Ajax判断是否有敏感词汇
  7. IOS5开发-http get/post调用mvc4 webapi互操作(图片上传)
  8. 快速幂或者矩阵快速幂
  9. 焦点图,带数字显示,支持常见浏览器
  10. steamvr自定义按键_SteamVR SDK更新:带来运动平滑、自定义控制器键等多项功能
  11. emmx文件用什么软件打开电脑_fbx文件是什么_电脑fbx文件用什么软件打开
  12. python基础资料(Learn|Codecademy好用的工具)
  13. SQL 的各种 join 用法
  14. 自己对香港一卡通的总结
  15. 百度网盘:未知错误播放失败1000
  16. 今日头条自媒体怎么提高推荐量
  17. BIG6——解决问题时收集、利用资料的思路
  18. 在公众号中通过链接下载APP时,如何不通过应用宝,直接跳浏览器下载?
  19. 使用nuget 打包并上传 nuget.org
  20. BT 面板控制命令 宝塔 Linux 常用命令收集整理

热门文章

  1. 【翻译】【CGWORLD】怪物猎人携带版3rd制作介绍
  2. 文档 笔记 我全都要
  3. 独家解读 | 滴滴机器学习平台架构演进之路
  4. 上传文件按钮美化,上传文件前后状态控制
  5. no nlsxbe in java.library.path
  6. 点击模型:达观数据提升算法精度的利器
  7. Linux SendMail服务启动慢总结
  8. javascript高程3 学习笔记(二)
  9. Zend API: array_init
  10. python 字典程序_Python 字典(Dictionary)操作详解