1 Storm Client

最开始使用storm命令来启动topology, 如下

storm jar storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.WordCountTopology

这个storm命令是用python实现的, 看看其中的jar函数, 很简单, 调用exec_storm_class, 其中jvmtype="-client"
而exec_storm_class其实就是拼出一条java执行命令, 然后用os.system(command)去执行, 为何用Python写, 简单? 可以直接使用storm命令?
这儿的klass就是topology类, 所以java命令只是调用Topology类的main函数

def jar(jarfile, klass, *args):"""Syntax: [storm jar topology-jar-path class ...]Runs the main method of class with the specified arguments.
The storm jars and configs in ~/.storm are put on the classpath.
The process is configured so that StormSubmitter
(http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
will upload the jar at topology-jar-path when the topology is submitted.
"""exec_storm_class(klass,jvmtype="-client",extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],args=args,childopts="-Dstorm.jar=" + jarfile)def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[]):nativepath = confvalue("java.library.path", extrajars)args_str = " ".join(map(lambda s: "\"" + s + "\"", args))command = "java" + jvmtype + " -Dstorm.home=" + STORM_DIR + " " + get_config_opts() + " -Djava.library.path=" + nativepath + " " + childopts + " -cp" + get_classpath(extrajars) + " " + klass + " " + args_strprint "Running:" + commandos.system(command)

直接看看WordCountTopology例子的main函数都执行什么?

除了定义topology, 最终会调用StormSubmitter.submitTopology(args[0], conf, builder.createTopology()), 来提交topology

    public static void main(String[] args) throws Exception {        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout("spout", new RandomSentenceSpout(), 5);        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));Config conf = new Config();conf.setDebug(true);if(args!=null && args.length > 0) {conf.setNumWorkers(3);            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} else {        conf.setMaxTaskParallelism(3);LocalCluster cluster = new LocalCluster();cluster.submitTopology("word-count", conf, builder.createTopology());   Thread.sleep(10000);cluster.shutdown();}}

StormSubmitter

直接看看submitTopology,
1. 配置参数
   把命令行参数放在stormConf, 从conf/storm.yaml读取配置参数到conf, 再把stormConf也put到conf, 可见命令行参数的优先级更高
   将stormConf转化为Json, 因为这个配置是要发送到服务器的

2. Submit Jar
    StormSubmitter的本质是个Thrift Client, 而Nimbus则是Thrift Server, 所以所有的操作都是通过Thrift RPC来完成, Thrift参考Thrift, Storm-源码分析- Thrift的使用
    先判断topologyNameExists, 通过Thrift client得到现在运行的topology的状况, 并check
    然后Submit Jar, 通过底下三步         
    client.getClient().beginFileUpload();
    client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
    client.getClient().finishFileUpload(uploadLocation);
    把数据通过RPC发过去, 具体怎么存是nimbus自己的逻辑的事...

3. Submit Topology
    很简单只是简单的调用RPC
    client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);

    /*** Submits a topology to run on the cluster. A topology runs forever or until * explicitly killed.*** @param name the name of the storm.* @param stormConf the topology-specific configuration. See {@link Config}. * @param topology the processing to execute.* @param options to manipulate the starting of the topology* @throws AlreadyAliveException if a topology with this name is already running* @throws InvalidTopologyException if an invalid topology was submitted*/public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {if(!Utils.isValidConf(stormConf)) {throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");}stormConf = new HashMap(stormConf);stormConf.putAll(Utils.readCommandLineOpts());Map conf = Utils.readStormConfig();conf.putAll(stormConf);try {String serConf = JSONValue.toJSONString(stormConf);if(localNimbus!=null) {LOG.info("Submitting topology " + name + " in local mode");localNimbus.submitTopology(name, null, serConf, topology);} else {NimbusClient client = NimbusClient.getConfiguredClient(conf);if(topologyNameExists(conf, name)) {throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");}submitJar(conf);try {LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);if(opts!=null) {client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);                    } else {// this is for backwards compatibilityclient.getClient().submitTopology(name, submittedJar, serConf, topology);                                            }} catch(InvalidTopologyException e) {LOG.warn("Topology submission exception", e);throw e;} catch(AlreadyAliveException e) {LOG.warn("Topology already alive exception", e);throw e;} finally {client.close();}}LOG.info("Finished submitting topology: " +  name);} catch(TException e) {throw new RuntimeException(e);}}

转载于:https://www.cnblogs.com/fxjwind/archive/2013/06/05/3119056.html

Storm-源码分析-Topology Submit-Client相关推荐

  1. JStorm与Storm源码分析(八)--计时器工具-mk-timer

    Storm使用计时器线程来处理一些周期性调度事件. 与计时器相关的操作主要有:创建计时器线程.查看线程是否活跃.向线程中加入新的待调度事件.取消计时器线程 mk-timer方法用于创建一个计时器线程. ...

  2. JStorm与Storm源码分析(七)--BasicBoltExecutor与装饰模式

    在Storm中IBasicBolt的主要作用是为用户提供一种更为简单的Bolt编写方式,更为简单体现在Storm框架本身帮你处理了所发出消息的Ack.Fail和Anchor操作,而这部分操作是由执行器 ...

  3. JStorm与Storm源码分析(三)--Scheduler,调度器

    Scheduler作为Storm的调度器,负责为Topology分配可用资源. Storm提供了IScheduler接口,用户可以通过实现该接口来自定义Scheduler. 其定义如下: public ...

  4. JStorm与Storm源码分析(二)--任务分配,assignment

    mk-assignments主要功能就是产生Executor与节点+端口的对应关系,将Executor分配到某个节点的某个端口上,以及进行相应的调度处理.代码注释如下: ;;参数nimbus为nimb ...

  5. JStorm与Storm源码分析(一)--nimbus-data

    Nimbus里定义了一些共享数据结构,比如nimbus-data. nimbus-data结构里定义了很多公用的数据,请看下面代码: (defn nimbus-data [conf inimbus]( ...

  6. Storm源码分析之四: Trident源码分析

    Storm源码分析之四: Trident源码分析 @(STORM)[storm] Storm源码分析之四 Trident源码分析 一概述 0小结 1简介 2关键类 1Spout的创建 2spout的消 ...

  7. JStorm与Storm源码分析(四)--均衡调度器,EvenScheduler

    EvenScheduler同DefaultScheduler一样,同样实现了IScheduler接口,  由下面代码可以看出: (ns backtype.storm.scheduler.EvenSch ...

  8. JStorm与Storm源码分析(二)--任务分配,assignmen

    mk-assignments主要功能就是产生Executor与节点+端口的对应关系,将Executor分配到某个节点的某个端口上,以及进行相应的调度处理.代码注释如下: 1 ;;参数nimbus为ni ...

  9. JStorm与Storm源码分析(六)--收集器 IOutputCollector 、OutputCollector

    在Storm中,多个地方使用了IOutputCollector收集器接口,收集器OutputCollector的接口就是IOutputCollector.所以有必要对接口IOutputCollecto ...

  10. 【投屏】Scrcpy源码分析三(Client篇-投屏阶段)

    Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...

最新文章

  1. 每列大于0的个数_二进制中1的个数(剑指offer第十四天)
  2. RabbitMQ 6种工作模式
  3. MAT之NSL:CPK_NN神经网络实现预测哪个样本与哪个样本处在同一层,从而科学规避我国煤矿突水灾难
  4. 常用的几个JavaScript调试技巧
  5. Wtm携手LayUI -- .netcore 开源生态我们是认真的!
  6. android applybatch,android – 使用applyBatch插入成千上万的联系人条目很慢
  7. python顺序结构代码_Python代码结构——顺序、分支、循环
  8. ssh连接服务器时特别慢的问题的解决方法
  9. mysql php 空格函数_MySQL_mysql 强大的trim() 函数,mysql中的去除左空格函数: LTRI - phpStudy...
  10. 第三章:开始使用zookeeper的API
  11. cdsn 最大分类数相关
  12. 华为虚拟机eNSP命令大全(所有命令)
  13. 解决 Adobe 系列绿色版本无法打开的问题
  14. php读取 Excel文件
  15. GET和POST本质区别
  16. Pr 视频效果:过时
  17. 内存超频时序怎么调_超频讲解:内存时序设置一
  18. django建议入门-FYI
  19. 沉浸式体验参加网络安全培训班,学习过程详细到底!
  20. gazebo+turtlebot3+gmapping建二维地图

热门文章

  1. SpringBoot Scheduled Cron表达式范例记录
  2. 运用循环判断语句和列表的购物车程序
  3. 【转】请让孩子输在起跑线上
  4. 修改Oracle最大连接数
  5. JAVA 学生信息管理系统
  6. 物联网专题--基于APP Inventor的BLE蓝牙4.0数据通信
  7. java地球_java – 应用地球纹理地图的球体
  8. JS Statements var / let / const
  9. 7-12 藏头诗 (15 分)
  10. 安徽医科大学计算机考研,这所985院校将现场面试!安徽医科大学缺额超800人!调剂信息更新...