使用LinkedBlockingQueue来实现生产者消费者的例子
工作中,经常有将文件中的数据导入数据库的表中,或者将数据库表中的记录保存到文件中。为了提高程序的处理速度,可以设置读线程和写线程,这些线程通过消息队列进行数据交互。本例就是使用了LinkedBlockingQueue来模仿生产者线程和消费者线程进行数据生产和消费。
为了方便,这些不同的类被写在了一个类中,实际使用的时候,可以单独拆开,举一反三地使用。
以下是例子:
LinkedBlockingQueueDemo.java
import java.util.Date; import java.util.Random; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit;public class LinkedBlockingQueueDemo {// 生产者线程数量private final static int providerThreadAmount = 5;// 记录每一个生产者线程是否处理完毕的标记private static boolean[] providerDoneFlag = new boolean[providerThreadAmount];// 整个所有的生产者线程全部结束的标记private static boolean done = false;// 一个线程安全的队列,用于生产者和消费者异步地信息交互private static LinkedBlockingQueue<String> linkedBlockingQeque = new LinkedBlockingQueue<String>();static class ProviderThread extends Thread {private Thread thread;private String threadName;private int threadNo;public ProviderThread(String threadName2, int threadNo) {this.threadName = threadName2;this.threadNo = threadNo;}public void start() {if (thread == null) {thread = new Thread(this, threadName);}thread.start();System.out.println((new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName());}@Overridepublic void run() {int rows = 0;for (int i = 0; i < 100; i++) {String string = String.format("%s-%d-%s", threadName, i, Thread.currentThread().getName());// offer不会去阻塞线程,put会//linkedBlockingQeque.offer(string); linkedBlockingQeque.put(string);rows++;/** try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch* (InterruptedException e) { e.printStackTrace(); }*/}// 本线程处理完毕的标记LinkedBlockingQueueDemo.providerDoneFlag[threadNo] = true;System.out.println((new Date().getTime()) + " " + threadName + " end. total rows is " + rows + "\t"+ Thread.currentThread().getName());}}static class ConsumerThread implements Runnable {private Thread thread;private String threadName;public ConsumerThread(String threadName2) {this.threadName = threadName2;}public void start() {if (thread == null) {thread = new Thread(this, threadName);}thread.start();System.out.println((new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName());}@Overridepublic void run() {int rows = 0;// 生产者线程没有结束,或者消息队列中有元素的时候,去队列中取数据while (LinkedBlockingQueueDemo.getDone() == false || linkedBlockingQeque.isEmpty() == false) {try {//在甘肃电信的实际应用中发现,当数据的处理量达到千万级的时候,带参数的poll会将主机的几百个G的内存耗尽,jvm会提示申请内存失败,并将进程退出。网上说,这是这个方法的一个bug。//String string = linkedBlockingQeque.poll(3, TimeUnit.SECONDS);String string = linkedBlockingQeque.poll();if (string == null) {continue;}rows++;System.out.println((new Date().getTime()) + " " + threadName + " get msg from linkedBlockingQeque is "+ string + "\t" + Thread.currentThread().getName());/** try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch* (InterruptedException e) { e.printStackTrace(); }*/} catch (InterruptedException e) {e.printStackTrace();}}System.out.println((new Date().getTime()) + " " + threadName + " end total rows is " + rows + "\t"+ Thread.currentThread().getName());}}public static synchronized void setDone(boolean flag) {LinkedBlockingQueueDemo.done = flag;}public static synchronized boolean getDone() {return LinkedBlockingQueueDemo.done;}public static void main(String[] args) {System.out.println((new Date().getTime()) + " " + "process begin at " + Thread.currentThread().getName());System.out.println((new Date().getTime()) + " " + "linkedBlockingDeque.hashCode() is " + linkedBlockingQeque.hashCode());// 启动若干生产者线程for (int i = 0; i < providerThreadAmount; i++) {String threadName = String.format("%s-%d", "ProviderThread", i);ProviderThread providerThread = new ProviderThread(threadName, i);providerThread.start();}// 启动若干个消费者线程for (int i = 0; i < 10; i++) {String threadName = String.format("%s-%d", "ConsumerThread", i);ConsumerThread consumerThread = new ConsumerThread(threadName);consumerThread.start();}// 循环检测生产者线程是否处理完毕do {for (boolean b : providerDoneFlag) {if (b == false) {/** try { Thread.sleep(3 * 1000); System.out.println((new Date().getTime()) +* " "+"sleep 3 seconds. linkedBlockingQeque.size() is "+linkedBlockingQeque.* size() + "\t" + Thread.currentThread().getName()); } catch* (InterruptedException e) { e.printStackTrace(); }*/// 只要有一个生产者线程没有结束,则整个生产者线程检测认为没有结束break;}LinkedBlockingQueueDemo.setDone(true);}// 生产者线程全部结束的时候,跳出检测if (LinkedBlockingQueueDemo.getDone() == true) {break;}} while (true);System.out.println((new Date().getTime()) + " process done successfully\t" + Thread.currentThread().getName());} }
结果略。
转载于:https://www.cnblogs.com/babyha/p/9765846.html
使用LinkedBlockingQueue来实现生产者消费者的例子相关推荐
- Linux 生产者消费者简单例子学习
#if 0Linux实现生产者消费者模型1. 防止虚假唤醒2. 唤醒线程的时机很重要,否则会导致线程多次访问锁,影响性能 #endif#include <unistd.h> #includ ...
- java consumed_Java设计模式—生产者消费者模式(阻塞队列实现)
生产者消费者模式是并发.多线程编程中经典的 真实世界中的生产者消费者模式 生产者和消费者模式在生活当中随处可见,它描述的是协调与协作的关系.比如一个人正在准备食物(生产者),而另一个人正在吃(消费者) ...
- python多线程实现生产者消费者_用Python实现多线程“生产者-消费者”模型的简单例子...
用 Python 实现多线程"生产者 - 消费者"模型的简单例子 生产者消费者问题是一个著名的线程同步问题, 该问题描述如下: 有一个生产者在生产产品, 这些产品将提供给若干个消费 ...
- 【Java】线程通信的例子:用两个线程打印 1-100;生产者消费者问题
wait()和sleep()的区别 面试题:sleep() 和 wait()的异同? 相同点:一旦执行方法,都可以使得当前的线程进入阻塞状态. 不同点: 1)两个方法声明的位置不同:Thread类中声 ...
- LinkedBlockingQueue 实现生产者消费者模型
并发编程栏目代码 GitHub package 地址: 点击打开链接 博客并发编程栏目 : 点击打开链接 实现 LinkedBlockingQueue是一个基于已链接节点的.范围任意的blocking ...
- 用阻塞队列LinkedBlockingQueue实现生产者消费者先进先出
LinkedBlockingQueue是一个基于已链接节点的.范围任意的blocking queue的实现. 由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生 ...
- 多线程的实际应用-生产者与消费者的例子
多线程的实际应用-生产者与消费者的例子 最近在恶补java多线程的相关知识,正好<java高手真经>课后有一道类似的题目,就拿过来编了一下,在纠结了一阵后,终于写完了,虽然中途瞄了好几眼前 ...
- java实现的PV操作经典例子:读者写者、贪睡的理发师、生产者消费者。
其中读者写者和贪睡的理发师使用的Semaphore类:生产者消费者使用的是管程. 读者写者 class Semaphore {int value;public Semaphore(int v){thi ...
- 几个java实现的PV操作经典例子:读者写者、贪睡的理发师、生产者消费者
其中读者写者和贪睡的理发师使用的Semaphore类:生产者消费者使用的是管程. 读者写者: class Semaphore {int value;public Semaphore(int v){th ...
最新文章
- 软件测试人员必备Linux命令(初、中、高级)
- VirtualProtect VirtualLock VirtualUnlock
- EMQ源码之--EMQ的启动
- 基于SharePoint 的企业信息平台架构
- 手机界面常见的的九宫格
- windows c++ cjson 使用_cJSON源码剖析
- mysql优化 my.cnf_MySQL性能调优my.cnf详解
- 机器学习-KMeans聚类 K值以及初始类簇中心点的选取
- 实战-Android开机时间优化
- 学校计算机和网络保密管理规定,计算机信息系统安全保密管理规定
- 上传JSPX文件绕过网站后缀名检查
- android打开系统文件怎么打开方式,Android调用系统应用打开任意文件
- ATF:Gicv源码文件系列-gic_common.h
- java 工作业绩_个人年度工作总结报告java
- 概率算法-均匀分布产生正态分布
- 李白号称诗仙,为何七律连有些二流诗人都敌不过?
- [转载]我的时间管理与方法论
- 6. 彤哥说netty系列之Java NIO核心组件之Buffer
- 研究ffmepg中有感(NDK 与 JNI 的关系)
- (附源码)SSM医院预约挂号系统JAVA计算机毕业设计项目