Storm系列(四)Topology提交校验过程
功能:提交一个新的Topology,并为Topology创建storm-id(topology-id),校验其结构,设置必要的元数据,最后为Topology分配任务.
实现源码:
1 | (^void submitTopology |
2 | [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] |
3 | (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology |
4 | (SubmitOptions. TopologyInitialStatus/ACTIVE))) |
从以上源码中看出submitTopology内部是对submitTopologyWithOpts方法的调用。
submitTopologyWithOpts函数原型如下:
1 | ^void submitTopologyWithOpts |
2 | [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology |
3 | ^SubmitOptions submitOptions] |
在submitTopologyWithOpts中主要做了以下几件事情:
- 校验submitOptions参数不能为空。
- 检查storm-name中是否包含非法字符。
- 校验storm-name与正在运行的Topology是否有重名,重名将造成冲突。
- 将nimbus(nimbus-data类型)中的submitted-count已提交Topology计数字段加1。
- 为所提交的Topology创建唯一的storm-id(topology-id),格式:<storm-name>-<submitted-count>-<当前时间>
- 通过normalize-conf获取提交的Topology的Storm配置,首先将参数serializedConf进行反序列化,然后加入storm-name,storm-id等。
- 将Storm默认的配置(conf)与第六步得到的Storm配置进行合并,合并原则为两份配置中重复的配置项以第六步中的配置为准。
- 调用normalize-topology计算提交的Topology中每个组件并行度及更新TOPOLOGY_TASKS配置项.
- 获取nimbus(nimbus-data类型)中storm-cluster-state对象。
- 调用System-topology!方法对Topology结构进行校验。
- 获取nimbus中的submit-lock锁。
- 调用setup-storm-code为Topology创建对应的本地文件夹、复制jar并写入序列化后的Storm配置项和Topology信息.
- 调用setup-hearbeats!为Topology在Zookeeper中创建心跳路径,/storm/workerbeats/topology-id.
- 定义一个从thrift-status到keyword-status的哈希表,该哈希表用来将传入的submitOptions中的thrift-status转化为对应的keyword-status.
- 调用start-storm设置stormBase,它在Zookeeper中路径是/storm/storms/<topology-id>,stormBase的信息将做为该路径所对应的存储值。
- 调用mk-assignments为所提交的Topology分配资源.
normalize-topology
实现源码:
1 | (defn normalize-topology [storm-conf ^StormTopology topology] |
2 | (let [ret (.deepCopy topology)] |
3 | (doseq [[_ component] (all-components ret)] |
4 | (.set_json_conf |
5 | (.get_common component) |
6 | (->> {TOPOLOGY-TASKS (component-parallelism storm-conf component)} |
7 | (merge (component-conf component)) |
8 | to-json ))) |
9 | ret )) |
实现说明:
- 调用deepCopy对topology进行深度拷贝,赋值给ret.
- 遍历topology(ret)所有组件,调用component-parallelism更新组件配置中的TOPOLOGY_TASKS信息。
component-parallelism实现源码(计算组件并行度):
1 | (defn- component-parallelism [storm-conf component] |
2 | (let [storm-conf (merge storm-conf (component-conf component)) |
3 | num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component)) |
4 | max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM) |
5 | ] |
6 | (if max-parallelism |
7 | (min max-parallelism num-tasks) |
8 | num-tasks))) |
实现说明:
- 将Topology配置信息与组件(component)配置信息进行合并,两者存在重复的配置项时以组件的配置项为准。
- 计算组件并行度(num-tasks),若果配置storm-conf中配置了TOPOLOGY-TASKS信息,就以该配置值做为组件的并行度,否则通过调用num-start-executors获取用户对组件设置的并行度做为num-tasks.
- 获取storm-conf配置中TOPOLOGY-MAX-TASK-PARALLELISM配置项的值。
- 返回TOPOLOGY-MAX-TASK-PARALLELISM与num-tasks较小的值做为组件的并行度。
1 | TopologyBuilder builder = new TopologyBuilder(); |
2 | // 4对应对用用户设置的组件并行度,10对应TOPOLOGY-TASK配置项的值 |
3 | builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random").setNumTasks(6); Config conf = new Config(); |
4 | // 8对应 TOPOLOGY-MAX-TASK-PARALLELISM配置项的值 |
5 | Conf.setMaxTaskParallelism(8); |
system-topology!
功能:
验证用户提交的Topology,同时为提交的topology添加一些系统组件和流。
实现源码:
1 | (defn system-topology! [storm-conf ^StormTopology topology] |
2 | (validate-basic! topology) |
3 | (let [ret (.deepCopy topology)] |
4 | (add-acker! storm-conf ret) |
5 | (add-metric-components! storm-conf ret) |
6 | (add-system-components! storm-conf ret) |
7 | (add-metric-streams! ret) |
8 | (add-system-streams! ret) |
9 | (validate-structure! ret) |
10 | ret |
11 | )) |
实现说明:
- 使用validate-basic!校验所提交的Topology.
主要用于确保topology中的组件id不重复而且不是系统id,以及确保每个组件的TOPOLOGY-TASKS配置项大于0时,组件的并行度设置也一定大于0. - 调用deepCopy对topology进行深度拷贝,赋值给ret.
- 为Topology添加acker-bolt.
用于追踪发送出去的消息是否被成功处理。 - 使用add-metric-components为Topology添加metric-bolt.
- 为Topology添加system-bolt.
System-bolt没有输入流只有输出流分别为:SYSTEM-TICK-STREAM-ID,声明字段是[“rate_secs”],非直接模式;另一个为METRICS-TICK-STREAM-ID,声明字段为[“interval”]非直接模式,并行度为0. - 为Topology中的所有组件添加统计流。
Stream-id为METRICS-STREAM-ID,声明字段为[“task-info”,”data-points”],非直接流模式. - 为Topology中的所有组件添加系统流。
stream-id为SYSTEM-STREAM-ID,声明字段为[“event”],非直接流模式. - 使用validate-structure!检验以上步骤所组合后的Topology.
验证过程:
获取Topology中所有组件和组件的输入(包括component-id、stream-id、Grouping),对输入组件依次判断输入组件ID(component-id)是否在该Topology中,不存在则抛出异常,存在则再判断该组件的流类型是否为所对应的stream-id,若不存在则抛出异常,存在则继续检查该流的分组方式(Grouping)是否与能对应,所有组件检查完毕后没有异常抛出表示该Topology有效.
转载于:https://www.cnblogs.com/jianyuan/p/4792443.html
Storm系列(四)Topology提交校验过程相关推荐
- Storm/JStorm之Topology提交过程
咱以一个简单的案例来说明Topology提交流程: public static void main(String[] args) throws AlreadyAliveException,Invali ...
- Storm概念学习系列之Topology拓扑
不多说,直接上干货! Hadoop 上运行的是 MapReduce 作业,而在 Storm 上运行的是拓扑 Topology,这两者之间是非常不同的.一个关键的区别是:一个MapReduce 作业 ...
- .NetCore框架Surging系列(四)RPC客户端过程
.NetCore框架Surging系列(一)介绍 .NetCore框架Surging系列(二)HTTP .NetCore框架Surging系列(三)HTTP本地路由发现过程 .NetCore框架Sur ...
- [jQuery学习系列四 ]4-Jquery学习四-事件操作
[jQuery学习系列四 ]4-Jquery学习四-事件操作 前言: 今天看知乎偶然看到中国有哪些类似于TED的节目, 回答中的一些推荐我给记录下来了, 顺便也在这里贴一下: 一席 云集 听道 推酷 ...
- Spring Cloud Alibaba系列四:集成 seata 实现分布式事务
文章目录 Spring Cloud Alibaba系列四:集成 seata 实现分布式事务 前言 Seata 是什么? Seata 术语 安装 seata 1.创建 seata 数据库,并添加对应的表 ...
- Paddle Graph Learning (PGL)图学习之图游走类deepwalk、node2vec模型[系列四]
Paddle Graph Learning (PGL)图学习之图游走类模型[系列四] 更多详情参考:Paddle Graph Learning 图学习之图游走类模型[系列四] https://aist ...
- sed修炼系列(四):sed中的疑难杂症
sed系列文章: sed修炼系列(一):花拳绣腿之入门篇 sed修炼系列(二):武功心法(info sed翻译+注解) sed修炼系列(三):sed高级应用之实现窗口滑动技术 sed修炼系列(四):s ...
- 效率系列(四) VS常用快捷键
写在前面的话 :最近博主整理了一些关于 Visual Studio 2017 的常用快捷键,希望可以帮助到大家更高效更愉快的打码 1.打开 快捷键 描述 Ctrl + Shift + N 新建项目 C ...
- 机器学习入门系列四(关键词:BP神经网络)
机器学习入门系列四(关键词:BP神经网络) 标签: 机器学习神经网络 2016-01-12 15:28 80人阅读 评论(0) 收藏 举报 本文章已收录于: 分类: 机器学习(3) 作者同类文章X 版 ...
最新文章
- .NET Core 2.0 Preview 2为开发人员带来改进
- cuda 核函数 for循环_【CUDA 基础】6.2 并发内核执行
- 转载 python扩展问题”unable to find vcvarsall.bat“的解决
- 116-数学运算符更多的使用
- 基于Sql Server 2008的分布式数据库的实践(三)
- python pop check mail_python初学者,用python3实现基本的学生管理系统代码实例
- Android开发之高德地图定位成功返回的定位信息
- Python: 反方向迭代一个序列
- 网络编程-TCP/IP协议栈-TCP协议
- 界面设计方法 (1) — 4. 看板功能的设计
- python fromarray_python --- 之pil image.fromarray
- python 如何调用另一个路径下py文件的函数
- HTML5学习笔记(六):CSS基本样式
- 【图的有向路径检查】程序员面试金典——4.2有向路径检查
- Answers To The Questions from GiGabyte
- 责任链(Chain of Responsibility)模式
- 详解:什么是VXLAN?
- 通过js脚本处理剪切板(简单而强大的效率工具)
- 鸿蒙之始有几个老婆,先天五太
- 16.04Ubuntu桌面版搭建
热门文章
- 【Pytorch神经网络实战案例】02 CIFAR-10数据集:Pytorch使用GPU训练CNN模版-方法②
- 六、Webpack详解学习笔记——webpack的安装、起步、配置、loader的使用、webpack中配置Vue、plugin的使用、搭建本地服务器、webpack配置的分离
- 十四、CSS 3新特性详解(二)——2D转换(transform)、动画(animation)、动画序列
- MapReduce既是编程模型又是计算框架
- 程序员面试金典 - 面试题 01.04. 回文排列(哈希map)
- LeetCode 202. 快乐数(快慢指针)
- linux下实现内存监视,shell脚本来监视Linux上的内存使用情况
- css 百分比 怎么固定正方形_你未必知道的49个CSS知识点
- 基于BERT的多模学习——VL-BERT篇
- Spark Streaming + Elasticsearch构建App异常监控平台