前置知识

此处以Java描述该问题,需要Java并发的知识,这里附上一些不错的教程:

  • Java Multitasking

Youtube上的一个教程,非常实战,能快速找到感觉。

  • The Java™ Tutorials - Concurrency

Oracle的并发官方教程,很全面,但不深入。

  • Java多线程系列目录(共43篇)

国人的Java并发系列教程,很全面,但有地方深入到源码,有的浅停在使用方法。

  • Java 7 并发编程指南中文版

免费的翻译书,没有看,据说不错。

问题描述

生产者-消费者(Producer-Consumer Problem)以下简称为PC问题。其描述以下问题:

  • 多个生产者生产产品,多个消费者消费产品,两者间有一个大小固定的缓冲区;
  • 生产者、消费者不可同时访问缓冲区;
  • 生产者不可向满缓冲区放产品;
  • 消费者不可从空缓冲区取产品。

信号量解法

public class PCSemaphore {private final static int BUFFER_SIZE = 10;public static void main(String[] args) {Semaphore mutex = new Semaphore(1);Semaphore full = new Semaphore(0);Semaphore empty = new Semaphore(BUFFER_SIZE);Queue<Integer> buffer = new LinkedList<>();Producer producer = new Producer(mutex, empty, full, buffer);Consumer consumer = new Consumer(mutex, empty, full, buffer);// 可以初始化多个生产者、消费者new Thread(producer, "p1").start();new Thread(producer, "p2").start();new Thread(consumer, "c1").start();new Thread(consumer, "c2").start();new Thread(consumer, "c3").start();}
}class Producer implements Runnable {private Semaphore mutex, empty, full;private Queue<Integer> buffer;private Integer counter = 0;public Producer(Semaphore mutex, Semaphore empty, Semaphore full, Queue<Integer> buffer) {this.mutex = mutex;this.empty = empty;this.full = full;this.buffer = buffer;}@Overridepublic void run() {while (true) {try {empty.acquire();mutex.acquire();} catch (InterruptedException e) {e.printStackTrace();}buffer.offer(counter++);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}full.release();mutex.release();}}
}class Consumer implements Runnable {private Semaphore mutex, empty, full;private Queue<Integer> buffer;public Consumer(Semaphore mutex, Semaphore empty, Semaphore full, Queue<Integer> buffer) {this.mutex = mutex;this.empty = empty;this.full = full;this.buffer = buffer;}@Overridepublic void run() {String threadName = Thread.currentThread().getName();while (true) {try {full.acquire();mutex.acquire();} catch (InterruptedException e) {e.printStackTrace();}Integer product = buffer.poll();int left = buffer.size();System.out.printf("%s consumed %d left %d%n", threadName, product, left);try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}empty.release();mutex.release();}}
}

教科书的解法,问题有三:

  1. 不可交换消费者中的full和mutex信号量的请求顺序,若我们交换他们则会有:

    同样地不可交换生产者中的empty和mutex信号量的请求顺序。
    即必须遵守先资源信号量,再互斥信号量的请求顺序。
    教科书中使用AND型信号量解决该问题,但这对程序员来说算是一个Dirty Solution,为什么呢,见问题2。

  2. 将进程控制部分和业务逻辑放在一起,这种代码看着混乱不堪。

  3. 效率低,Java中是不会用信号量来实现这东西,信号量在Java中常用来限制对一个共享资源的最大并发访问数。

注意,因为这里写的写的是一个小示例,所以我们没有考虑产品的生产、使用和入队、出队(放入、拿出缓存区的过程)。但实际中我们只需要将产品的入队和出队进行互斥即可,产品的生产和使用可并发执行,甚至在每个生产者、消费者内部可建立单独的缓冲区,暂存生产出来但还不能放到公共缓冲区的产品,直到可以放入公共缓冲区。

wait() & notify()

public class PCWaitNotify {public final static int BUFFER_SIZE = 10;public static void main(String[] args) {Object mutex = new Object();AtomicInteger counter = new AtomicInteger(0);Queue<Integer> buffer = new LinkedList<>();Producer producer = new Producer(buffer, counter, mutex);Consumer consumer = new Consumer(buffer, mutex);new Thread(producer).start();new Thread(producer).start();new Thread(consumer).start();}
}class Producer implements Runnable {private Random rand = new Random();private Queue<Integer> buffer;private AtomicInteger counter; // 支持原子操作的基本类型包装类private Object mutex;public Producer(Queue<Integer> buffer, AtomicInteger counter, Object mutex) {this.buffer = buffer;this.counter = counter;this.mutex = mutex;}@Overridepublic void run() {while (true) {synchronized (mutex) {while (buffer.size() == PCWaitNotify.BUFFER_SIZE) {try {mutex.wait();} catch (InterruptedException e) {e.printStackTrace();}}buffer.offer(counter.incrementAndGet());mutex.notify();}try {Thread.sleep(rand.nextInt(800));} catch (InterruptedException e) {e.printStackTrace();}}}
}class Consumer implements Runnable {private Random rand = new Random();private Queue<Integer> buffer;private Object mutex;public Consumer(Queue<Integer> buffer, Object mutex) {this.buffer = buffer;this.mutex = mutex;}@Overridepublic void run() {while (true) {synchronized (mutex) {while (buffer.size() == 0) {try {mutex.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("consumed " + buffer.poll() + " left " + buffer.size());mutex.notify();}try {Thread.sleep(rand.nextInt(500));} catch (InterruptedException e) {e.printStackTrace();}}}
}

以Java的底层并发API,wait()和notify()实现,效率虽高,但进程控制部分和业务逻辑同样混在一起,没有完全解决问题。

BlockingQueue

public class PCBlockingQueue {private final static int BUFFER_SIZE = 10;public static void main(String[] args) {BlockingQueue<Integer> buffer = new LinkedBlockingQueue<>(BUFFER_SIZE);AtomicInteger counter = new AtomicInteger(0);Producer producer = new Producer(buffer, counter);Consumer consumer = new Consumer(buffer);new Thread(producer).start();new Thread(producer).start();new Thread(consumer).start();}
}class Producer implements Runnable {private Random rand = new Random();private AtomicInteger counter;private BlockingQueue<Integer> buffer;public Producer(BlockingQueue<Integer> buffer, AtomicInteger counter) {this.buffer = buffer;this.counter = counter;}@Overridepublic void run() {while (true) {try {Thread.sleep(rand.nextInt(800));Integer product = counter.incrementAndGet();buffer.put(product);} catch (InterruptedException e) {e.printStackTrace();}}}
}class Consumer implements Runnable {private Random rand = new Random();private BlockingQueue<Integer> buffer;public Consumer(BlockingQueue<Integer> buffer) {this.buffer = buffer;}@Overridepublic void run() {while (true) {try {Thread.sleep(rand.nextInt(600));Integer product = buffer.take(); // 队列空时,会阻塞,直到有新元素,并将新元素返回。System.out.println("consumed " + product + " left " + buffer.size());} catch (InterruptedException e) {e.printStackTrace();}}}
}

你觉得像Java这样的企业级语言会不考虑到代码的功能分离问题吗(SRP原则)?Java中早就提供了一堆线程安全的数据结构,这里用了BlockingQueue,其他线程安全类参考java.util.concurrent包。

转载于:https://www.cnblogs.com/sequix/p/8776716.html

经典并发问题:生产者-消费者相关推荐

  1. 线程通信的经典问题:生产者消费者问题

    package ThreadTest2; // 线程通信的经典问题:生产者消费者//店员(资源) class Clerk{private int productCount = 0 ;public sy ...

  2. python进阶09并发之五生产者消费者

    原创博客地址:python进阶09并发之五生产者消费者 这也是实际项目中使用较多的一种并发模式,用Queue(JoinableQueue)实现,是Python中最常用的方式(这里的queue特指mul ...

  3. java多线程抽奖_java 线程池、多线程并发实战(生产者消费者模型 1 vs 10) 附案例源码...

    导读 前二天写了一篇<Java 多线程并发编程>点我直达,放国庆,在家闲着没事,继续写剩下的东西,开干! 线程池 为什么要使用线程池 例如web服务器.数据库服务器.文件服务器或邮件服务器 ...

  4. Java多线程之并发协作生产者消费者设计模式

    转载请注明出处:blog.csdn.net/linglongxin- 两个线程一个生产者个一个消费者 需求情景 两个线程,一个负责生产,一个负责消费,生产者生产一个,消费者消费一个 涉及问题 同步问题 ...

  5. [19/04/11-星期四] 多线程_并发协作(生产者/消费者模式_2种解决方案(管程法和信号灯法))...

    一.概念 多线程环境下,我们经常需要多个线程的并发和协作.这个时候,就需要了解一个重要的多线程并发协作模型"生产者/消费者模式". Ø 什么是生产者? 生产者指的是负责生产数据的模 ...

  6. Linux 系统应用编程——多线程经典问题(生产者-消费者)

    "生产者--消费者"问题是Linux多线程编程中的经典问题,主要是利用信号量处理线程间的同步和互斥问题. "生产者--消费者"问题描述如下: 有一个有限缓冲区( ...

  7. mysql服务器多线程模型_java 线程池、多线程并发实战(生产者消费者模型 1 vs 10) 附案例源码 - 陈彦斌 - 博客园...

    导读 前二天写了一篇<Java 多线程并发编程>点我直达,放国庆,在家闲着没事,继续写剩下的东西,开干! 线程池 为什么要使用线程池 例如web服务器.数据库服务器.文件服务器或邮件服务器 ...

  8. 2.3.6 操作系统之进程同步与互斥经典问题(生产者-消费者问题、多生产者-多消费者问题、吸烟者问题、读者-写者问题、哲学家进餐问题)

    文章目录 0.前言 1.生产者-消费者问题 (1)问题描述 (2)问题分析 (3)如何实现? (4)实现互斥的P操作一定要在实现同步的P操作之后 (5)知识回顾与重要考点 2.多生产者-多消费者问题 ...

  9. Java 并发(生产者/消费者 模式)

    >生产者/消费者 模式角色:生产者,消费者都是线程,两者中间是容器,容器内部是产品. 要求: 容器 里面要定义容量 容器 往里面添加(满时等待) 或者 从里面删除(空时等待) ,都要是阻塞的(等 ...

  10. 经典进程同步问题——生产者消费者问题

    问题描述 "生产者-消费者"问题 (producer/consumer problem) 是最著名的进程同步问题. 该问题描述了共享固定大小缓冲区的两个线程--即所谓的" ...

最新文章

  1. ihtml2document能不能根据id获取dom_使用DOM进行XML文件的解析
  2. docker添加新的环境变量_Docker环境变量
  3. 机器学习与计算机视觉(FPGA的图像处理方法)
  4. jmeter获得Response Headers,Response Body里的值
  5. 了解OutOfMemoryError异常 - 深入Java虚拟机读后总结
  6. 【foobar 2000】如何在手机、电脑上播放局域网内另一台电脑上存储的音乐?FTP服务器、UPnP/DLNA协议、构建Music server、创建音乐服务器
  7. 微信调试弹出报错信息
  8. Sql server中 如何用sql语句创建视图
  9. viper4android md,【xposed】微信主题模块(MDWechat)v3.5.0
  10. 移动apn接入点哪个快_千兆交换机和快速以太网交换机哪个更好呢?
  11. mysql 错误 1548_mysql报错1548-Cannot load from mysql.proc. The table is probably corrupted
  12. 汇编语言里 eax, ebx, ecx, edx, esi, edi, ebp, esp这些都是什么意思啊?
  13. 重磅|云迹科技获金茂资本、携程集团、光控众盈,海银资本联合投资...
  14. awd的批量脚本 pwn_北极星杯 awd复现
  15. STEP7主站与远程I/O组网_过路老熊_新浪博客
  16. 天猫,淘宝,京东收货信息中,自动识别手机号、姓名、省市区
  17. 启赟金融 CTO 马连浩:跨境支付系统架构
  18. 妻子决定你未来事业的高度,男的看一看,女的学一学
  19. 高分辨率光学遥感图像水体分类综述2022.03
  20. 用moment.js对UTC和北京时间互转,格式化。

热门文章

  1. Centos 7 Puppet之foreman介绍安装测试
  2. MySQL5.7 踩坑实录
  3. Oozie分布式任务的工作流——Sqoop篇
  4. 基于 opencv图像去噪
  5. C#数组和集合专题2(Array)
  6. x201 温度过高 反应慢 硬盘搜索时更慢更热 为什么呢?
  7. Python连接MySQL的实例代码
  8. 炼丹手册——学习率设置
  9. Aix下运行java程序_AIX上部署java项目
  10. linux浮动ip添加 手动,在Linux 双机下自己手动实现浮动ip技术