日常开发中,我们会经常跟日志系统打交道,日志系统算是典型的生产者消费者模型的系统。(多个不同的)生产者线程负责生产日志消息,这些系统扮演者生产者的角色,日志系统负责将消息队列里的日志上传到服务器,属于消费者角色。是一种多生产者单消费者模式的设计方式。如果生产者的生产速度大于LoggerThread 的消费速度,则BlockingDeque线程会阻塞生产者,直到LoggerThread 有能力处理新的消息。

不支持关闭的生产者-消费者日志服务(bug版本)

public class LogService {private final BlockingQueue<String> queue;private final LoggerThread loggerThread;public LogService() {//注意队列的容量为10queue = new LinkedBlockingQueue<>(10);loggerThread = new LoggerThread();}public void start() {loggerThread.start();}/*** 生产者生产消息,每个log的调用者都行相当于一个生产者** @param msg* @throws InterruptedException*/public void log(String msg) throws InterruptedException {queue.put(msg);}/*** 消费者线程消费消息,该线程扮演的是消费者这一角色。*/private class LoggerThread extends Thread {public  LoggerThread(){}public void run() {try {while (true) {//无限循环System.out.println(queue.take());}//end while} catch (InterruptedException e) {//在循环外响应中断} finally {System.out.println("log service is close");}}}}

上面代码看起来很正常,但是有如下缺点:
1、LoggerThread没有提供关闭的方法,当没有生产者生产消息的时候,那么LoggerThread在调用queue.take()的时候会一直处于阻塞状态,导致JVM没法正常关闭。比如下面的调用代码,就会发生LoggerThread无法关闭的情况。

 public static void main(String[] args) {LogService logService = new LogService();for(int i=0;i<100;i++) {final int index =i;new Thread(new Runnable() {@Overridepublic void run() {try {logService.log("生产者生产消息===="+index);} catch (InterruptedException e) {System.out.println("response interception");}}}).start();}logService.start();}

关闭LoggerThread很容易啊,只需要把LoggerThread的方法改成如下就可以了吧?

 public void run() {try {while (!queue.isEmpty()) {//判断队列是否为空,如果为空则关闭System.out.println(queue.take());}} catch (InterruptedException e) {System.out.println("response interception");} finally {System.out.println("log service is close="+queue.size());}}

上面的改动中,我们将while(true)改成了while(!queue.isEmpty()).可以吗?答案是完全不可以!在我们这个demo中,队列的大小是有限制的,如果消费者的消费速度大于生产者的速度,那么队列在某一瞬间就变成空了,也就是queue.isEmpty()为true,此时消费者关闭,但是生产者可以继续生产,则队列满的情况下生产者会阻塞。取消一个生产者-消费者模型的最可靠的方法是需要同时取消生产者和消费者,因为当单独的关闭消费者的时候,会阻塞生产者;当单独的关闭生产者的时候,会阻塞消费者。其关闭原则应该是:我们应该提供一个Stop方法,调用stop方法的时候表示日志服务已经关闭,此时生产者不可以再生产消息,当消费者消费完队列里的消息后才正式关闭日志服务,避免造成日志的丢失,

且看下面代码,我们为LogService提供了stop方法,并且修改了log方法,当isShutDown为true的时候,就不能在生产消息。然而这看似完美的操作却有个巨大漏洞,stop方法不是原子的,if(!isShutdown)也不是原子的,使得关闭方法并不可靠

  private boolean isShutdown=false;public void stop(){isShutdown=true;}public void log(String msg) throws InterruptedException {if(!isShutdown){queue.put(msg);}}

所以我们需要改造。使之成为原子的即可。我们改造如下

支持关闭的生产者-消费者日志服务(不安全版本)

使用synchronized关键字,将stop改造成原子的,需要注意的是,因为put方法本身就可以阻塞,所以我们不需要在消息加入队列的时候再去持有一个锁,所以我们将put方法放在了synchronized语句块的外面。

private boolean isShutDown = false;public void stop(){synchronized(this) { isShutDown=true;}}/*** 生产者生产消息,每个log的调用者都行相当于一个生产者** @param msg* @throws InterruptedException*/public void log(String msg) throws InterruptedException {synchronized(this) {if(isShutDown) {return;}}queue.put(msg);       }

经过这么以改进,当我们调用stop的时候,则生产者线程不会继续生产消息。我们可以将LoggerThread改造如下:

 public void run() {try {while (true) {if(isShutDown) {//如果关闭则退出break;}System.out.println(queue.take());}} catch (InterruptedException e) {//注意我们这里实在while循环外响应中断的。System.out.println("response interception");} finally {System.out.println("log service is close="+queue.size());}}

那么这样就没问题了吗?答案是问题大了去了。比如我们调用了stop方法,此时LoggerThread消费者正好在queue.take()阻塞着,那么if(isShutDown) {break}则永远不会执行,LoggerThread是无法退出的。正确的做法是我们需要再次对stop方法进行改造,改造如下:

    public void stop(){synchronized(this) { isShutDown=true;}loggerThread.interrupt();}

我们添加了loggerThread.interrupt();,此时take方法发生阻塞时会响应中断,因为我们在while循环之外响应中断的,则跳出while循环,执行打印如下:

  public static void main(String[] args) {LogService logService = new LogService();for(int i=0;i<90;i++) {final int index =i;Thread thread=new Thread(new Runnable() {@Overridepublic void run() {             try {               logService.log("生产者生产消息===="+index); } catch (InterruptedException e) {System.out.println("response interception");}               }       });thread.start();}logService.start();try {Thread.sleep(2000);} catch (InterruptedException e) {        }//两秒之后关闭日志系统logService.stop();}

执行打印如下:

response interception
log service is close=0

那么到此为止是不是就没问题了呢?当然仍然有问题,如果发生中断的时候,我们在while循环之外响应中断,消息队列中仍然有消息,我们就没法继续处理剩下的日志,那么日志服务的关闭势必造成日志的丢失,如果是比较关键的日志信息丢失了,我们都没地方哭去。

所以我们的日志服务系统仍然有优化的空间。

支持关闭的生产者-消费者日志服务(安全版本)

我们的目标是生产者生产多少消息,我们的日志系统就处理多少消息。当日志服务关闭的时候,生产者不能生产消息。消费者在处理完所有消息后,自行关闭。
为此我们继续改造,使用一个变量logCount,每调用一次log则logCount+1;注意logCount递增操作需要是原子的。
为此我们对log方法进行改造,改造如下:

    private int logCount=0;public void log(String msg) throws InterruptedException {synchronized(this) {if(isShutDown) {return;}//消息递增logCount++;}queue.put(msg);}

处理完了成产者,我们需要对消费者进行改造,注意LoggerThread之前的消费者捕获InterruptedException 时,是在while循环之外,我们现在需要放在循环之内。这样我们才能正确处理中断,确保队列里的消息处理完毕,改造后的LoggerThread如下:

到此为止,本篇博文就结束了,在读《Java并发编程实战》这一章节的时候,还有点疑问为什么这么写。直到写完了这篇博客,边写博客边敲程序进行验证,才算是彻底了解了其中的意义。果然写博客还是很有帮助的。起码这边博客对博主自己深入理解Java并发编程,有很大帮助。

完整的代码如下:

package log;import java.io.PrintWriter;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;public class LogService {private final BlockingQueue<String> queue;private final LoggerThread loggerThread;public LogService() {queue = new LinkedBlockingQueue<>(10);loggerThread = new LoggerThread();}public void start() {loggerThread.start();}private boolean isShutDown = false;public void stop() {synchronized (this) {isShutDown = true;}loggerThread.interrupt();}private int logCount = 0;public void log(String msg) throws InterruptedException {synchronized (this) {if (isShutDown) {return;}logCount++;}queue.put(msg);}/*** 消费者线程消费消息,该线程扮演的是消费者这一角色。*/private class LoggerThread extends Thread {public LoggerThread() {}public void run() {try {while (true) {try {synchronized (LogService.this) {if (isShutDown && logCount == 0) {break;}}System.out.println(queue.take());synchronized (LogService.this) {logCount--;}} catch (InterruptedException e) {System.out.println("相应中断,while循环继续执行,除非满足退出条件");}}//end while} finally {//资源额释放System.out.println("log service is close=" + queue.size());}}}public static void main(String[] args) {LogService logService = new LogService();for (int i = 0; i < 90; i++) {final int index = i;Thread thread = new Thread(new Runnable() {@Overridepublic void run() {try {logService.log("生产者生产消息====" + index);} catch (InterruptedException e) {System.out.println("response interception");}}});thread.start();}logService.start();try {Thread.sleep(2000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}logService.stop();}
}

Java并发编程实战之基于生产者消费者模式的日志服务读书笔记相关推荐

  1. 并发编程(五)——生产者消费者模式

    2019独角兽企业重金招聘Python工程师标准>>> 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据 ...

  2. Java并发编程实战_不愧是领军人物!这种等级的“Java并发编程宝典”谁能撰写?...

    前言 大家都知道并发编程技术就是在同一个处理器上同时的去处理多个任务,充分的利用到处理器的每个核心,最大化的发挥处理器的峰值性能,这样就可以避免我们因为性能而产生的一些问题. 大厂的核心负载肯定是非常 ...

  3. Java并发编程实战~生产者-消费者模式

    前面我们在<Worker Thread 模式>中讲到,Worker Thread 模式类比的是工厂里车间工人的工作模式.但其实在现实世界,工厂里还有一种流水线的工作模式,类比到编程领域,就 ...

  4. Java并发编程实战————Executor框架与任务执行

    引言 本篇博客介绍通过"执行任务"的机制来设计应用程序时需要掌握的一些知识.所有的内容均提炼自<Java并发编程实战>中第六章的内容. 大多数并发应用程序都是围绕&qu ...

  5. 【极客时间】《Java并发编程实战》学习笔记

    目录: 开篇词 | 你为什么需要学习并发编程? 内容来源:开篇词 | 你为什么需要学习并发编程?-极客时间 例如,Java 里 synchronized.wait()/notify() 相关的知识很琐 ...

  6. 《Java 并发编程实战》--读书笔记

    Java 并发编程实战 注: 极客时间<Java 并发编程实战>–读书笔记 GitHub:https://github.com/ByrsH/Reading-notes/blob/maste ...

  7. java并发编程实战学习(3)--基础构建模块

    转自:java并发编程实战 5.3阻塞队列和生产者-消费者模式 BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和poll方法.如果队列已经满了,那么put ...

  8. 视频教程-Java并发编程实战-Java

    Java并发编程实战 2018年以超过十倍的年业绩增长速度,从中高端IT技术在线教育行业中脱颖而出,成为在线教育领域一匹令人瞩目的黑马.咕泡学院以教学培养.职业规划为核心,旨在帮助学员提升技术技能,加 ...

  9. Java并发编程实战读书笔记

    Java并发编程 标签(空格分隔): 并发 多线程 基础 线程 在执行过程中,能够执行程序代码的一个执行单元,在Java语言中,线程有四种状态:运行,就绪,挂起,结束. 并发特性 原子性 一个操作不会 ...

  10. 《Java并发编程实战》读书笔记

    Subsections  线程安全(Thread safety) 锁(lock) 共享对象 对象组合 基础构建模块 任务执行 取消和关闭 线程池的使用 性能与可伸缩性 并发程序的测试 显示锁 原子变量 ...

最新文章

  1. 模板 - 二分图(包含全套常用定理性质)
  2. 着眼未来!2019未来杯高校AI挑战赛圆满落幕
  3. php音译汉字,PHP中的西里尔语音译
  4. Java 动态代理及 RPC 框架介绍
  5. 地图索引 R-tree
  6. mvc 职能划分_MVC架构的职责划分原则
  7. IEPNGFix:Unclickable children of element 解决办法
  8. php自动安装myqsl,php – 在自制的小牛上安装MySQL麻烦
  9. idea关于连接mysql数据库异常解决的方法
  10. C# 第三方控件 错误 LC-1
  11. excel打开2个独立窗口_喜欢用华为手机拍照,记得打开这2个开关,能让照片更加清晰...
  12. 极速办公(excel)字体如何改为斜体
  13. 掌阅科技净利大增却遇跌停?到底是有利好还是利空?
  14. 头豹研究院发布《2022年中国数据库产品策略解析报告》
  15. mysql8对系统的要求_linux-mysql8的安装步骤详解及需要注意的坑
  16. 一个年化收益30%的指数
  17. kafka broker监听多个IP地址
  18. Ftest(F检验,P值求取)
  19. 篇3:嵌入式系统和嵌入式操作系统
  20. Memory loss【记忆缺失】

热门文章

  1. linux下mysql中文乱码_linux下mysql中文乱码
  2. process 类 java_Process 执行命令行Java封装类详解
  3. 软件测试测试用例编写 不超过7步骤_软件测试(功能、接口、性能、自动化)详解...
  4. 51单片机C语言堆栈,《单片机C语言试题》(一)20101027
  5. wextend matlab,小波学习之三(多孔算法与MATLAB swt剖析)转载
  6. JavaScript cookie js cookie设置
  7. 算法笔记_面试题_16. 二叉树相关_模板及示例十几道
  8. C++_智能指针shared_ptr、unique_ptr、weak_ptr、auto_ptr总结
  9. 15-2 mysql的数据类型
  10. java类 (二):内部类