Master-Worker模式是常用的并行设计模式。核心思想是,系统由两个角色组成,Master和Worker,Master负责接收和分配任务,Worker负责处理子任务。任务处理过程中,Master还负责监督任务进展和Worker的健康状态;Master将接收Client提交的任务,并将任务的进展汇总反馈给Client。各角色关系如下图

Master-Worker模式满足于可以将大任务划分为小任务的场景,是一种分而治之的设计理念。通过多线程或者多进程多机器的模式,可以将小任务处理分发给更多的CPU处理,降低单个CPU的计算量,通过并发/并行提高任务的完成速度,提高系统的性能。

具体细节如上图,Master对任务进行切分,并放入任务队列;然后,触发Worker处理任务。实际操作中,任务的分配有多种形式,如Master主动拉起Workder进程池或线程池,并将任务分配给Worker;或者由Worker主动领取任务,这样的Worker一般是常驻进程;还有一种解耦的方式,即Master指做任务的接收、切分和结果统计,指定Worker的数量和性能指标,但不参与Worker的实际管理,而是交由第三方调度监控和调度Worker。

代码实现Master-Worker模式:

Master代码:

 1 package com.hjf.master_worker;
 2
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 import java.util.concurrent.ConcurrentHashMap;
 6 import java.util.concurrent.ConcurrentLinkedQueue;
 7
 8 /**
 9  * Master
10  * @author huangjianfei
11  */
12 public class Master
13 {
14     //1:应该有一个承载任务的集合
15     private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
16
17     //2:使用hashmap去承载所有的worker对象 ThreadName------Worker
18     private HashMap<String,Thread> workers = new HashMap<>();
19
20     //3:使用一个容器承载每一个worker并行执行任务的结果集
21     private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String, Object>();
22
23     //4:构造方法
24     public Master(Worker worker,int workerCount){
25         //在worker中添加两个引用  workQueue用于任务的领取  resultMap用于任务的提交
26         worker.setWorkerQueue(this.workQueue);
27         worker.setResultMap(this.resultMap);
28
29         for (int i = 0; i < workerCount; i++)
30         {
31             workers.put("子节点 "+i, new Thread(worker));
32         }
33     }
34
35     //5:提交方法
36     public void submit(Task task){
37         workQueue.add(task);
38     }
39
40     //6:需要有一个执行的方法(启动应用程序 让所有的worker工作)
41     public void execute(){
42         //遍历workers 分别去执行每一个worker
43         for (Map.Entry<String,Thread> me: workers.entrySet())
44         {
45             me.getValue().start();
46         }
47     }
48
49     /**
50      * 判断所有的worker是否执行完毕
51      */
52     public boolean isCompleted()
53     {
54         //遍历所有的worker 只要有一个没有停止 那么就代表没有结束
55         for (Map.Entry<String,Thread> me: workers.entrySet())
56         {
57             if(me.getValue().getState() != Thread.State.TERMINATED){
58                 return false;
59             }
60         }
61         return true;
62     }
63
64     /**
65      * 计算最终的结果集
66      * @return
67      */
68     public int getResult(){
69         int result = 0;
70         for (Map.Entry<String,Object> me : resultMap.entrySet())
71         {
72             result += (Integer)me.getValue();
73         }
74         return result;
75     }
76 }

Worker代码实现:

 1 package com.hjf.master_worker;
 2
 3 import java.util.concurrent.ConcurrentHashMap;
 4 import java.util.concurrent.ConcurrentLinkedQueue;
 5
 6 /**
 7  * Worker
 8  * @author huangjianfei
 9  */
10 public class Worker implements Runnable
11 {
12     private ConcurrentLinkedQueue<Task> workQueue;
13
14     private ConcurrentHashMap<String, Object> resultMap;
15
16     public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue)
17     {
18         this.workQueue = workQueue;
19     }
20
21     public void setResultMap(ConcurrentHashMap<String, Object> resultMap)
22     {
23         this.resultMap = resultMap;
24     }
25
26     @Override
27     public void run()
28     {
29         //处理一个个任务
30         while(true){
31             //从队列中取出一个元素
32             Task input = this.workQueue.poll();
33             if(null == input) break;
34             //真正的去做业务处理
35             Object outPut = handle(input);
36             //存放任务的结果
37             this.resultMap.put(String.valueOf(input.getId()), outPut);
38         }
39     }
40
41     //单独抽出来 给子类重写,更加灵活
42     public Object handle(Task input){
43         return null;
44     }
45
46
47     /**
48      * 处理业务 应该抽象出来 子类去具体实现业务逻辑
49      * @param input
50      */
51 //    private Object handle(Task input)
52 //    {
53 //        Object outPut = null;
54 //        if(null == input) return null;
55 //        try
56 //        {
57 //            //表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库
58 //            Thread.sleep(5000);
59 //            //模拟真实的业务场景
60 //            outPut = input.getPrice();
61 //        } catch (InterruptedException e)
62 //        {
63 //            e.printStackTrace();
64 //        }
65 //        return outPut;
66 //    }
67
68 }

Task代码实现:

 1 package com.hjf.master_worker;
 2 /**
 3  * 任务
 4  * @author huangjianfei
 5  */
 6 public class Task
 7 {
 8     private int id;
 9     private String name;
10     private int price;
11     public int getId()
12     {
13         return id;
14     }
15     public void setId(int id)
16     {
17         this.id = id;
18     }
19     public String getName()
20     {
21         return name;
22     }
23     public void setName(String name)
24     {
25         this.name = name;
26     }
27     public int getPrice()
28     {
29         return price;
30     }
31     public void setPrice(int price)
32     {
33         this.price = price;
34     }
35
36 }

Worker子类,在以后的开发中可以按照自己的需求去设计相关的Worker的子类:

 1 package com.hjf.master_worker;
 2
 3 public class MyWorker1 extends Worker
 4 {
 5     @Override
 6     public Object handle(Task input)
 7     {
 8         Object outPut = null;
 9         if(null == input) return null;
10         try
11         {
12             //表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库
13             Thread.sleep(5000);
14             //模拟真实的业务场景
15             outPut = input.getPrice();
16         } catch (InterruptedException e)
17         {
18             e.printStackTrace();
19         }
20         return outPut;
21     }
22 }

Main测试类代码:

 1 package com.hjf.master_worker;
 2
 3 import java.util.Random;
 4 /**
 5  * 主线程测试类
 6  * @author huangjianfei
 7  */
 8 public class Main
 9 {
10     public static void main(String[] args)
11     {
12         System.out.println("我的机器可用的Processor数量:"+Runtime.getRuntime().availableProcessors());
13         // 使用worker子类实现具体的业务,更加灵活
14         Master master = new Master(new MyWorker1(), Runtime.getRuntime().availableProcessors());
15         Random r = new Random();
16         //提交100个任务
17         for (int i = 0; i <= 100; i++)
18         {
19             Task t = new Task();
20             t.setId(i);
21             t.setName("任务 "+i);
22             t.setPrice(r.nextInt(1000));
23             master.submit(t);
24         }
25
26         //执行所有的worker
27         master.execute();
28
29         long start = System.currentTimeMillis();//记录时间
30
31
32         while(true){
33             //全部的worker执行结束的时候去计算最后的结果
34             if(master.isCompleted()){
35                 long end = System.currentTimeMillis() - start;//计算耗时
36                 //计算结果集
37                 int result = master.getResult();
38                 System.out.println("执行最终结果: "+result + " 执行耗时 "+end);
39                 break;
40             }
41         }
42
43     }
44
45 }

转载于:https://www.cnblogs.com/little-fly/p/8902241.html

Master-Worker设计模式介绍相关推荐

  1. linux 并行 模式,并行设计模式-Master/Worker

    Master-Worker设计模式核心思想是将原来串行的逻辑并行化,并将逻辑拆分成很多独立模块并行执行,其中主要包含两个主要组件Master和Worker,Master主要讲逻辑进行查分,拆分为互相独 ...

  2. centos7.6使用kubeadm安装kubernetes的master worker节点笔记及遇到的坑

    个人博客原文地址:http://www.lampnick.com/php/760 本文目标 安装docker及设置docker代理 安装kubeadm 使用kubeadm初始化k8s Master节点 ...

  3. Nginx源码分析:master/worker工作流程概述

    nginx源码分析 nginx-1.11.1 参考书籍<深入理解nginx模块开发与架构解析> Nginx的master与worker工作模式 在生成环境中的Nginx启动模式基本都是以m ...

  4. 单例设计模式介绍||单例设计模式八种方式——1) 饿汉式(静态常量) 2) 饿汉式(静态代码块) 3) 懒汉式(线程不安全) 4) 懒汉式(线程安全,同步方法)

    单例模式 单例设计模式介绍 所谓类的单例设计模式,就是采取一定的方法保证在整个的软件系统中,对某个类只能存在一个对象实例,并且该类只提供一个取得其对象实例的方法(静态方法). 比如Hibernate的 ...

  5. 23种设计模式介绍(一)---- 创建型模式

    由于设计模式篇幅比较大,如果在一篇文章讲完所有的设计模式的话不利于阅读.于是我把它分为三篇文章 23种设计模式介绍(一)---- 创建型模式 23种设计模式介绍(二)---- 结构型模式 23种设计模 ...

  6. 责任链设计模式介绍及实战

    责任链设计模式介绍及实战 1.责任链模式 顾名思义,责任链模式(Chain of Responsibility Pattern)为请求创建了一个接收者对象的链.这种模式给予请求的类型,对请求的发送者和 ...

  7. Kubernetes 笔记(06)— 搭建多节点集群、kubeadm 安装 master/worker/console/flannel 网络插件

    1. kubeadm 官网:https://kubernetes.io/zh-cn/docs/reference/setup-tools/kubeadm/ 为了简化 Kubernetes 的部署工作, ...

  8. WINDOWS PE制作 - 主引导记录(Master Boot Record)介绍

    WINDOWS PE制作之主引导记录(Master Boot Record)介绍 本章内容的主要部分百度百科相关条目重新编辑而来,原文网址:https://wapbaike.baidu.com/ite ...

  9. 23种设计模式介绍以及在Java中的实现

    本文章出自:blog.csdn.net/anxpp/artic- 若要查看原文请点击 文章中的示例源码在github上:github.com/anxpp/JavaD- 由于CSDN上的下拉翻页比较麻烦 ...

  10. 23种设计模式介绍(Python示例讲解)

    文章目录 一.概述 二.设计模式七种原则 三.设计模式示例讲解 1)创建型模式 1.工厂模式(Factory Method) [1]简单工厂模式(不属于GOF设计模式之一) [2]工厂方法模式 2.抽 ...

最新文章

  1. c语言职专试题及答案,中等职业学校计算机应用专业c语言编程基础科试卷及答案.doc...
  2. Spring学习4之依赖注入(DI)
  3. 基于7个案例,分享我对“提示信息设计”的思考
  4. CentOS7中使用Docker安装SVN以及配置账号权限
  5. python3源码剖析_T-SNE源码剖析(python版)
  6. 每天一道LeetCode-----将有序序列转成高度平衡二叉搜索树
  7. zhuan zai suffix tree
  8. oracle sequence last_number,关于oracle序列的LAST_NUMBER
  9. aes离线解密工具_如何在Python中解密OpenSSL AES加密文件?
  10. easyui 添加下拉框数据_电商教父:关于淘宝关键词点击率以及提升数据的方法...
  11. 想成为有钱人,你要逼自己戒掉这5个坏习惯
  12. 电阻带行业调研报告 - 市场现状分析与发展前景预测(2021-2027年)
  13. 数组使用方法集合(建议收藏)
  14. 十个经典java开发项目及其描述-马上写到你的简历中去吧,祝你升职加薪
  15. VC调用3dmax自动化对象
  16. SQLSERVER读懂语句运行的统计信息
  17. 如何辨别BGP带宽的真假?
  18. Bootstrap 导航元素( tab导航)标签页
  19. 在计算机语言中的乘法,LOGO语言编程题  高精度乘法★★
  20. python pta实验八

热门文章

  1. 游戏筑基开发之单链表及其增删改查(C语言)
  2. Security+ 学习笔记34 硬件安全
  3. STP重新收敛过程和补充内容
  4. leetcode 21 合并两个有序链表 (python)
  5. 基础算法:与、或、异或运算
  6. ubuntu adduser
  7. CentOS7上squid的部署及两种模式(4.1版本)
  8. 借助 Resharper 和 StyleCop 让代码更整洁
  9. js中for循环的优化写法
  10. ubuntu 13.04 web开发从零配置到全装备手记(环境搭建全攻略)