【源码分析】storm拓扑运行全流程源码分析

@(STORM)[storm]

  • 源码分析storm拓扑运行全流程源码分析
  • 一拓扑提交流程
    • 一stormpy

      • 1storm jar
      • 2def jar
      • 3exec_storm_class
      • 4get_classpath
    • 二拓扑提交之一
      • 1用户代码调用submitTopology
      • 2StormSubmittersubmitTopologyWithProgressBar
      • 3StormSubmittersubmitTopology
    • 二提交拓扑之二StormSubmittersubmitTopologyAs
      • 1加载配置
      • 2使用NimbusClient提交拓扑
  • 二 拓扑运行流程
    • 一概述

一、拓扑提交流程

拓扑提交的总体流程如下:
1、客户端通过thrift RPC提交topology的配置及jar包到nimbus。
2、nimbus针对该topology建立本地目录。
3、nimbus调度器根据topology的配置计算task,并把task分配到不同的worker上,调度的结果写入zookeeper。
4、zk上建立assignment节点,存储task和supervisor中的worker的对应关系。同时在zk上创建workerbeats节点来监控worker的心跳。
5、supervisor去zk上获取分配的task信息,启动一个或者多个worker来执行。
6、每个worker上运行一个或多个executor,每个executor对应一个线程,worker内部的executor之间通过DisrupterQueue进行通信,不同worker间默认采用netty来通信。
7、executor运行一个或者多个task(spout/bolt)
到此,topology就正式运行起来了。

具体流程图如下:(参考自《storm技术内幕与大数据实践》P96)

本文介绍了通过调用storm jar如何向nimbus提交拓扑的过程,即上述的第一步,主要的工作是加载配置信息,classpath,并将其与用户的jar包通过thrift协调上传至nimbus,等待nimbus的调用。

(一)storm.py

在这部分,请尤其注意classpath的设置。
依次将下列内容加入classpath中:

\$STORM_HOME\$STORM_HOME/lib\$STORM_HOME/extlib用户代码的jar包~/.storm\$STORM_HOME/bin

详见下面的分析。

1、storm jar

用户可以通过storm jar命令向storm集群提交一个拓扑,如:

/home/hadoop/storm/bin/storm jar storm-starter-topologies-0.9.4.jar storm.starter.WordCountTopology word-count

其实,storm执行的是bin/目录下的storm.py文件

2、def jar

jar函数只有一行,就是执行exec_storm_class函数。

def jar(jarfile, klass, *args):exec_storm_class(klass,jvmtype="-client",extrajars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR],args=args,daemon=False,jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile])

其中的几个变量为:

USER_CONF_DIR = os.path.expanduser("~" + os.sep + ".storm")
STORM_BIN_DIR = os.path.join(STORM_DIR, "bin")

因此用户jar包,~/.storm及$STORM_HOME/bin目录下的jar包会被自动加载到classpath中。

3、exec_storm_class

def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False, daemon=True, daemonName=""):global CONFFILEstorm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR])if(storm_log_dir == None or storm_log_dir == "nil"):storm_log_dir = os.path.join(STORM_DIR, "logs")all_args = [JAVA_CMD, jvmtype,"-Ddaemon.name=" + daemonName,get_config_opts(),"-Dstorm.home=" + STORM_DIR,"-Dstorm.log.dir=" + storm_log_dir,"-Djava.library.path=" + confvalue("java.library.path", extrajars, daemon),"-Dstorm.conf.file=" + CONFFILE,"-cp", get_classpath(extrajars, daemon),] + jvmopts + [klass] + list(args)print("Running: " + " ".join(all_args))if fork:os.spawnvp(os.P_WAIT, JAVA_CMD, all_args)elif is_windows():# handling whitespaces in JAVA_CMDsub.call(all_args)else:os.execvp(JAVA_CMD, all_args)

可以看出,最后就是运行一条java命令,主类是用户main函数的类。
看一下classpath的设置。

4、get_classpath

def get_classpath(extrajars, daemon=True):ret = get_jars_full(STORM_DIR)ret.extend(get_jars_full(STORM_DIR + "/lib"))ret.extend(get_jars_full(STORM_DIR + "/extlib"))if daemon:ret.extend(get_jars_full(STORM_DIR + "/extlib-daemon"))if STORM_EXT_CLASSPATH != None:for path in STORM_EXT_CLASSPATH.split(os.pathsep):ret.extend(get_jars_full(path))if daemon and STORM_EXT_CLASSPATH_DAEMON != None:for path in STORM_EXT_CLASSPATH_DAEMON.split(os.pathsep):ret.extend(get_jars_full(path))ret.extend(extrajars)return normclasspath(os.pathsep.join(ret))

依次将下列内容加入classpath中:

"-Dstorm.jar=" + jarfile
\$STORM_HOME\$STORM_HOME/lib\$STORM_HOME/extlib用户代码的jar包~/.storm\$STORM_HOME/bin

(二)拓扑提交之一

1、用户代码调用submitTopology

用户一般通过StormSubmitter.submitTopology提交拓扑

if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());}

这里使用了submitTopologyWithProgressBar,只是在submitTopology的基础上增加了一些进度信息,见下面代码。

2、StormSubmitter.submitTopologyWithProgressBar

public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {// show a progress bar so we know we're not stuck (especially on slow connections)submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {@Overridepublic void onStart(String srcFile, String targetFile, long totalBytes) {System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);}@Overridepublic void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) {int length = 50;int p = (int)((length * bytesUploaded) / totalBytes);String progress = StringUtils.repeat("=", p);String todo = StringUtils.repeat(" ", length - p);System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);}@Overridepublic void onCompleted(String srcFile, String targetFile, long totalBytes) {System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);}});}

本质上就是调用submitTopology方法,同时在start, progress和complete阶段输出一些信息。

3、StormSubmitter.submitTopology

@SuppressWarnings("unchecked")
public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {submitTopologyAs(name, stormConf, topology, opts, progressListener, null);
}

StormSubmitter.submitTopology其实就是调用StormSubmitter.submitTopologyAs。下面我们详细分析一下StormSubmitter.submitTopologyAs

(二)提交拓扑之二:StormSubmitter.submitTopologyAs

1、加载配置

在submitTopologyAs中,第一件事就是将拓扑的配置加载到一个HashMap中

   if(!Utils.isValidConf(stormConf)) {throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");}stormConf = new HashMap(stormConf);stormConf.putAll(Utils.readCommandLineOpts());Map conf = Utils.readStormConfig();conf.putAll(stormConf);stormConf.putAll(prepareZookeeperAuthentication(conf));

上述代码完成了以下功能:
(1)检查拓扑传进来的conf是否有效,是否能json化,然后将其转换为HashMap。这里的conf是用户在建立拓扑时通过以下类似代码传进来的:

    Config config = new Config();config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 200);config.setNumWorkers(topoNumWorker);config.setMaxTaskParallelism(20);config.put(Config.NIMBUS_HOST, nimbusHost);config.put(Config.NIMBUS_THRIFT_PORT, 6627);config.put(Config.STORM_ZOOKEEPER_PORT, 2181);config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zk));config.put(Config.TOPOLOGY_NAME, topologyName);

(2)将命令行中的参数加载进stormConf中
(3)调用readStormConfig,加载配置文件中的内容:

public static Map readStormConfig() {Map ret = readDefaultConfig();String confFile = System.getProperty("storm.conf.file");Map storm;if (confFile==null || confFile.equals("")) {storm = findAndReadConfigFile("storm.yaml", false);} else {storm = findAndReadConfigFile(confFile, true);}ret.putAll(storm);ret.putAll(readCommandLineOpts());return ret;
}

先加载defaults.yaml, 然后再加载storm.yaml

(4)最后,加载zk认证相关信息。
(5)除此之外,还可以组件中覆盖getComponentConfiguration方法以修改其组件的配置。
(6)最后,还可以使用spoutDeclare与boltDeclare设置外部组件。

注意,这里有conf和stormConf2个变量,conf才是全部的配置,stormConf不包括defaults.yaml和storm.yaml。先将用户配置加载到stormConf,然后将defaults.yaml和storm.yaml回到conf,最后将stormConf加载到conf.

2、使用NimbusClient提交拓扑

当配置准备好以后,就开始向nimbus提交拓扑。在storm中,nimbus是一个thrift服务器,它接受客户端通过json文件提交RPC调用,即NimbusClient向nimbus提供一份json格式的字符串,用于提交拓扑信息。

            String serConf = JSONValue.toJSONString(stormConf);NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);if(topologyNameExists(conf, name, asUser)) {throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");}String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, asUser);try {LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);if(opts!=null) {client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);} else {// this is for backwards compatibilityclient.getClient().submitTopology(name, jar, serConf, topology);}} catch(InvalidTopologyException e) {LOG.warn("Topology submission exception: "+e.get_msg());throw e;} catch(AlreadyAliveException e) {LOG.warn("Topology already alive exception", e);throw e;} finally {client.close();}

核心步骤包括:
(1)将配置文件改为json格式的string

String serConf = JSONValue.toJSONString(stormConf);

(2)获取Nimbus client对象

NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);

getConfiguredClientAs的代码中的其中一行是指定nimbus的地址:

String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);

(3)检查拓扑名称是否已经存在

    if(topologyNameExists(conf, name, asUser)) {throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");}

(4)将jar包上传至nimbus

String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, asUser);

(5)最后调用submitTopologyWithOpts正式向nimbus提交拓扑,参数包括:

client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);

submitTopologyWithOpts方法就只有2行:

  send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);recv_submitTopologyWithOpts();

即将信息发送至thrift server及接收返回信息。发送的信息包括:

  args.set_name(name);args.set_uploadedJarLocation(uploadedJarLocation);args.set_jsonConf(jsonConf);args.set_topology(topology);args.set_options(options);

其中set_uploadedJarLocation指定了jar包的上传路径。

综上所述,其实所谓的提交拓扑,就是将拓扑的配置信息通过thrift发送到thrift server,并把jar包上传到nimbus,等待nimbus的后续处理,此时拓扑并未真正起来,直至recv_submitTopologyWithOpts获得成功的返回信息为止。

二、 拓扑运行流程

(一)概述

拓扑数据流如下:
1、Spout读取或者产生数据
2、通过netty/ZMQ将数据从所在的worker发送到下一个Executor所在的worker(如果下一个Executor与spout的executor在同一个worker,则直接发送到自身worker内部的Disruptor Queue)
3、worker根据TaskId将消息放入Executor的输入Disruptor Queue中
4、Executor处理完数据后,将其放到自身的输出Disruptor Queue中
5、然后Executor还会启动一个线程将输出Disruptor Queue中的内容通过netty发送到其它worker中,或者直接发送至其它Executor相对应的输入Disruptor Queue(源executor与目标executor在同一个worker的情况)。
6、如此循环3~5步骤,直至所有executor都处理完成数据。

executor的执行方式是一个典型的生产者消费者模式

【源码分析】storm拓扑运行全流程源码分析相关推荐

  1. YOLOv3反向传播原理 之 全流程源码分析

    YOLOv3反向传播原理 之 全流程源码分析 1.YOLOv3网络训练中反向传播主体流程 1.1 初始化 1.2 batch内梯度累加 1.3 network和layer 中的关键变量 2.YOLO层 ...

  2. Myth源码解析系列之六- 订单下单流程源码解析(发起者)

    前面一章我们走完了服务启动的源码,这次我们进入下单流程的源码解析~ 订单下单流程源码解析(发起者) 首先保证myth-demo-springcloud-order.myth-demo-springcl ...

  3. 大数据_MapperReduce_Hbase_批处理batchMutate源码分析_数据的写入流程源码分析---Hbase工作笔记0032

    技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 我们知道hbase是用java语言编写的,所以我们再来看一下源码: 然后我们再去看一下这个batc ...

  4. [源码学习][知了开发]WebMagic-总体流程源码分析

    写在前面 前一段时间开发[知了]用到了很多技术(可以看我前面的博文http://blog.csdn.net/wsrspirit/article/details/51751568),这段时间抽空把这些整 ...

  5. SpringBoot启动全流程源码解析(超详细版)

    我们在使用SpringBoot启动项目的时候,可能只需加一个注解,然后启动main,整个项目就运行了起来,但事实真的是所见即所得吗,还是SpringBoot在背后默默做了很多?本文会通过源码解析的方式 ...

  6. SpringBoot2 | SpringBoot启动流程源码分析(一)

    首页 博客 专栏·视频 下载 论坛 问答 代码 直播 能力认证 高校 会员中心 收藏 动态 消息 创作中心 SpringBoot2 | SpringBoot启动流程源码分析(一) 置顶 张书康 201 ...

  7. hadoop源码分析_Spark2.x精通:Job触发流程源码深度剖析(一)

    , 一.概述  之前几篇文章对Spark集群的Master.Worker启动流程进行了源码剖析,后面直接从客户端角度出发,讲解了spark-submit任务提交过程及driver的启动:集群启动.任务 ...

  8. nimble源码学习——广播流程源码分析1

    广播流程源码分析1 在controller层有多种状态:广播.空闲.连接等等,这次分析的是广播这个状态或者叫 做角色.在前面controller层循环的分析中,可以明确controller层也有eve ...

  9. OkHttp原理流程源码分析

    OkHttp已经是非常流行的android客户端的网络请求框架,我其实在项目中使用也已经好几年了,之前一直把重心放在如何快速的搞定业务上.迭代的效率上,这一点来讲,对于一个公司优秀员工是没有毛病的.但 ...

最新文章

  1. 使用最新版(2020)IntelliJ IDEA 创建Servlet项目
  2. Java / Android String.format 的使用
  3. 配置red hat的ip 自动地址
  4. 介绍一款 API 敏捷开发工具,告别加班!
  5. 【jQuery 区别】.click()和$(document).on(click,指定的元素,function(){});的区别
  6. 单词evolve pro legacy launcher session
  7. python中索引是从什么开始_python索引从0开始,那负数索引算什么?三秋道果说python...
  8. 做人的态度,本人的工作和生活的感悟。
  9. PWN-PRACTICE-BUUCTF-29
  10. jsonp react 获取返回值_Django+React全栈开发:文章列表
  11. python批量裁剪图片_用Python写了一个图片格式批量处理工具
  12. DNS IP DOMAIN 详解
  13. VC++中文件类型小结
  14. Bee Framework_百度百科
  15. Hikari 数据库连接池配置详解
  16. poj 4105 拯救公主(bfs+二进制状态压缩)
  17. PHP正则表达式笔记与实例详解
  18. 深度学习常见数据集汇总
  19. 【论文阅读】RAPTOR: Robust and Perception-Aware Trajectory Replanning for Quadrotor Fast Flight
  20. 小议费雪线性判别(Fisher Linear Discriminant Analysis)

热门文章

  1. Paas是什么——Go语言相关学习笔记
  2. 详细解读CSS优先级——Web前端系列学习笔记
  3. Mybatis的全局配置文件中的标签
  4. gis怎么改鼠标滚轮缩放_PhotoShop缩放画面的快捷方式
  5. JFreeChart基本的用法实例(一)
  6. srve0255e尚未定义要怎么办_皮肤干燥怎么办?四大方法帮你冬季补?
  7. kubectl 创建pvc_k8s的持久化存储PVPVC
  8. windows 改变文件大小 函数_手写 bind call apply 方法 与 实现节流防抖函数
  9. 限制服务器访问指定网站,如何允许或限制某一国或地区的用户访问网站
  10. java map join_HashMap 常见应用:实现 SQL JOIN