生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题。有如下几个常见的实现方法:

1. wait()/notify()

2. lock & condition

3. BlockingQueue

下面来逐一分析。

1. wait()/notify()

第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行;这也是最原始的实现。

1 public class WaitNotifyBroker implements Broker{2

3 private finalObject[] items;4

5 private inttakeIndex;6 private intputIndex;7 private intcount;8

9 public WaitNotifyBroker(intcapacity) {10 this.items = newObject[capacity];11 }12

13 @SuppressWarnings("unchecked")14 @Override15 publicT take() {16 T tmpObj = null;17 try{18 synchronized(items) {19 while (0 ==count) {20 items.wait();21 }22 tmpObj =(T) items[takeIndex];23 if (++takeIndex ==items.length) {24 takeIndex = 0;25 }26 count--;27 items.notify();28 }29 } catch(InterruptedException e) {30 e.printStackTrace();31 }32

33 returntmpObj;34 }35

36 @Override37 public voidput(T obj) {38 try{39 synchronized(items) {40 while (items.length ==count) {41 items.wait();42 }43

44 items[putIndex] =obj;45 if (++putIndex ==items.length) {46 putIndex = 0;47 }48 count++;49 items.notify();50 }51 } catch(InterruptedException e) {52 e.printStackTrace();53 }54

55 }56

57 }

这里利用Array构造一个Buffer去存取数据,并利用count, putIndex和takeIndex来保证First-In-First-Out。

如果利用LinkedList来代替Array,相对来说会稍微简单些。

LinkedList的实现,可以参考《Java 7 Concurrency Cookbook》第2章wait/notify。

2. lock & condition

lock & condition,实际上也实现了类似synchronized和wait()/notify()的功能,但在加锁和解锁、暂停和唤醒方面,更加细腻和可控。

在JDK的BlockingQueue的默认实现里,也是利用了lock & condition。此文也详细介绍了怎么利用lock&condition写BlockingQueue,这里换LinkedList再实现一次:

1 public class LockConditionBroker implements Broker{2

3 private finalReentrantLock lock;4 private finalCondition notFull;5 private finalCondition notEmpty;6 private final intcapacity;7 private LinkedListitems;8

9 public LockConditionBroker(intcapacity) {10 this.lock = newReentrantLock();11 this.notFull =lock.newCondition();12 this.notEmpty =lock.newCondition();13 this.capacity =capacity;14

15 items = new LinkedList();16 }17

18 @Override19 publicT take() {20 T tmpObj = null;21 lock.lock();22 try{23 while (items.size() == 0) {24 notEmpty.await();25 }26

27 tmpObj =items.poll();28 notFull.signalAll();29

30 } catch(InterruptedException e) {31 e.printStackTrace();32 } finally{33 lock.unlock();34 }35 returntmpObj;36 }37

38 @Override39 public voidput(T obj) {40 lock.lock();41 try{42 while (items.size() ==capacity) {43 notFull.await();44 }45

46 items.offer(obj);47 notEmpty.signalAll();48

49 } catch(InterruptedException e) {50 e.printStackTrace();51 } finally{52 lock.unlock();53 }54

55 }56 }

3. BlockingQueue

最后这种方法,也是最简单最值得推荐的。利用并发包提供的工具:阻塞队列,将阻塞的逻辑交给BlockingQueue。

实际上,上述1和2的方法实现的Broker类,也可以视为一种简单的阻塞队列,不过没有标准包那么完善。

1 public class BlockingQueueBroker implements Broker{2

3 private final BlockingQueuequeue;4

5 publicBlockingQueueBroker() {6 this.queue = new LinkedBlockingQueue();7 }8

9 @Override10 publicT take() {11 try{12 returnqueue.take();13 } catch(InterruptedException e) {14 e.printStackTrace();15 }16

17 return null;18 }19

20 @Override21 public voidput(T obj) {22 try{23 queue.put(obj);24 } catch(InterruptedException e) {25 e.printStackTrace();26 }27 }28

29 }

我们的队列封装了标注包里的LinkedBlockingQueue,十分简单高效。

接下来,就是一个1P2C的例子:

1 public interface Broker{2

3 T take();4

5 voidput(T obj);6

7 }8

9

10 public class Producer implementsRunnable {11

12 private final Brokerbroker;13 private finalString name;14

15 public Producer(Brokerbroker, String name) {16 this.broker =broker;17 this.name =name;18 }19

20 @Override21 public voidrun() {22 try{23 for (int i = 0; i < 5; i++) {24 broker.put(i);25 System.out.format("%s produced: %s%n", name, i);26 Thread.sleep(1000);27 }28 broker.put(-1);29 System.out.println("produced termination signal");30 } catch(InterruptedException e) {31 e.printStackTrace();32 return;33 }34

35 }36

37 }38

39

40 public class Consumer implementsRunnable {41

42 private final Brokerbroker;43 private finalString name;44

45 public Consumer(Brokerbroker, String name) {46 this.broker =broker;47 this.name =name;48 }49

50 @Override51 public voidrun() {52 try{53 for (Integer message = broker.take(); message != -1; message =broker.take()) {54 System.out.format("%s consumed: %s%n", name, message);55 Thread.sleep(1000);56 }57 System.out.println("received termination signal");58 } catch(InterruptedException e) {59 e.printStackTrace();60 return;61 }62

63 }64

65 }66

67

68 public classMain {69

70 public static voidmain(String[] args) {71 Broker broker = new WaitNotifyBroker(5);72 //Broker broker = new LockConditionBroker(5);73 //Broker broker = new BlockingQueueBroker();

74

75 new Thread(new Producer(broker, "prod 1")).start();76 new Thread(new Consumer(broker, "cons 1")).start();77 new Thread(new Consumer(broker, "cons 2")).start();78

79 }80

81 }

除了上述的方法,其实还有很多第三方的并发包可以解决这个问题。例如LMAX Disruptor和Chronicle等

本文完。

参考:

《Java 7 Concurrency Cookbook》

pattern in java_Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)相关推荐

  1. java 山洞过火车 java_Java多线程之生产者消费者模型

    [ 线程间的通讯 wait()在对象上等待,等待通知(在等待过程中释放对象锁.等待必须在同步块内.这个对象就是同步锁)<让线程进入阻塞状态,将线程放入等待池中> notify()通知在这个 ...

  2. 进程 互斥锁、队列与管道、生产者消费者模型

    目录 1.互斥锁 2.队列与管道 3.生产者消费者模型(Queue) 4.生产者消费者模型(JoinableQueue) 1.互斥锁 首先导入Lock模块 实例化一把锁 但是每次生成子进程的时候都会重 ...

  3. Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁、信号量、事件、队列、生产者消费者模型...

    一.进程锁(同步锁/互斥锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理. 例 ...

  4. 互斥锁、共享内存方式以及生产者消费者模型

    守护进程 1.守护进程的概念 进程指的是一个正在运行的程序,守护进程也是一个普通进程 意思就是一个进程可以守护另一个进程 import time from multiprocessing import ...

  5. 并发编程——进程——生产者消费者模型

    一.生产者消费者模型介绍 为什么要使用生产者消费者模型 生产者指的是生产数据的任务,消费者指的是处理数据的任务. 在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者 ...

  6. 如何在 Java 中正确使用 wait, notify 和 notifyAll – 以生产者消费者模型为例

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. wait, notify 和 noti ...

  7. 生产者消费者_【线程通信】生产者消费者模型

    1生产者消费者模型介绍 生产者消费者模型,是每一个学习多线程的的人都需要知道的模型; 大致情况就是:有两个线程,一个负责生产产品,一个消费产品,两者公用同一块内存区域,也就是产品放在了同一块内存上面, ...

  8. python queue 生产者 消费者_【python】-- 队列(Queue)、生产者消费者模型

    队列(Queue) 在多个线程之间安全的交换数据信息,队列在多线程编程中特别有用 队列的好处: 提高双方的效率,你只需要把数据放到队列中,中间去干别的事情. 完成了程序的解耦性,两者关系依赖性没有不大 ...

  9. 生产者消费者模型、信号量、线程池以及单例模式的实现

    生产者消费者模型!!---对典型的应用场景设计的解决方案 生产者与消费者模型应用场景:有线程不断的生产数据,有线程不断的处理数据. 数据的生产与数据的处理:放在同一个线程中完成,因为执行流只有一个,那 ...

最新文章

  1. protobuf和socket通信简单实例
  2. Sqlserver 数据库安全
  3. 线性回归csv数据集_用mxnet的gluon线性回归训练只有两个特征的数据集
  4. php代码 intval( ),php intval的测试代码发现问题_PHP教程
  5. 奇安信代码卫士帮助微软修复严重漏洞,获官方致谢和奖金
  6. “流量注入”***模式的探讨
  7. 简述RHEL7新特性(二)
  8. 跟我一起写 Makefile ---转
  9. rbf神经网络_黄小龙,陈阳舟:高阶非线性不确定多智能体系统自适应RBF神经网络协同控制...
  10. 如何设置vs2005的环境变量
  11. Matplotlib入门详细教程
  12. (ffmpeg3.3.x更新纪要)雷霄骅《最简单的基于FFMPEG+SDL的视频播放器》
  13. android imageview方法,Android入门之ImageView的使用方法
  14. mmall 项目实战(一)项目初始化
  15. Git:Terminal is dumb, but EDITOR unset
  16. 免费的桌面主题按钮 V1.0
  17. 小米8手机相册中的图片怎么识别文字?
  18. Linux鸟哥的私房菜(硬件)
  19. 视频剪辑工具:剪映专业版 for Mac
  20. 论项目管理与可行性分析的重要性

热门文章

  1. python -day16-模块介绍
  2. js的eval代码快速解密
  3. 数据库学习--wildfly配置postgreSQL数据源
  4. H2 database 操作操作内存表
  5. 服务器Context、虚拟主机配置(管理、配置)
  6. 设计模式你怎么看?--代理模式
  7. __declspec(novtable)有什么作用
  8. AndroidStudio_安卓原生开发_自己设计android端_到springboot端的请求验证Token系统---Android原生开发工作笔记158
  9. 编译人脸识别的时候出现问题
  10. 9W人脸清洗的问题--20170208