Storm-Flux简介
Fluk
Fluk是storm中的一个框架,主要功能是简化storm 任务在管理、配置topology中的一些问题和麻烦。
产生的原因背景
在管理storm 的topolgy的过程中,我们最常做的工作就是提交jar包。如下代码所示:
public static void main(String[] args) throws Exception {// 返回的逻辑值用来判断我们是否在本地上运行// 创建必要的配置选项...boolean runLocal = shouldRunLocal();if(runLocal){LocalCluster cluster = new LocalCluster();cluster.submitTopology(name, conf, topology);} else {StormSubmitter.submitTopology(name, conf, topology);}
}
上述提交任务的代码,通常都是位于上层的java任务管理器中。同时关于topoplgy的定义也位于其中,每当任务有变动,都需要重新编译jar包,重新提交才可以。为了减少这部分工作,Fluk就将这部分工作接手了过来,改为配置的方式进行。基于此得出Fluk的主要功能点如下:
- 安装和部署storm 拓扑采用配置的方式而不是内置的方式进行。
- 通过使用yaml dsl 定义storm core api(spouts/bolt)
- yaml dsl 支持对storm-kafka、storm-hdfs、storm-hbase等。
Flux 使用
在pom 文件中加入对flux的依赖,如下所示:
<!-- 在shaded jar文件中包含FLux和用户依赖包 -->
<dependencies><!-- Flux include --><dependency><groupId>org.apache.storm</groupId><artifactId>flux-core</artifactId><version>${storm.version}</version></dependency><!-- Flux Wrappers include --><dependency><groupId>org.apache.storm</groupId><artifactId>flux-wrappers</artifactId><version>${storm.version}</version></dependency><!-- 在这里添加用户依赖包... --></dependencies>
<!-- 创建一个包括所有依赖包的大大的jar文件 -->
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>1.4</version><configuration><createDependencyReducedPom>true</createDependencyReducedPom></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.apache.storm.flux.Flux</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>
部署和运行Flux
和以前使用storm jar 提交任务的方式基本相同。只是由于当前的任务中可能已经没有了对topology的定义,所以需要定义你使用的flux的yaml文件是哪一个,以便找到对应的topolgy逻辑,如下所示:
storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml
根据以上可以看到Flux的和兴就在对yaml文件的编写配置。下面具体说明一下
yaml配置
Flux 拓扑定义
- 拓扑的名字及拓扑使用资源的配置如topolgy.workers
- 拓扑组件列表
- 一个DSL 拓扑定义
- 一个spouts 列表,每一个项通过一个唯一的ID区别
- 一个bolts列表,每一个项通过一个唯一的ID区别
- 一个可以创建 org.apache.storm.generated.StormTopology 实例的JVM类
name: "yaml-topology"
config:topology.workers: 1# spout定义
spouts:- id: "spout-1"className: "org.apache.storm.testing.TestWordSpout"parallelism: 1# bolt定义
bolts:- id: "bolt-1"className: "org.apache.storm.testing.TestWordCounter"parallelism: 1- id: "bolt-2"className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"parallelism: 1# stream定义,定义流的流向
streams:- name: "spout-1 --> bolt-1" #name暂时未用上(可以在logging,UI等中作为placeholder)from: "spout-1"to: "bolt-1"grouping:type: FIELDSargs: ["word"]- name: "bolt-1 --> bolt2"from: "bolt-1"to: "bolt-2"grouping:type: SHUFFLE
上面是一个大的配置框架,下面说一些具体的特征配置
组件
组件从本质来说是对象实例,用来在对spouts和bolts的配置选项中获取。如果你对Spring框架很熟悉,这里的组件大概就类比于Spring中的beans。每一个组件都是可被识别的,至少是可以通过一个唯一的标识符(字符串)和一个类名(字符串)。举个例子,以下的例子将会创建一个 org.apache.storm.kafka.StringScheme 类的实例作为关键字 "stringScheme" 的引用。这里我们假设这个类 org.apache.storm.kafka.StringScheme 有一个默认的构造函数。
components:- id: "stringScheme"className: "org.apache.storm.kafka.StringScheme"(假设有默认的构造函数)
上面提到的定义的组件使用的是class类采用的是默认构造函数,如果需要自己构造则采用如下的方法
自定义构造函数
通过在yaml文件中增加constructorArgs的方法来定义构造函数。
- id: "zkHosts"className: "org.apache.storm.kafka.ZkHosts"constructorArgs:- "localhost:2181"(通过调用一个把单个字符串“localhost:2181”作为参数传递给构造函数来创建一个对象)
contructorArgs 是一个列表,其元素是对象。这个列表会被传递给类的构造函数们。上面的例子就是将zk的配置用参数的方式传入到class的构造函数中。
引用
每一个组件实例都通过一个唯一的id可悲其他组件重复使用。为了引用一个已存在的组件,你需要在使用 ref 这个标签的时候指明这个组件的id。
在以下的例子中,一个名为的组件被创建,之后将被作为另一个组件的构造函数的参数被引用
components:- id: "stringScheme"className: "org.apache.storm.kafka.StringScheme"- id: "stringMultiScheme"className: "org.apache.storm.spout.SchemeAsMultiScheme"constructorArgs:- ref: "stringScheme" # component with id "stringScheme" must be declared above.
属性
除去允许在调用构造函数的时候传进不同的参数,Flux同样允许在配置组件的时候使用被声明为 public 的类似JavaBean的setter方法和域
- id: "spoutConfig"className: "org.apache.storm.kafka.SpoutConfig"constructorArgs:# brokerHosts- ref: "zkHosts"# topic- "myKafkaTopic"# zkRoot- "/kafkaSpout"# id- "myId"properties:- name: "ignoreZkOffsets"value: true- name: "scheme"ref: "stringMultiScheme"
参考链接:http://storm.apachecn.org/releases/cn/1.1.0/flux.html
转载于:https://www.cnblogs.com/angellst/p/8660686.html
Storm-Flux简介相关推荐
- BigData之Storm:Apache Storm的简介、深入理解、下载、案例应用之详细攻略
BigData之Storm:Apache Storm的简介.深入理解.下载.案例应用之详细攻略 目录 Apache Storm的简介 Apache Storm的深入理解 1.Storm与hadoop ...
- Storm Trident简介
转载自:[翻译][Trident] Storm Trident 教程 英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial -- ...
- flux storm_Apache Storm:如何使用Flux配置KafkaBolt
flux storm 微型框架中的助焊剂可以帮助我们定义和部署Storm拓扑. Flux有各种包装器,可帮助您定义所需的流并初始化Bolts和Spouts(使用带有或不带有参数的构造函数,并通过反射自 ...
- Apache Storm:如何使用Flux配置KafkaBolt
微型框架中的助焊剂可以帮助我们定义和部署Storm拓扑. Flux有各种包装器,可帮助您定义所需的流并初始化Bolts和Spouts(使用带有或不带有参数的构造函数,并通过反射自动调用自定义配置方法) ...
- BigData:大数据开发的简介、核心知识(linux基础+Java/Python编程语言+Hadoop{HDFS、HBase、Hive}+Docker)、经典场景应用之详细攻略
BigData:大数据开发的简介.核心知识(linux基础+Java/Python编程语言+Hadoop{HDFS.HBase.Hive}+Docker).经典场景应用之详细攻略 BigData:大数 ...
- storm apache_Apache Storm的实时情绪分析示例
storm apache 实时情感分析是指处理自然语言文本(或语音)流以提取主观信息. 琐碎的用例用于构建推荐引擎或查找社交媒体趋势. 我选择了Apache Storm作为实时处理引擎. Storm非 ...
- Apache Storm的实时情绪分析示例
实时情感分析是指处理自然语言文本(或语音)流以提取主观信息. 琐碎的用例用于构建推荐引擎或查找社交媒体趋势. 我选择了Apache Storm作为实时处理引擎. Storm非常强大(我们正在生产中使用 ...
- DevOpsSOP 基于阿里云VPC搭建Storm+Kafka+Zookeeper集群
集群搭建之 zookeeper + kafka 环境要求 pre-install Centos下安装Java开发环境 JDK1.8 Cenos下安装Supervisor守护 zookeeper clu ...
- storm apache java_Apache Storm 示例 Java 拓扑 - Azure HDInsight | Microsoft Docs
您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn. 以 Java 语言创建 ...
最新文章
- C#实现的18位×××格式验证算法
- 找对象的过程中,我竟然理解了什么是机器学习!
- ubuntu获取root权限
- 三星GT-I9308 Galaxy SIII 移动定制机 root方法 (亲测可用)
- python中文版-Python中文版
- CynosDB技术详解——存储集群管理【文末有福利】
- 利用Hyperledger Fabric开发第一个区块链应用
- POJ 1410 Intersection 判断线段交和点在矩形内 【计算几何】
- 分成收益破5000后,我决定将付费专栏开源了
- python打包成exe_Python打包成exe时,再犯这几个错误就说不过去了
- linux su命令位置,Linux命令大全su详解
- Spring MVC+ Spring + Mybatis “三大框架”介绍
- 【Unity】添加 Device Simulator功能
- 迅雷手机版苹果版_手机迅雷安卓版下载2019_迅雷手机版下载最新版
- ReactNative进阶(五十三):Keystore file ‘..android.keystore‘ not found for signing config ‘debug‘问题解决
- css 设置冻结表格头,固定/冻结行表头、列表头的做法
- B2C之新岛咖啡 一堂62元的供应链管理课
- c语言 指针 pdf,彻底搞定C指针.pdf
- Cadence16.6 > OrCAD Capture CIS >原理图统一改器件属性
- kettle多表数据迁移
热门文章
- EXCHANGE 2010 DAG 实验总结
- AttributeError: 'StatusHandler' object has no attribute 'async_callback'
- gitee中同步github的repository提示:账户或密码错误
- Cython与CPython的区别
- python 的几个内置函数(lambda ,zip, filter, map, reduce )用法
- python实现最小二乘法(转)
- 无法定位程序输入点 _ZdaPvj 于动态链接库 libstdc++-6.dll
- 深入理解计算机操作系统:链接
- 深度学习(二十六)——VAE
- [学习笔记]后缀自动机