工作中,经常有将文件中的数据导入数据库的表中,或者将数据库表中的记录保存到文件中。为了提高程序的处理速度,可以设置读线程和写线程,这些线程通过消息队列进行数据交互。本例就是使用了LinkedBlockingQueue来模仿生产者线程和消费者线程进行数据生产和消费。
为了方便,这些不同的类被写在了一个类中,实际使用的时候,可以单独拆开,举一反三地使用。

以下是例子:

LinkedBlockingQueueDemo.java

import java.util.Date;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;public class LinkedBlockingQueueDemo {// 生产者线程数量private final static int providerThreadAmount = 5;// 记录每一个生产者线程是否处理完毕的标记private static boolean[] providerDoneFlag = new boolean[providerThreadAmount];// 整个所有的生产者线程全部结束的标记private static boolean done = false;// 一个线程安全的队列,用于生产者和消费者异步地信息交互private static LinkedBlockingQueue<String> linkedBlockingQeque = new LinkedBlockingQueue<String>();static class ProviderThread extends Thread {private Thread thread;private String threadName;private int threadNo;public ProviderThread(String threadName2, int threadNo) {this.threadName = threadName2;this.threadNo = threadNo;}public void start() {if (thread == null) {thread = new Thread(this, threadName);}thread.start();System.out.println((new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName());}@Overridepublic void run() {int rows = 0;for (int i = 0; i < 100; i++) {String string = String.format("%s-%d-%s", threadName, i, Thread.currentThread().getName());// offer不会去阻塞线程,put会//linkedBlockingQeque.offer(string);
                linkedBlockingQeque.put(string);rows++;/** try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch* (InterruptedException e) { e.printStackTrace(); }*/}// 本线程处理完毕的标记LinkedBlockingQueueDemo.providerDoneFlag[threadNo] = true;System.out.println((new Date().getTime()) + " " + threadName + " end. total rows is " + rows + "\t"+ Thread.currentThread().getName());}}static class ConsumerThread implements Runnable {private Thread thread;private String threadName;public ConsumerThread(String threadName2) {this.threadName = threadName2;}public void start() {if (thread == null) {thread = new Thread(this, threadName);}thread.start();System.out.println((new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName());}@Overridepublic void run() {int rows = 0;// 生产者线程没有结束,或者消息队列中有元素的时候,去队列中取数据while (LinkedBlockingQueueDemo.getDone() == false || linkedBlockingQeque.isEmpty() == false) {try {//在甘肃电信的实际应用中发现,当数据的处理量达到千万级的时候,带参数的poll会将主机的几百个G的内存耗尽,jvm会提示申请内存失败,并将进程退出。网上说,这是这个方法的一个bug。//String string = linkedBlockingQeque.poll(3, TimeUnit.SECONDS);String string = linkedBlockingQeque.poll();if (string == null) {continue;}rows++;System.out.println((new Date().getTime()) + " " + threadName + " get msg from linkedBlockingQeque is "+ string + "\t" + Thread.currentThread().getName());/** try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch* (InterruptedException e) { e.printStackTrace(); }*/} catch (InterruptedException e) {e.printStackTrace();}}System.out.println((new Date().getTime()) + " " + threadName + " end total rows is " + rows + "\t"+ Thread.currentThread().getName());}}public static synchronized void setDone(boolean flag) {LinkedBlockingQueueDemo.done = flag;}public static synchronized boolean getDone() {return LinkedBlockingQueueDemo.done;}public static void main(String[] args) {System.out.println((new Date().getTime()) + " " + "process begin at " + Thread.currentThread().getName());System.out.println((new Date().getTime()) + " " + "linkedBlockingDeque.hashCode() is " + linkedBlockingQeque.hashCode());// 启动若干生产者线程for (int i = 0; i < providerThreadAmount; i++) {String threadName = String.format("%s-%d", "ProviderThread", i);ProviderThread providerThread = new ProviderThread(threadName, i);providerThread.start();}// 启动若干个消费者线程for (int i = 0; i < 10; i++) {String threadName = String.format("%s-%d", "ConsumerThread", i);ConsumerThread consumerThread = new ConsumerThread(threadName);consumerThread.start();}// 循环检测生产者线程是否处理完毕do {for (boolean b : providerDoneFlag) {if (b == false) {/** try { Thread.sleep(3 * 1000); System.out.println((new Date().getTime()) +* " "+"sleep 3 seconds. linkedBlockingQeque.size() is "+linkedBlockingQeque.* size() + "\t" + Thread.currentThread().getName()); } catch* (InterruptedException e) { e.printStackTrace(); }*/// 只要有一个生产者线程没有结束,则整个生产者线程检测认为没有结束break;}LinkedBlockingQueueDemo.setDone(true);}// 生产者线程全部结束的时候,跳出检测if (LinkedBlockingQueueDemo.getDone() == true) {break;}} while (true);System.out.println((new Date().getTime()) + " process done successfully\t" + Thread.currentThread().getName());}
}

结果略。

转载于:https://www.cnblogs.com/babyha/p/9765846.html

使用LinkedBlockingQueue来实现生产者消费者的例子相关推荐

  1. Linux 生产者消费者简单例子学习

    #if 0Linux实现生产者消费者模型1. 防止虚假唤醒2. 唤醒线程的时机很重要,否则会导致线程多次访问锁,影响性能 #endif#include <unistd.h> #includ ...

  2. java consumed_Java设计模式—生产者消费者模式(阻塞队列实现)

    生产者消费者模式是并发.多线程编程中经典的 真实世界中的生产者消费者模式 生产者和消费者模式在生活当中随处可见,它描述的是协调与协作的关系.比如一个人正在准备食物(生产者),而另一个人正在吃(消费者) ...

  3. python多线程实现生产者消费者_用Python实现多线程“生产者-消费者”模型的简单例子...

    用 Python 实现多线程"生产者 - 消费者"模型的简单例子 生产者消费者问题是一个著名的线程同步问题, 该问题描述如下: 有一个生产者在生产产品, 这些产品将提供给若干个消费 ...

  4. 【Java】线程通信的例子:用两个线程打印 1-100;生产者消费者问题

    wait()和sleep()的区别 面试题:sleep() 和 wait()的异同? 相同点:一旦执行方法,都可以使得当前的线程进入阻塞状态. 不同点: 1)两个方法声明的位置不同:Thread类中声 ...

  5. LinkedBlockingQueue 实现生产者消费者模型

    并发编程栏目代码 GitHub package 地址: 点击打开链接 博客并发编程栏目 : 点击打开链接 实现 LinkedBlockingQueue是一个基于已链接节点的.范围任意的blocking ...

  6. 用阻塞队列LinkedBlockingQueue实现生产者消费者先进先出

    LinkedBlockingQueue是一个基于已链接节点的.范围任意的blocking queue的实现. 由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生 ...

  7. 多线程的实际应用-生产者与消费者的例子

    多线程的实际应用-生产者与消费者的例子 最近在恶补java多线程的相关知识,正好<java高手真经>课后有一道类似的题目,就拿过来编了一下,在纠结了一阵后,终于写完了,虽然中途瞄了好几眼前 ...

  8. java实现的PV操作经典例子:读者写者、贪睡的理发师、生产者消费者。

    其中读者写者和贪睡的理发师使用的Semaphore类:生产者消费者使用的是管程. 读者写者 class Semaphore {int value;public Semaphore(int v){thi ...

  9. 几个java实现的PV操作经典例子:读者写者、贪睡的理发师、生产者消费者

    其中读者写者和贪睡的理发师使用的Semaphore类:生产者消费者使用的是管程. 读者写者: class Semaphore {int value;public Semaphore(int v){th ...

最新文章

  1. 软件测试人员必备Linux命令(初、中、高级)
  2. VirtualProtect VirtualLock VirtualUnlock
  3. EMQ源码之--EMQ的启动
  4. 基于SharePoint 的企业信息平台架构
  5. 手机界面常见的的九宫格
  6. windows c++ cjson 使用_cJSON源码剖析
  7. mysql优化 my.cnf_MySQL性能调优my.cnf详解
  8. 机器学习-KMeans聚类 K值以及初始类簇中心点的选取
  9. 实战-Android开机时间优化
  10. 学校计算机和网络保密管理规定,计算机信息系统安全保密管理规定
  11. 上传JSPX文件绕过网站后缀名检查
  12. android打开系统文件怎么打开方式,Android调用系统应用打开任意文件
  13. ATF:Gicv源码文件系列-gic_common.h
  14. java 工作业绩_个人年度工作总结报告java
  15. 概率算法-均匀分布产生正态分布
  16. 李白号称诗仙,为何七律连有些二流诗人都敌不过?
  17. [转载]我的时间管理与方法论
  18. 6. 彤哥说netty系列之Java NIO核心组件之Buffer
  19. 研究ffmepg中有感(NDK 与 JNI 的关系)
  20. (附源码)SSM医院预约挂号系统JAVA计算机毕业设计项目

热门文章

  1. python入门--函数
  2. Html百分比设宽偏差大,absolute和relative元素 设置百分比宽高的差异
  3. 单词搜索—leetcode79
  4. python测试用例管理模块_Python的单元测试模块如何检测测试用例?
  5. 张朝阳:我什么都有,但我就是很痛苦
  6. Objecttive-C 创建多线程
  7. C++ Primer 第八章 标准IO库
  8. 手写体识别(数据挖掘入门与实践-实验11)
  9. C语言 链表实现学生管理系统(含文件读写操作)
  10. Java 实现三次 for 循环计算水仙花数