1. 前言

Schedulerx2.0的客户端提供分布式执行、多种任务类型、统一日志等框架,用户只要依赖schedulerx-worker这个jar包,通过schedulerx2.0提供的编程模型,简单几行代码就能实现一套高可靠可运维的分布式执行引擎。

这篇文章重点是介绍基于schedulerx2.0的分布式执行引擎原理和最佳实践,相信看完这篇文章,大家都能写出高效率的分布式作业,说不定速度能提升好几倍:)

2. 可扩展的执行引擎

Worker总体架构参考Yarn的架构,分为TaskMaster, Container, Processor三层:

  • TaskMaster:类似于yarn的AppMaster,支持可扩展的分布式执行框架,进行整个jobInstance的生命周期管理、container的资源管理,同时还有failover等能力。默认实现StandaloneTaskMaster(单机执行),BroadcastTaskMaster(广播执行),MapTaskMaster(并行计算、内存网格、网格计算),MapReduceTaskMaster(并行计算、内存网格、网格计算)。
  • Container:执行业务逻辑的容器框架,支持线程/进程/docker/actor等。
  • Processor:业务逻辑框架,不同的processor表示不同的任务类型。

以MapTaskMaster为例,大概的原理如下图所示:

3. 分布式编程模型之Map模型

Schedulerx2.0提供了多种分布式编程模型,这篇文章主要介绍Map模型(之后的文章还会介绍MapReduce模型,适用更多的业务场景),简单几行代码就可以将海量数据分布式到多台机器上进行分布式跑批,非常简单易用。

针对不同的跑批场景,map模型作业还提供了并行计算、内存网格、网格计算三种执行方式:

  • 并行计算:子任务300以下,有子任务列表。
  • 内存网格:子任务5W以下,无子任务列表,速度快。
  • 网格计算:子任务100W以下,无子任务列表。

4. 并行计算原理

因为并行任务具有子任务列表:


如上图,子任务列表可以看到每个子任务的状态、机器,还有重跑、查看日志等操作。

因为并行计算要做到子任务级别的可视化,并且worker挂了、重启还能支持手动重跑,就需要把task持久化到server端:


如上图所示:

  1. server触发jobInstance到某个worker,选中为master。
  2. MapTaskMaster选择某个worker执行root任务,当执行map方法时,会回调MapTaskMaster。
  3. MapTaskMaster收到map方法,会把task持久化到server端。
  4. 同时,MapTaskMaster还有个pull线程,不停拉取INIT状态的task,并派发给其他worker执行。

5. 网格计算原理

网格计算要支持百万级别的task,如果所有任务都往server回写,server肯定扛不住,所以网格计算的存储实际上是分布式在用户自己的机器上的:

如上图所示:

  1. server触发jobInstance到某个worker,选中为master。
  2. MapTaskMaster选择某个worker执行root任务,当执行map方法时,会回调MapTaskMaster。
  3. MapTaskMaster收到map方法,会把task持久化到本地h2数据库。
  4. 同时,MapTaskMaster还有个pull线程,不停拉取INIT状态的task,并派发给其他worker执行。

6. 最佳实践

6.1 需求

举个例子:

  1. 读取A表中status=0的数据。
  2. 处理这些数据,插入B表。
  3. 把A表中处理过的数据的修改status=1。
  4. 数据量有4亿+,希望缩短时间。

6.2 反面案例

我们先看下如下代码是否有问题?

public class ScanSingleTableProcessor extends MapJobProcessor {private static int pageSize = 1000;@Overridepublic ProcessResult process(JobContext context) {String taskName = context.getTaskName();Object task = context.getTask();if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(taskName)) {int recordCount = queryRecordCount();int pageAmount = recordCount / pageSize;//计算分页数量for(int i = 0 ; i < pageAmount ; i ++) {List<Record> recordList = queryRecord(i);//根据分页查询一页数据map(recordList, "record记录");//把子任务分发出去并行处理}return new ProcessResult(true);//true表示执行成功,false表示失败} else if ("record记录".equals(taskName)) {//TODOreturn new ProcessResult(true);}return new ProcessResult(false);}
}

如上面的代码所示,在root任务中,会把数据库所有记录读取出来,每一行就是一个Record,然后分发出去,分布式到不同的worker上去执行。逻辑是没有问题的,但是实际上性能非常的差。结合网格计算原理,我们把上面的代码绘制成下面这幅图:

如上图所示,root任务一开始会全量的读取A表的数据,然后会全量的存到h2中,pull线程还会全量的从h2读取一次所有的task,还会分发给所有客户端。所以实际上对A表中的数据:

  • 全量读2次
  • 全量写一次
  • 全量传输一次

这个效率是非常低的。

6.3 正面案例

下面给出正面案例的代码:

public class ScanSingleTableJobProcessor extends MapJobProcessor {private static final int pageSize = 100;static class PageTask {private int startId;private int endId;public PageTask(int startId, int endId) {this.startId = startId;this.endId = endId;}public int getStartId() {return startId;}public int getEndId() {return endId;}}@Overridepublic ProcessResult process(JobContext context) {String taskName = context.getTaskName();Object task = context.getTask();if (taskName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {System.out.println("start root task");Pair<Integer, Integer> idPair = queryMinAndMaxId();int minId = idPair.getFirst();int maxId = idPair.getSecond();List<PageTask> taskList = Lists.newArrayList();int step = (int) ((maxId - minId) / pageSize); //计算分页数量for (int i = minId; i < maxId; i+=step) {taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));}return map(taskList, "Level1Dispatch");} else if (taskName.equals("Level1Dispatch")) {PageTask record = (PageTask)task;long startId = record.getStartId();long endId = record.getEndId();//TODOreturn new ProcessResult(true);}return new ProcessResult(true);}@Overridepublic void postProcess(JobContext context) {//TODOSystem.out.println("all tasks is finished.");}private Pair<Integer, Integer> queryMinAndMaxId() {//TODO select min(id),max(id) from xxxreturn null;}
}

如上面的代码所示,

  • 每个task不是整行记录的record,而是PageTask,里面就2个字段,startId和endId。
  • root任务,没有全量的读取A表,而是读一下整张表的minId和maxId,然后构造PageTask进行分页。比如task1表示PageTask[1,1000],task2表示PageTask[1001,2000]。每个task处理A表不同的数据。
  • 在下一级task中,如果拿到的是PageTask,再根据id区间去A表处理数据。

根据上面的代码和网格计算原理,得出下面这幅图:

如上图所示,

  • A表只需要全量读取一次。
  • 子任务数量比反面案例少了上千、上万倍。
  • 子任务的body非常小,如果recod中有大字段,也少了上千、上万倍。

综上,对A表访问次数少了好几倍,对h2存储压力少了上万倍,不但执行速度可以快很多,还保证不会把自己本地的h2数据库搞挂。

原文链接
本文为云栖社区原创内容,未经允许不得转载。

Schedulerx2.0分布式计算原理最佳实践相关推荐

  1. 从零到一构建完整知识体系,阿里最新SpringBoot原理最佳实践真香

    Spring Boot不用多说,是咱们Java程序员必须熟练掌握的基本技能.工作上它让配置.代码编写.部署和监控都更简单,面试时互联网企业招聘对于Spring Boot这个系统开发的首选框架也是考察的 ...

  2. Rocketmq原理最佳实践

    一. MQ背景&选型 消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性. 目前主流的MQ主要是Rocketmq.kafka.Rabbitmq,Rocketmq ...

  3. spring 2.0核心技术与最佳实践 pdf_推荐 Spring Boot 实践学习案例大全 数据缓存 和中间件 安全权限...

    概况 spring boot 实践学习案例 spring boot 初学者及核心技术巩固的最佳实践 目录 『 Spring Boot 2 快速教程 』 Spring Boot 2:WebFlux集成 ...

  4. 任务调度Schedulerx2.0分布式计算之MapReduce模型

    简介 阿里巴巴任务调度Schedulerx2.0自研轻量级分布式模型MapReduce,可以进行大数据的实时/离线跑批.通过一个map方法就能将海量数据分布式到多台机器上执行,通过process方法处 ...

  5. [译]高效的TensorFlow 2.0:应用最佳实践以及有什么变化

    Tensorflow团队早早就放出了风声,Tensorflow 2.0就快来了,这是一个重要的里程碑版本,重点放在简单和易用性上.我对Tensorflow 2.0的到来充满期待,因此翻译了这篇Tens ...

  6. Android 6.0 权限管理最佳实践

    博客: Android 6.0 运行时权限管理最佳实践 github: https://github.com/yanzhenjie/AndPermission

  7. 学习笔记TF061:分布式TensorFlow,分布式原理、最佳实践

    分布式TensorFlow由高性能gRPC库底层技术支持.Martin Abadi.Ashish Agarwal.Paul Barham论文<TensorFlow:Large-Scale Mac ...

  8. Spring Validation最佳实践及其实现原理,参数校验没那么简单!

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:六点半起床 juejin.im/post/685654110 ...

  9. Spring Validation 最佳实践及其实现原理,参数校验没那么简单!

    之前也写过一篇关于Spring Validation使用的文章,不过自我感觉还是浮于表面,本次打算彻底搞懂Spring Validation.本文会详细介绍Spring Validation各种场景下 ...

最新文章

  1. Apache的服务端包含--SSI
  2. Java中利用socket实现简单的服务端与客户端的通信(入门级)
  3. ajax头文件报错,AJAX的CSRF保护
  4. 【实战分享】漫谈 gRPC的选型
  5. 【Calcite】Calcite入门
  6. grootJsAPI文档
  7. 3.0-rsync格式
  8. Angular之constructor和ngOnInit差异及适用场景
  9. php scrscriptipt,xss跨站脚本攻击 (初级-中级-高级)
  10. 20201125 plecs更新
  11. 1386 - Cellular Automaton
  12. 电脑如何实现微信多开
  13. IComponent2 Interface 学习
  14. 简历制作(项目经验)
  15. 去除Ninja的提醒
  16. python实现1/n倍频程计算
  17. 【038】MySQL中varchar与char的区别以及varchar(50)中的50代表的涵义?
  18. java基于SSM的宠物医院信息管理系统-计算机毕业设计
  19. 微信小程序图片空白问题处理
  20. 用c语言编写基于sht10传感器的仓库温湿度监测系统的程序,单片机远程仓库湿度监测系统仿真max487+sht11源程序+电路原理图...

热门文章

  1. Java程序员常犯的几类错误
  2. int转unsigned int_谢劲课题组在基于锰催化的转金属化基元反应取得系列进展
  3. C/C++初学者快速提升?
  4. oracle安装过程掉电,Oracle数据库掉电后ORA-01172磁盘坏块解决方法
  5. html一行中怎么写空格,html – 用一行填写空格
  6. python urllib发送post请求_python爬虫 urllib模块发起post请求过程解析
  7. 北京重磅发布:杰青、优青放宽女性年龄限制,基金人才评审“同等条件下女性优先”...
  8. 如何理解傅立叶级数、傅立叶变换公式?
  9. 【福利】周志华教授专著《集成学习:基础与算法》上市,豆瓣满分森林书破解AI实践难题...
  10. Kubernetes存储卷的使用