生产者

package cn.lonecloud.procum.disruptor;import cn.lonecloud.procum.Data;
import com.lmax.disruptor.RingBuffer;import java.nio.ByteBuffer;/*** @author lonecloud* @version v1.0* @date 下午3:02 2018/5/7*/
public class Producer {//队列private final RingBuffer<Data> dataRingBuffer;public Producer(RingBuffer<Data> dataRingBuffer) {this.dataRingBuffer = dataRingBuffer;}/*** 插入数据* @param s*/public void pushData(String s) {//获取下一个位置long next = dataRingBuffer.next();try {//获取容器Data data = dataRingBuffer.get(next);//设置数据data.setData(s);} finally {//插入dataRingBuffer.publish(next);}}
}

  消费者

package cn.lonecloud.procum.disruptor;import cn.lonecloud.procum.Data;
import com.lmax.disruptor.WorkHandler;/*** @author lonecloud* @version v1.0* @date 下午3:01 2018/5/7*/
public class Customer implements WorkHandler<Data> {@Overridepublic void onEvent(Data data) throws Exception {System.out.println(Thread.currentThread().getName()+"---"+data.getData());}
}

  数据工厂

package cn.lonecloud.procum.disruptor;import cn.lonecloud.procum.Data;
import com.lmax.disruptor.EventFactory;/*** @author lonecloud* @version v1.0* @date 下午3:02 2018/5/7*/
public class DataFactory implements EventFactory<Data> {@Overridepublic Data newInstance() {return new Data();}
}

  主函数

package cn.lonecloud.procum.disruptor;import cn.lonecloud.procum.Data;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @author lonecloud* @version v1.0* @date 下午3:09 2018/5/7*/
public class Main {public static void main(String[] args) throws InterruptedException {//创建线程池ExecutorService service = Executors.newCachedThreadPool();//创建数据工厂DataFactory dataFactory = new DataFactory();//设置缓冲区大小,必须为2的指数,否则会有异常int buffersize = 1024;Disruptor<Data> dataDisruptor = new Disruptor<Data>(dataFactory, buffersize,service);//创建消费者线程dataDisruptor.handleEventsWithWorkerPool(new Customer(),new Customer(),new Customer(),new Customer(),new Customer(),new Customer(),new Customer());//启动dataDisruptor.start();//获取其队列RingBuffer<Data> ringBuffer = dataDisruptor.getRingBuffer();for (int i = 0; i < 100; i++) {//创建生产者Producer producer = new Producer(ringBuffer);//设置内容producer.pushData(UUID.randomUUID().toString());//Thread.sleep(1000);}}
}

  其中策略有几种:

1. BlockingWaitStrategy:阻塞策略,最节省CPU,但是高并发条件下性能最糟糕

2 SleepingWaitStrategy:在循环中无限等待,处理数据会产生高延迟,对生产线程影响小,场景:异步日志

3. YieldingWaitStrategy:低延迟场合,使用必须保证剩余的消费者线程的逻辑CPU

4. BusySpinWaitStrategy:消费者线程会尽最大努力疯狂的监控缓冲区变化。

转载于:https://www.cnblogs.com/lonecloud/p/9002927.html

使用Disruptor实现生产者和消费者模型相关推荐

  1. linux进程间通信:system V 信号量 生产者和消费者模型编程案例

    生产者和消费者模型: 有若干个缓冲区,生产者不断向里填数据,消费者不断从中取数据 两者不冲突的前提: 缓冲区有若干个,且是固定大小,生产者和消费者各有若干个 生产者向缓冲区中填数据前需要判断缓冲区是否 ...

  2. python生产和消费模型_python queue和生产者和消费者模型

    queue队列 当必须安全地在多个线程之间交换信息时,队列在线程编程中特别有用. classqueue.Queue(maxsize=0) #先入先出classqueue.LifoQueue(maxsi ...

  3. Linux系统编程---17(条件变量及其函数,生产者消费者条件变量模型,生产者与消费者模型(线程安全队列),条件变量优点,信号量及其主要函数,信号量与条件变量的区别,)

    条件变量 条件变量本身不是锁!但它也可以造成线程阻塞.通常与互斥锁配合使用.给多线程提供一个会合的场所. 主要应用函数: pthread_cond_init 函数 pthread_cond_destr ...

  4. 计算机操作系统生产者和消费者模型的简单介绍

    同步互斥小口诀 画图理解题目 判断题目类型 分析进程数目 填写进程模板 补充基本代码(伪代码) 补充PV代码 检查调整代码 注意事项 代码是一步一步写出来的,代码是反复调整写出来的 60%是生产者和消 ...

  5. 并发无锁队列学习(单生产者单消费者模型)

    1.引言 本文介绍单生产者单消费者模型的队列.依据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种. 单生产者单消费者模型的队列操作过程是不须要进行加锁的.生产 ...

  6. Linux系统编程40:多线程之基于环形队列的生产者与消费者模型

    文章目录 (1)什么是信号量 (2)与信号量相关的操作 (3)基于环形队列的生产者与消费者模型-信号量(单消费者单生产者) (1)什么是信号量 前面的叙述中,我们通过锁保证了每次只有一个线程进入临界区 ...

  7. Linux系统编程39:多线程之基于阻塞队列生产者与消费者模型

    文章目录 (1)生产者与消费者模型概述 (2)生产者与消费者模型优点 (3)基于阻塞队列(blockingqueue)的生产者消费者模型(单消费者单生产者) (4)基于阻塞队列(blockingque ...

  8. Go语言编程:使用条件变量Cond和channel通道实现多个生产者和消费者模型

    如题,使用条件变量Cond和channel通道实现多个生产者和消费者模型.Go语言天生带有C语言的基因,很多东西和C与很像,但是用起来 绝对比C语言方便.今天用Go语言来实现下多消费者和生产者模型.如 ...

  9. Linux系统编程:使用semaphore信号量和mutex互斥量实现多个生产者和消费者模型

    代码实现 如题,使用semaphore信号量和mutex互斥量实现多个生产者和消费者模型.本来是想只用信号量实现生产者消费者模型的,但是发现 只能在一个生产者和一个消费者之间,要在多个生产者和消费者模 ...

最新文章

  1. Spring Cloud构建微服务架构:分布式配置中心(加密解密)
  2. matlab在电力行业中的仿真技术-MATLAB基于EKF算法估计电动汽车蓄电池的SOC
  3. mysql保存中文异常Incorrect string value: '\xE4\xBD\xA0\xE5\xA5\xBD' for column'
  4. Vue中实现页面上点击按钮下载文件(exe)
  5. 浅谈a标签属性href的mailto更多用法
  6. c#开发中遇到System.AccessViolationException
  7. OpenCV cvReleaseImage把图像怎么样了?
  8. linux sublime3 插件安装插件,手动安装sublimeText3插件
  9. 技术分享连载(六十一)
  10. 学习C++的五十条忠告
  11. 湖南工业大学在线计算机作业答案,湖南工业大学《计算机组成原理》试题集,共7份,有部分答案...
  12. git-stash用法
  13. python字符串驻留机制_python的内存驻留机制(小数据池)
  14. 我心目中的支付宝架构
  15. 人工智能你必须掌握的32个算法(二)归并排序算法
  16. 禁止视频在手机移动端页面中全屏播放代码范例
  17. bolt界面引擎学习笔记一
  18. 推送之信鸽推送详解 Think Different
  19. 天猫精灵家居对接第三方设备(详细版)
  20. spark学习之资源调度

热门文章

  1. [以太坊源代码分析] V. 从钱包到客户端
  2. 在區塊鏈上建立可更新的智慧合約(二)
  3. 由Android 65K方法数限制引发的思考
  4. 【NOIP2013模拟】Vani和Cl2捉迷藏 题解代码
  5. python实现udp聊天室_python网络编程基础--socket的简介,以及使用socket来搭建一个简单的udp小程序...
  6. BZOJ-1005-明明的烦恼
  7. linux查看某端口进程占用,Linux下查看某端口占用进程
  8. 什么是java泛型_java泛型背后是什么
  9. r语言datarame删除行_R语言缺失值的处理:线性回归模型插补
  10. python中os.path.join()的循环用法_Python中.join()和os.path.join()两个函数的用法详解