mk-assignments主要功能就是产生Executor与节点+端口的对应关系,将Executor分配到某个节点的某个端口上,以及进行相应的调度处理。代码注释如下:

 1 ;;参数nimbus为nimbus-data对象,:scratch-topology-id为需要重新调度的Topology的id
 2 (defnk mk-assignments [nimbus :scratch-topology-id nil]
 3   (let [conf (:conf nimbus);;分别从nimbus-data中获取conf,storm-cluster-state和inimbus对象,并将其保存为临时变量
 4         storm-cluster-state (:storm-cluster-state nimbus)
 5         ^INimbus inimbus (:inimbus nimbus)
 6         ;;从zk中读取所有活跃的Topologies,获取他们id的集合
 7         topology-ids (.active-storms storm-cluster-state)
 8         ;;根据前面得到的Topology-id的集合,对每一个id调用read-topology-details方法
 9         ;;从参数nimbus-data中获取topology-details信息,并以<topology-id,topology-details>保存在集合中
10         topologies (into {} (for [tid topology-ids]
11                               {tid (read-topology-details nimbus tid)}))
12         ;;利用前面得到的<topology-id,topology-details>集合创建Topologies对象
13         topologies (Topologies. topologies)
14         ;;读取所有已经分配资源的Topology的id的集合。
15         assigned-topology-ids (.assignments storm-cluster-state nil)
16         existing-assignments (into {} (for [tid assigned-topology-ids]
17           ;; 对于那些已经分配资源但需要重新调度的Topology(由scratch-topology-id指定),
18           ;; 我们忽略其之前的分配,故之前分配占用的所有slot将被视为空闲slot(空闲资源),可重新被调度使用。
19           (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
20               {tid (.assignment-info storm-cluster-state tid nil)})))
21         ;; 调用compute-new-topology->executor->node+port方法为所有Topologies计算新的调度,
22         ;; 并返回topology->executor->node+port
23         topology->executor->node+port (compute-new-topology->executor->node+port
24                                        nimbus
25                                        existing-assignments
26                                        topologies
27                                        scratch-topology-id)
28
29         ;;获取当前系统时间(秒)
30         now-secs (current-time-secs)
31         ;;调用basic-supervisor-details-map方法获取ZooKeeper中所有的SupervisorInfo信息,
32         ;;然后将其转换为<supervisor-id,SupervisorDetails>集合,具体操作看1
33         basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state)
34
35         ;; 对topology->executor->node+port中各项进行处理,通过添加开始时间等构建最终的作业
36         ;; 返回得到<topology-id Assignment>集合
37         new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
38             ;;根据topology-id获取Topology的任务分配情况
39             :let [existing-assignment (get existing-assignments topology-id)
40                   ;;从executor->node+port信息中提取所有的节点信息
41                   all-nodes (->> executor->node+port vals (map first) set)
42                   ;;根据all-nodes获取每个节点的主机名信息,并返回一个<node hostname>集合
43                   node->host (->> all-nodes
44                                   (mapcat (fn [node]
45                                             (if-let [host (.getHostName inimbus basic-supervisor-details-map node)]
46                                               [[node host]]
47                                               )))
48                                   (into {}))
49                   ;;将上述获取到的<node, hostname>集合和<node, host>集合,得到所有<node host>关系.
50                   ;;如果存在相同的node,则与其对应的主机名将采用<node,hostname>集合中的值
51                   all-node->host (merge (:node->host existing-assignment) node->host)
52                   ;;调用changed-executors,通过将executor->node+port信息同existing-assignment中的信息进行比对,
53                   ;;计算出所有被重新分配的Executor
54                   reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port)
55                   ;;通过将已经存在的assignment中的executor->start-time-secs信息
56                   ;;与所有被重新分配的通过将已经存在的assignment中的executor->start-time-secs进行合并,
57                   ;;获得最新的所有<executor,start-time-secs>集合
58                   start-times (merge (:executor->start-time-secs existing-assignment)
59                                     (into {}
60                                           (for [id reassign-executors]
61                                             [id now-secs]
62                                             )))]]
63        ;;创建Assignment对象,参数分别为该Topology在Nimbus服务器上的root文件夹路径、
64        ;;<node,host>集合、新的executor->node+port映射关系以及新的<executor,start-time-secs>集合
65        {topology-id (Assignment.
66                      (master-stormdist-root conf topology-id)
67                      (select-keys all-node->host all-nodes)
68                      executor->node+port
69                      start-times)}))]
70
71     ;; 对于新计算的<topology-id,assignment>集合中的每一项,比较其新的调度与当前运行时的调度之间是否发生了变化
72     ;; 如果没有发生变化,就打印一条记录;否则将该Topology在ZooKeeper中保存的调度结果更新assignment
73     (doseq [[topology-id assignment] new-assignments
74             :let [existing-assignment (get existing-assignments topology-id)
75                   topology-details (.getById topologies topology-id)]]
76       (if (= existing-assignment assignment)
77         (log-debug "Assignment for " topology-id " hasn't changed")
78         (do
79           (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
80           (.set-assignment! storm-cluster-state topology-id assignment)
81           )))
82     ;;对于前面得到的new-assignments中的每一项,首先计算出新增的slot,
83     ;;再将其转换化为worker-slot对象,返回的是<topology-id,worker-slot>集合,
84     ;;最后调用inimbus的assignSlots方法来分配slot
85     (->> new-assignments
86           (map (fn [[topology-id assignment]]
87             (let [existing-assignment (get existing-assignments topology-id)]
88               [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
89               )))
90           (into {})
91           (.assignSlots inimbus topologies))
92     ))

在该过程中,如果某个Slot不存在Executor的超时,而Supervisor的ZooKeeper心跳超时时,
认为当前Slot依然有效,可以分配认为.最坏的情况就是这些分配过去的Executor会超时,在下一轮的分配过程中,则不会分配。

mk-assignments的详细过程如下:
1.从ZooKeeper中读取所有活跃的Topologies
2.从ZooKeeper中读取当前的assignments,获取所有已经分配资源的Topology的id的集合。
3.对Topologies进行新assignments
3.1通过调用computer-topology->executors取出所有已经assignment的topology的executors
3.2 update-all-heartbeats,对每一个Topology进行更新心跳
3.3调用compute-topology->alive-executors过滤topology->executors,保留alive的executors
3.4调用compute->supervisor->dead-ports找出dead ports
3.5调用compute-topology->scheduler-assignment转换ZooKeeper中的assignment为SchedulerAssignment
3.6通过调用missing-assignment-topologies找出需要从新assign的Topology
3.7通过调用all-scheduling-slots得到所有Supervisor节点中可用的slot数量
3.8调用read-all-supervisor-details得到所有的Supervisor节点SupervisorDetails
3.9获取backtype.storm.scheduler.Cluster
3.10调用scheduler.schedule分配所有的Topologies
3.11通过调用compute-topology->executor->node_port转换SchedulerAssignment为Assignment,输出ressign日志
4.通过将已经存在的assignment中的executor->start-time-secs信息与所有被重新分配的通过将已经存在的assignment中的executor->start-time-secs进行合并,获得最新的所有<executor,start-time-secs>集合,补充start-times等信息,获得new-assignments。
5.调用set-assignment!将新的assignment结果写入ZooKeeper.

  

mk-assignments负责对当前集群中所有Topology进行新一轮的任务调度。先检查已运行的Topology所占用的资源,判断它们是否有问题以及重新分配;根据系统当前的可用资源,为新提交的Topology分配任务。mk-assignments会将所有assignment信息更新到ZooKeeper中,Supervisor周期性地检查这些分配信息,并根据这些分配信息做相应的调度处理。

注:学习李明老师Storm源码分析和陈敏敏老师Storm技术内幕与大数据实现的笔记的整理。 
欢迎关注下面二维码进行技术交流:

JStorm与Storm源码分析(二)--任务分配,assignmen相关推荐

  1. JStorm与Storm源码分析(二)--任务分配,assignment

    mk-assignments主要功能就是产生Executor与节点+端口的对应关系,将Executor分配到某个节点的某个端口上,以及进行相应的调度处理.代码注释如下: ;;参数nimbus为nimb ...

  2. JStorm与Storm源码分析(三)--Scheduler,调度器

    Scheduler作为Storm的调度器,负责为Topology分配可用资源. Storm提供了IScheduler接口,用户可以通过实现该接口来自定义Scheduler. 其定义如下: public ...

  3. JStorm与Storm源码分析(一)--nimbus-data

    Nimbus里定义了一些共享数据结构,比如nimbus-data. nimbus-data结构里定义了很多公用的数据,请看下面代码: (defn nimbus-data [conf inimbus]( ...

  4. JStorm与Storm源码分析(四)--均衡调度器,EvenScheduler

    EvenScheduler同DefaultScheduler一样,同样实现了IScheduler接口,  由下面代码可以看出: (ns backtype.storm.scheduler.EvenSch ...

  5. JStorm与Storm源码分析(八)--计时器工具-mk-timer

    Storm使用计时器线程来处理一些周期性调度事件. 与计时器相关的操作主要有:创建计时器线程.查看线程是否活跃.向线程中加入新的待调度事件.取消计时器线程 mk-timer方法用于创建一个计时器线程. ...

  6. JStorm与Storm源码分析(七)--BasicBoltExecutor与装饰模式

    在Storm中IBasicBolt的主要作用是为用户提供一种更为简单的Bolt编写方式,更为简单体现在Storm框架本身帮你处理了所发出消息的Ack.Fail和Anchor操作,而这部分操作是由执行器 ...

  7. JStorm与Storm源码分析(六)--收集器 IOutputCollector 、OutputCollector

    在Storm中,多个地方使用了IOutputCollector收集器接口,收集器OutputCollector的接口就是IOutputCollector.所以有必要对接口IOutputCollecto ...

  8. JStorm与Storm源码分析(五)--SpoutOutputCollector与代理模式

    本文主要是解析SpoutOutputCollector源码,顺便分析该类中所涉及的设计模式–代理模式. 首先介绍一下Spout输出收集器接口–ISpoutOutputCollector,该接口主要声明 ...

  9. Storm源码分析之四: Trident源码分析

    Storm源码分析之四: Trident源码分析 @(STORM)[storm] Storm源码分析之四 Trident源码分析 一概述 0小结 1简介 2关键类 1Spout的创建 2spout的消 ...

最新文章

  1. R语言ggplot2可视化:可视化堆叠的直方图、添加每个分组的每个bin的计数标签、在堆叠直方图的bin中的每个分组部分添加数值标签
  2. Linux发布环境,linux发布环境初始化脚本
  3. VS2012下安装配置OpenCV2.4.9的方法
  4. C语言经典算法100例-037-给10个数排序
  5. spring的PathMatchingResourcePatternResolver基于ant通配符匹配路径遍历项目所有xml文件
  6. Day-6:创建计算字段
  7. windows商店_Windows记事本应用现在可以从Microsoft Store中获得
  8. [置顶] Android adb root权限
  9. java file 字典查询_File listFiles
  10. 西门子1200控制台达A2伺服458通讯控制博图V15.1
  11. JS代码错误:Deleting local variable in strict mode
  12. 交中IB课程中心2022届早申阶段录取成果汇总
  13. python拼图游戏代码_Python图像处理——人物拼图游戏
  14. Pedestrian Detection paper
  15. Linux下GPT分区,gdisk修复损坏的分区表
  16. 针对L型区域的椭圆方程的差分法
  17. mysql_upgrade --force,MySQL force upgrade
  18. 计算机网络第二章选择题,计算机网络技术第二章习题
  19. Android 拨打电话各安卓版本适用
  20. Java实现QQ第三方登录

热门文章

  1. Pyth笔记-高级装饰器
  2. SQL中删除数据,保留表结构。
  3. 《MobileNetV2: Inverted Residuals and Linear Bottlenecks》论文学习笔记
  4. 通过lutris在linux上玩桌面版阴阳师
  5. vue制作一个好看的网页
  6. 微信小程序组件仿某音
  7. scrapy 解决中途中断爬取问题
  8. 【调剂】兰州理工大学2021年调剂工作研究生招生学院招生专业及联系方式
  9. docker 报错 Structure needs cleaning
  10. Quantum Espresso + Phonopy 计算声子过程