2021SC@SDUSC

首先,我们再来看一下supervisor的工作:

Storm将每个节点分为主控节点和工作节点两种,其中主控节点只有一个,工作节点可以有多个。

每个工作节点运行Supervisor守护进程,负责监听工作节点上已经分配的主机作业,启动和停止Nimbus已经分配的工作进程。

supervisor会定时从zookeeper获取拓补信息topologies、任务分配信息assignments及各类心跳信息,以此为依据进行任务分配。

在supervisor同步时,会根据新的任务分配情况来启动新的worker或者关闭旧的worker并进行负载均衡。

接下来,我们来分析代码:

1. 从Python脚本开始
还是从Python脚本开始看代码。Supervisor的启动由一句 "storm supervisor" 指令触发,这句指令也是通过Python脚本 $STORM_DIR/bin/storm 来调用java类backtype.storm.daemon.supervisor。具体是执行如下格示的指令:

java -server -Dstorm.options= -Dstorm.home=$STORM_DIR-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp $STORM_CLASSPATH -Xmx256m -Dlogfile.name=supervisor.log -Dlogback.configurationFile=$STORM_DIR/logback/cluster.xmlbacktype.storm.daemon.supervisor

2.进入Java的supervisor类

这个类在 supervisor.clj 中定义,文件路径为 storm-core/src/clj/backtype/storm/daemon/supervisor.clj,下面是部分代码:

(ns backtype.storm.daemon.supervisor;...(:gen-class:methods [^{:static true} [launch [backtype.storm.scheduler.ISupervisor] void]]))(defn -launch [supervisor](let [conf (read-storm-config)](validate-distributed-mode! conf)(let [supervisor (mk-supervisor conf nil supervisor)](add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown supervisor)))))(defn standalone-supervisor [](let [conf-atom (atom nil)id-atom (atom nil)](reify ISupervisor(prepare [this conf local-dir](reset! conf-atom conf)(let [state (LocalState. local-dir)curr-id (if-let [id (.get state LS-ID)]id(generate-supervisor-id))](.put state LS-ID curr-id)(reset! id-atom curr-id)))(confirmAssigned [this port]true)(getMetadata [this](doall (map int (get @conf-atom SUPERVISOR-SLOTS-PORTS))))(getSupervisorId [this]@id-atom)(getAssignmentId [this]@id-atom)(killedWorker [this port])(assigned [this ports]))))(defn -main [](-launch (standalone-supervisor)))

其main函数也在代码底部。同nimbus类似,supervisor也有一个standalone-supervisor函数,这个函数用reify,返回一个实现了ISupervisor接口的类的对象,具体看一下 prepare 函数就行了,其它函数先不用深究。
第13~14行,定义了两个原子变量,对它们的操作都具有原子性。Supervisor里是一个多线程环境,因此这里有必要使用原子变量。
第16行,prepare函数的参数有两个,conf 是 supervisor 的所有配置信息,是一个Map;local-dir 是 supervisor 的本地目录路径。
第17行,把 conf-atom 变量置为 conf 参数的值。
第18行,new 一个 LocalState 类,并将其绑定到变量 state。LocalState 是 backtype.storm.utils 包中定义的纯 Java 类,是一个简单的、持久的、操作原子的 Key/Value 数据库。实际相当于一个持久化到硬盘的 Map,因此其效率会非常低,只适合偶尔需要读写请求的场景。
第19~21行,从 state 中读出 LS-ID 对应的 value,如果非空,则绑定给 curr-id 变量。若为空,则生成一个随机的 supervisor id 赋给 curr-id 变量。
第22行,将 (LS-ID, curr-id) 这对 key/value 写入 state 中。
实际上,prepare 做的就是检查 local-dir 目录里是否记录了 supervisor id 的信息,没有的话就随机生成一个存进去。

-launch 函数更简单,确认了配置文件里设置了分布式模式后,将 supervisor 变量绑定到 mk-supervisor 函数的返回值上。最后为本进程挂钩一个shutdown hook,即进程意外退出或被kill时,执行 supervisor 的 shutdown 函数。

supervisor启动的主要内容是在 mk-supervisor 函数里:

;; in local state, supervisor stores who its current assignments are
;; another thread launches events to restart any dead processes if necessary
(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor](log-message "Starting Supervisor with conf " conf)(.prepare isupervisor conf (supervisor-isupervisor-dir conf))(FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))(let [supervisor (supervisor-data conf shared-context isupervisor)[event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]                         sync-processes (partial sync-processes supervisor)synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)heartbeat-fn (fn [] (.supervisor-heartbeat!(:storm-cluster-state supervisor)(:supervisor-id supervisor)(SupervisorInfo. (current-time-secs)(:my-hostname supervisor)(:assignment-id supervisor)(keys @(:curr-assignment supervisor));; used ports(.getMetadata isupervisor)(conf SUPERVISOR-SCHEDULER-META)((:uptime supervisor)))))](heartbeat-fn);; should synchronize supervisor so it doesn't launch anything after being down (optimization)(schedule-recurring (:timer supervisor)0(conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)heartbeat-fn)(when (conf SUPERVISOR-ENABLE);; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up;; to date even if callbacks don't all work exactly right(schedule-recurring (:timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))(schedule-recurring (:timer supervisor)0(conf SUPERVISOR-MONITOR-FREQUENCY-SECS)(fn [] (.add processes-event-manager sync-processes))))(log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))(reifyShutdownable(shutdown [this](log-message "Shutting down supervisor " (:supervisor-id supervisor))(reset! (:active supervisor) false)(cancel-timer (:timer supervisor))(.shutdown event-manager)(.shutdown processes-event-manager)(.disconnect (:storm-cluster-state supervisor)))SupervisorDaemon(get-conf [this]conf)(get-id [this](:supervisor-id supervisor))(shutdown-all-workers [this](let [ids (my-worker-ids conf)](doseq [id ids](shutdown-worker supervisor id))))DaemonCommon(waiting? [this](or (not @(:active supervisor))(and(timer-waiting? (:timer supervisor))(every? (memfn waiting?) managers)))))))

第3行,使用 defserverfn 宏来定义一个函数,就是给函数体包了一层 try-cause 语句。
第5行,执行 isupervisor 的 prepare 函数,传入的目录路径是 $STORM_LOCAL_DIR/supervisor/isupervisor
第6行,清空 supervisor 的 tmp 目录,路径为 $STORM_LOCAL_DIR/supervisor/tmp
第7行,执行 supervisor-data 函数,生成一个包含了 supervisor 诸多变量的 map,绑定给 supervisor 变量。这个东西跟 nimbus-data 一样,如果要将 supervisor.clj 翻译成一个 Java 类,则这个 map 里的东西就基本是这个类的所有属性。

第8行,这里使用了 let 的一个 ":as" 用法,这句话的意思是event-manager 变量绑定到(event/event-manager false)的执行结果,processes-event-manager变量绑定到(event/event-manager false)的执行结果,最后将[event-manager processes-event-manager]绑定到managers这个变量。
event-manager 函数将生成一个线程,用来响应事件。这里的第二个参数填 false 的意思就是,生成一个用户线程(而不是守护线程)。

第9行,sync-processes 变量绑定到一个函数,这个函数很重要,其实现的功能就是将supervisor管理的worker状态与Zookeeper上的任务分配状态进行同步。即负责Worker的关闭与开启。具体实现等在讲 Topology 提交的博文时再来讨论。

第10行,synchronize-supervisor 变量也是绑定到 一个函数,这个函数也很重要,其实现的功能是将本地的任务分配信息与Zookeeper上的任务分配信息进行同步,当发现有更新时,它会唤醒 sync-processes 所在的线程去关闭或开启 worker。这里也先不讨论这个函数的实现。

第11~21行,将 heartbeat-fn 变量绑定到一个匿名函数。这个函数当然就是做 supervisor 心跳的,会在 Zookeeper 上写入该 supervisor 的心跳信息,也就是 SupervisorInfo 这个类型的对象。具体地看一下代码,
第11行,.supervisor-heartbeat!表示执行某个对象的supervisor-heartbeat!函数。
这个对象在第12行,前面说了 supervisor 变量是 supervisor-data 函数的返回值,是一个 Map。这里取出于:storm-cluster-state 域对应的那个 value。其是一个 StormClusterState 类型的变量,其实就是一个 Zookeeper 客户端,那么 heartbeat-fn 这个函数其实就是在调一个 StormClusterState 的 supervisor-hearbeat! 函数。具体这个类在 cluster.clj 中定义,有兴趣的读者可以看下。
第13行是得到 supervisor 的 id,作为 supervisor-heartbeat 函数的第一个参数。而后面的第14~21行则是构造出 supervisor-heartbeat! 函数的第二个参数。这里不再细说了,要注意的一点是,第16行的 assignment-id 实际也是 supervisor-id,见 standalone-supervisor 函数里的定义。为什么要多不同的名字?这个我也想不清楚
第22行,supervisor 先做一次心跳,以让 nimbus 及时地知道它还”活着“。现在还在启动过程中,可能有些操作会比较卡时间,因此首要之事是把心跳做 了。后面的第24~27行,将 heartbeat-fn 交给 timer,定时地执行,默认是每隔5秒(在defaults.yaml中有)

第28~35行,让两个 event-manager 线程定期地执行 synchronize-supervisor 函数和 sync-processes 函数。
最后从第37行开始,用 reify 实现 Shutdownable、SupervisorDaemon、DaemonCommon 三个接口,并返回一个该类型的对象。shutdown 函数负责让 Supervisor 干净地退出,Supervisor 里有三个线程(除了主线程),就是前面提到的 heartbeat、synchronize-supervisor 和 sync-processes。第42~44行依次把它们都关掉了。最后第45行断开与 Zookeeper 的连接。

之前提到一个问题,就是 Supervisor 的主线程好像直接就会运行完然后退出了。确实是这样子,但 Supervisor 进程不会退出,因为 JVM 里的线程有两种——用户线程 (User Thread) 和守护线程 (Daemon),当进程里只剩下守护线程时,程序才会退出。而 synchronize-supervisor 和 sync-processes 所在的线程都是用户线程,所以尽管主线程退出了,JVM 还是会继续运行这些用户线程。

Supervisor(一)相关推荐

  1. Supervisor使用详解

    一.supervisor简介 Supervisor是用Python开发的一套通用的进程管理程序,能将一个普通的命令行进程变为后台daemon,并监控进程状态,异常退出时能自动重启.它是通过fork/e ...

  2. Linux Supervisor的安装与使用入门

    在linux或者unix操作系统中,守护进程(Daemon)是一种运行在后台的特殊进程,它独立于控制终端并且周期性的执行某种任务或等待处理某些发生的事件.由于在linux中,每个系统与用户进行交流的界 ...

  3. python进程监控 supervisor_python supervisor进程监控工具的使用

    supervisor -- a process control system 另外一个类似 supervisor的工具,因为supervisor 不兼容python3, !!! Circus Proc ...

  4. 登录linux后台工具,linux后台进程管理工具-supervisor

    安装环境为:centos,如果是ubuntu的话命令可能会不一样. Supervisor 是一个用python编写的进程管理工具,能将一个普通的命令行进程变为后台的deamon,方便管理. 安装sup ...

  5. 【部署类】专题:消息队列MQ、进程守护Supervisor

    目录 1 背景需求 2 技术方案 2.1 消息队列 2.2 进程守护 3 源码介绍 3.1 supervisor部分 3.1.1 supervisord.conf 内容 3.1.2 MM3D.conf ...

  6. erlang supervisor simple_one_for_one实例

    http://www.cnblogs.com/little-ant/p/3196201.html simple_one_for_one vs one_for_one: 相同点: 这种Restart S ...

  7. [喵咪的Liunx(1)]计划任务队列脚本后台进程Supervisor帮你搞定

    喵咪的Liunx(1)]计划任务队列脚本后台进程Supervisor帮你搞定 前言 哈喽大家好啊,好久不见啊(都快一个月了),要问为什么没有更新博客呢只应为最近在录制PhalApi的视频教程时间比较少 ...

  8. Linux supervisor守护进程的安装和使用

    个人网站:http://xiaocaoshare.com/ supervisor守护进程的介绍 Supervisor(http://supervisord.org/)是用Python开发的一个clie ...

  9. supervisor源码分析

    Supervisor分析 1.运行原理概述: Supervisor生成主进程并将主进程变成守护进程,supervisor依次生成配置文件中的工作进程,然后依次监控工作进程的工作状态,并且主进程负责与s ...

  10. Nimbus/Supervisor本地目录结构

    为什么80%的码农都做不了架构师?>>>    注意:下面目录结构里面, nimbus机器上面只有/nimbus目录,supervisor机器上面只有/supervisor目录和/w ...

最新文章

  1. 【GoLang】tcmalloc jemalloc
  2. python语言入门m-Python学习基础篇 -1
  3. Nginx静态资源盗链的效果展示
  4. 项目整合一级缓存和二级缓存
  5. java 编写代码_Java 7:如何编写非常快速的Java代码
  6. kind富文本编辑器_在VueJs中集成UEditor 富文本编辑器
  7. return ,continue,break的用法与区别总结
  8. python静态变量_python静态变量
  9. IT都包括什么?零基础适合学哪一样?
  10. 面试官跟我扯了半小时 CountDownLatch 后,给我发 Offer?| 原力计划
  11. idea安装axios
  12. 专利学习笔记5:CPC客户端的安装方法
  13. steam换头像出现服务器错误_steam测试中国版 单机游戏强制防沉迷
  14. 科普:卡他妈滤波_拔剑-浆糊的传说_新浪博客
  15. 使用windows日志监控AD安全性的五大挑战
  16. QGraphicsView使用详解
  17. lol手游日服服务器未响应,LOL手游进不去怎么回事?日服登陆失败解决办法[多图]...
  18. 条码标签软件里如何将图片生成数据库
  19. redit高可用之集群
  20. BUUCTF [GXYCTF2019]Ping Ping Ping 1

热门文章

  1. 游戏SDK应用内悬浮窗的实现(四)
  2. 我把面试问烂了的⭐MySQL面试题⭐总结了一下(带答案,万字总结,精心打磨,建议收藏)
  3. 鱼C_python的一些题
  4. 8年经验面试官详解 Java 面试秘诀
  5. 疯狂的车子:它们居然可以上路
  6. Thinkpad T61升级记:64位操作系统win7_x64,8G内存
  7. 博文共赏:也谈大公司病2——减少错误不等于增加成功
  8. 物流单号保存在TXT文档,教你一键导入批量查询物流信息
  9. TCP/IP协议知识科普
  10. ARM开发(9)基于STM32的简单四则运算计算器