作者 | HelloGitHub

来源 | HelloGitHub(ID:GitHub520)

头图 | CSDN 下载自视觉中国

HelloGitHub 推出的《讲解开源项目》系列。讲解 PowerJob 系列即将接近尾声,本系列的干货你还喜欢吗?欢迎留言说下你的感受和后面想看的内容。

项目地址:https://github.com/KFCFans/PowerJob

MapReduce 概念介绍

MapReduce 是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念 Map(映射)和 Reduce(归约),是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。当前的软件实现是指定一个 Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的 Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

以上这一大段不算难懂的文字就是 MapReduce 的官方概念,从“大规模数据集”这个关键字可以看出,MapReduce 是面向大数据处理领域设计的,是分治思想的一种经典实现,简单概括下就是把一大坨数据通过 Map 方法切分为较小的、单机能够处理的数据块进行处理(shuffle),处理完成后通过 Reduce 方法汇总结果,具体流程如下图所示。

需求背景

PowerJob 作为任务调度中间件,核心职责是负责任务的调度。而 MapReduce 作为一个大数据处理模型,核心功能是大规模数据的并行处理。从表象看,PowerJob 和 MapReduce 纯属八杆子打不着的关系~相信很多人第一眼看到 PowerJob 和 MapReduce 这两个关键词一起出现时,都会有以下心理活动:

“你一个任务调度框架咋就硬要扯上 MapReduce 那么高端的概念呢?就硬蹭呗?”

其实这个问题,换个角度来思考,就能找到答案。

一般来讲,需要定时调度执行的都是离线数据同步任务,对于一些有一定体量的业务来说,这个离线数据规模可能很大,单机无法很好的完成计算。为了解决这个问题,目前市面上的调度框架普遍支持静态分片这种相对比较简陋的方式来完成分布式计算,即通过指定分片数量来调动固定数量的机器执行固定区间的任务。但很显然,这种方式非常不灵活,局限性也非常大。

那么如何实现复杂且庞大任务的分布式计算呢?阿里巴巴的 SchedulerX 团队给出了 MapReduce 这样的答案。通过自己编程的形式,实现 Map 方法,完成任务的切分,再通过 Reduce 汇总子任务结果,即可完成高度可定制的分布式计算。

PowerJob 的 MapReduce 实现便是借鉴了这一先进的思想,这里再次感谢 SchedulerX 团队~

示例用法

在 PowerJob 中,MapReduce 不再是高高在上、难以触碰的概念。得益于强大的底层实现和优雅的 API 设计,开发者仅需要寥寥数行代码便可完成大型任务的分布式计算,具体示例如下。

对于有分布式计算需求的任务,我们需要继承特定的抽象类 MapReduceProcessor 来开启分布式计算能力,该接口要求开发者实现两个方法,分别是 process 和 reduce。前者负责任务的具体执行,后者负责汇集所有子任务得出具体的结果。同时,该抽象类默认提供两个可用方法:isRootTask 和 map。通过调用 isRootTask 方法可以判断出当前 Task 是否为根任务,如果是根任务,则进行任务的切分(PowerJob 支持任意级 map,并不只有在根任务才能切分任务),然后调用 map 方法分发子任务。

下面放一段简单的代码示例帮助大家理解。下面这段代码模拟了目前市面上主流的“静态分片”式分布式处理,即通过控制台指定分片数量和参数(比如分3片,分片参数为:1=a&2=b&3=c)来控制参与计算的机器数量和起始参数。虽然是“杀鸡焉用牛刀”的示例,不过还是能帮助大家很好理解 PowerJob MapReduce 处理器的强大之处!

首先,我们通过 context 的 getJobParams 方法获取控制台配置的参数,即分片参数 1=a&2=b&3=c。这个分片参数代表现在需要有 3 台机器参与执行,每台机器上子任务的起始参数分别为 a、b、c。因此,我们可以根据该规则创建子任务对象 SubTask,传入分片索引 index 和 分片参数 params。

完成子任务的切分后,即可调用 map 方法完成任务的分发。

分发后该子任务会再次进入 process 方法,只不过本次是以 SubTask 而不是 RootTask 的身份进入。我们可以通过 context.getSubTask() 方法获取之前 map 出去的对象,该方法的返回值是 Object,因此我们需要使用 Java instaneof 关键字判断类型(当然,如果没有多级 map,那么该对象只可能是 SubTask 类型,直接强转即可),如果该对象为 SubTask 类型,即进行了子任务处理阶段,开始编写子任务处理逻辑即可。

当所有子任务执行完毕后,PowerJob 会调用 reduce 方法,传入所有子任务的运行结果,便于开发者构建该任务的最终结果。

@Component
public class StaticSliceProcessor extends MapReduceProcessor {@Overridepublic ProcessResult process(TaskContext context) throws Exception {OmsLogger omsLogger = context.getOmsLogger();// root task 负责分发任务if (isRootTask()) {// 从控制台传递分片参数,假设格式为KV:1=a&2=b&3=cString jobParams = context.getJobParams();Map<String, String> paramsMap = Splitter.on("&").withKeyValueSeparator("=").split(jobParams);List<SubTask> subTasks = Lists.newLinkedList();paramsMap.forEach((k, v) -> subTasks.add(new SubTask(Integer.parseInt(k), v)));return map(subTasks, "SLICE_TASK");}Object subTask = context.getSubTask();if (subTask instanceof SubTask) {// 实际处理// 当然,如果觉得 subTask 还是很大,也可以继续分发哦return new ProcessResult(true, "subTask:" + ((SubTask) subTask).getIndex() + " process successfully");}return new ProcessResult(false, "UNKNOWN BUG");}@Overridepublic ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {// 按需求做一些统计工作... 不需要的话,直接使用 Map 处理器即可return new ProcessResult(true, "xxxx");}@Getter@NoArgsConstructor@AllArgsConstructorprivate static class SubTask {private int index;private String params;}
}

原理实现

PowerJob 的 MapReduce 思想主要来源于《Schedulerx2.0 分布式计算原理&最佳实践》这篇文章。

出于功能职责的划分(powerjob-server 仅负责任务的调度和运维),整个 MapReduce 任务的计算由执行器 powerjob-worker 自主完成。

为了便于模型的设计和功能的划分,PowerJob 为执行器节点分配了 3 种角色,分别是 TaskTracker、ProcessorTracker 和 Processor。

  • TaskTracker 是每一个任务的主节点,充当集群中的 master 角色,因此每个任务每次只会产生一个 TaskTracker。它负责子任务的分发、状态监控和集群中各执行节点的健康检查,并定期将任务的运行时信息上报给 server。

  • ProcessorTracker 是每一个执行器节点中负责执行器管理的角色,每个任务在每个执行器节点(JVM 实例)上都会产生一个 ProcessorTracker。它负责管理执行器节点任务的执行,包括接受来自 TaskTracker 的任务、上报本机任务执行情况和执行状态等功能。

  • Processor 是每一个执行器节点中负责具体执行任务的角色,也就是真正的执行单元,每个任务在每个执行器节点都会生成若干个 Processor(没错!就是控制台“实例并发数”所决定的数量)。它接受来自 ProcessorTracker 派发的执行任务并完成计算。

当需要执行分布式任务时,powerjob-server 会根据集群中各个 worker 节点的内存占用、CPU 使用率和磁盘使用率进行健康度计算,得分最高的节点将作为本次任务的 master 节点,即承担 TaskTracker 的职责。TaskTracker 在接收到来自 server 的任务执行请求时被创建,并完成三个阶段的初始化:

  • 首先需要初始化内嵌的 H2 数据库,用于存储所有子任务的派发情况和执行情况。

  • 存储就位后,TaskTracker 会根据 server 派发下来的任务内容,构建根任务,并将其持久化到内嵌数据库。

  • 最后 TaskTracker 会创建一系列定时任务,包括子任务定时派发、子任务执行状态检查、worker 健康度检查和任务整体执行状态上报。

ProcessorTracker 在接收到来自 TaskTracker 的子任务执行请求时被创建,并根据请求中携带的任务信息构建出执行所需要的线程池和对应的处理器。当子任务的运行状态发生变更后,ProcessorTracker 需要及时将最新状态反馈给 TaskTracker。

至于 Processor,本质上就是封装了每个子任务上下文信息的线程,由 ProcessorTracker 提交到执行线程池进行执行,并向上级汇报自己的执行状态。

上图清晰地展示了 PowerJob MapReduce 的工作原理,由于 MapReduce 确实算得上是非常复杂和精妙的实现,一篇文章的篇幅肯定是无法将细节说的一清二楚的。因此本文偏向于整体上的介绍,为大家讲述核心组件的划分依据和主要功能。如果对具体的细节有兴趣,那么源码是最好的资料~在本文的指导下,我个人认为花不了一天时间就能差不多看懂~

更多精彩推荐
☞程序员删库被判 6 年,公司损失近亿,云原生时代如何打造安全防线?☞8次迭代5大升级,旷视天元1.0预览版正式发布☞曾是谷歌程序员,抛下百万年薪创业,4 年成就 7 亿用户,今身价百亿!
☞首次在手机端不牺牲准确率实现BERT实时推理,比TensorFlow-Lite快近8倍,每帧只需45ms
☞Service Mesh 在超大规模场景下的落地挑战
☞比特币背后的技术,是否已成为科技领军代表?
点分享点点赞点在看

PowerJob 应对庞大任务的锦囊妙计:MapReduce相关推荐

  1. hadoop生态圈面试精华之MapReduce(二)

    hadoop生态圈面试精华之MapReduce(二) shuGle为什么要排序? 问过的一些公司:携程(2021.09),网易有道(2021.09) 参考答案: shuffle排序,按字典顺序排序的, ...

  2. 夜来风雨声,MapReduce知多少?

    文章目录 引言 批处理系统 MapReduce 把数据放在一起 排序-合并join 输出 容错 落后者 straggler 改进 总结 引言 正如DDIA上所说,MapReduce论文发表时从某种意义 ...

  3. 苏宁“砍价团”高可用、高并发架构实践

    " 苏宁拼购 808 的火爆见证了砍价团的成功,作为一种新兴的购物营销玩法,砍价团展现出了巨大的商业潜力.不同于传统购物流程的单一模式,砍价团凝练了购物玩法和社群营销的精髓. 来自:51ct ...

  4. 英特尔详解5G将如何助力VR的未来发展

    来源:亿欧智库 摘要:英特尔中国研究院通过重点研究通信和计算融合的方式,解释了5G在面对巨大数据洪流时数据处理和传输的方式,通过边缘计算的新方法,使VR达到传输要求.打造一个高效的通信和计算系统,通往 ...

  5. 索引初识一 MySql

    1 mysql索引类型[主要分4类索引] 创建索引: 1.添加PRIMARY KEY(主键索引) [主键:一种唯一性索引,必须指定为primary key ] mysql> ALTER TABL ...

  6. 云计算革命对国际关系的影响

    云计算对未来国际关系的影响主要集中在两个方面,一是对未来国际体系的影响,二是对国际关系中行为体力量对比的影响. 云计算战略的出现及应用 人类社会已迈入到一个由互联网构成的虚拟网络空间与现实空间交互重叠 ...

  7. 从双11看技术趋势:金融行业总动员之阿里金融云

    刚刚结束的天猫双11,让全世界震撼的不只是逆天的销售额,还有深不可见的科技力量. 在这场全球共振的狂欢中,阿里巴巴展现了从一家电商公司转型到一家以技术为驱动的科技公司的深刻转变.阿里巴巴的支付.物流. ...

  8. 产品经理如何锻炼自己看透事物本质的能力

    <教父>里有句话,"花半秒钟就看透事物本质的人和花一辈子看不清的人,注定是截然不同的命运."可见看透事物本质能力对我们影响之大. 而我们常说的,需求分析,就是看透事物本 ...

  9. software reporter tool占用高_看完这篇DBA工作详解,你觉得平均月薪17000真的高吗?...

    DBA(数据库管理员)是企业关键业务应用中非常重要的角色, 数据库管理系统 (DBMS)管理和维护的守护神,位置举足轻重.然而在企业招聘中极少出现相关岗位,更多时候出现在删库跑路的段子里面. 据了解, ...

最新文章

  1. 第二十二课.DeepGraphLibrary(三)
  2. 1119 Pre- and Post-order Traversals (30 分)【难度: 难 / 知识点: 树的构建】
  3. 复旦大学吴立德《数值优化》、《深度学习》和
  4. Linux安装Nginx使用负载均衡
  5. 【渝粤题库】陕西师范大学151205 财务管理原理作业(笔试题型)
  6. 四川汶川地震祈福赈灾宣传画、报纸头版精选第二版(超多图)
  7. 你相信逛B站也能学编程吗?
  8. java类型提升_java表达式中类型的自动提升(转)
  9. TCP系列51—拥塞控制—14、TLP、ER与拥塞控制
  10. C#首席设计师Anders Hejlsberg专访
  11. 手把手玩转win8开发系列课程(9)
  12. python笔记-动态类型
  13. 手机内置摄像头接线图解_1000以下手机哪款好?8款千元以内性价比最高的手机推荐...
  14. winform中notifyIcon的ShowBalloonTip方法的坑
  15. Day2 Excel与数据处理之定位条件、选择性粘贴及查找功能
  16. Linux下ps -ef和ps aux
  17. android sku 库存管理,建议收藏!为什么合理的SKU设置对有效库存管理与销售至关重要?...
  18. python pandas 讲解ppt_Python数据分析之pandas基本功能讲解
  19. Android-vold源码分析之连接电脑OTG(11)
  20. 非盈利组织能力建设探讨

热门文章

  1. jQuery学习四——效果
  2. 分析日志下载时间脚本
  3. 页面自动刷新代码大全
  4. [FFmpeg] 多个图片合成视频
  5. [FFmpeg] RGBA 和 YUV 存储方式
  6. 第3章 关系数据模型
  7. [链表遍历|模拟] leetcode 2 两数相加
  8. spring容器_Spring 容器的启动过程探秘
  9. resolving xxx failed: Temporary failure in name resolution解决
  10. 二叉树中最大的二叉搜索子树