Future、Master-Worker和生产者-消费者模型

2018年03月06日 22:32:08 努力做最好的自己 阅读数:391

并行设计模式属于设计优化的一部分,它是对一些常用的多线程结构的总结和抽象。与串行结构相比,并行程序的结构通常更为复杂。因此合理的使用并行模式在多线程开发中更具有意义,在这里主要介绍Future、Master-Worker和生产者-消费者模型。

Future模式

Future模式有点类似于商品订单。比如在网购时,当看重某一件商品时,就可以提交订单,当订单处理完成后,在家里等待商品送货上门即可。或者说更形象的我们发送ajax请求的时候,页面是异步的进行后台处理,用户无须一直等待请求的结果,可以继续浏览或操作其他内容。

Future模式会异步创建一个子线程,去完成相关请求任务,然后将处理结果返回给主线程main。在子线程请求并处理数据的过程中,主线程可以继续做别的事情,即异步加载数据。

Future模式非常适合在处理耗时很长的业务逻辑时进行使用,可以有效减少系统的响应时间,提高系统的吞吐量。

Main.java

 
  1. public class Main {

  2. public static void main(String[] args) throws InterruptedException {

  3. FutureClient fc = new FutureClient();

  4. Data data = fc.request("请求参数");

  5. System.out.println("请求发送成功!");

  6. System.out.println("做其他的事情...");

  7. String result = data.getRequest();

  8. System.out.println(result);

  9. }

  10. }

FutureClient.java

 
  1. public class FutureClient {

  2. public Data request(final String queryStr){

  3. //1 我想要一个代理对象(Data接口的实现类)先返回给发送请求的客户端,告诉他请求已经接收到,可以做其他的事情

  4. final FutureData futureData = new FutureData();

  5. //2 启动一个新的线程,去加载真实的数据,传递给这个代理对象

  6. new Thread(new Runnable() {

  7. @Override

  8. public void run() {

  9. //3 这个新的线程可以去慢慢的加载真实对象,然后传递给代理对象

  10. RealData realData = new RealData(queryStr);

  11. futureData.setRealData(realData);

  12. }

  13. }).start();

  14. return futureData;

  15. }

  16. }

FutureData.java

 
  1. public class FutureData implements Data{

  2. private RealData realData ;

  3. private boolean isReady = false;

  4. public synchronized void setRealData(RealData realData) {

  5. //如果已经装载完毕了,就直接返回

  6. if(isReady){

  7. return;

  8. }

  9. //如果没装载,进行装载真实对象

  10. this.realData = realData;

  11. isReady = true;

  12. //进行通知

  13. notify();

  14. }

  15. @Override

  16. public synchronized String getRequest() {

  17. //如果没装载好 程序就一直处于阻塞状态

  18. while(!isReady){

  19. try {

  20. wait();

  21. } catch (InterruptedException e) {

  22. e.printStackTrace();

  23. }

  24. }

  25. //装载好直接获取数据即可

  26. return this.realData.getRequest();

  27. }

  28. }

RealData.java

 
  1. public class RealData implements Data{

  2. private String result ;

  3. public RealData (String queryStr){

  4. System.out.println("根据" + queryStr + "进行查询,这是一个很耗时的操作..");

  5. try {

  6. Thread.sleep(5000);

  7. } catch (InterruptedException e) {

  8. e.printStackTrace();

  9. }

  10. System.out.println("操作完毕,获取结果");

  11. result = "查询结果";

  12. }

  13. @Override

  14. public String getRequest() {

  15. return result;

  16. }

  17. }

上述程序执行的过程:在main主线程中,创建FutureClient对象并调用其request方法。在request方法体执行过程中,返回一个FutureData代理对象给main主线程,同时开启一个子线程A开始做真正请求处理工作。main主线程得到FutureData代理对象后,继续向下执行代码,执行String result = data.getRequest();,即调用FutureData对象中的getRequest()方法,这个方法是一个同步方法,使用了synchronized关键字修饰,main主线程获取当前FutureData的对象锁之后,执行了wait()方法,main主线程处于阻塞状态,并释放了FutureData的对象锁。在子线程A中创建RealData对象,执行RealData的构造函数,输出内容并使当前线程休眠5S并继续输出内容。RealData对象创建完毕后,在子线程A中执行futureData.setRealData(realData);,这个方法同样是一个同步方法,使用了synchronized关键字修饰,子线程A获取到FutureData对象的锁,执行notify(),发出通知,此时子线程A结束,阻塞的主线程main被唤醒,继续执行getRequest()中的return this.realData.getRequest();。最后,主线程再次执行输出。

Eclipse中console输出如下:

JDK中Future模式的封装

其实,在JDK中已经提供了Future模式的封装,使用示例如下:

UseFuture.java

 
  1. import java.util.concurrent.Callable;

  2. import java.util.concurrent.ExecutorService;

  3. import java.util.concurrent.Executors;

  4. import java.util.concurrent.Future;

  5. import java.util.concurrent.FutureTask;

  6. public class UseFuture implements Callable<String>{

  7. private String para;

  8. public UseFuture(String para){

  9. this.para = para;

  10. }

  11. /**

  12. * 这里是真实的业务逻辑,其执行可能很慢

  13. */

  14. @Override

  15. public String call() throws Exception {

  16. //模拟执行耗时

  17. Thread.sleep(5000);

  18. String result = this.para + "处理完成";

  19. return result;

  20. }

  21. //主控制函数

  22. public static void main(String[] args) throws Exception {

  23. String queryStr = "query";

  24. //构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类

  25. FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));

  26. FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr));

  27. //创建一个固定线程的线程池且线程数为2,

  28. ExecutorService executor = Executors.newFixedThreadPool(2);

  29. //这里提交任务future,则开启线程执行RealData的call()方法执行

  30. //submit和execute的区别: 第一点是submit可以传入实现Callable接口的实例对象, 第二点是submit方法有返回值

  31. Future f1 = executor.submit(future);//单独启动一个线程A去执行

  32. Future f2 = executor.submit(future2);//单独启动一个线程B去执行

  33. System.out.println("请求完毕");

  34. try {

  35. //在线程A、B的执行过程中,主线程main可以做额外的数据操作,也就是主程序执行其他业务逻辑

  36. System.out.println("处理实际的业务逻辑...");

  37. Thread.sleep(1000);

  38. } catch (Exception e) {

  39. e.printStackTrace();

  40. }

  41. //调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待

  42. System.out.println("数据:" + future.get());//future.get()获取线程A执行任务的结果

  43. System.out.println("数据:" + future2.get());//future2.get()获取线程B执行任务的结果

  44. executor.shutdown();

  45. }

  46. }

创建了2个FutureTask对象,并且传入实现了Callable接口并进行真实业务逻辑处理的类的对象作为参数。创建一个固定数量为2的一个线程池,通过executor.submit(FutureTask对象)来将task任务交给线程池中的线程进行处理。在两个处理task任务的子线程执行过程中,main主线程可以继续执行下面的代码System.out.println("请求完毕");。主线程main执行到future.get()时,若处理该task的子线程已经将该任务处理完毕,则future.get()可以获得子线程A的任务处理结果,同理future2.get()可以获得子线程B的任务处理结果。若future.get()代码执行时,处理future任务的子线程A还没有处理完成,则主线程main需要等待,直到子线程A处理完成,则future.get()获得任务处理结果后,则main主线程才可以继续向下执行代码。最后,将线程池关闭。

Eclipse的console输出:

Master-Worker模式

Master-Worker模式是常用的并行计算模式,它的核心思想是系统由两类进程协作工作:Master进程和Worker进程。Master负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完成后,会将结果返回给Master,由Master做归纳和总结。其好处是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。

在系统的数据量不是很大的场景,用Hadoop或者Storm有点大材小用,可以考虑Master-Worker。

Main.java

 
  1. import java.util.Random;

  2. public class Main {

  3. public static void main(String[] args) {

  4. System.out.println("本机器可用processor数量:"+Runtime.getRuntime().availableProcessors());

  5. Master master = new Master(new Worker(), Runtime.getRuntime().availableProcessors());

  6. Random r = new Random();

  7. for(int i = 1; i <= 100; i++){

  8. Task t = new Task();

  9. t.setId(i);

  10. t.setPrice(r.nextInt(1000));

  11. master.submit(t);

  12. }

  13. master.execute();

  14. long start = System.currentTimeMillis();

  15. while(true){

  16. if(master.isComplete()){

  17. long end = System.currentTimeMillis() - start;

  18. int priceResult = master.getResult();

  19. System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);

  20. break;

  21. }

  22. }

  23. }

  24. }

Master.java

 
  1. import java.util.HashMap;

  2. import java.util.Map;

  3. import java.util.concurrent.ConcurrentHashMap;

  4. import java.util.concurrent.ConcurrentLinkedQueue;

  5. public class Master {

  6. //1 有一个盛放任务的容器

  7. private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();

  8. //2 需要有一个盛放worker的集合

  9. private HashMap<String, Thread> workers = new HashMap<String, Thread>();

  10. //3 需要有一个盛放每一个worker执行任务的结果集合

  11. private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

  12. //4 构造方法

  13. public Master(Worker worker , int workerCount){

  14. worker.setWorkQueue(this.workQueue);

  15. worker.setResultMap(this.resultMap);

  16. for(int i = 0; i < workerCount; i ++){

  17. this.workers.put(Integer.toString(i), new Thread(worker));

  18. }

  19. }

  20. //5 需要一个提交任务的方法

  21. public void submit(Task task){

  22. this.workQueue.add(task);

  23. }

  24. //6 需要有一个执行的方法,启动所有的worker方法去执行任务

  25. public void execute(){

  26. for(Map.Entry<String, Thread> me : workers.entrySet()){

  27. me.getValue().start();

  28. }

  29. }

  30. //7 判断是否运行结束的方法

  31. public boolean isComplete() {

  32. for(Map.Entry<String, Thread> me : workers.entrySet()){

  33. if(me.getValue().getState() != Thread.State.TERMINATED){

  34. return false;

  35. }

  36. }

  37. return true;

  38. }

  39. //8 计算结果方法

  40. public int getResult() {

  41. int priceResult = 0;

  42. for(Map.Entry<String, Object> me : resultMap.entrySet()){

  43. priceResult += (Integer)me.getValue();

  44. }

  45. return priceResult;

  46. }

  47. }

Task.java

 
  1. public class Task {

  2. private int id;

  3. private int price ;

  4. public int getId() {

  5. return id;

  6. }

  7. public void setId(int id) {

  8. this.id = id;

  9. }

  10. public int getPrice() {

  11. return price;

  12. }

  13. public void setPrice(int price) {

  14. this.price = price;

  15. }

  16. }

Worker.java

 
  1. import java.util.concurrent.ConcurrentHashMap;

  2. import java.util.concurrent.ConcurrentLinkedQueue;

  3. public class Worker implements Runnable {

  4. private ConcurrentLinkedQueue<Task> workQueue;

  5. private ConcurrentHashMap<String, Object> resultMap;

  6. public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {

  7. this.workQueue = workQueue;

  8. }

  9. public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {

  10. this.resultMap = resultMap;

  11. }

  12. @Override

  13. public void run() {

  14. while(true){

  15. Task input = this.workQueue.poll();//获取并移除队列的头元素

  16. if(input == null) break;

  17. Object output = MyWorker.handle(input);

  18. this.resultMap.put(Integer.toString(input.getId()), output);

  19. }

  20. }

  21. /*private Object handle(Task input) {

  22. Object output = null;

  23. try {

  24. //处理任务的耗时。。 比如说进行操作数据库。。。

  25. Thread.sleep(500);

  26. output = input.getPrice();

  27. } catch (InterruptedException e) {

  28. e.printStackTrace();

  29. }

  30. return output;

  31. }*/

  32. private static Object handle(Task input) {

  33. return null;

  34. }

  35. }

MyWorker.java

 
  1. import test.Worker;

  2. public class MyWorker extends Worker{

  3. public static Object handle(Task input){

  4. Object output = null;

  5. try {

  6. //处理任务的耗时。。 比如说进行操作数据库。。。

  7. Thread.sleep(500);

  8. output = input.getPrice();

  9. } catch (InterruptedException e) {

  10. e.printStackTrace();

  11. }

  12. return output;

  13. }

  14. }

Eclipse的console输出:

生产者-消费者

生产者-消费者也是一个非常经典的多线程模式,我们在实际开发中应用非常广泛的的思想理念。在生产者-消费者模式中:通常有两类线程,即若干个生产者的线程和若干个消费者的线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信。

Main.java

 
  1. import java.util.concurrent.BlockingQueue;

  2. import java.util.concurrent.ExecutorService;

  3. import java.util.concurrent.Executors;

  4. import java.util.concurrent.LinkedBlockingQueue;

  5. public class Main {

  6. public static void main(String[] args) throws Exception {

  7. //内存缓冲区,生产者和消费者都需要拥有内存缓冲区的引用

  8. BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);

  9. //生产者

  10. Provider p1 = new Provider(queue);

  11. Provider p2 = new Provider(queue);

  12. Provider p3 = new Provider(queue);

  13. //消费者

  14. Consumer c1 = new Consumer(queue);

  15. Consumer c2 = new Consumer(queue);

  16. Consumer c3 = new Consumer(queue);

  17. //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)

  18. ExecutorService cachePool = Executors.newCachedThreadPool();

  19. //将3个生产者、3个消费者交给线程池去执行,线程池会分配线程去执行这些生产者、消费者的任务

  20. cachePool.execute(p1);//execute方法的参数为实现了Runnable接口的类的对象

  21. cachePool.execute(p2);

  22. cachePool.execute(p3);

  23. cachePool.execute(c1);

  24. cachePool.execute(c2);

  25. cachePool.execute(c3);

  26. try {

  27. Thread.sleep(3000);

  28. } catch (InterruptedException e) {

  29. e.printStackTrace();

  30. }

  31. p1.stop();

  32. p2.stop();

  33. p3.stop();

  34. try {

  35. Thread.sleep(2000);

  36. } catch (InterruptedException e) {

  37. e.printStackTrace();

  38. }

  39. // cachePool.shutdown();

  40. // cachePool.shutdownNow();

  41. }

  42. }

Provider.java

 
  1. import java.util.Random;

  2. import java.util.concurrent.BlockingQueue;

  3. import java.util.concurrent.TimeUnit;

  4. import java.util.concurrent.atomic.AtomicInteger;

  5. public class Provider implements Runnable{

  6. //共享缓存区

  7. private BlockingQueue<Data> queue;

  8. //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态

  9. private volatile boolean isRunning = true;

  10. //id生成器

  11. private static AtomicInteger count = new AtomicInteger();

  12. //随机对象

  13. private static Random r = new Random();

  14. public Provider(BlockingQueue queue){

  15. this.queue = queue;

  16. }

  17. @Override

  18. public void run() {

  19. while(isRunning){

  20. try {

  21. //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时)

  22. Thread.sleep(r.nextInt(1000));

  23. //获取的数据进行累计...

  24. int id = count.incrementAndGet();

  25. //比如通过一个getData方法获取了

  26. Data data = new Data(Integer.toString(id), "数据" + id);

  27. System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");

  28. if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){

  29. System.out.println("提交缓冲区数据失败....");

  30. //do something... 比如重新提交

  31. }

  32. } catch (InterruptedException e) {

  33. e.printStackTrace();

  34. }

  35. }

  36. }

  37. public void stop(){

  38. this.isRunning = false;

  39. }

  40. }

Data.java

 
  1. public final class Data {

  2. private String id;

  3. private String name;

  4. public Data(String id, String name){

  5. this.id = id;

  6. this.name = name;

  7. }

  8. public String getId() {

  9. return id;

  10. }

  11. public void setId(String id) {

  12. this.id = id;

  13. }

  14. public String getName() {

  15. return name;

  16. }

  17. public void setName(String name) {

  18. this.name = name;

  19. }

  20. @Override

  21. public String toString(){

  22. return "{id: " + id + ", name: " + name + "}";

  23. }

  24. }

Consumer.java

 
  1. import java.util.Random;

  2. import java.util.concurrent.BlockingQueue;

  3. import java.util.concurrent.TimeUnit;

  4. public class Consumer implements Runnable{

  5. private BlockingQueue<Data> queue;

  6. public Consumer(BlockingQueue queue){

  7. this.queue = queue;

  8. }

  9. //随机对象

  10. private static Random r = new Random();

  11. @Override

  12. public void run() {

  13. while(true){

  14. try {

  15. //获取数据

  16. Data data = this.queue.take();

  17. //进行数据处理。休眠0 - 1000毫秒模拟耗时

  18. Thread.sleep(r.nextInt(1000));

  19. System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());

  20. } catch (InterruptedException e) {

  21. e.printStackTrace();

  22. }

  23. }

  24. }

  25. }

Eclipse的console中输出:

多线程三种设计模式-相关推荐

  1. java中控制反转_Java如何利用IOC控制反转的三种设计模式详解

    这篇文章主要为大家详细介绍了Java使用IOC控制反转的三种设计模式,具有一定的参考价值,感兴趣的小伙伴们可以参考一下 对于许多开发人员来说,控制反演(IoC)都是一个模糊的概念,因为他们在现实世界中 ...

  2. 干掉if else!试试这三种设计模式,优化代码贼顺手!

    点击上方"芋道源码",选择"设为星标" 管她前浪,还是后浪? 能浪的浪,才是好浪! 每天 10:33 更新文章,每天掉亿点点头发... 源码精品专栏 原创 | ...

  3. 数据库 -- 由数据库连接池引出的三种设计模式

    笔记摘要: 这里首先对数据库连接池的优化进行了说明,同时自己编写了一个数据库连接池,在实际开发中,为了获取标准的数据源,我们需要去实现javax.sal.DataSource接口, 在实现过程中对于链 ...

  4. PHP常见三种设计模式:单例、工厂、观察者

    1.单例模式 目的:保证一个类仅有一个实例,并提供一个访问它的全局访问点. 应用场景:数据库连接.缓存操作.分布式存储. /*** 设计模式之单例模式* $_instance必须声明为静态的私有变量* ...

  5. 23三种设计模式详解

    设计模式分为三大类: 创建型模式,共五种:工厂方法模式.抽象工厂模式.单例模式.建造者模式.原型模式. 结构型模式,共七种:适配器模式.装饰器模式.代理模式.外观模式.桥接模式.组合模式.享元模式. ...

  6. 多线程 三种创建方式及区别

    线程概念 进程:启动一个程序就是一个进程. 线程:在一个程序里面,多个事情同步进行,这个事情是由线程来完成 不使用多线程的效果 如果我们不使用线程,会怎么样呢?看下面代码 新建立一个hero类包含英雄 ...

  7. 多线程的三种设计模式的介绍

    前言 并发设计模型属于设计优化的一部分,它是对一些常用的多线程结构的总结和抽象.与串行程序相比,并行程序通常更为复杂.因此合理的使用并行模式在多线程开发中具有意义,本篇主要讲解一下Future,Mas ...

  8. Java三种设计模式

    Java设计模式 1,静态工厂方法模式 提供一个工厂类,构造方法私有,不允许外界直接创建该工厂类的对象. 在工厂类中添加一个静态方法,用于创建对象并返回. 缺点:不利于后期维护 如果后期要再创建类,还 ...

  9. java常用的三种设计模式

    一.单例模式 1.概述 单例模式的定义就是确保某一个类只有一个实例,并且提供一个全局访问点.属于设计模式三大类中的创建型模式. 单例模式具有典型的三个特点: 只有一个实例. 自我实例化. 提供全局访问 ...

最新文章

  1. 用Java调用WebService
  2. springboot使用shiro配置多个过滤器和session同步案例
  3. python模块--BeautifulSoup4 和 lxml
  4. 编写自己的Javascript库-1
  5. [转] 深入理解React 组件状态(State)
  6. Xuggler视频处理简介
  7. 同样是做大数据分析,你月薪8k他30k,到底差在了哪?
  8. 机器学习常见算法优缺点汇总
  9. iOS开源项目周报1229
  10. java 依赖倒置_Java设计原则—依赖倒置原则(转)
  11. 毕业设计之 --- 基于大数据分析的金融产品销售预测分析
  12. 利用百度地图开放平台的Web API实现检索定位
  13. 银行卡收单业务____单边账___现实生活中单边账的处理
  14. 算力狂热时代的冷静之道:宁畅总裁秦晓宁分享企业的算力最优解
  15. 构建makefile文件
  16. 安卓通过链接打开淘宝客户端
  17. c语言计算存储大小,在C语言中5种基本数据类型的存储空间长度的排列顺序
  18. Linux加法简单程序,Linux操作之——简单命令
  19. 《zw版·Halcon-delphi系列原创教程》 Halcon分类函数013,shape模型
  20. 架设PPPOE服务器

热门文章

  1. 第2节--深度学习基础介绍-机器学习--课程介绍(下)
  2. PV操作经典例题——哲学家进餐问题
  3. 调试串口导致烧录失败
  4. Maven:命令大全。
  5. LinkedIn领英账号达到一周添加好友邀请上限后怎么办?学会这四式三招,你也能解决LinkedIn领英账号添加好友的每周数量限制...
  6. [经验分享] 覃超线上直播课 如何快速搞定秋招算法面试
  7. oracle avg() 绝对平均值
  8. 泛型(泛型类、泛型方法)
  9. Python概述:C++程序员眼中的Python
  10. 揭露强奸犯的黑客被判有罪?审视CFAA计算机欺诈法