Scheduler作为Storm的调度器,负责为Topology分配可用资源。
Storm提供了IScheduler接口,用户可以通过实现该接口来自定义Scheduler。
其定义如下:

public interface IScheduler {//接收当前Nimbus的Storm配置作为参数,进行一些初始化工作void prepare(Map conf);/*** 真正进行任务分配的方法,在Nimbus进行任务分配的时候会调用该方法.* 参数为topologies、cluster:前者含有当前集群中所有Topology的静态信息,* cluster包含了Topology的运行态信息,比如用户自定义调度逻辑时所需要的所有资源、* Supervisor信息、当前可用的所有slot* 以及任物分配情况等,根据topologies和cluster信息,就可以进行调度分配任务了*/void schedule(Topologies topologies, Cluster cluster);
}

真正选择哪个调度器来对Topology进行分配的方法是mk-assignments。
mk-assignments方法定义与解释如下:

;;参数:stormConf和接口INimbus的实现类实例
(defn mk-scheduler [conf inimbus];;调用inimbus中的getForcedScheduler方法,并将返回值赋给临时变量forced-scheduler(let [forced-scheduler (.getForcedScheduler inimbus)scheduler (cond;;若调用的getForcedScheduler方法,返回的是非null的IScheduler,则返回该IScheduler实例forced-scheduler(do (log-message "Using forced scheduler from INimbus " (class forced-scheduler)) forced-scheduler);;如果用户实现了自定义的IScheduler,并且在storm.yaml中有配置,;;则返回用户自定义的IScheduler.(conf STORM-SCHEDULER)(do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER)) (-> (conf STORM-SCHEDULER) new-instance));;如果上述都不满足则返回默认的DefaultScheduler:else(do (log-message "Using default scheduler") (DefaultScheduler.)))](.prepare scheduler conf)scheduler))

从上述代码可以看出,如果调用inimbus中的getForcedScheduler方法,且返回的是非null的IScheduler,则返回该IScheduler实例;如果用户实现了自定义的IScheduler,并且在storm.yaml中有配置,则返回用户自定义的IScheduler;如果两者都没有实现,则采用默认调度器DefaultScheduler进行任务的分配。现在我们只关心DefaultScheduler。
DefaultScheduler的定义与解释如下:

;;DefaultScheduler是Storm默认的调度器,如果用户没有指定自己实现的调度器,
;;Storm就会使用该调度器进行Topology的任务分配。
;;DefaultScheduler实现了IScheduler接口
(ns backtype.storm.scheduler.DefaultScheduler(:use [backtype.storm util config])(:require [backtype.storm.scheduler.EvenScheduler :as EvenScheduler])(:import [backtype.storm.scheduler IScheduler TopologiesCluster TopologyDetails WorkerSlot SchedulerAssignmentEvenScheduler ExecutorDetails])(:gen-class:implements [backtype.storm.scheduler.IScheduler]))
;;default-schedule方法主要是计算当前集群中所有可供分配的slot资源,
;;并判断当前已经分配给该Topology的slot资源是否需要重新分配,
;;利用这些信息,对新提交的Topology进行资源分配
(defn default-schedule [^Topologies topologies ^Cluster cluster];;调用cluster的needsSchedulingTopologies方法获取所需要进行任务调度的Topology集合;;needsSchedulingTopologies方法定义如fn1所示.(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)];;这部分代码块的作用是对每一个需要进行任务调度的Topology进行处理(doseq [^TopologyDetails topology needs-scheduling-topologies;;通过调用getId获取topology-id:let [topology-id (.getId topology) ;;调用cluster的getAvailableSlots方法获取当前集群中所有可用的slot资源, ;;并将其转换为<node,port>集合赋给available-slots变量. ;;getAvailableSlots方法定义如下fn2所示 available-slots (->> (.getAvailableSlots cluster) (map #(vector (.getNodeId %) (.getPort %)))) ;;调用getExecutors获取Topology的所有Executor信息, ;;并将其转换为<start-task-id,end-task-id>集合 all-executors (->> topology .getExecutors (map #(vector (.getStartTask %) (.getEndTask %))) set) ;;调用EvenScheduler的get-alive-assigned-node+port->executors方法 ;;计算当前Topology已经分配的任务信息,以<[node,port],executors>信息保存到alive-assigned变量中 alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id) ;; alive-executors (->> alive-assigned vals (apply concat) set) ;;调用slots-can-reassign方法对alive-assigned的slot信息进行判断, ;;选出其中可被重新分配的slot集合并保存到can-reassign-slots.  ;;slots-can-reassign方法定义如fn3所示: can-reassign-slots (slots-can-reassign cluster (keys alive-assigned)) ;;计算当前Topology所能使用的全部slot数目,它取以下两个量中较小的值作为total-slots-to-use total-slots-to-use (min (.getNumWorkers topology) (+ (count can-reassign-slots) (count available-slots))) ;;用于判断如果total-slots-to-use的数目大于当前已经分配的slot数目, ;;或者正在运行的executors数目不等于所有的executors数 ;;则调用bad-slots方法计算所有可被释放的slot. ;;bad-slots方法的具体定义如fn4所示. bad-slots (if (or (> total-slots-to-use (count alive-assigned)) (not= alive-executors all-executors)) (bad-slots alive-assigned (count all-executors) total-slots-to-use) [])]];;调用cluster的freeSlots方法释放前面计算出来的bad-slots(.freeSlots cluster bad-slots);;调用EvenScheduler的schedule-topologies-evenly方法将系统中的资源均匀分配给Topology(EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster))))

fn1:

/*** 获取所有需要调度的topology,并以集合的形式返回*/
public List<TopologyDetails> needsSchedulingTopologies(Topologies topologies) {List<TopologyDetails> ret = new ArrayList<TopologyDetails>();for (TopologyDetails topology : topologies.getTopologies()) {if (needsScheduling(topology)) {ret.add(topology);}}return ret;
}

fn2:

//根据supervisor信息获取所有可用的slot资源,并封装在WorkerSlot中,以集合的形式返回
public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {Set<Integer> ports = this.getAvailablePorts(supervisor);List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());for (Integer port : ports) {slots.add(new WorkerSlot(supervisor.getId(), port));}return slots;
}

fn3:

;;该方法将对传入的slots资源进行过滤,选出其中仍然可以继续使用的slot,组成新的集合
;;过滤方法:先判断slot的node信息是否存在于集群的黑名单里,
;;如果不在则继续判断slot的port信息是否在于node相对应的Supervisor的所有可用端口列表中
;;如果在,则表示该slot可以继续使用
(defn slots-can-reassign [^Cluster cluster slots](->> slots(filter(fn [[node port]](if-not (.isBlackListed cluster node) (if-let [supervisor (.getSupervisorById cluster node)] (.contains (.getAllPorts supervisor) (int port)) ))))))

fn4:

;;该方法用于计算一个Topology已经分配的资源中哪些是不再需要的
;;existing-slots:已经分配出去的资源(分配给Topology),它是一个<[node,port],executors>集合
;;num-executors:Topology的所有Executor(包括已分配和未分配的)
;;num-workers:Topology可使用的全部slot数目
(defn- bad-slots [existing-slots num-executors num-workers];;判断num-workers是否为0。如果是,意味着当前没有可供该Topology使用的slot,这时返回一个空集合(if (= 0 num-workers)'();;定义distribution集合和keepers集合,distribution集合通过调用integer-divided方法生成;;实际所做的事是将num-executors均匀地分配到num-workers中.;;keepers集合为一个空集合(let [distribution (atom (integer-divided num-executors num-workers))keepers (atom {})];;对于传入的existing=slots中的每一项,计算其对象的executor-count,;;然后以该executor-count作为键从前面计算的distribution集合中获取值.如果获取的值大于0,;;则意味着存在至少一个Worker上有executor-count个Executor的分配,并且,这个分配信息便继续维持,不更新。;;这时,会将<[node,port],executors>信息放入keepers中,同时将distribution中该executor-count的对应值减一.(doseq [[node+port executor-list] existing-slots :let [executor-count (count executor-list)]](when (pos? (get @distribution executor-count 0)) (swap! keepers assoc node+port executor-list) (swap! distribution update-in [executor-count] dec) ));;从existing-slots中移除keepers中记录的需要继续维持的分配情况.如果移除完之后还存在slot信息,;;表明这些slot可以被释放掉,将其转换为WorkerSlot对象集合并返回.(->> @keeperskeys(apply dissoc existing-slots)keys(map (fn [[node port]] (WorkerSlot. node port)))))))

*注:学习李明等老师Storm源码分析和陈敏敏等老师Storm技术内幕与大数据实践的笔记整理。
欢迎关注下面二维码进行技术交流:*

JStorm与Storm源码分析(三)--Scheduler,调度器相关推荐

  1. JStorm与Storm源码分析(一)--nimbus-data

    Nimbus里定义了一些共享数据结构,比如nimbus-data. nimbus-data结构里定义了很多公用的数据,请看下面代码: (defn nimbus-data [conf inimbus]( ...

  2. JStorm与Storm源码分析(二)--任务分配,assignment

    mk-assignments主要功能就是产生Executor与节点+端口的对应关系,将Executor分配到某个节点的某个端口上,以及进行相应的调度处理.代码注释如下: ;;参数nimbus为nimb ...

  3. JStorm与Storm源码分析(四)--均衡调度器,EvenScheduler

    EvenScheduler同DefaultScheduler一样,同样实现了IScheduler接口,  由下面代码可以看出: (ns backtype.storm.scheduler.EvenSch ...

  4. JStorm与Storm源码分析(二)--任务分配,assignmen

    mk-assignments主要功能就是产生Executor与节点+端口的对应关系,将Executor分配到某个节点的某个端口上,以及进行相应的调度处理.代码注释如下: 1 ;;参数nimbus为ni ...

  5. JStorm与Storm源码分析(八)--计时器工具-mk-timer

    Storm使用计时器线程来处理一些周期性调度事件. 与计时器相关的操作主要有:创建计时器线程.查看线程是否活跃.向线程中加入新的待调度事件.取消计时器线程 mk-timer方法用于创建一个计时器线程. ...

  6. JStorm与Storm源码分析(七)--BasicBoltExecutor与装饰模式

    在Storm中IBasicBolt的主要作用是为用户提供一种更为简单的Bolt编写方式,更为简单体现在Storm框架本身帮你处理了所发出消息的Ack.Fail和Anchor操作,而这部分操作是由执行器 ...

  7. JStorm与Storm源码分析(六)--收集器 IOutputCollector 、OutputCollector

    在Storm中,多个地方使用了IOutputCollector收集器接口,收集器OutputCollector的接口就是IOutputCollector.所以有必要对接口IOutputCollecto ...

  8. JStorm与Storm源码分析(五)--SpoutOutputCollector与代理模式

    本文主要是解析SpoutOutputCollector源码,顺便分析该类中所涉及的设计模式–代理模式. 首先介绍一下Spout输出收集器接口–ISpoutOutputCollector,该接口主要声明 ...

  9. Storm源码分析之四: Trident源码分析

    Storm源码分析之四: Trident源码分析 @(STORM)[storm] Storm源码分析之四 Trident源码分析 一概述 0小结 1简介 2关键类 1Spout的创建 2spout的消 ...

最新文章

  1. seata 如何开启tcc事物_如何能在实战中完成分布式事务?知道这些点很重要
  2. jQuery找兄弟系列next(),nextAll(),nextUntil(),prev(),prevAll(),prevUntil(),siblings()
  3. centos mysql 访问_centos下mysql开启远程访问
  4. HDU - 5820 Lights(主席树)
  5. RDD, DataFrame or Dataset
  6. warning: implicit declaration of function ‘typeof’
  7. flink cdc 2.0.0 sql 开发模板,及踩坑记录
  8. python与会计学_财务与会计前沿讲座——“大数据集训”开讲
  9. windows10插入耳机没有反应的问题
  10. Android创建圆形或圆角按钮Button 真菜鸟食用
  11. 实测办公场景下,国产远程控制软件的表现力如何?(技术解析)
  12. amd显卡跑人工神经网络,amd显卡能跑神经网络吗
  13. 应付款与分工之利读后感
  14. 人工智能会话代理在医疗保健中的有效性:系统综述
  15. Mysql Remark
  16. python入门三剑客怎么样_python三剑客
  17. Workbench中DM建模草图修改不了的解决办法
  18. 【Verilog语法001】Verilog log2 函数
  19. linux u盘更新程序,嵌入式linux下插u盘自动更新的设计
  20. seesion cookie鉴权 与 token鉴权

热门文章

  1. jupyter notebook dead kernel问题解决
  2. js浮点数精度丢失问题及如何解决js中浮点数计算不精准
  3. BigDecimal空指针异常——个人应用
  4. 使字符串的首字母大写(具有最佳性能)
  5. Struts 2 --ONGL介绍
  6. 设计模式(创建型模式)——单例模式(Singleton)
  7. IOS 面试 --- 动画 block
  8. 无法安装gem包RMagick解决办法
  9. 毕业后的第二个月的一点思绪
  10. 小学生家庭教育与学习心理