生产者和消费者问题是线程模型中的经典问题,生产者和消费者在同一时间段共用同一个存储空间,这个存储空间是一个缓冲区的仓库,生产者可以将产品放入仓库,消费者可以从仓库中取出产品。

生产者/消费者模型是基于等待/通知机制,主要关注以下几点:

  1. 生产者生产的时候消费者不能消费
  2. 消费者消费的时候生产者不能生产
  3. 缓冲区空时消费者不能消费
  4. 缓冲区满时生产者不能生产

主要优点:

  • 解耦。因为多了一个缓冲区,所以生产者和消费者并不直接相互调用,这一点很容易想到,这样生产者和消费者的代码发生变化,都不会对对方产生影响,这样其实就把生产者和消费者之间的强耦合解开,变为了生产者和缓冲区/消费者和缓冲区之间的弱耦合
  • 通过平衡生产者和消费者的处理能力来提高整体处理数据的速度,这是生产者/消费者模型最重要的一个优点。如果消费者直接从生产者这里拿数据,如果生产者生产的速度很慢,但消费者消费的速度很快,那消费者就得占用CPU的时间片白白等在那边。有了生产者/消费者模型,生产者和消费者就是两个独立的并发体,生产者把生产出来的数据往缓冲区一丢就好了,不必管消费者;消费者也是,从缓冲区去拿数据就好了,也不必管生产者,缓冲区满了就不生产,缓冲区空了就不消费,使生产者/消费者的处理能力达到一个动态的平衡

实现生产者和消费者的5种方式

wait()和notify()方法的实现:

这也是最简单最基础的实现,缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。

package org.example;import java.sql.SQLOutput;public class Test {private static Integer count = 0;private static final Integer Total = 10;private static String flag = "agree";public static void main(String[] args) {Test test = new Test();new Thread(test.new Producer()).start();new Thread(test.new Consumer()).start();new Thread(test.new Producer()).start();new Thread(test.new Consumer()).start();new Thread(test.new Producer()).start();new Thread(test.new Consumer()).start();new Thread(test.new Producer()).start();new Thread(test.new Consumer()).start();}public class Producer implements Runnable{@Overridepublic void run() {for (int i = 0;i < 5; i++){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (flag) {while (count == Total) {try {flag.wait();} catch (InterruptedException e) {e.printStackTrace();}}count++;System.out.println(Thread.currentThread().getName()+"生产者共有" + count);flag.notifyAll();}}}}public class Consumer implements Runnable{@Overridepublic void run() {for(int i = 0; i < 5; i++){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (flag){while(count == 0){try {flag.wait();} catch (InterruptedException e) {e.printStackTrace();}}count--;System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count);flag.notifyAll();}}}}
}

运行结果:

Thread-6生产者共有1
Thread-1消费者消费,目前总共有0
Thread-0生产者共有1
Thread-4生产者共有2
Thread-5消费者消费,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-2生产者共有1
Thread-3消费者消费,目前总共有0
Thread-4生产者共有1
Thread-6生产者共有2
Thread-7消费者消费,目前总共有1
Thread-0生产者共有2
Thread-1消费者消费,目前总共有1
Thread-5消费者消费,目前总共有0
Thread-2生产者共有1
Thread-3消费者消费,目前总共有0
Thread-4生产者共有1
Thread-5消费者消费,目前总共有0
Thread-2生产者共有1
Thread-3消费者消费,目前总共有0
Thread-6生产者共有1
Thread-7消费者消费,目前总共有0
Thread-0生产者共有1
Thread-1消费者消费,目前总共有0
Thread-6生产者共有1
Thread-3消费者消费,目前总共有0
Thread-4生产者共有1
Thread-0生产者共有2
Thread-1消费者消费,目前总共有1
Thread-2生产者共有2
Thread-5消费者消费,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-0生产者共有1
Thread-3消费者消费,目前总共有0
Thread-6生产者共有1
Thread-7消费者消费,目前总共有0
Thread-4生产者共有1
Thread-2生产者共有2
Thread-5消费者消费,目前总共有1
Thread-1消费者消费,目前总共有0Process finished with exit code 0

可重入锁ReentrantLock的实现

java.util.concurrent.lock 中的 Lock 框架是锁定的一个抽象,通过对lock的lock()方法和unlock()方法实现了对锁的显示控制,而synchronize()则是对锁的隐性控制。
可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响,简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,函数调用结束计数器就减1,然后锁需要被释放两次才能获得真正释放。已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*** 生产者和消费者,ReentrantLock的实现*/
public class Test{private static Integer count = 0;private static final Integer FULL = 10;//创建一个锁对象private Lock lock = new ReentrantLock();//创建两个条件变量,一个为缓冲区非满,一个为缓冲区非空private final Condition notFull = lock.newCondition();private final Condition notEmpty = lock.newCondition();public static void main(String[] args) {Test test2 = new Test();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}//获取锁lock.lock();try {while (count == FULL) {try {notFull.await();} catch (InterruptedException e) {e.printStackTrace();}}count++;System.out.println(Thread.currentThread().getName()+ "生产者生产,目前总共有" + count);//唤醒消费者notEmpty.signal();} finally {//释放锁lock.unlock();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e1) {e1.printStackTrace();}lock.lock();try {while (count == 0) {try {notEmpty.await();} catch (Exception e) {e.printStackTrace();}}count--;System.out.println(Thread.currentThread().getName()+ "消费者消费,目前总共有" + count);notFull.signal();} finally {lock.unlock();}}}}
}

 运行结果

Thread-2生产者生产,目前总共有1
Thread-6生产者生产,目前总共有2
Thread-4生产者生产,目前总共有3
Thread-5消费者消费,目前总共有2
Thread-1消费者消费,目前总共有1
Thread-0生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-4生产者生产,目前总共有1
Thread-3消费者消费,目前总共有0
Thread-6生产者生产,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-5消费者消费,目前总共有0
Thread-2生产者生产,目前总共有1
Thread-1消费者消费,目前总共有0
Thread-4生产者生产,目前总共有1
Thread-1消费者消费,目前总共有0
Thread-6生产者生产,目前总共有1
Thread-5消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-2生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-4生产者生产,目前总共有1
Thread-5消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-6生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-2生产者生产,目前总共有2
Thread-1消费者消费,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-6生产者生产,目前总共有1
Thread-3消费者消费,目前总共有0
Thread-4生产者生产,目前总共有1
Thread-0生产者生产,目前总共有2
Thread-2生产者生产,目前总共有3
Thread-1消费者消费,目前总共有2
Thread-7消费者消费,目前总共有1
Thread-5消费者消费,目前总共有0
Thread-4生产者生产,目前总共有1
Thread-6生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-2生产者生产,目前总共有2
Thread-7消费者消费,目前总共有1
Thread-0生产者生产,目前总共有2
Thread-5消费者消费,目前总共有1
Thread-1消费者消费,目前总共有0
Thread-6生产者生产,目前总共有1
Thread-4生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-2生产者生产,目前总共有2
Thread-5消费者消费,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-1消费者消费,目前总共有0
Thread-6生产者生产,目前总共有1
Thread-4生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-2生产者生产,目前总共有2
Thread-5消费者消费,目前总共有1
Thread-1消费者消费,目前总共有0
Thread-6生产者生产,目前总共有1
Thread-3消费者消费,目前总共有0
Thread-4生产者生产,目前总共有1
Thread-1消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-5消费者消费,目前总共有0
Thread-2生产者生产,目前总共有1
Thread-7消费者消费,目前总共有0
Thread-4生产者生产,目前总共有1
Thread-6生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-2生产者生产,目前总共有2
Thread-0生产者生产,目前总共有3
Thread-5消费者消费,目前总共有2
Thread-7消费者消费,目前总共有1
Thread-1消费者消费,目前总共有0Process finished with exit code 0

阻塞队列BlockingQueue的实现

BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

  1. 当队列满了的时候进行入队列操作
  2. 当队列空了的时候进行出队列操作
    因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。
    从上可知,阻塞队列是线程安全的。
    下面是BlockingQueue接口的一些方法:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/*** 使用BlockingQueue实现生产者消费者模型*/
public class Test3 {private static Integer count = 0;//创建一个阻塞队列final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);public static void main(String[] args) {Test3 test3 = new Test3();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}try {blockingQueue.put(1);count++;System.out.println(Thread.currentThread().getName()+ "生产者生产,目前总共有" + count);} catch (InterruptedException e) {e.printStackTrace();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e1) {e1.printStackTrace();}try {blockingQueue.take();count--;System.out.println(Thread.currentThread().getName()+ "消费者消费,目前总共有" + count);} catch (InterruptedException e) {e.printStackTrace();}}}}
}

信号量Semaphore的实现

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,在操作系统中是一个非常重要的问题,可以用来解决哲学家就餐问题。Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量,可以使用acquire()方法获得一个许可,当许可不足时会被阻塞,release()添加一个许可。在下列代码中,还加入了另外一个mutex信号量,维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行

import java.util.concurrent.Semaphore;
/*** 使用semaphore信号量实现*/
public class Test4 {private static Integer count = 0;//创建三个信号量final Semaphore notFull = new Semaphore(10);final Semaphore notEmpty = new Semaphore(0);final Semaphore mutex = new Semaphore(1);public static void main(String[] args) {Test4 test4 = new Test4();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}try {notFull.acquire();mutex.acquire();count++;System.out.println(Thread.currentThread().getName()+ "生产者生产,目前总共有" + count);} catch (InterruptedException e) {e.printStackTrace();} finally {mutex.release();notEmpty.release();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e1) {e1.printStackTrace();}try {notEmpty.acquire();mutex.acquire();count--;System.out.println(Thread.currentThread().getName()+ "消费者消费,目前总共有" + count);} catch (InterruptedException e) {e.printStackTrace();} finally {mutex.release();notFull.release();}}}}
}

管道输入输出流PipedInputStream和PipedOutputStream实现

在java的io包下,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流。
它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。
使用方法:先创建一个管道输入流和管道输出流,然后将输入流和输出流进行连接,用生产者线程往管道输出流中写入数据,消费者在管道输入流中读取数据,这样就可以实现了不同线程间的相互通讯,但是这种方式在生产者和生产者、消费者和消费者之间不能保证同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的,多个生成者和多个消费者者之间则不行

/*** 使用管道实现生产者消费者模型*/
public class Test5 {final PipedInputStream pis = new PipedInputStream();final PipedOutputStream pos = new PipedOutputStream();{try {pis.connect(pos);} catch (IOException e) {e.printStackTrace();}}class Producer implements Runnable {@Overridepublic void run() {try {while(true) {Thread.sleep(1000);int num = (int) (Math.random() * 255);System.out.println(Thread.currentThread().getName() + "生产者生产了一个数字,该数字为: " + num);pos.write(num);pos.flush();} } catch (Exception e) {e.printStackTrace();} finally {try {pos.close();pis.close();} catch (IOException e) {e.printStackTrace();}}}}class Consumer implements Runnable {@Overridepublic void run() {try {while(true) {Thread.sleep(1000);int num = pis.read();System.out.println("消费者消费了一个数字,该数字为:" + num);}} catch (Exception e) {e.printStackTrace();} finally {try {pos.close();pis.close();} catch (IOException e) {e.printStackTrace();}}}}public static void main(String[] args) {Test5 test5 = new Test5();new Thread(test5.new Producer()).start();new Thread(test5.new Consumer()).start();}
}

参考

《Thinking In Java》

Java实现生产者和消费者的5种方式

Java中生产者和消费者总结相关推荐

  1. Java中生产者与消费者问题的演变

    想要了解更多关于Java生产者消费者问题的演变吗?那就看看这篇文章吧,我们分别用旧方法和新方法来处理这个问题. 生产者消费者问题是一个典型的多进程同步问题. 对于大多数人来说,这个问题可能是我们在学校 ...

  2. Java多线程-生产者与消费者

    Java多线程生产者与消费者,准确说应该是"生产者-消费者-仓储"模型,使用了仓储,使得生产者消费者模型就显得更有说服力. 对于此模型,应该明确一下几点: 1.生产者仅仅在仓储未满 ...

  3. kafka中生产者和消费者的分区问题

    本文来书说下kafka中生产者和消费者的分区问题 文章目录 概述 主题的分区数设置 分区与生产者 分区与消费者 range roundrobin(轮询) 本文参考 本文小结 概述 我们知道,生产者发送 ...

  4. java多线程生产者与消费者问题_Java多线程详解之四:生产者消费者问题

    一.问题描述 生产者消费者问题(Producer-Consumer problem),也称有限缓冲区问题(Bounded-buffer promblem),是一个多线程同步问题的经典案例.对于一个固定 ...

  5. java多线程生产者与消费者案例_多线程操作实例——生产者与消费者

    面对多线程学习生产者与消费者是最基本的实例 对于java后端开发的人员必须要掌握,还有考研考试计算机操作系统的同鞋. 下面是三个实例对于生产者与消费者的的例子,层层递进,逐步解决问题. 问题:生产者- ...

  6. Disruptor框架中生产者、消费者的各种复杂依赖场景下的使用总结-我见过最好的Disruptor

    更多高并发知识请访问 www.itkc8.com 非常感谢 https://www.cnblogs.com/pku-liuqiang/p/8544700.html Disruptor是一个优秀的并发框 ...

  7. java实现生产者和消费者 类比消息中间件

    一.对生产者消费者的理解 生产者消费者模式是并发.多线程编程中经典的设计模式. 简单来看,就是一个类负责生产,一个类负责消费.举例来说,一个变量,生产者不断增加这个变量,消费者不断减少这个变量.在互联 ...

  8. day20Java-Thread-多线程中生产者和消费者

    博客 Java-(高级) 文章目录 多线程生产者和消费者 多线程生产者消费者代码版本1 多线程生产者消费者代码版本2-同步解决问题 多线程生产者消费者代码版本3-等待唤醒机制解决问题 多线程生产者消费 ...

  9. java多线程生产者与消费者问题_java多线程实现生产者与消费者问题

    生产者与消费者多线程实现,首先的问题就是同步,就是关于临界资源的访问 我们首先来定义一个临界资源类,这里设为Q class Q { int z=4; } 这个int型的z就是我假设的临界资源的个数 然 ...

最新文章

  1. 网络优化常见专业术语详解
  2. POJ 3347 Kadj Squares(复杂的线段相交问题)
  3. java中的void是什么?有什么作用?
  4. SpringMVC流程图示
  5. datatable.js 服务端分页+fixColumns列固定
  6. 使用Puppeteer进行数据抓取(一)——安装和使用
  7. 发送带有接缝的活动邀请
  8. rails4 ajax 例子,Ajax和Rails 4:创建实例变量并更新视图而不刷新
  9. c语言标识符的文法表示,第三章文法和语法[lly]3.ppt
  10. pl sql mysql 版本_pl sql developer连oracle哪个版本的数据库都可以吗
  11. 技能树 Web前端/php/JavaWeb/数据库
  12. mysql基础语法之(全文索引)
  13. Camtasia实用技巧之智能聚焦
  14. 帆软复选框选中并打印(按某种格式打印)数据分析、报填可用
  15. Caffe学习:Layers
  16. 本两个Build工作总结
  17. 支持android 9的框架,【测评】安卓9 xposed框架 riru edxposed与太极·magisk对比
  18. 通达信手机版分时图指标大全_通达信精选指标——挣开眼就买卖版指标详解
  19. seo和sem的区别与联系
  20. delphi 安装控件时提示系统找不到指定的模块的解决

热门文章

  1. SpringCloud[04]Ribbon负载均衡服务调用
  2. 淘宝视频的跨模态检索
  3. LibUSB-Win32程序介绍
  4. cmd命令行进入D盘的方法
  5. java 过滤http请求头_JAVAWEB开发实现对请求头、请求参数的过滤
  6. CRNN中英文字符识别
  7. 有源电力滤波器并联三相apf matlab simulink仿真 谐波检测谐波补偿
  8. OnePiece 之 Asp.Net 菜鸟也来做开发(二)
  9. 1718 Cos的多项式
  10. [DB] From Leng,Oracle 数据库报ora-653 ora-01654错误解决办法