我们先来看看什么是生产者消费者模式,生产者消费者模式是程序设计中非常常见的一种设计模式,被广泛运用在解耦、消息队列等场景。在现实世界中,我们把生产商品的一方称为生产者,把消费商品的一方称为消费者,有时生产者的生产速度特别快,但消费者的消费速度跟不上,俗称“产能过剩”,又或是多个生产者对应多个消费者时,大家可能会手忙脚乱。如何才能让大家更好地配合呢?这时在生产者和消费者之间就需要一个中介来进行调度,于是便诞生了生产者消费者模式。

使用生产者消费者模式通常需要在两者之间增加一个阻塞队列作为媒介,有了媒介之后就相当于有了一个缓冲,平衡了两者的能力,整体的设计如图所示,最上面是阻塞队列,右侧的 1 是生产者线程,生产者在生产数据后将数据存放在阻塞队列中,左侧的 2 是消费者线程,消费者获取阻塞队列中的数据。而中间的 3 和 4 分别代表生产者消费者之间互相通信的过程,因为无论阻塞队列是满还是空都可能会产生阻塞,阻塞之后就需要在合适的时机去唤醒被阻塞的线程。

那么什么时候阻塞线程需要被唤醒呢?有两种情况。第一种情况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。另一种情况是如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产,这便是对生产者消费者模式的简单介绍。

如何用 BlockingQueue 实现生产者消费者模式
我们接下来看如何用 wait/notify/Condition/BlockingQueue 实现生产者消费者模式,先从最简单的 BlockingQueue 开始讲起:

复制代码
public static void main(String[] args) {
 
  BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
 
 Runnable producer = () -> {
    while (true) {
          queue.put(new Object());
  }
   };
 
new Thread(producer).start();
new Thread(producer).start();
 
Runnable consumer = () -> {
      while (true) {
           queue.take();
}
   };
new Thread(consumer).start();
new Thread(consumer).start();
}
如代码所示,首先,创建了一个 ArrayBlockingQueue 类型的 BlockingQueue,命名为 queue 并将它的容量设置为 10;其次,创建一个简单的生产者,while(true) 循环体中的queue.put() 负责往队列添加数据;然后,创建两个生产者线程并启动;同样消费者也非常简单,while(true) 循环体中的 queue.take() 负责消费数据,同时创建两个消费者线程并启动。为了代码简洁并突出设计思想,代码里省略了 try/catch 检测,我们不纠结一些语法细节。以上便是利用 BlockingQueue 实现生产者消费者模式的代码。虽然代码非常简单,但实际上 ArrayBlockingQueue 已经在背后完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。

如何用 Condition 实现生产者消费者模式
BlockingQueue 实现生产者消费者模式看似简单,背后却暗藏玄机,我们在掌握这种方法的基础上仍需要掌握更复杂的实现方法。我们接下来看如何在掌握了 BlockingQueue 的基础上利用 Condition 实现生产者消费者模式,它们背后的实现原理非常相似,相当于我们自己实现一个简易版的 BlockingQueue:

复制代码
public class MyBlockingQueueForCondition {
 
   private Queue queue;
   private int max = 16;
   private ReentrantLock lock = new ReentrantLock();
   private Condition notEmpty = lock.newCondition();
   private Condition notFull = lock.newCondition();
 
 
   public MyBlockingQueueForCondition(int size) {
       this.max = size;
       queue = new LinkedList();
   }
 
   public void put(Object o) throws InterruptedException {
       lock.lock();
       try {
           while (queue.size() == max) {
               notFull.await();
           }
           queue.add(o);
           notEmpty.signalAll();
       } finally {
           lock.unlock();
       }
   }
 
   public Object take() throws InterruptedException {
       lock.lock();
       try {
           while (queue.size() == 0) {
               notEmpty.await();
           }
           Object item = queue.remove();
           notFull.signalAll();
           return item;
       } finally {
           lock.unlock();
       }
   }
}
如代码所示,首先,定义了一个队列变量 queue 并设置最大容量为 16;其次,定义了一个 ReentrantLock 类型的 Lock 锁,并在 Lock 锁的基础上创建两个 Condition,一个是 notEmpty,另一个是 notFull,分别代表队列没有空和没有满的条件;最后,声明了 put 和 take 这两个核心方法。

因为生产者消费者模式通常是面对多线程的场景,需要一定的同步措施保障线程安全,所以在 put 方法中先将 Lock 锁上,然后,在 while 的条件里检测 queue 是不是已经满了,如果已经满了,则调用 notFull 的 await() 阻塞生产者线程并释放 Lock,如果没有满,则往队列放入数据并利用 notEmpty.signalAll() 通知正在等待的所有消费者并唤醒它们。最后在 finally 中利用 lock.unlock() 方法解锁,把 unlock 方法放在 finally 中是一个基本原则,否则可能会产生无法释放锁的情况。

下面再来看 take 方法,take 方法实际上是与 put 方法相互对应的,同样是通过 while 检查队列是否为空,如果为空,消费者开始等待,如果不为空则从队列中获取数据并通知生产者队列有空余位置,最后在 finally 中解锁。

这里需要注意,我们在 take() 方法中使用 while( queue.size() == 0 ) 检查队列状态,而不能用 if( queue.size() == 0 )。为什么呢?大家思考这样一种情况,因为生产者消费者往往是多线程的,我们假设有两个消费者,第一个消费者线程获取数据时,发现队列为空,便进入等待状态;因为第一个线程在等待时会释放 Lock 锁,所以第二个消费者可以进入并执行 if( queue.size() == 0 ),也发现队列为空,于是第二个线程也进入等待;而此时,如果生产者生产了一个数据,便会唤醒两个消费者线程,而两个线程中只有一个线程可以拿到锁,并执行 queue.remove 操作,另外一个线程因为没有拿到锁而卡在被唤醒的地方,而第一个线程执行完操作后会在 finally 中通过 unlock 解锁,而此时第二个线程便可以拿到被第一个线程释放的锁,继续执行操作,也会去调用 queue.remove 操作,然而这个时候队列已经为空了,所以会抛出 NoSuchElementException 异常,这不符合我们的逻辑。而如果用 while 做检查,当第一个消费者被唤醒得到锁并移除数据之后,第二个线程在执行 remove 前仍会进行 while 检查,发现此时依然满足 queue.size() == 0 的条件,就会继续执行 await 方法,避免了获取的数据为 null 或抛出异常的情况。

如何用 wait/notify 实现生产者消费者模式
最后我们再来看看使用 wait/notify 实现生产者消费者模式的方法,实际上实现原理和Condition 是非常类似的,它们是兄弟关系:

复制代码
class MyBlockingQueue {
 
   private int maxSize;
   private LinkedList<Object> storage;
 
   public MyBlockingQueue(int size) {
       this.maxSize = size;
       storage = new LinkedList<>();
   }
 
   public synchronized void put() throws InterruptedException {
       while (storage.size() == maxSize) {
           wait();
       }
       storage.add(new Object());
       notifyAll();
   }
 
   public synchronized void take() throws InterruptedException {
       while (storage.size() == 0) {
           wait();
       }
       System.out.println(storage.remove());
       notifyAll();
   }
}
如代码所示,最主要的部分仍是 take 与 put 方法,我们先来看 put 方法,put 方法被 synchronized 保护,while 检查队列是否为满,如果不满就往里放入数据并通过 notifyAll() 唤醒其他线程。同样,take 方法也被 synchronized 修饰,while 检查队列是否为空,如果不为空就获取数据并唤醒其他线程。使用这个 MyBlockingQueue 实现的生产者消费者代码如下:

复制代码
/**
* 描述:     wait形式实现生产者消费者模式
*/
public class WaitStyle {
 
   public static void main(String[] args) {
       MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
       Producer producer = new Producer(myBlockingQueue);
       Consumer consumer = new Consumer(myBlockingQueue);
       new Thread(producer).start();
       new Thread(consumer).start();
   }
}
 
class Producer implements Runnable {
 
   private MyBlockingQueue storage;
 
   public Producer(MyBlockingQueue storage) {
       this.storage = storage;
   }
 
   @Override
   public void run() {
       for (int i = 0; i < 100; i++) {
           try {
               storage.put();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
}
 
class Consumer implements Runnable {
 
   private MyBlockingQueue storage;
 
   public Consumer(MyBlockingQueue storage) {
       this.storage = storage;
   }
 
   @Override
   public void run() {
       for (int i = 0; i < 100; i++) {
           try {
               storage.take();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
}
以上就是三种实现生产者消费者模式的讲解,其中,第一种 BlockingQueue 模式实现比较简单,但其背后的实现原理在第二种、第三种实现方法中得以体现,第二种、第三种实现方法本质上是我们自己实现了 BlockingQueue 的一些核心逻辑,供生产者与消费者使用。

引用:https://kaiwu.lagou.com/course/courseInfo.htm?courseId=16#/detail/pc?id=243

Java多线程学习三:有哪几种实现生产者消费者模式的方法相关推荐

  1. Java多线程学习三十五: CyclicBarrier 和 CountDownLatch 有什么不同

    CyclicBarrier 和 CountDownLatch 有什么不同? CyclicBarrier作用 CyclicBarrier 和 CountDownLatch 确实有一定的相似性,它们都能阻 ...

  2. java多线程系类:基础篇:10生产者消费者的问题

    概要 本章,会对"生产/消费者问题"进行讨论.涉及到的内容包括: 1. 生产/消费者模型 2. 生产/消费者实现 转载请注明出处:http://www.cnblogs.com/sk ...

  3. 多线程在单例中的应用,生产者消费者模式(线程的通信)

    单例的实现方式:懒汉式和饿汉式 其中,懒汉式是线程不安全的,当有多条线程同时访问单例对象时,则会出现多线程临界资源问题 单例实现步骤: 1 私有化构造方法 2 在类中创建对象 3 通过公开的方法返回这 ...

  4. Java多线程学习三十八:你知道什么是 CAS 吗

    CAS 简介 CAS 其实是我们面试中的常客,因为它是原子类的底层原理,同时也是乐观锁的原理,所以当你去面试的时候,经常会遇到这样的问题"你知道哪些类型的锁"?你可能会回答&quo ...

  5. Java多线程学习三十七:volatile 的作用是什么?与 synchronized 有什么异同

    volatile 是什么 首先我们就来介绍一下 volatile,它是 Java 中的一个关键字,是一种同步机制.当某个变量是共享变量,且这个变量是被 volatile 修饰的,那么在修改了这个变量的 ...

  6. Java多线程学习三十三:Future 的主要功能是什么?

    Future 类 Future 的作用 Future 最主要的作用是,比如当做一定运算的时候,运算过程可能比较耗时,有时会去查数据库,或是繁重的计算,比如压缩.加密等,在这种情况下,如果我们一直在原地 ...

  7. Java多线程学习三十:ThreadLocal 适合用在哪些实际生产的场景中

    我们在学习一个工具之前,首先应该知道这个工具的作用,能带来哪些好处,而不是一上来就闷头进入工具的 API.用法等,否则就算我们把某个工具的用法学会了,也不知道应该在什么场景下使用.所以,我们先来看看究 ...

  8. Java多线程学习三十九:CAS 有什么缺点?

    CAS 有哪几个主要的缺点. 首先,CAS 最大的缺点就是 ABA 问题. 决定 CAS 是否进行 swap 的判断标准是"当前的值和预期的值是否一致",如果一致,就认为在此期间这 ...

  9. Java多线程学习三十六:主内存和工作内存的关系

    CPU 有多级缓存,导致读的数据过期 由于 CPU 的处理速度很快,相比之下,内存的速度就显得很慢,所以为了提高 CPU 的整体运行效率,减少空闲时间,在 CPU 和内存之间会有 cache 层,也就 ...

最新文章

  1. 披上AI战衣的中国APP,正在让印度节节败退
  2. HTML5音乐播放器(四):播放列表与播放方式
  3. P1582 倒水(二进制)
  4. 依赖注入与对象间关系
  5. 计算机网络基础+重点知识点
  6. UJAM Virtual Guitarist SILK for mac(尼龙弦原声吉他)
  7. Hadoop 2.2.0 在centos6.2 64位下的安装--分布式模式
  8. TouchVG 支持 CocoaPods 了!
  9. 极域电子教室卸载、忘记密码解决方案
  10. YARN队列优先级分配策略
  11. 如何让ipad成为电脑的扩展屏
  12. 广告机解决方案/安防监控网络方案/医疗方案
  13. 变量命名20201106-01
  14. 【大数据】什么是数据集成?(SeaTunnel 集成工具介绍)
  15. 服务器常用状态码及其含义
  16. Git 切换分支的命令
  17. 卡诺图和Apple Watch的第一次亲密接触
  18. win10如何手动强制关联默认文件打开方式应用
  19. 英雄联盟手游正式上线啦
  20. 关于黎曼猜想论文开头部分引用的欧拉公式

热门文章

  1. 家长又放心了一些!教育类App不能再干这些事了
  2. OPPO Reno造乐节落地重庆 华语乐坛十大金曲榜单公布
  3. 拳王公社:从0-1只需掌握这3个重点​,网创再也不缺精准流量
  4. 支付宝最不想看到的:当“集五福”变成赚钱生意 有人日入千元!
  5. __FILE__, __LINE__, __FUNCTION__
  6. rm ,rm -rf , rm -f,rm -r 以及rm 命令的其他参数命令
  7. php算法求出一个数可以被分解成多少个_面试时写不出排序算法?看这篇就够了(下)...
  8. 嵌入式Linux入门9:Linux系统使用
  9. cad lisp 两侧偏移并删除_CAD做钣金件展开的原理你知道吗?
  10. 【算法】赫夫曼编码 解码