原文链接

源码查看的是1.0.3版本的jar

1、Client端提交Topology到nimbus
调用命令:

storm jar WordCount.jar storm.starter.storm.starter wordcount

实际上调用的是:

storm -client WordCount.jar storm.starter.storm.starter wordcount

2、通过TopologyBuilder将Spout和Bolt按照一定的逻辑顺序构建Topology程序

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomIntegerSpout());
builder.setBolt("partialsum", new StatefulSumBolt("partial"), Integer.valueOf(1)).shuffleGrouping("spout");
builder.setBolt("printer", new PrinterBolt(), Integer.valueOf(2)).shuffleGrouping("partialsum");
builder.setBolt("total", new StatefulSumBolt("total"), Integer.valueOf(1)).shuffleGrouping("printer");

3、通过调用TopologyBuilder的createTopology()方法,获取StormTopology实例对象。源码如下:

// 该方法用于创建Topology对象
public StormTopology createTopology() {ComponentCommon common;java.util.Map boltSpecs = new HashMap();java.util.Map spoutSpecs = new HashMap();maybeAddCheckpointSpout();for (String boltId : this._bolts.keySet()) {IRichBolt bolt = (IRichBolt)this._bolts.get(boltId);bolt = maybeAddCheckpointTupleForwarder(bolt);common = getComponentCommon(boltId, bolt);try {maybeAddCheckpointInputs(common);boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));} catch (RuntimeException wrapperCause) {if ((wrapperCause.getCause() != null) && (NotSerializableException.class.equals(wrapperCause.getCause().getClass()))){throw new IllegalStateException("Bolt '" + boltId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " should be instantiated within the prepare method of '" + boltId + " at the earliest.", wrapperCause);}throw wrapperCause;}}for (String spoutId : this._spouts.keySet()) {IRichSpout spout = (IRichSpout)this._spouts.get(spoutId);common = getComponentCommon(spoutId, spout);try {spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));} catch (RuntimeException wrapperCause) {if ((wrapperCause.getCause() != null) && (NotSerializableException.class.equals(wrapperCause.getCause().getClass()))){throw new IllegalStateException("Spout '" + spoutId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " should be instantiated within the prepare method of '" + spoutId + " at the earliest.", wrapperCause);}throw wrapperCause;}}// 把Spout和Bolt相关信息存放到map中,并返回Topology对象StormTopology stormTopology = new StormTopology(spoutSpecs, boltSpecs, new HashMap());stormTopology.set_worker_hooks(this._workerHooks);return stormTopology;}

4、开始提交任务,具体过程如下
(1)调用StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());提交任务(这种提交方式是集群下的提交)

submitTopologyWithProgressBar方法:
public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts)throws AlreadyAliveException, InvalidTopologyException, AuthorizationException{// 在此方法中有调用了submitTopology方法submitTopology(name, stormConf, topology, opts, new ProgressListener(){public void onStart(String srcFile, String targetFile, long totalBytes) {System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", new Object[] { srcFile, targetFile, Long.valueOf(totalBytes) });}public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes){int length = 50;int p = (int)(length * bytesUploaded / totalBytes);String progress = StringUtils.repeat("=", p);String todo = StringUtils.repeat(" ", length - p);System.out.printf("\r[%s%s] %d / %d", new Object[] { progress, todo, Long.valueOf(bytesUploaded), Long.valueOf(totalBytes) });}public void onCompleted(String srcFile, String targetFile, long totalBytes){System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", new Object[] { srcFile, targetFile, Long.valueOf(totalBytes) }); } } );}

(2) submitTopologyWithProgressBar方法中又调用了StormSubmitter的submitTopologyf方法。
submitTopology方法:

public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener)throws AlreadyAliveException, InvalidTopologyException, AuthorizationException{submitTopologyAs(name, stormConf, topology, opts, progressListener, null);}

(3)而在submitTopology方法又调用了submitTopology的submitTopologyAs方法
submitTopologyAs方法:

public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException{// 检查Stormconf必须是json-serializable Json序列化对象if (!(Utils.isValidConf(stormConf)))throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");stormConf = new HashMap(stormConf);stormConf.putAll(Utils.readCommandLineOpts());Map conf = Utils.readStormConfig();conf.putAll(stormConf);// 获得命令行参数,并把它放到Stormconf中stormConf.putAll(prepareZookeeperAuthentication(conf));validateConfs(conf, topology);Map passedCreds = new HashMap();if (opts != null) {Credentials tmpCreds = opts.get_creds();if (tmpCreds != null)passedCreds = tmpCreds.get_creds();}Map fullCreds = populateCredentials(conf, passedCreds);if (!(fullCreds.isEmpty())) {if (opts == null)opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);opts.set_creds(new Credentials(fullCreds));}try {// 判断nimbus是否为空,如果不为空,启动本地运行模式if (localNimbus != null) {LOG.info("Submitting topology " + name + " in local mode");if (opts != null) {localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);}elselocalNimbus.submitTopology(name, stormConf, topology);LOG.info("Finished submitting topology: " + name);} else {String serConf = JSONValue.toJSONString(stormConf);// 检查Topology的名称在集群上是否存在,如果存在抛出异常if (topologyNameExists(conf, name, asUser))throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");// 调用submitJarAs方法提交jar文件String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, asUser);try { NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser); Object localObject1 = null;try { LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);if (opts != null) {client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);}elseclient.getClient().submitTopology(name, jar, serConf, topology);LOG.info("Finished submitting topology: " + name); } catch (Throwable localThrowable2) { } finally {if (client != null) if (localObject1 != null) try { client.close(); } catch (Throwable localThrowable3) { localObject1.addSuppressed(localThrowable3); } else client.close();  }} catch (InvalidTopologyException e) {LOG.warn("Topology submission exception: " + e.get_msg());throw e;} catch (AlreadyAliveException e) {LOG.warn("Topology already alive exception", e);throw e;}}} catch (TException e) {throw new RuntimeException(e);}invokeSubmitterHook(name, asUser, conf, topology);}

在submitTopology()方法中,做了一下工作:
1)检验Stormconf,必须是json-serializableJson的序列化对象 Utils.isValidConf(stormConf)

2)判断Topology的运行模式
//STONE_NOTE如果localNimbus不为空的话,调用本地模式运行 localNimbus.submitTopology(name,null,serConf,topology);

3)如果为分布式集群模式运行
//STONE_NOTE检测Topology的名称在集群上是否存在 topologyNameExists(client,conf,name)

//STONE_NOTE调用submitJar方法,提交jar文件
submitJar(client,conf);

//STONE_NOTE新的提交方式,携带opts参数 提交Topology任务
client.getClient().submitTopologyWithOpts(name,path,serConf,topology,opts);
最终任务提交完成!

storm任务提交流程相关推荐

  1. Arxiv 论文提交流程——看这篇就够了

    点击上方"3D视觉工坊",选择"星标" 干货第一时间送达 作者:刘浚嘉 | 来源:知乎 https://zhuanlan.zhihu.com/p/1094051 ...

  2. Storm中Numbus,zookeeper,Supervisor,worker作用,Storm任务提交过程,Storm组件本地目录树,Storm zookeeper目录树

    1.Storm 任务提交的过程 2.Storm组件本地目录树 3.Storm zookeeper目录树

  3. Storm任务提交过程及目录树介绍

    目录 前言: 1.Storm 任务提交的过程 2.Storm相关的目录树 总结: 目录 前言: 对于任何一个组件来说,了解它相关的任务提交的过程是非常有必要的(毕竟生产中遇到一些Bug时,你如果知道内 ...

  4. Flink并行度优先级_集群操作常用指令_运行组件_任务提交流程_数据流图变化过程

    Flink并行度优先级(从高到低) sum(1).setParallelism(1) env.setParallelism(1) ApacheFlinkDashboard任务添加并行度配置 flink ...

  5. Kafka源码解析 - 副本迁移任务提交流程

    1.副本迁移脚本 kafka-reassign-partitions.sh工具来重新分布分区.该工具有三种使用模式: (1)generate模式,给定需要重新分配的Topic,自动生成reassign ...

  6. 大数据_Flink_数据处理_运行时架构3_yarn上作业提交流程---Flink工作笔记0018

    然后我们再来看看,yarn上面的flink的job提交流程,可以看到 首先flink client,1.先去上传flink的jar包和配置,到HDFS,然后,2再提交job任务,给resourcema ...

  7. 【Yarn】工作机制及任务提交流程

    本文以mr程序为例,解释yarn的工作机制及任务提交流程: 0. mr程序提交任务到客户端所在节点: 1.节点上的YarnRunner向ResourceManager申请一个Application: ...

  8. 虚拟机centos7 git clone特别慢_从文件生命周期看GIT的提交流程

    上一篇GIT的理论知识比较枯燥无味,理论性较强,也是难以引起共鸣! 波罗学:谈谈版本管理GIT之理论与架构​zhuanlan.zhihu.com 紧接上篇,今天从实在操作方面说一下GIT使用中,使用最 ...

  9. Flink-作业提交流程

    Flink-作业提交流程 快速索引 StreamGraph JobGraph ExecutionGraph RuntimeGraph 1 背景 1.1 概述 Flink DataStream一个作业提 ...

  10. Flink运行时架构及各部署模式下作业提交流程

    1.运行时架构 1.1 核心组件 1.1.1 JobManager 作业管理器,对于一个提交执行的作业,JobManager 是真正意义上的"管理者"(Master),负责管理调度 ...

最新文章

  1. Linux命令-安装zip和unzip命令
  2. 用Java线程获取优异性能(II)——使用同步连载线程访问关键代码部份
  3. 随机采样方法整理与讲解(MCMC、Gibbs Sampling等)
  4. Sword STL之map效率问题
  5. 反汇编基础-乘法与除法
  6. python对象回收_python 对象引用,可变性,垃圾回收
  7. OSChina 周二乱弹 ——有时醒来发现身边是不同的姑娘
  8. PHP数组和字符串函数
  9. unity3d 各种优化综合
  10. 【23】数据可视化:基于 Echarts + Python 动态实时大屏范例 - Redis 数据源
  11. 前端博站项目中遇到的问题总结
  12. 防水穿墙套管在建筑外墙管道需设置柔性连接
  13. 新西兰本科计算机专业挂科率,2020年新西兰通信工程专业挂科率高吗?
  14. 拉勾网爬取失败?试试这一招
  15. C#实现简单点餐系统
  16. 中兴GWH-11 ZXV10 H108B的AD路由器获取超级用户密码
  17. Mac微信更新 可备份手机聊天记录
  18. Access根据出生日期计算年龄_小技巧 | 在SPSS中根据出生日期计算年龄
  19. php mysql条数不对_为什么PHP从MySQL中取出的数据总是少了一条?
  20. LaTeX中的参考文献

热门文章

  1. 解析yml文件 转换 Map
  2. Java实现学生成绩分析系统
  3. mysql查询 多门课程的平均成绩_数据库学生成绩分析问题.doc
  4. 智能机器人需要机器视觉
  5. python机器视觉培训——Python的安装
  6. 手表端adb安装apk
  7. Detours使用方法,简单明了
  8. iRedMail在Debian或Ubuntu Linux上的安装
  9. java 事务回滚失败
  10. linux复制文件的命令是什么?