pattern in java_Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)
生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题。有如下几个常见的实现方法:
1. wait()/notify()
2. lock & condition
3. BlockingQueue
下面来逐一分析。
1. wait()/notify()
第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行;这也是最原始的实现。
1 public class WaitNotifyBroker implements Broker{2
3 private finalObject[] items;4
5 private inttakeIndex;6 private intputIndex;7 private intcount;8
9 public WaitNotifyBroker(intcapacity) {10 this.items = newObject[capacity];11 }12
13 @SuppressWarnings("unchecked")14 @Override15 publicT take() {16 T tmpObj = null;17 try{18 synchronized(items) {19 while (0 ==count) {20 items.wait();21 }22 tmpObj =(T) items[takeIndex];23 if (++takeIndex ==items.length) {24 takeIndex = 0;25 }26 count--;27 items.notify();28 }29 } catch(InterruptedException e) {30 e.printStackTrace();31 }32
33 returntmpObj;34 }35
36 @Override37 public voidput(T obj) {38 try{39 synchronized(items) {40 while (items.length ==count) {41 items.wait();42 }43
44 items[putIndex] =obj;45 if (++putIndex ==items.length) {46 putIndex = 0;47 }48 count++;49 items.notify();50 }51 } catch(InterruptedException e) {52 e.printStackTrace();53 }54
55 }56
57 }
这里利用Array构造一个Buffer去存取数据,并利用count, putIndex和takeIndex来保证First-In-First-Out。
如果利用LinkedList来代替Array,相对来说会稍微简单些。
LinkedList的实现,可以参考《Java 7 Concurrency Cookbook》第2章wait/notify。
2. lock & condition
lock & condition,实际上也实现了类似synchronized和wait()/notify()的功能,但在加锁和解锁、暂停和唤醒方面,更加细腻和可控。
在JDK的BlockingQueue的默认实现里,也是利用了lock & condition。此文也详细介绍了怎么利用lock&condition写BlockingQueue,这里换LinkedList再实现一次:
1 public class LockConditionBroker implements Broker{2
3 private finalReentrantLock lock;4 private finalCondition notFull;5 private finalCondition notEmpty;6 private final intcapacity;7 private LinkedListitems;8
9 public LockConditionBroker(intcapacity) {10 this.lock = newReentrantLock();11 this.notFull =lock.newCondition();12 this.notEmpty =lock.newCondition();13 this.capacity =capacity;14
15 items = new LinkedList();16 }17
18 @Override19 publicT take() {20 T tmpObj = null;21 lock.lock();22 try{23 while (items.size() == 0) {24 notEmpty.await();25 }26
27 tmpObj =items.poll();28 notFull.signalAll();29
30 } catch(InterruptedException e) {31 e.printStackTrace();32 } finally{33 lock.unlock();34 }35 returntmpObj;36 }37
38 @Override39 public voidput(T obj) {40 lock.lock();41 try{42 while (items.size() ==capacity) {43 notFull.await();44 }45
46 items.offer(obj);47 notEmpty.signalAll();48
49 } catch(InterruptedException e) {50 e.printStackTrace();51 } finally{52 lock.unlock();53 }54
55 }56 }
3. BlockingQueue
最后这种方法,也是最简单最值得推荐的。利用并发包提供的工具:阻塞队列,将阻塞的逻辑交给BlockingQueue。
实际上,上述1和2的方法实现的Broker类,也可以视为一种简单的阻塞队列,不过没有标准包那么完善。
1 public class BlockingQueueBroker implements Broker{2
3 private final BlockingQueuequeue;4
5 publicBlockingQueueBroker() {6 this.queue = new LinkedBlockingQueue();7 }8
9 @Override10 publicT take() {11 try{12 returnqueue.take();13 } catch(InterruptedException e) {14 e.printStackTrace();15 }16
17 return null;18 }19
20 @Override21 public voidput(T obj) {22 try{23 queue.put(obj);24 } catch(InterruptedException e) {25 e.printStackTrace();26 }27 }28
29 }
我们的队列封装了标注包里的LinkedBlockingQueue,十分简单高效。
接下来,就是一个1P2C的例子:
1 public interface Broker{2
3 T take();4
5 voidput(T obj);6
7 }8
9
10 public class Producer implementsRunnable {11
12 private final Brokerbroker;13 private finalString name;14
15 public Producer(Brokerbroker, String name) {16 this.broker =broker;17 this.name =name;18 }19
20 @Override21 public voidrun() {22 try{23 for (int i = 0; i < 5; i++) {24 broker.put(i);25 System.out.format("%s produced: %s%n", name, i);26 Thread.sleep(1000);27 }28 broker.put(-1);29 System.out.println("produced termination signal");30 } catch(InterruptedException e) {31 e.printStackTrace();32 return;33 }34
35 }36
37 }38
39
40 public class Consumer implementsRunnable {41
42 private final Brokerbroker;43 private finalString name;44
45 public Consumer(Brokerbroker, String name) {46 this.broker =broker;47 this.name =name;48 }49
50 @Override51 public voidrun() {52 try{53 for (Integer message = broker.take(); message != -1; message =broker.take()) {54 System.out.format("%s consumed: %s%n", name, message);55 Thread.sleep(1000);56 }57 System.out.println("received termination signal");58 } catch(InterruptedException e) {59 e.printStackTrace();60 return;61 }62
63 }64
65 }66
67
68 public classMain {69
70 public static voidmain(String[] args) {71 Broker broker = new WaitNotifyBroker(5);72 //Broker broker = new LockConditionBroker(5);73 //Broker broker = new BlockingQueueBroker();
74
75 new Thread(new Producer(broker, "prod 1")).start();76 new Thread(new Consumer(broker, "cons 1")).start();77 new Thread(new Consumer(broker, "cons 2")).start();78
79 }80
81 }
除了上述的方法,其实还有很多第三方的并发包可以解决这个问题。例如LMAX Disruptor和Chronicle等
本文完。
参考:
《Java 7 Concurrency Cookbook》
pattern in java_Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)相关推荐
- java 山洞过火车 java_Java多线程之生产者消费者模型
[ 线程间的通讯 wait()在对象上等待,等待通知(在等待过程中释放对象锁.等待必须在同步块内.这个对象就是同步锁)<让线程进入阻塞状态,将线程放入等待池中> notify()通知在这个 ...
- 进程 互斥锁、队列与管道、生产者消费者模型
目录 1.互斥锁 2.队列与管道 3.生产者消费者模型(Queue) 4.生产者消费者模型(JoinableQueue) 1.互斥锁 首先导入Lock模块 实例化一把锁 但是每次生成子进程的时候都会重 ...
- Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁、信号量、事件、队列、生产者消费者模型...
一.进程锁(同步锁/互斥锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理. 例 ...
- 互斥锁、共享内存方式以及生产者消费者模型
守护进程 1.守护进程的概念 进程指的是一个正在运行的程序,守护进程也是一个普通进程 意思就是一个进程可以守护另一个进程 import time from multiprocessing import ...
- 并发编程——进程——生产者消费者模型
一.生产者消费者模型介绍 为什么要使用生产者消费者模型 生产者指的是生产数据的任务,消费者指的是处理数据的任务. 在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者 ...
- 如何在 Java 中正确使用 wait, notify 和 notifyAll – 以生产者消费者模型为例
欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. wait, notify 和 noti ...
- 生产者消费者_【线程通信】生产者消费者模型
1生产者消费者模型介绍 生产者消费者模型,是每一个学习多线程的的人都需要知道的模型; 大致情况就是:有两个线程,一个负责生产产品,一个消费产品,两者公用同一块内存区域,也就是产品放在了同一块内存上面, ...
- python queue 生产者 消费者_【python】-- 队列(Queue)、生产者消费者模型
队列(Queue) 在多个线程之间安全的交换数据信息,队列在多线程编程中特别有用 队列的好处: 提高双方的效率,你只需要把数据放到队列中,中间去干别的事情. 完成了程序的解耦性,两者关系依赖性没有不大 ...
- 生产者消费者模型、信号量、线程池以及单例模式的实现
生产者消费者模型!!---对典型的应用场景设计的解决方案 生产者与消费者模型应用场景:有线程不断的生产数据,有线程不断的处理数据. 数据的生产与数据的处理:放在同一个线程中完成,因为执行流只有一个,那 ...
最新文章
- protobuf和socket通信简单实例
- Sqlserver 数据库安全
- 线性回归csv数据集_用mxnet的gluon线性回归训练只有两个特征的数据集
- php代码 intval( ),php intval的测试代码发现问题_PHP教程
- 奇安信代码卫士帮助微软修复严重漏洞,获官方致谢和奖金
- “流量注入”***模式的探讨
- 简述RHEL7新特性(二)
- 跟我一起写 Makefile ---转
- rbf神经网络_黄小龙,陈阳舟:高阶非线性不确定多智能体系统自适应RBF神经网络协同控制...
- 如何设置vs2005的环境变量
- Matplotlib入门详细教程
- (ffmpeg3.3.x更新纪要)雷霄骅《最简单的基于FFMPEG+SDL的视频播放器》
- android imageview方法,Android入门之ImageView的使用方法
- mmall 项目实战(一)项目初始化
- Git:Terminal is dumb, but EDITOR unset
- 免费的桌面主题按钮 V1.0
- 小米8手机相册中的图片怎么识别文字?
- Linux鸟哥的私房菜(硬件)
- 视频剪辑工具:剪映专业版 for Mac
- 论项目管理与可行性分析的重要性
热门文章
- python -day16-模块介绍
- js的eval代码快速解密
- 数据库学习--wildfly配置postgreSQL数据源
- H2 database 操作操作内存表
- 服务器Context、虚拟主机配置(管理、配置)
- 设计模式你怎么看?--代理模式
- __declspec(novtable)有什么作用
- AndroidStudio_安卓原生开发_自己设计android端_到springboot端的请求验证Token系统---Android原生开发工作笔记158
- 编译人脸识别的时候出现问题
- 9W人脸清洗的问题--20170208