做题的时候遇到了生产者消费者问题,这个问题可以说是线程学习的经典题目了,就忍不住研究了一波。它描述是有一块缓冲区(队列实现)作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。在Java中这个数组线程阻塞的问题,多个用户同时发送多个请求,怎么保证不发生线程死锁,是我们要考虑的问题。

生产者消费者模式说明:

1.生产者只在仓库未满时进行生产,仓库满时生产者进程被阻塞;

2.消费者只在仓库非空时进行消费,仓库为空时消费者进程被阻塞;

3.当消费者发现仓库为空时会通知生产者生产;

3.当生产者发现仓库满时会通知消费者消费;

实现的关键:

我们知道在JAVA环境中,线程Thread有如下几个状态:

1.新建状态

2.就绪状态

3.运行状态

4.阻塞状态

5.死亡状态

生产者消费者问题就是要控制线程的阻塞状态,保证生产者和消费者进程在一定条件下,一直稳定运行,不出现没有商品但是消费者还是一直购买,商品满了但是生产者还是不断生产导致浪费的情况。

我们考虑线程常用的Sychronized、RetrenLock还有阻塞队列来实现。

(1)Object的wait() / notify()方法

wait(): wait()方法可以让线程进入等待状态,当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。

notify():notify随机选择一个在该对象上调用wait方法的线程,解除其阻塞状态。当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

代码实现:

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;/*** 生产者消费者模式:使用Object.wait() / notify()方法实现*/
public class ProducerConsumer {private static final int CAPACITY = 5;
//申请一个容量最大的仓库public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生产者*/public static class Producer extends Thread{private Queue<Integer> queue;//队列作为仓库String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){
//while(condition)为自旋锁,为防止该线程没有收到notify()调用也从wait()中返回
//(也称作虚假唤醒),这个线程会重新去检查condition条件以决定当前是否可以安全
//地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行
//了,自旋锁当终止条件满足时,才会停止自旋,这里设置了一直执行,直到程序手动停
//止。synchronized(queue){//给队列加锁,保证线程安全while(queue.size() == maxSize){//当队列是满的时候,生产者线程等待,由消费者线程进行操作try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");queue.wait();} catch (Exception ex) {ex.printStackTrace();}}//队列不为空的时候,生产者被唤醒进行操作System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);//因此如果想在一个满的队列中加入一个新项,调用 add() 方法就会抛出一//个 unchecked 异常,而调用 offer() 方法会返回 falsequeue.notifyAll();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}/*** 消费者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){synchronized(queue){while(queue.isEmpty()){try {//队列为空,说明没有生产者生产的商品,消费者进行等待System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");queue.wait();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();//如果队列元素为空,调用remove() 的行为与 Collection 接口的版本相似会抛出异常,这里是模拟消费者取走商品的过程// 但是新的 poll() 方法在用空集合调用时只是返回 null。因此新的方法更适合容易出现异常条件的情况。System.out.println("[" + name + "] Consuming value : " + x);queue.notifyAll();//唤醒所有队列,消费者和生产者根据队列情况进行操作try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}
}

2. 使用Lock和Condition的await() / signal()方法

Condition接口的await()和signal()是用来做同步的两种方法,它们的功能基本上和Object的wait()/ nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

代码实现:

import java.util.LinkedList;import java.util.Queue;import java.util.Random;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** 生产者消费者模式:使用Lock和Condition实现*/public class ProducerConsumer {private static final int CAPACITY = 5;private static final Lock lock = new ReentrantLock();private static final Condition fullCondition = lock.newCondition();//队列满的条件private static final Condition emptyCondition = lock.newCondition();//队列空的条件public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生产者*/public static class Producer extends Thread{private Queue<Integer> queue;String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//获得锁lock.lock();while(queue.size() == maxSize){try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");//这里可以和wait()进行对比,两种控制线程阻塞的方式fullCondition.await();} catch (InterruptedException ex) {ex.printStackTrace();}}System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);//唤醒其他所有生产者、消费者fullCondition.signalAll();emptyCondition.signalAll();//释放锁,Lock不同于Sychronized,需要手动释放锁lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 消费者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//获得锁lock.lock();while(queue.isEmpty()){try {System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");//队列为空满足条件,消费者线程阻塞emptyCondition.await();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();System.out.println("[" + name + "] Consuming value : " + x);//唤醒其他所有生产者、消费者fullCondition.signalAll();emptyCondition.signalAll();//释放锁lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}

(3)BlockingQueue阻塞队列方法

我们采用一个阻塞队列来实现。

通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。

我们这里使用LinkedBlockingQueue,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()/ signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法。

  • put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
  • take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

代码实现:

import java.util.LinkedList;import java.util.Queue;import java.util.Random;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** 生产者消费者模式:使用Lock和Condition实现*/public class ProducerConsumer {private static final int CAPACITY = 5;private static final Lock lock = new ReentrantLock();private static final Condition fullCondition = lock.newCondition();//队列满的条件private static final Condition emptyCondition = lock.newCondition();//队列空的条件public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生产者*/public static class Producer extends Thread{private Queue<Integer> queue;String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//获得锁lock.lock();while(queue.size() == maxSize){try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");//这里可以和wait()进行对比,两种控制线程阻塞的方式fullCondition.await();} catch (InterruptedException ex) {ex.printStackTrace();}}System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);
//唤醒其他所有生产者、消费者fullCondition.signalAll();emptyCondition.signalAll();//释放锁,Lock不同于Sychronized,需要手动释放锁lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 消费者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){lock.lock();while(queue.isEmpty()){try {System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
//队列为空满足条件,消费者线程阻塞emptyCondition.await();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();System.out.println("[" + name + "] Consuming value : " + x);//唤醒其他所有生产者、消费者fullCondition.signalAll();emptyCondition.signalAll();//释放锁lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}

小结:三种实现形式,其实理念都是相同的,都是控制阻塞状态,根据条件去控制线程的运行状态和阻塞状态。生产者消费者模式 为信息传输开辟了一个崭新的概念,因为它的优先级最高,所以即使网络发生堵塞时它也会最先通过,最大程度的保证了设备的安全。也有缺点,就是在网络中的个数是有限制的。生产者消费者模式在设置时比较简单,使用方便安全,在将来的自动化行业必定会大大被人们所认同。

参考资料:

https://blog.csdn.net/u010983881/article/details/78554671#commentBox

生产者消费者模型java实现相关推荐

  1. 生产者消费者模型 java

    生产者消费者 面包类 容器类 让面包同步 存放面包 取出面包 生产类 线程 消费类 线程 测试类 效果 多存多取的时候 问题 可能出现 解决多存多取 效果

  2. 多线程生产者消费者模型

    1. 基础知识: 1. 什么是生产者-消费者模式: 比如有两个进程A和B,它们共享一个固定大小的缓冲区,A进程产生数据放入缓冲区,B进程从缓冲区中取出数据进行计算,那么这里其实就是一个生产者和消费者的 ...

  3. java多线程抽奖_java 线程池、多线程并发实战(生产者消费者模型 1 vs 10) 附案例源码...

    导读 前二天写了一篇<Java 多线程并发编程>点我直达,放国庆,在家闲着没事,继续写剩下的东西,开干! 线程池 为什么要使用线程池 例如web服务器.数据库服务器.文件服务器或邮件服务器 ...

  4. 如何在 Java 中正确使用 wait, notify 和 notifyAll – 以生产者消费者模型为例

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. wait, notify 和 noti ...

  5. 分布与并行计算—生产者消费者模型队列(Java)

    在生产者-消费者模型中,在原有代码基础上,把队列独立为1个类实现,通过公布接口,由生产者和消费者调用. public class Consumer implements Runnable {int n ...

  6. pattern in java_Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

    生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题.有如下几个常见的实现方法: 1. wait()/notify() 2. lock & condition 3. Blockin ...

  7. 【Java 并发编程】多线程、线程同步、死锁、线程间通信(生产者消费者模型)、可重入锁、线程池

    并发编程(Concurrent Programming) 进程(Process).线程(Thread).线程的串行 多线程 多线程的原理 多线程的优缺点 Java并发编程 默认线程 开启新线程 `Ru ...

  8. java 生产者消费者_Java多线程:线程间通信—生产者消费者模型

    一.背景 && 定义 多线程环境下,只要有并发问题,就要保证数据的安全性,一般指的是通过 synchronized 来进行同步. 另一个问题是, 多个线程之间如何协作呢 ? 我们看一个 ...

  9. Java生产者 消费者模型的一种实现

    本文主要介绍java中生产者/消费者模式的实现,对java线程锁机制的一次深入理解. 生产者/消费者模型 生产者/消费者模型要保证,同一个资源在同一时间节点下只能被最多一个线程访问,这个在java中用 ...

最新文章

  1. elment-ui文件上传详解
  2. matlab幂法的瑞利商加速,瑞利商加速定理14.PPT
  3. SAP Spartacus支持的语言和货币单位的数据源
  4. Java小魔女芭芭拉_沉迷蘑菇不可自拔,黏土人《小魔女学园》苏西·曼芭芭拉 图赏...
  5. 抑郁症是不可告人的病吗?
  6. Dialog里加入下拉框Java_android 自定义dialog弹出框,带单选多选下拉
  7. python接口自动化4-绕过验证码登录(cookie) (转载)
  8. 网页与服务器数据库数据交互,网页与ACCESS数据库如何实现数据交互?
  9. 使用FFmpeg 编解码 FLV的HEVC(H265)格式的视频
  10. python查询12306余票_Python实现查询12306火车票信息
  11. Web前端满屋花案例框架
  12. 【知识点】eval() 的用法
  13. 百度笔试题——首相的密道
  14. 【复杂网络系列】模块度(Modularity )的计算方法
  15. java $ 怎样用_jsp中$是什么意思?怎么用?
  16. 2020CCPC 绵阳 7-4 Defuse the Bombs(二分)
  17. win10更改固定IP出现意外无法更改
  18. springboot+敬老院管理系统 毕业设计-附源码261535
  19. html5地图编辑器,XTranslator Map Editor(地图编辑器)
  20. 计算机中整数的表示和整数运算

热门文章

  1. SSLOJ 1317.灵魂分流药剂
  2. 从SEO优化角度打造移动端网站的移动建站指南
  3. 2013年部分节假日安排
  4. QGIS离线GeoJSON数据,使用Cesium加载并根据楼层高度拉伸(weixin公众号【图说GIS】)
  5. Cryptarithmetic Problem ‘ODD+ODD == EVEN’;map()函数,reduce()
  6. Verilog实现移位寄存器
  7. css中的BFC、IFC、GFC、FFC
  8. B2C大点名:国内B2C网站收集(更新至2009年6月2日)
  9. 工业智能网关BL110应用之60:如何实现智能楼宇控制BACnet 接入阿里云平台
  10. 如何通过数据分析找到热销产品?