**

前言

**

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。


以下这些解法,其实本质上都是实现了一个阻塞队列。为空,则消费者阻塞,满了,则生产者阻塞。

**

1.使用wait()和notify()实现

**

这也是最简单最基础的实现,缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。

public static void testProductConsumeByWaitAndNotify() {final int size = 10;final Queue<String> queue = new ArrayDeque<String>(size);final Object lock = new Object();Runnable producer = new Runnable() {public void run() {for(int i=0;i<30;i++) {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String msg = "消息:"+i;//队列未满,一直往里放消息synchronized (lock) {while (size == queue.size()) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}queue.offer(msg);lock.notifyAll();}System.out.println(msg+" 已发送");}}};Runnable consumer = new Runnable() {public void run() {while (true) {try {Thread.sleep(200);} catch (InterruptedException e1) {e1.printStackTrace();}synchronized (lock) {while (queue.size() == 0) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}String msg = queue.poll();System.out.println(msg+"已消费");lock.notifyAll();}}}};new Thread(producer).start();new Thread(producer).start();new Thread(producer).start();new Thread(consumer).start();new Thread(consumer).start();}

**

2.可重入锁ReentrantLock的实现

**

java.util.concurrent.lock 中的 Lock 框架是锁定的一个抽象,通过对lock的lock()方法和unlock()方法实现了对锁的显示控制,而synchronize()则是对锁的隐性控制。
可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响,简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,函数调用结束计数器就减1,然后锁需要被释放两次才能获得真正释放。已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞。

ReentrantLock的Condition:

//阻塞当前线程,直到收到通知或者被中断(将当前线程加入到当前Condition对象的等待队列里)
//Block until signalled or interrupted
public final void await() throws InterruptedException;/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
* 把在当前Condition对象的等待队列里的等待最久的线程,转移到当前Lock的等待队列里
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
*         returns {@code false}
*/
public final void signal() ;

ReentrantLock实现生产消费模型:

public static void testProductConsumeByLock() {final Lock lock = new ReentrantLock();final Condition empty = lock.newCondition();final Condition full = lock.newCondition();final int size = 10;final Queue<String> queue = new ArrayDeque<String>(size);Runnable producer = new Runnable() {public void run() {try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}for(int i=0;i<20;i++) {lock.lock();try {if(queue.size() == size) {try {full.await();} catch (InterruptedException e) {e.printStackTrace();}}String msg = "生产消息:"+i;queue.add(msg);System.out.println(msg);empty.signal();} finally {lock.unlock();}}}};Runnable consumer = new Runnable() {public void run() {try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}while (true) {lock.lock();try {if(queue.isEmpty()) {try {empty.await();} catch (InterruptedException e) {e.printStackTrace();}}else {String msg = queue.remove();System.out.println(msg + "已消费");full.signal();}} finally {lock.unlock();}}}};new Thread(producer).start();new Thread(producer).start();new Thread(producer).start();new Thread(consumer).start();new Thread(consumer).start();}

**

3.阻塞队列BlockingQueue实现

**

BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

  1. 当队列满了的时候进行入队列操作
  2. 当队列空了的时候进行出队列操作

因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。
从上可知,阻塞队列是线程安全的。

下面是BlockingQueue接口的一些方法:

操作 抛异常 特定值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除 remove(o) poll(o) take(o) poll(timeout, timeunit)
检查 element(o) peek(o)

这四类方法分别对应的是:

  1. ThrowsException:如果操作不能马上进行,则抛出异常
  2. SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
  3. Blocks:如果操作不能马上进行,操作会被阻塞
  4. TimesOut:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false

下面来看由阻塞队列实现的生产消费模型,这里我们使用take()和put()方法,这里生产者和生产者,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象

/*** 生产者消费者* 使用阻塞队列实现* @throws InterruptedException */public static void testProductConsumeByBlockingQueue() throws InterruptedException {//因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。
//      final BlockingQueue<String> queue = new SynchronousQueue<String>(true);//使用有界阻塞队列final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);Runnable producer = new Runnable() {public void run() {for(int i=0;i<100;i++) {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String msg = "消息:"+i;try {queue.put(msg);} catch (InterruptedException e1) {e1.printStackTrace();}System.out.println(msg+" 已发送");}}};Runnable consumer = new Runnable() {public void run() {while (true) {try {Thread.sleep(200);} catch (InterruptedException e1) {e1.printStackTrace();}String msg = null;try {msg = queue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(msg+"已消费");}}};new Thread(producer).start();new Thread(consumer).start();}

**

4.信号量Semaphore的实现

**
信号量可以控制访问相应资源的线程的数量,从而实现生产消费模型

import java.util.concurrent.Semaphore;public class BySemaphore {int count = 0;final Semaphore put = new Semaphore(5);// 初始令牌个数final Semaphore get = new Semaphore(0);final Semaphore mutex = new Semaphore(1);   //该信号量相当于锁public static void main(String[] args) {BySemaphore bySemaphore = new BySemaphore();new Thread(bySemaphore.new Producer()).start();new Thread(bySemaphore.new Consumer()).start();new Thread(bySemaphore.new Consumer()).start();new Thread(bySemaphore.new Producer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 5; i++) {try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}try {put.acquire();// 注意顺序mutex.acquire();count++;System.out.println("生产者" + Thread.currentThread().getName()+ "已生产完成,商品数量:" + count);} catch (Exception e) {e.printStackTrace();} finally {mutex.release();get.release();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 5; i++) {try {Thread.sleep(1000);} catch (InterruptedException e1) {e1.printStackTrace();}try {get.acquire();// 注意顺序mutex.acquire();count--;System.out.println("消费者" + Thread.currentThread().getName()+ "已消费,剩余商品数量:" + count);} catch (Exception e) {e.printStackTrace();} finally {mutex.release();put.release();}}}}
}

**

5.使用消息队列

**

这个是取巧的办法,直接使用现成的消息中间件服务(如RocketMq、RabbitMq、Kafka等),分分钟搞定。手动微笑~~

Java实现生产消费模型的5种方式相关推荐

  1. java 线程“生产/消费”模型1

    /*资源类*/ public class ShareValue {private int total;public ShareValue(int total){this.total=total;}// ...

  2. java线程“生产/消费”模型2

    /* 资源类 */ class ShareValue {private int total;//判断对象是否为空private boolean isEmpty=true;//判断对象是否已满priva ...

  3. JAVA解决生产消费者_Java常用三种方式解决生产者消费者问题(详细)

    package test; /** * Synchronized 版本解决生产者消费者 * wait() / notify()方法 */ import java.util.LinkedList; im ...

  4. 生产-消费模型之阻塞队列的源码分析

    文章目录 前言 阻塞队列API 存放元素 boolean add(E e) boolean offer(E e) boolean offer(E e, long timeout, TimeUnit u ...

  5. java kafka设置偏移量_kafka实战宝典:手动修改消费偏移量的两种方式

    kafka实战宝典:手动修改消费偏移量的两种方式 工作中遇到过消费端报错的问题:包括数据Invalid Message和Failed_to_UNcompress等报错信息,导致消费端的iterator ...

  6. Java异步非阻塞编程的几种方式

    简介: Java异步非阻塞编程的几种方式 一. 从一个同步的Http调用说起 一个很简单的业务逻辑,其他后端服务提供了一个接口,我们需要通过接口调用,获取到响应的数据. 逆地理接口:通过经纬度获取这个 ...

  7. 用java自己实现代码阻塞的几种方式

    用java自己实现代码阻塞的几种方式 假如有一个场景,当代码获取的变量不为期待值的时候需要等待变量变为期待值再往下执行,最开始可能会考虑通过死循环+线程睡眠来实现,但是这样子毕竟不太合理.可以通过以下 ...

  8. 【java】Java运行时动态生成类几种方式

    1.概述 转载:Java运行时动态生成类几种方式 这里发现自己不知道的,原来Java 还能自己编译自己,学到了. 最近一个项目中利用规则引擎,提供用户拖拽式的灵活定义规则.这就要求根据数据库数据动态生 ...

  9. java的如何创建js_[Java教程]JS创建事件的三种方式(实例)

    [Java教程]JS创建事件的三种方式(实例) 0 2016-05-11 14:00:16 1.普通的定义方式 οnclick="Sfont=prompt('请在文本框中输入红色','红色' ...

最新文章

  1. shiro的简单入门使用
  2. JS运行机制(浏览器内核)
  3. 使用vivado进行逻辑开发时,进行到Generate Bitstream时报错
  4. Java 8状态更新
  5. python中的字体英文名_获取中文字体的英文名字
  6. java 1是flase_Java这段代码为什么会返回 false?传入的是字符串[1,1]
  7. 【AD】如何删除AD20右下角Title
  8. 找一份高薪的AI工作有多难?
  9. git安装 苹果笔记本_自己挖的坑自己填,无光驱安装苹果笔记本双系统
  10. 上周热点回顾(11.11-11.17)
  11. 超市登录系统 java_超市订单管理系统,登录功能实现
  12. 关于C#GB2312编码问题
  13. JavaScript和TypeScript学习心得
  14. 一个更加强大的查壳工具, 更新版本
  15. Elastic: ILM与rollover的关系
  16. 华中科技大学计算机与网络,华中科技大学计算机通信与网络实验报告-基于NS2的协议分析实验...
  17. 冻结训练和解冻训练的区别
  18. MM,这是我第一次给你写的Blog,用一首《那一夜》开始吧
  19. HTML - 伪元素
  20. 一季度理赔报告显示,恶性肿瘤仍为主要风险,年金险、终身寿险备受青睐 | 美通社头条...

热门文章

  1. sql 之like 和通配符%,_(mysql)
  2. 小鱼便签_同样是写便签,这样更酷
  3. python opencv 图像切割_【OpenCV+Python】图像的基本操作与算术运算
  4. 铜线越长发电量越多,6千米长的铜线能让电灯泡发光吗?
  5. 这是哪里来的小妖精!!!
  6. 高糊马赛克秒变高清,表情帝:这还是我吗?
  7. 大道至简,大数据的小窍门
  8. 主成分分析和因子分析十大不同点
  9. python删除列表中的重复值_如何从 Python 列表中删除重复项
  10. 密西根州立大学计算机qs分数,2020年QS世界大学排名密歇根州立大学排名第144