Fluk

Fluk是storm中的一个框架,主要功能是简化storm 任务在管理、配置topology中的一些问题和麻烦。

产生的原因背景

在管理storm 的topolgy的过程中,我们最常做的工作就是提交jar包。如下代码所示:

public static void main(String[] args) throws Exception {// 返回的逻辑值用来判断我们是否在本地上运行// 创建必要的配置选项...boolean runLocal = shouldRunLocal();if(runLocal){LocalCluster cluster = new LocalCluster();cluster.submitTopology(name, conf, topology);} else {StormSubmitter.submitTopology(name, conf, topology);}
}

上述提交任务的代码,通常都是位于上层的java任务管理器中。同时关于topoplgy的定义也位于其中,每当任务有变动,都需要重新编译jar包,重新提交才可以。为了减少这部分工作,Fluk就将这部分工作接手了过来,改为配置的方式进行。基于此得出Fluk的主要功能点如下:

  • 安装和部署storm 拓扑采用配置的方式而不是内置的方式进行。
  • 通过使用yaml dsl 定义storm core api(spouts/bolt)
  • yaml dsl 支持对storm-kafka、storm-hdfs、storm-hbase等。

Flux 使用

在pom 文件中加入对flux的依赖,如下所示:

<!-- 在shaded jar文件中包含FLux和用户依赖包 -->
<dependencies><!-- Flux include --><dependency><groupId>org.apache.storm</groupId><artifactId>flux-core</artifactId><version>${storm.version}</version></dependency><!-- Flux Wrappers include --><dependency><groupId>org.apache.storm</groupId><artifactId>flux-wrappers</artifactId><version>${storm.version}</version></dependency><!-- 在这里添加用户依赖包... --></dependencies>
<!-- 创建一个包括所有依赖包的大大的jar文件 -->
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>1.4</version><configuration><createDependencyReducedPom>true</createDependencyReducedPom></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.apache.storm.flux.Flux</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>

部署和运行Flux

和以前使用storm jar 提交任务的方式基本相同。只是由于当前的任务中可能已经没有了对topology的定义,所以需要定义你使用的flux的yaml文件是哪一个,以便找到对应的topolgy逻辑,如下所示:

storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml

根据以上可以看到Flux的和兴就在对yaml文件的编写配置。下面具体说明一下

yaml配置

Flux 拓扑定义

  1. 拓扑的名字及拓扑使用资源的配置如topolgy.workers
  2. 拓扑组件列表
  3. 一个DSL 拓扑定义
  • 一个spouts 列表,每一个项通过一个唯一的ID区别
  • 一个bolts列表,每一个项通过一个唯一的ID区别
  • 一个可以创建 org.apache.storm.generated.StormTopology 实例的JVM类
name: "yaml-topology"
config:topology.workers: 1# spout定义
spouts:- id: "spout-1"className: "org.apache.storm.testing.TestWordSpout"parallelism: 1# bolt定义
bolts:- id: "bolt-1"className: "org.apache.storm.testing.TestWordCounter"parallelism: 1- id: "bolt-2"className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"parallelism: 1# stream定义,定义流的流向
streams:- name: "spout-1 --> bolt-1" #name暂时未用上(可以在logging,UI等中作为placeholder)from: "spout-1"to: "bolt-1"grouping:type: FIELDSargs: ["word"]- name: "bolt-1 --> bolt2"from: "bolt-1"to: "bolt-2"grouping:type: SHUFFLE

上面是一个大的配置框架,下面说一些具体的特征配置

组件

组件从本质来说是对象实例,用来在对spouts和bolts的配置选项中获取。如果你对Spring框架很熟悉,这里的组件大概就类比于Spring中的beans。每一个组件都是可被识别的,至少是可以通过一个唯一的标识符(字符串)和一个类名(字符串)。举个例子,以下的例子将会创建一个 org.apache.storm.kafka.StringScheme 类的实例作为关键字 "stringScheme" 的引用。这里我们假设这个类 org.apache.storm.kafka.StringScheme 有一个默认的构造函数。

components:- id: "stringScheme"className: "org.apache.storm.kafka.StringScheme"(假设有默认的构造函数)

上面提到的定义的组件使用的是class类采用的是默认构造函数,如果需要自己构造则采用如下的方法

自定义构造函数

通过在yaml文件中增加constructorArgs的方法来定义构造函数。

- id: "zkHosts"className: "org.apache.storm.kafka.ZkHosts"constructorArgs:- "localhost:2181"(通过调用一个把单个字符串“localhost:2181”作为参数传递给构造函数来创建一个对象)

contructorArgs 是一个列表,其元素是对象。这个列表会被传递给类的构造函数们。上面的例子就是将zk的配置用参数的方式传入到class的构造函数中。

引用

每一个组件实例都通过一个唯一的id可悲其他组件重复使用。为了引用一个已存在的组件,你需要在使用 ref 这个标签的时候指明这个组件的id。

在以下的例子中,一个名为的组件被创建,之后将被作为另一个组件的构造函数的参数被引用

components:- id: "stringScheme"className: "org.apache.storm.kafka.StringScheme"- id: "stringMultiScheme"className: "org.apache.storm.spout.SchemeAsMultiScheme"constructorArgs:- ref: "stringScheme" # component with id "stringScheme" must be declared above.
属性

除去允许在调用构造函数的时候传进不同的参数,Flux同样允许在配置组件的时候使用被声明为 public 的类似JavaBean的setter方法和域

 - id: "spoutConfig"className: "org.apache.storm.kafka.SpoutConfig"constructorArgs:# brokerHosts- ref: "zkHosts"# topic- "myKafkaTopic"# zkRoot- "/kafkaSpout"# id- "myId"properties:- name: "ignoreZkOffsets"value: true- name: "scheme"ref: "stringMultiScheme"

参考链接:http://storm.apachecn.org/releases/cn/1.1.0/flux.html

转载于:https://www.cnblogs.com/angellst/p/8660686.html

Storm-Flux简介相关推荐

  1. BigData之Storm:Apache Storm的简介、深入理解、下载、案例应用之详细攻略

    BigData之Storm:Apache Storm的简介.深入理解.下载.案例应用之详细攻略 目录 Apache Storm的简介 Apache Storm的深入理解 1.Storm与hadoop ...

  2. Storm Trident简介

    转载自:[翻译][Trident] Storm Trident 教程 英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial -- ...

  3. flux storm_Apache Storm:如何使用Flux配置KafkaBolt

    flux storm 微型框架中的助焊剂可以帮助我们定义和部署Storm拓扑. Flux有各种包装器,可帮助您定义所需的流并初始化Bolts和Spouts(使用带有或不带有参数的构造函数,并通过反射自 ...

  4. Apache Storm:如何使用Flux配置KafkaBolt

    微型框架中的助焊剂可以帮助我们定义和部署Storm拓扑. Flux有各种包装器,可帮助您定义所需的流并初始化Bolts和Spouts(使用带有或不带有参数的构造函数,并通过反射自动调用自定义配置方法) ...

  5. BigData:大数据开发的简介、核心知识(linux基础+Java/Python编程语言+Hadoop{HDFS、HBase、Hive}+Docker)、经典场景应用之详细攻略

    BigData:大数据开发的简介.核心知识(linux基础+Java/Python编程语言+Hadoop{HDFS.HBase.Hive}+Docker).经典场景应用之详细攻略 BigData:大数 ...

  6. storm apache_Apache Storm的实时情绪分析示例

    storm apache 实时情感分析是指处理自然语言文本(或语音)流以提取主观信息. 琐碎的用例用于构建推荐引擎或查找社交媒体趋势. 我选择了Apache Storm作为实时处理引擎. Storm非 ...

  7. Apache Storm的实时情绪分析示例

    实时情感分析是指处理自然语言文本(或语音)流以提取主观信息. 琐碎的用例用于构建推荐引擎或查找社交媒体趋势. 我选择了Apache Storm作为实时处理引擎. Storm非常强大(我们正在生产中使用 ...

  8. DevOpsSOP 基于阿里云VPC搭建Storm+Kafka+Zookeeper集群

    集群搭建之 zookeeper + kafka 环境要求 pre-install Centos下安装Java开发环境 JDK1.8 Cenos下安装Supervisor守护 zookeeper clu ...

  9. storm apache java_Apache Storm 示例 Java 拓扑 - Azure HDInsight | Microsoft Docs

    您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn. 以 Java 语言创建 ...

最新文章

  1. C#实现的18位×××格式验证算法
  2. 找对象的过程中,我竟然理解了什么是机器学习!
  3. ubuntu获取root权限
  4. 三星GT-I9308 Galaxy SIII 移动定制机 root方法 (亲测可用)
  5. python中文版-Python中文版
  6. CynosDB技术详解——存储集群管理【文末有福利】
  7. 利用Hyperledger Fabric开发第一个区块链应用
  8. POJ 1410 Intersection 判断线段交和点在矩形内 【计算几何】
  9. 分成收益破5000后,我决定将付费专栏开源了
  10. python打包成exe_Python打包成exe时,再犯这几个错误就说不过去了
  11. linux su命令位置,Linux命令大全su详解
  12. Spring MVC+ Spring + Mybatis “三大框架”介绍
  13. 【Unity】添加 Device Simulator功能
  14. 迅雷手机版苹果版_手机迅雷安卓版下载2019_迅雷手机版下载最新版
  15. ReactNative进阶(五十三):Keystore file ‘..android.keystore‘ not found for signing config ‘debug‘问题解决
  16. css 设置冻结表格头,固定/冻结行表头、列表头的做法
  17. B2C之新岛咖啡 一堂62元的供应链管理课
  18. c语言 指针 pdf,彻底搞定C指针.pdf
  19. Cadence16.6 > OrCAD Capture CIS >原理图统一改器件属性
  20. kettle多表数据迁移

热门文章

  1. EXCHANGE 2010 DAG 实验总结
  2. AttributeError: 'StatusHandler' object has no attribute 'async_callback'
  3. gitee中同步github的repository提示:账户或密码错误
  4. Cython与CPython的区别
  5. python 的几个内置函数(lambda ,zip, filter, map, reduce )用法
  6. python实现最小二乘法(转)
  7. 无法定位程序输入点 _ZdaPvj 于动态链接库 libstdc++-6.dll
  8. 深入理解计算机操作系统:链接
  9. 深度学习(二十六)——VAE
  10. [学习笔记]后缀自动机