秒杀多线程第十篇 生产者消费者问题 (续)
使用java 和semaphore实现的 ,多个生产者和多个消费者的问题。
1.使用Semaphore,Semaphore的大小设定为BUFFER_LENGTH。也就是同时最多有这么多线程来操作缓冲区。2个semaphore, empty和exist。
默认开始缓冲区为空
1)StoreEmpty 在开始时,所有的都可用。
2)StoreHas 在开始时都是锁定的,也就是没有空余的可以acquire,直到生产者放入数据以后,就可以。
2.生产者逻辑:
1)等待缓冲区有空间
2)同步放入数据到缓存区
3)通知缓冲区存在数据
4)所有数据都已生产,通知其他生产线程停止。
3.消费者逻辑
1)等待缓冲区有数据
2)同步取出数据
3)通知缓冲区有空间
4)所有数据已消费,通知其他消费线程停止
main class:
package com.multithread.prosumer;import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;import com.multithread.main.ExampleInterface;public class ProsumerExample extends ExampleInterface {static final int BUFFER_LENGTH = 10;static final int CUSTOMER_SIZE = 4;static final int PRODUCTOR_SIZE = 3;public static final int TOTAL_PRODUCTORS = 200;public Queue<Integer> g_produtor = new LinkedList<Integer>();public volatile int mProductor = 0;public Object objlock = new Object();Semaphore StoreEmpty = new Semaphore(BUFFER_LENGTH);//等待缓冲区数据Semaphore StoreHas = new Semaphore(BUFFER_LENGTH); public CountDownLatch mLatchDown = new CountDownLatch(PRODUCTOR_SIZE+CUSTOMER_SIZE);public CountDownLatch mLatchStart = new CountDownLatch(PRODUCTOR_SIZE+CUSTOMER_SIZE);public boolean bStopCustomFlag = false;public boolean bStopProductorFlag = false;@Overridepublic void startDemo() {try {g_produtor.clear(); bStopCustomFlag = false;initEmptySingal();initExistSingal();Executor mEcecutor = Executors.newFixedThreadPool(PRODUCTOR_SIZE+CUSTOMER_SIZE);for(int i=1;i<=PRODUCTOR_SIZE;i++){mEcecutor.execute(new ProducerThread(this,"生产者"+i));}for(int j=1;j<=CUSTOMER_SIZE;j++){char c =(char)(j+'A'-1);mEcecutor.execute(new CustomerThread(this,"消费者"+c));}mLatchStart.await();System.out.println("所有操作线程已启动...");mLatchDown.await();} catch (InterruptedException e) {// TODO Auto-generated catch block e.printStackTrace();}catch(Exception e){e.printStackTrace(); }System.out.println("所有线程操作结束");}/** if true ,go back, if false, wait here* */public void waitEmpty(String name){try { // System.out.println("[waitEmpty]"+name+"等待缓冲区,有空余地方:"+StoreEmpty.availablePermits()); StoreEmpty.acquire(); // System.out.println("[waitEmpty]"+name+"等待缓冲区,有空余地方结束 剩余空间:"+StoreEmpty.availablePermits());} catch (InterruptedException e) {// TODO Auto-generated catch block e.printStackTrace();}}public void singalEmpty(String name){StoreEmpty.release(); // System.out.println("[singalEmpty]"+name+"缓冲区释放空余地方,剩余空间:"+StoreEmpty.availablePermits()); }public void waitExist(String name){try { // System.out.println("[waitExist]"+name+"等待缓冲区,数据存放空间:"+StoreHas.availablePermits()); StoreHas.acquire(); // System.out.println("[waitExist]"+name+"缓冲区有数据放入,缓冲区数据个数:"+StoreHas.availablePermits());} catch (InterruptedException e) {// TODO Auto-generated catch block e.printStackTrace();}}public void singalExist(String name){StoreHas.release(); // System.out.println("[singalExist]"+name+"将数据放入缓冲区:"+StoreHas.availablePermits()); }public void initEmptySingal(){//init,all is empty; // try { // StoreEmpty.acquire(BUFFER_LENGTH-1); // } catch (InterruptedException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } }public void initExistSingal(){//init,nothing is existtry { // System.out.println("释放所有缓冲区数据,消费线程全部等待:"+StoreHas.availablePermits()); StoreHas.acquire(StoreHas.availablePermits());} catch (InterruptedException e) {// TODO Auto-generated catch block e.printStackTrace();}}public void releaseExistSingal(){ // System.out.println("[releaseExistSingle]等待缓冲区有数据放入:释放所有"+StoreHas.availablePermits()); StoreHas.release(BUFFER_LENGTH);}public void releaseEmptySingal(){StoreEmpty.release(BUFFER_LENGTH);} }
package com.multithread.prosumer;public class ProducerThread extends Thread {ProsumerExample mProsumer = null;String name = null;boolean flag = true;public ProducerThread(ProsumerExample pe,String name){mProsumer = pe;this.name = name;}@Overridepublic void run() {System.out.println(name+"操作開始");mProsumer.mLatchStart.countDown(); // for(int i=0;i<=END_PRODUCE_NUMBER;i++) // { // try { // //等待缓冲区为空 // mProsumer.waitEmpty(name); // //互斥的访问缓冲区 // synchronized (mProsumer.objlock) { // int index = mProsumer.g_produtor.size(); // mProsumer.g_produtor.offer(i); // System.out.println(name+"将数据"+i+"放入缓冲区位置:"+(index+1)); // } // // //通知缓冲区有新数据了 // mProsumer.singalExist(name); // // } catch (Exception e) { // // TODO Auto-generated catch block // e.printStackTrace(); // break; // } // finally{ // } // }while(flag){//等待缓冲区为空 mProsumer.waitEmpty(name);//互斥的访问缓冲区 synchronized (mProsumer.objlock){if(mProsumer.mProductor<ProsumerExample.TOTAL_PRODUCTORS){int index = mProsumer.g_produtor.size();mProsumer.g_produtor.offer(mProsumer.mProductor);System.out.println(name+"将数据"+mProsumer.mProductor+"放入缓冲区位置:"+(index+1));++mProsumer.mProductor;if(mProsumer.mProductor>=ProsumerExample.TOTAL_PRODUCTORS){flag = false;mProsumer.releaseEmptySingal();} }else{flag = false;//结束操作break;//不用通知,应为没有产生新数据 }}//通知缓冲区有新数据了 mProsumer.singalExist(name);}System.out.println(name+"操作结束");mProsumer.mLatchDown.countDown();}}
public class CustomerThread extends Thread {volatile boolean flag = true;ProsumerExample mProsumer = null;int mProductor = 0;String name = null;public CustomerThread(ProsumerExample pe, String name) {mProsumer = pe;this.name = name;}@Overridepublic void run() {System.out.println("---" + name + "操作開始");mProsumer.mLatchStart.countDown();while (flag) {try {// 等待缓冲池有数据System.out.println("---" + name + "等待缓冲区数据");mProsumer.waitExist(name);// 互斥的访问缓冲区synchronized (mProsumer.objlock) {if (mProsumer.g_produtor.size() > 0) {mProductor = mProsumer.g_produtor.poll();System.out.println("---" + name + "将数据" + mProductor+ "取出缓冲区");if (mProductor == (ProsumerExample.TOTAL_PRODUCTORS-1)) {flag = false;mProsumer.bStopCustomFlag = true;// 释放其他消费线程 mProsumer.releaseExistSingal();}} else {System.out.println("---" + name + "缓冲区已空");// 其他消费者线程已停止,缓冲区已为空,此线程也要停止。if (mProsumer.bStopCustomFlag) {flag = false;break;//没有产生新的空间 }}}// 通知缓存区有空间 mProsumer.singalEmpty(name);// doing other thingsThread.sleep((long) (Math.random() * 100));} catch (InterruptedException e) {// TODO Auto-generated catch block e.printStackTrace();}}System.out.println("---" + name + "操作结束");mProsumer.mLatchDown.countDown();}}
转载于:https://www.cnblogs.com/deman/p/4091365.html
秒杀多线程第十篇 生产者消费者问题 (续)相关推荐
- 秒杀多线程第十篇 生产者消费者问题
继经典线程同步问题之后,我们来看看生产者消费者问题及读者写者问题.生产者消费者问题是一个著名的线程同步问题,该问题描述如下:有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消 ...
- 秒杀多线程第十六篇 多线程十大经典案例之一 双线程读写队列数据
本文配套程序下载地址为:http://download.csdn.net/detail/morewindows/5136035 转载请标明出处,原文地址:http://blog.csdn.net/mo ...
- 秒杀多线程第十二篇 多线程同步内功心法——PV操作上
阅读本篇之前推荐阅读以下姊妹篇: <秒杀多线程第四篇一个经典的多线程同步问题> <秒杀多线程第五篇经典线程同步关键段CS> <秒杀多线程第六篇经典线程同步事件Event& ...
- 秒杀多线程第十一篇 读者写者问题
与上一篇<秒杀多线程第十篇 生产者消费者问题>的生产者消费者问题一样,读者写者也是一个非常著名的同步问题.读者写者问题描述非常简单,有一个写者很多读者,多个读者可以同时读文件,但写者在写文 ...
- 秒杀多线程第八篇 经典线程同步 信号量Semaphore
阅读本篇之前推荐阅读以下姊妹篇: <秒杀多线程第四篇一个经典的多线程同步问题> <秒杀多线程第五篇经典线程同步关键段CS> <秒杀多线程第六篇经典线程同步事件Event& ...
- 秒杀多线程第十一篇---读者写者问题
与上一篇<秒杀多线程第十篇 生产者消费者问题>的生产者消费者问题一样,读者写者也是一个非常著名的同步问题.读者写者问题描述非常简单,有一个写者很多读者,多个读者可以同时读文件,但写者在写文 ...
- 秒杀多线程第十五篇 关键段,事件,互斥量,信号量的“遗弃”问题
秒杀多线程第十五篇 关键段,事件,互斥量,信号量的"遗弃"问题 在<秒杀多线程第九篇 经典线程同步总结 关键段 事件 互斥量 信号量>中对经典多线程同步互斥问题进行了回 ...
- 秒杀多线程第十四篇 读者写者问题继 读写锁SRWLock
在<秒杀多线程第十一篇读者写者问题>文章中我们使用事件和一个记录读者个数的变量来解决读者写者问题.问题虽然得到了解决,但代码有点复杂.本篇将介绍一种新方法--读写锁SRWLock来解决这一 ...
- 秒杀多线程第六篇 经典线程同步 事件Event
阅读本篇之前推荐阅读以下姊妹篇: <秒杀多线程第四篇 一个经典的多线程同步问题> <秒杀多线程第五篇 经典线程同步关键段CS> 上一篇中使用关键段来解决经典的多线程同步互斥问题 ...
最新文章
- iOS 9应用开发教程之ios9的视图
- java实现图形界面输入半径求圆面积_测试开发工程师系列之Android自动化测试Appium(Python)连载(7)安卓图形界面...
- mysql text与blog的区别
- 图论模型迪杰斯特拉算法
- 三维卷积神经网络预测MNIST数字详解
- java键盘输入到文件中_在Linux中使用java和javac命令编译运行java文件
- 固态硬盘坏块修复工具_坏道和坏块什么区别?硬盘高级修复教程来了
- 阿里巴巴矢量图引入步骤
- SRP简介(SRP--Single-Responsibility Principle):
- 网秦创始人林宇称遭史文勇非法拘禁 后者称是恶意中伤
- 【ESAPI】WEB安全ESAPI使用
- Zookeeper详解(五):通过JMX查看Zookeeper信息
- windows无法连接到打印机
- 杯具,万达电商又换CEO
- clap与slap_(2xslap,slap,slap,clap,slap)Doinapos;theFACTSNowthatapos;s.ppt
- 一起赚美元① | 通过一个小工具月入12万美元的秘诀
- SQLServer日期函数的使用
- JDK11解压缩安装
- android 分割线 整洁,Android-RecyclerView分割线(水平/垂直/网格)
- [Android通信]基于socket的聊天app(七):好友分组