前言

最近在研究Storm的任务调度相关的知识,于是就想要试着去改造一下Storm的任务调度,来满足一下现实状况中的一些场景。

Storm调度的相关术语

在看Storm的Scheduler代码么之前,得要弄明白几个概念,这样可以帮助大家更好的理解后面的调度过程。
1、slot。这代表一个Supervisor节点上的一个单位资源。每个slot对应一个port,一个slot只能被一个Worker占用。
2、Worker,Executor.Task,1个Worker包含1个或多个Executor执行器,每个执行器包含多个Task。
3、Executor的表现形式为[1-1],[2-2],中括号内的数字代表该Executor中的起始Task id到末尾Task id,1个Worker就相当于在外面加个大括号{[1-1],[2-2]}
4.Component。Storm中的每个组件就是指一类Spout或1个类型的Bolt,这里指的是名称类型,不包含个数。
下面是调度器的核心实现。

代码实现

import backtype.storm.scheduler.*;
import clojure.lang.PersistentArrayMap;
import java.util.*;/*** 直接分配调度器,可以分配组件到指定节点中* Created by zhexuan on 15/7/6.*/
public class DirectScheduler implements IScheduler{
@Override
public void prepare(Map conf) {}@Override
public void schedule(Topologies topologies, Cluster cluster) {System.out.println("DirectScheduler: begin scheduling");// Gets the topology which we want to scheduleCollection<TopologyDetails> topologyDetailes;TopologyDetails topology;//作业是否要指定分配的标识String assignedFlag;Map map;Iterator<String> iterator = null;topologyDetailes = topologies.getTopologies();for(TopologyDetails td: topologyDetailes){map = td.getConf();assignedFlag = (String)map.get("assigned_flag");//如何找到的拓扑逻辑的分配标为1则代表是要分配的,否则走系统的调度if(assignedFlag != null && assignedFlag.equals("1")){System.out.println("finding topology named " + td.getName());topologyAssign(cluster, td, map);}else {System.out.println("topology assigned is null");}}//其余的任务由系统自带的调度器执行new EvenScheduler().schedule(topologies, cluster);
}/*** 拓扑逻辑的调度* @param cluster* 集群* @param topology* 具体要调度的拓扑逻辑* @param map* map配置项*/
private void topologyAssign(Cluster cluster, TopologyDetails topology, Map map){Set<String> keys;PersistentArrayMap designMap;Iterator<String> iterator;iterator = null;// make sure the special topology is submitted,if (topology != null) {designMap = (PersistentArrayMap)map.get("design_map");if(designMap != null){System.out.println("design map size is " + designMap.size());keys = designMap.keySet();iterator = keys.iterator();System.out.println("keys size is " + keys.size());}if(designMap == null || designMap.size() == 0){System.out.println("design map is null");}boolean needsScheduling = cluster.needsScheduling(topology);if (!needsScheduling) {System.out.println("Our special topology does not need scheduling.");} else {System.out.println("Our special topology needs scheduling.");// find out all the needs-scheduling components of this topologyMap<String, List<ExecutorDetails>> componentToExecutors = cluster.getNeedsSchedulingComponentToExecutors(topology);System.out.println("needs scheduling(component->executor): " + componentToExecutors);System.out.println("needs scheduling(executor->components): " + cluster.getNeedsSchedulingExecutorToComponents(topology));SchedulerAssignment currentAssignment = cluster.getAssignmentById(topology.getId());if (currentAssignment != null) {System.out.println("current assignments: " + currentAssignment.getExecutorToSlot());} else {System.out.println("current assignments: {}");}String componentName;String nodeName;if(designMap != null && iterator != null){while (iterator.hasNext()){componentName = iterator.next();nodeName = (String)designMap.get(componentName);System.out.println("现在进行调度 组件名称->节点名称:" + componentName + "->" + nodeName);componentAssign(cluster, topology, componentToExecutors, componentName, nodeName);}}}}
}/*** 组件调度* @param cluster* 集群的信息* @param topology* 待调度的拓扑细节信息* @param totalExecutors* 组件的执行器* @param componentName* 组件的名称* @param supervisorName* 节点的名称*/
private void componentAssign(Cluster cluster, TopologyDetails topology, Map<String, List<ExecutorDetails>> totalExecutors, String componentName, String supervisorName){if (!totalExecutors.containsKey(componentName)) {System.out.println("Our special-spout does not need scheduling.");} else {System.out.println("Our special-spout needs scheduling.");List<ExecutorDetails> executors = totalExecutors.get(componentName);// find out the our "special-supervisor" from the supervisor metadataCollection<SupervisorDetails> supervisors = cluster.getSupervisors().values();SupervisorDetails specialSupervisor = null;for (SupervisorDetails supervisor : supervisors) {Map meta = (Map) supervisor.getSchedulerMeta();if(meta != null && meta.get("name") != null){System.out.println("supervisor name:" + meta.get("name"));if (meta.get("name").equals(supervisorName)) {System.out.println("Supervisor finding");specialSupervisor = supervisor;break;}}else {System.out.println("Supervisor meta null");}}// found the special supervisorif (specialSupervisor != null) {System.out.println("Found the special-supervisor");List<WorkerSlot> availableSlots = cluster.getAvailableSlots(specialSupervisor);// 如果目标节点上已经没有空闲的slot,则进行强制释放if (availableSlots.isEmpty() && !executors.isEmpty()) {for (Integer port : cluster.getUsedPorts(specialSupervisor)) {cluster.freeSlot(new WorkerSlot(specialSupervisor.getId(), port));}}// 重新获取可用的slotavailableSlots = cluster.getAvailableSlots(specialSupervisor);// 选取节点上第一个slot,进行分配cluster.assign(availableSlots.get(0), topology.getId(), executors);System.out.println("We assigned executors:" + executors + " to slot: [" + availableSlots.get(0).getNodeId() + ", " + availableSlots.get(0).getPort() + "]");} else {System.out.println("There is no supervisor find!!!");}}
}
}

说明部分

Storm自定义实现直接分配调度器,代码修改自Twitter Storm核心贡献者徐明明,此处为链接.

开发背景

在准备开发Storm自定义之前,事先已经了解了下现有Storm使用的调度器,默认是DefaultScheduler,调度原理大体如下:
* 在新的调度开始之前,先扫描一遍集群,如果有未释放掉的slot,则先进行释放
* 然后优先选择supervisor节点中有空闲的slot,进行分配,以达到最终平均分配资源的目标

现有scheduler的不足之处

上述的调度器基本可以满足一般要求,但是针对下面个例还是无法满足:
* 让spout分配到固定的机器上去,因为所需的数据就在那上面
* 不想让2个Topology运行在同一机器上,因为这2个Topology都很耗CPU

DirectScheduler的作用

DirectScheduler把划分单位缩小到组件级别,1个Spout和1个Bolt可以指定到某个节点上运行,如果没有指定,还是按照系统自带的调度器进行调度.这个配置在Topology提交的Conf配置中可配.

使用方法

集群配置

  • 打包此项目,将jar包拷贝到STORM_HOME/lib目录下,在nimbus节点上的Storm包
  • 在nimbus节点的storm.yaml配置中,进行如下的配置:

    storm.scheduler: "storm.DirectScheduler"
  • 然后是在supervisor的节点中进行名称的配置,配置项如下:


    supervisor.scheduler.meta:
    name: "your-supervisor-name"

在集群这部分的配置就结束了,然后重启nimbus,supervisor节点即可,集群配置只要1次配置即可.

拓扑逻辑配置

见下面的代码设置,主要是把组件名和节点名称作为映射值传入

int numOfParallel;
TopologyBuilder builder;
StormTopology stormTopology;
Config config;
//待分配的组件名称与节点名称的映射关系
HashMap<String, String> component2Node;//任务并行化数设为10个
numOfParallel = 2;builder = new TopologyBuilder();String desSpout = "my_spout";
String desBolt = "my_bolt";//设置spout数据源
builder.setSpout(desSpout, new TestSpout(), numOfParallel);builder.setBolt(desBolt, new TestBolt(), numOfParallel).shuffleGrouping(desSpout);config = new Config();
config.setNumWorkers(numOfParallel);
config.setMaxSpoutPending(65536);
config.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 40000);
config.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 40000);component2Node = new HashMap<>();component2Node.put(desSpout, "special-supervisor1");
component2Node.put(desBolt, "special-supervisor2");//此标识代表topology需要被调度
config.put("assigned_flag", "1");
//具体的组件节点对信息
config.put("design_map", component2Node);StormSubmitter.submitTopology("test", config, builder.createTopology());

拓扑逻辑作业具体要被调度时,传入配置参数即可.

调度器后期优化

DirectScheduler只是针对原有的调度实现做了1层包装,后期可以进行更深层次的改造,涉及到节点在分配的时候slot的排序等等.

完整代码地址

https://github.com/linyiqun/storm-scheduler

Storm自定义调度器实现--DirectScheduler相关推荐

  1. kubernetes 简介:调度器和调度算法((Affinity/Anti-Affinity, Taints and Tolerations, 自定义调度器 )

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 简介 scheduler 是 kubernetes 的调度器,主要的任务是把定义的 pod 分配到集群的节点上.听起来非常简单,但有很多要考虑的问题: ...

  2. JStorm与Storm源码分析(三)--Scheduler,调度器

    Scheduler作为Storm的调度器,负责为Topology分配可用资源. Storm提供了IScheduler接口,用户可以通过实现该接口来自定义Scheduler. 其定义如下: public ...

  3. 面试八股文:你写过自定义任务调度器吗?

    最近入职了新公司,尝试阅读祖传代码,记录并更新最近的编程认知. 思绪由Q1引发,后续Q2.Q3基于Q1的发散探究 Q1. Task.Run.Task.Factory.StartNew 的区别? 我们常 ...

  4. 如何利用shell脚本和client-go实现自己的k8s调度器

    调度器介绍 scheduler 是k8s master的一部分,作为插件存在于k8s生态体系. 自定义调度器方式 添加功能重新编译 实现自己的调度器(multi-scheduler) schedule ...

  5. kubernetes调度器

    目录 文章目录 目录 实验环境 实验软件 本节实践 前置知识 调度器 1.调度流程 1.默认调度器 2.扩展调度器(extender) 3.调度框架 1.扩展点(Extension Points) 2 ...

  6. 编写LitmusRT调度器插件

    目录 背景 打桩 编译安装 引入TRACE模块来输出debug信息 为P-EDF定义每个CPU的状态 激活插件 模块测试 添加调度逻辑 帮助函数 demo_job_completion() demo_ ...

  7. paddle 12种学习率调度器

    目录 文本框检测的Cosine学习率调度器: 13种调度器 文本框检测的Cosine学习率调度器: 学习率 0.001 效果好像比较好,推荐使用 configs/det/ch_ppocr_v2.0/c ...

  8. k8s调度器扩展方式

    调度器介绍 scheduler 是k8s master的一部分,作为插件存在于k8s生态体系. 自定义调度器方式 添加功能重新编译 实现自己的调度器(multi-scheduler) schedule ...

  9. k8s-------(| 五 |)调度器 scheduler,亲和(affinity),污点(taint),容忍(tolerations),标签labels

    文章目录 一.调度说明 1. 简介 2. 调度过程 3. 自定义调度器 二.调度亲和性 1. node节点亲和性 (1)硬策略 (2)软策略 (3)硬策略与软策略 (4)节点标签相关操作 2. Pod ...

最新文章

  1. 未来的数据中心(三)
  2. 为什么会有宇宙?宇宙之外会有什么?
  3. STP RSTP MSTP PVST+学习 (1)
  4. snmpset对象不可写_别再问了,好吗?Java字符串一定是不可变的
  5. 【Python】一句话 if else 简洁写法
  6. Android—Socket服务端与客户端用字符串的方式互相传递图片
  7. 【NOI 2001】食物链(种类并查集)
  8. 拓端tecdat|R语言探索BRFSS数据可视化
  9. 【2022-01-06】JS逆向之QCC请求头参数
  10. 中国居民身份证号码验证
  11. UEFI edk2>edksetup.bat --nt32,build,无法解析的符号解决办法
  12. java动态图片_java实现gif效果(java显示动态图片)
  13. 7.1 RAID(独立冗余磁盘阵列)
  14. flowable 查询完成的流程_flowable中终止流程(一)
  15. 今年的奥运会延期至2021年夏季举行,盘点一下历届奥运会数据
  16. Thinkpad使用傲游浏览器的时候,无法滚动页面怎么办?
  17. Samsung 展示6G 原型,测试速度达6.2Gbps,最终目标为1000Gbps
  18. 湖北科技职业学院计算机专业代码,志愿填报:普通文理类专业代码
  19. 跟紧时代的脚步:梦想是一定要有的,万一实现了呢!
  20. uniapp开发APP和微信小程序——使用高德实现定位

热门文章

  1. javaweb开发过程中小工具系列之支持事务的JdbcUtils
  2. 团队作业-Beta冲刺(1)
  3. 绝地重生微软服务器,绝地重生打开麦克风 | 手游网游页游攻略大全
  4. 领先的易物网站-舍得网基于J2ee Resin实现的技术内幕
  5. python 装饰器模拟京东登陆
  6. 2023年全国最新交安安全员精选真题及答案8
  7. Java操作pdf的工具类itextpdf
  8. NLP的这一年2017:深度学习或成主角
  9. MySQL 教程基础介绍
  10. 计算机信息安全之所以重要,全国高校计算机等级考试广西考区一级笔试模拟试题5...