十、主仆模式(Master-Slave)
1、核心思想
一个基于分而治之思想设计模式,将一个任务(原始任务)分解为若干个语义等同的子任务,
并由专门的工作者线程来并行执行这些任务,原始任务的结果是通过整合各个子任务的处理结果形成的。
2、评价:
即提高计算效率,又实现了信息隐藏。
3、适用场景
a、并行计算,目的是提升计算性能。
b、容错处理,目的是提高计算的可靠性,因为原始任务的结果是通过整合各个子任务的处理结果形成的
c、计算精度,目的是提高计算的精确程度,因为原始任务的结果是所有子任务的处理结果不精确性最低的一个结果。
只有a场景中Slave参与者实例是同一个实现类,b、c场景不同Slave参与者对应着不同的实现类。使用策略模式继承同一个接口
4、收集子任务处理结果的2种方法
a Master和Slave使用同一个存储仓库
b 使用Future模式
5、可靠性与异常处理
可以捕获子任务异常,可以考虑由其自身重新执行处理失败的子任务,即在客户端线程执行。

/*** @author huzhiqiang* @param <T> 子任务类型*/
public interface TaskDivideStrategy<T> {/*** 返回下一个子任务,返回值为null,则表示无后续子任务* @return*/T nextChunk();
}/*** 对子任务派发算法策略的抽象* @author huzhiqiang** @param <T> 子任务类型* @param <V> 子任务处理结果类型*/
public interface SubTaskDispatchStrategy<T, V> {/*** 根据指定的原始任务分解策略,将分解得来的各个子任务派发给一组Slave参与者实例* @param slaves* @param taskDivideStrategy  原始任务分解策略* @return* @throws InterruptedException*/Iterator<Future<V>> dispatch(Set<? extends SlaveSpe<T, V>> slaves,TaskDivideStrategy<T> taskDivideStrategy) throws InterruptedException;
}/*** Master-Slave模式Slave参与者的抽象* @author huzhiqiang** @param <T> 子任务类型* @param <V> 子任务处理结果类型*/
public interface SlaveSpe<T, V> {public Future<V> submit(final T task) throws InterruptedException;public void init();public void shutdown();
}/*** 对处理失败的子任务进行重试所需的信息* @author huzhiqiang** @param <T>* @param <V>*/
public class RetryInfo<T, V> {public final T subTask;public final Callable<V> redoCommand;public RetryInfo(T subTask, Callable<V> redoCommand) {super();this.subTask = subTask;this.redoCommand = redoCommand;}
}public class SubTaskFailureException extends Exception {private static final long serialVersionUID = 1L;@SuppressWarnings("rawtypes")public final RetryInfo retryInfo;@SuppressWarnings("rawtypes")public SubTaskFailureException(RetryInfo retryInfo, Exception cause) {super(cause);this.retryInfo = retryInfo;}
}/*** 简单的轮询派发算法策略* @author huzhiqiang** @param <T>* @param <V>*/
public class RoundRobinSubTaskDispatchStrategy<T, V> implements SubTaskDispatchStrategy<T, V> {@Overridepublic Iterator<Future<V>> dispatch(Set<? extends SlaveSpe<T, V>> slaves, TaskDivideStrategy<T> taskDivideStrategy)throws InterruptedException {final List<Future<V>> subResults = new LinkedList<Future<V>>();T subTask;Object[] arrSlaves = slaves.toArray();int i = -1;final int slaveCount = arrSlaves.length;Future<V> subTaskResultPromise;//先划分子任务,然后通过轮询算法分配到子任务处理线程中//如果子任务处理线程不是同一个实例对象,则划分子任务时可做对应处理while(null != (subTask = taskDivideStrategy.nextChunk())){i = (i+1) % slaveCount;subTaskResultPromise = ((WorkerThreadSlave<T, V>) arrSlaves[i]).submit(subTask);subResults.add(subTaskResultPromise);}return subResults.iterator();}
}/*** 基于工作者线程的Slave参与者通用实现* @author huzhiqiang** @param <T>* @param <V>*/
public abstract class WorkerThreadSlave<T, V> extends AbstractTerminatableThread implements SlaveSpe<T, V> {private final BlockingQueue<Runnable> taskQueue;public WorkerThreadSlave(BlockingQueue<Runnable> taskQueue) {super();this.taskQueue = taskQueue;}@Overridepublic Future<V> submit(T task) throws InterruptedException {FutureTask<V> ft = new FutureTask<V>(new Callable<V>() {@Overridepublic V call() throws Exception {V result;try {result = doProcess(task);} catch (Exception e) {SubTaskFailureException ex = newSubTaskFailureException(task, e);throw ex;}return result;}});taskQueue.put(ft);terminationToken.reservations.incrementAndGet();return ft;}private SubTaskFailureException newSubTaskFailureException(final T subTask, Exception e) {RetryInfo<T, V> retryInfo = new RetryInfo<T, V>(subTask, new Callable<V>() {@Overridepublic V call() throws Exception {V result;result = doProcess(subTask);return result;}});return new SubTaskFailureException(retryInfo, e);}@Overridepublic void init() {start();}@Overridepublic void shutdown() {//会等待线程处理完成再返回terminate(true);}@Overrideprotected void doRun() throws Exception {try {Runnable task = taskQueue.take();//不能使用new Thread(task).start()task.run();} finally{terminationToken.reservations.decrementAndGet();}}/*** 用于实现子任务的处理逻辑* @param task* @return* @throws Exception*/public abstract V doProcess(T task) throws Exception;
}public abstract class AbstractMaster<T, V, R> {public volatile Set<? extends SlaveSpe<T, V>> slaves;//子任务派发算法策略public volatile SubTaskDispatchStrategy<T, V> dispatchStrategy;public AbstractMaster(){}public void init(){slaves = createSlaves();dispatchStrategy = newSubTaskDispatchStrategy();for(SlaveSpe<T, V> slave: slaves){slave.init();}}/*** 对子类暴露的服务方法,该类的子类需要定义一个比该方法命名更为具体的服务方法* 该方法用了 Template(模板)模式  strategy(策略)模式* @param params* @return* @throws Exception*/public R service(Object... params) throws Exception{final TaskDivideStrategy<T> taskDivideStrategy = newTaskDivideStrategy(params);Iterator<Future<V>> subResults = dispatchStrategy.dispatch(slaves, taskDivideStrategy);for(SlaveSpe<T, V> slave: slaves){slave.shutdown();}//slave.shutdown() 会等待线程处理完成再运行R result = combineResults(subResults);return result;}private SubTaskDispatchStrategy<T, V> newSubTaskDispatchStrategy() {return new RoundRobinSubTaskDispatchStrategy<T, V>();}//留给子类实现, 用于合并子任务处理结果public abstract R combineResults(Iterator<Future<V>> subResults);//留给子类实现, 用于创建原始任务分解算法策略public abstract TaskDivideStrategy<T> newTaskDivideStrategy(Object... params);//留给子类实现, 用于创建Slave参与者实例public abstract Set<? extends SlaveSpe<T, V>> createSlaves();
}public class Range {public final int lowerBound;public final int upperBound;public Range(int lowerBound, int upperBound) {if(upperBound < lowerBound){throw new IllegalArgumentException("upperBound should not be less than lowerBound");}this.lowerBound = lowerBound;this.upperBound = upperBound;}@Overridepublic String toString() {return "Range [lowerBound=" + lowerBound + ", upperBound=" + upperBound + "]";}
}/*** 质数生成器服务。模式角色: Master-Slave.Master*/
public class PrimeGeneratorService extends AbstractMaster<Range, Set<BigInteger>, Set<BigInteger>> {public PrimeGeneratorService(){this.init();}//对外提供的具体方法名public Set<BigInteger> generatePrime(int upperBound) throws Exception{return this.service(upperBound);}@Overridepublic Set<BigInteger> combineResults(Iterator<Future<Set<BigInteger>>> subResults) {Set<BigInteger> result = new TreeSet<BigInteger>();while(subResults.hasNext()){try {result.addAll(subResults.next().get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {Throwable cause = e.getCause();if(SubTaskFailureException.class.isInstance(cause)){@SuppressWarnings("rawtypes")RetryInfo retryInfo = ((SubTaskFailureException) cause).retryInfo;Object subTask = retryInfo.subTask;System.out.println("failed subTask: " + subTask);}}}return result;}@Overridepublic TaskDivideStrategy<Range> newTaskDivideStrategy(Object... params) {final int numOfSlaves = slaves.size();final int originalTaskScale = (int) params[0];final int subTaskScale = originalTaskScale / numOfSlaves;final int subTaskCount = (0 == (originalTaskScale % numOfSlaves)) ? numOfSlaves: numOfSlaves+1;TaskDivideStrategy<Range> tds = new TaskDivideStrategy<Range>() {private int i=1;@Overridepublic Range nextChunk() {int upperBound;if(i<subTaskCount){upperBound = i * subTaskScale;}else if(i==subTaskCount){upperBound = originalTaskScale;}else{return null;}int lowerBound = (i-1)*subTaskScale + 1;i++;return new Range(lowerBound, upperBound);}};return tds;}@Overridepublic Set<? extends SlaveSpe<Range, Set<BigInteger>>> createSlaves() {Set<PrimeGenerator> slaves = new HashSet<>();for(int i=0; i<Runtime.getRuntime().availableProcessors(); i++){slaves.add(new PrimeGenerator(new ArrayBlockingQueue<Runnable>(200)));}return slaves;}/*** 质数生成器。模式角色: Master-Slave.Slave*/private static class PrimeGenerator extends WorkerThreadSlave<Range, Set<BigInteger>>{public PrimeGenerator(BlockingQueue<Runnable> taskQueue){super(taskQueue);}@Overridepublic Set<BigInteger> doProcess(Range range) throws Exception {Set<BigInteger> result = new TreeSet<BigInteger>();BigInteger start = BigInteger.valueOf(range.lowerBound);BigInteger end = BigInteger.valueOf(range.upperBound);//java.math.BigInteger.nextProbablePrime() 此方法返回一个整数大于该BigInteger的素数while(-1 == (start = start.nextProbablePrime()).compareTo(end)){result.add(start);}return result;}}
}public class Main {public static void main(String[] args) throws Exception {for(int i=0;i<10;i++){PrimeGeneratorService service = new PrimeGeneratorService();Set<BigInteger> result = null;try {result = service.generatePrime(Integer.valueOf(1000*(i+1)));} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println("Generated " + result.size() + " prime:");System.out.println(result.toString());}}
}

主仆模式(Master-Slave)相关推荐

  1. 适合MySQL master/slave模式的JDBC driver: lbpool

    去年写了篇MySQL分表实现上百万上千万记录分布存储的批量查询设计模式的文章,思路是基于MySQL手动分表的.缺点是增加了程序的复杂性.现在有一个更简单和方便的现成的产品了,那就是lbpool lbp ...

  2. Redis: Redis的主从复制(Master/Slave),一主二仆,薪火相传,反客为主,哨兵模式sentinel

    命令: slaveof 主库ip  主库端口         配置从库 info replication                         查看redis连接情况 slaveof  no ...

  3. MongoDB学习笔记——Master/Slave主从复制

    Master/Slave主从复制 主从复制MongoDB中比较常用的一种方式,如果要实现主从复制至少应该有两个MongoDB实例,一个作为主节点负责客户端请求,另一个作为从节点负责从主节点映射数据,提 ...

  4. Mongodb源码分析--Replication之主从模式--Master

    mongodb中提供了复制(Replication)机制,通过该机制可以帮助我们很容易实现读写分离方案,并支持灾难恢复(服务器断电)等意外情况下的数据安全. 在老版本(1.6)中,Mongo提供了两种 ...

  5. 主从多机matlab代码,Jenkins的Master Slave主从进行多机多环境部署-配置

    当我们写好一个程序以后,需要在多台机器上进行部署操作,如果我们使用每台机器独立部署的话,就很耗时间,这个时候我们可以利用Jenkins的主从机制来进行部署操作. 首先看个图,大概理解一下Master ...

  6. 【嵌入式】Libmodbus之RTU模式Master端程序示例

    00. 目录 文章目录 00. 目录 01. 软件开发流程 02. 获取版本信息 03. 读写单个线圈程序示例 04. 读写多个线圈程序示例 05. 读写单个保持寄存器程序示例 06. 读写多个保持寄 ...

  7. Redis(八):Redis的复制(Master/Slave)

    Redis的复制(Master/Slave)目录导航: 是什么 能干嘛 怎么玩 复制原理 哨兵模式(sentinel) 复制的缺点 是什么 官网 行话:也就是我们所说的主从复制,主机数据更新后根据配置 ...

  8. redis主从复制_Redis 的主从复制(Master/Slave)

    1. 是什么 行话:也就是我们所说的主从复制,主机数据更新后根据配置和策略自动同步到备机的 master/slave 机制,Master以写为主,Slave 以读为主 2. 能干嘛 数据冗余:主从复制 ...

  9. PGPool-II master/slave mode using caveat

    PGPool-II的master/slave模式类似load balance模式.只不过master/slave是与PGPool-II以外的第三方复制软件结合使用的.(如当前支持的slony和stre ...

  10. pgpool-II(二)pgpool-II+repmgr(master/slave)+balance+pgpool

    用 repmgr实现 pg的master/slave 搭建高可用和自动切换过程略 (详见htthttps://blog.csdn.net/zhaowenzhong/article/details/80 ...

最新文章

  1. 【小项目关键技术二】UGV电机编码测速
  2. python 连接字符的方法(全)
  3. Spring-Cloud组件:eureka
  4. Oracle Comment 获取并修改表或字段注释
  5. services.xml应该放在项目的哪里_新轮胎应该放在前轮还是后轮?
  6. Hadoop2.2.0--Hadoop Federation、Automatic HA、Yarn完全分布式集群结构
  7. 北京理工大学计算机考研真题,北京理工大学计算机专业基础历年考研真题汇编附答案...
  8. 海美迪盒子android升级包,海美迪H5固件升级ROM系统刷机包下载_刷机教程
  9. dojo省份地市级联之省份Dao实现类(五)
  10. 【Matlab综合设计】开环Buck-Boost升压-降压式变换器Simulink仿真(含仿真模块选择和参数计算过程)
  11. asp.net 将bmp格式图片怎么转换为jpg_如何把jpeg转换成jpg?分享两种jpeg转换jpg的方法...
  12. 我那牛逼老公,创业成功但最后被辞退,仍然一毛钱股份没有拿到……
  13. c语言括号里三种字符,c语言的基本数据类型都有什么,麻烦知道用大括号分类,非常感谢...
  14. python如何画神经网络特征图
  15. Js中身份证验证及社会统一信用代码验证
  16. shell命令 vxworks5.5_VxWorks Shell下常用的命令
  17. 汉明码的原理、生成和检验
  18. 《Android》Chap.11 网络技术
  19. 记一次由于临时变量导致的CPU使用率过高问题
  20. Java获取代理地址和端口

热门文章

  1. outlook2013升级_Microsoft Outlook 2013入门指南
  2. Android学习计划[转载Sammy_Snail]
  3. aruba交换机配置命令_ArubaOS交换机日常配置指导
  4. python实现排序的lowB三人组:冒泡、插入、选择
  5. [Unity]摘录笔记UnityShader(解读shader代码构成)
  6. kubernetes入门之Downward API
  7. “携手共建互联网安全生态”研讨会在京召开
  8. 【多线程】解决SpringBoot热部署时layering-cache线程池被Tomcat Graceful shutdown,重新赋值方法区中 StatsServi
  9. html网页什么样的字体最好看,css设置各种中文字体样式代码
  10. Bitmap、CBitmap、HBITMAP以及BITMAP的相互转换