微型框架中的助焊剂可以帮助我们定义和部署Storm拓扑。

Flux有各种包装器,可帮助您定义所需的流并初始化Bolts和Spouts(使用带有或不带有参数的构造函数,并通过反射自动调用自定义配置方法)。

您只需要使用Flux就是将其作为依赖项添加到“ pom.xml”中,通过单个YAML文件进行配置(请检查助焊剂示例 ),然后将其用作主类以在Storm集群中部署拓扑(或作为本地测试)。

为了初始化KafkaBolt ,需要执行以下步骤:

  1. 通过“ withTopicSelector ”方法定义“ topicSelector
  2. 通过“ withTupleToKafkaMapper ”方法定义一个“ kafkaMapper”
  3. 通过“ withProducerProperties ”方法定义一个“ kafkaProducerProps”
  4. 使用以上配置初始化“ org.apache.storm.kafka.bolt.KafkaBolt
  5. 在流中包含以上KafkaBolt

KafkaBolt的最小Flux配置示例:

components:- id: "stringScheme"className: "org.apache.storm.kafka.StringScheme"- id: "stringMultiScheme"className: "org.apache.storm.spout.SchemeAsMultiScheme"constructorArgs:- ref: "stringScheme"- id: "zkHosts"className: "org.apache.storm.kafka.ZkHosts"constructorArgs:- "localhost:2181"- id: "topicSelector"className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"constructorArgs:- "myTopicName"- id: "kafkaMapper"className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"- id: "kafkaProducerProps"className: "java.util.Properties"configMethods:- name: "put"args:- "bootstrap.servers"- "localhost:9092"- name: "put"args:- "acks"- "1"- name: "put"args:- "key.serializer"- "org.apache.kafka.common.serialization.StringSerializer"- name: "put"args:- "value.serializer"- "org.apache.kafka.common.serialization.StringSerializer" bolts:    - id: "bolt-kafka"className: "org.apache.storm.kafka.bolt.KafkaBolt"parallelism: 1configMethods:- name: "withProducerProperties"args: [ref: "kafkaProducerProps"]- name: "withTopicSelector"args: [ref: "topicSelector"]- name: "withTupleToKafkaMapper"args: [ref: "kafkaMapper"]streams:- name: "spout --> kafkaBolt"from: "spout-1"to: "bolt-kafka"grouping:type: LOCAL_OR_SHUFFLE

有关完整的工作配置示例,请选中此项 ,可以像这样使用 。

在Storm上部署拓扑的示例命令:

storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --remote --c nimbus.host=192.168.1.200 src/test/resources/flux/topology_kafka.yaml

KafkaSpout的助焊剂配置已作为官方助焊剂示例进行了描述。 Flux是一个非常有用的框架,它消除了定义和初始化拓扑所需的自定义代码

翻译自: https://www.javacodegeeks.com/2016/05/apache-storm-configure-kafkabolt-flux.html

Apache Storm:如何使用Flux配置KafkaBolt相关推荐

  1. flux storm_Apache Storm:如何使用Flux配置KafkaBolt

    flux storm 微型框架中的助焊剂可以帮助我们定义和部署Storm拓扑. Flux有各种包装器,可帮助您定义所需的流并初始化Bolts和Spouts(使用带有或不带有参数的构造函数,并通过反射自 ...

  2. kite 使用 go_使用Apache Storm和Kite SDK Morphlines的可配置ETL处理

    kite 使用 go 从我担任软件工程师的第一天起,我总是听到很多方面的相同要求: " 我们希望所有内容都可配置,我们希望在运行时更改所有内容,我们希望有一个可视化工具来应用所有这些逻辑,以 ...

  3. 使用Apache Storm和Kite SDK Morphlines的可配置ETL处理

    从我担任软件工程师的第一天起,我总是听到很多方面的相同要求: " 我们希望所有内容都可配置,我们希望在运行时更改所有内容,我们希望有一个可视化工具来应用所有这些逻辑,以便非开发人员使用和配置 ...

  4. Apache Storm的实时情绪分析示例

    实时情感分析是指处理自然语言文本(或语音)流以提取主观信息. 琐碎的用例用于构建推荐引擎或查找社交媒体趋势. 我选择了Apache Storm作为实时处理引擎. Storm非常强大(我们正在生产中使用 ...

  5. Spotify如何对Apache Storm进行规模扩展

    [编者的话]Spotify是一家音乐流媒体服务商,最新的数据显示他们已经有6000万用户.Spotify内部使用Apache Storm来构建实时类系统,包括广告定位.音乐推荐以及数据可视化等.本文来 ...

  6. Apache Storm 实时流处理系统ACK机制以及源码分析

    1.ACK机制简介 Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间(timeout)内被完全处理.完全处理的意思是该MessageId绑定的源Tuple以及由该源Tupl ...

  7. Apache Storm 实时流处理系统通信机制源码分析

    我们今天就来仔细研究一下Apache Storm 2.0.0-SNAPSHOT的通信机制.下面我将从大致思想以及源码分析,然后我们细致分析实时流处理系统中源码通信机制研究. 1. 简介 Worker间 ...

  8. apache ignite_使用Apache Storm和Apache Ignite进行复杂事件处理(CEP)

    apache ignite 在本文中, "使用Apache Ignite进行高性能内存计算"一书的作者将讨论使用Apache Strom和Apache Ignite进行复杂的事件处 ...

  9. 使用Apache Storm和Apache Ignite进行复杂的事件处理(CEP)

    在本文中, "使用Apache Ignite进行高性能内存计算"一书的作者将讨论使用Apache Strom和Apache Ignite进行复杂的事件处理. 本文的一部分摘自 书 ...

最新文章

  1. CentOS Linux下VNC Server远程桌面配置详解
  2. hdu A Simple Math Problem
  3. 【类库】私房干货.Net数据层方法的封装
  4. sql 2012先分离迁移mdf mlf 文件到别的机器后附加 数据库成只读的修复方法
  5. mysql 内存溢出_mysql - MySQL在非常大的表上计算性能 - 堆栈内存溢出
  6. 提升开发效率的十个工具
  7. 循序渐进db2 第3版_「图书推荐」焊接工程师手册第3版
  8. MySQL高级 —— 查询性能优化
  9. hc sr04流程图_HC-SR04超声模块示例代码/原理图/说明书等全套资料
  10. 吊打面试官系列:你会「递归」么?
  11. error: R_LARCH_SOP_PUSH_PCREL against `x264_log_default‘:PLT shouldn‘t be with r_addend.
  12. win10计算机内存,win10系统电脑怎么升级内存?win10系统升级内存的方法
  13. 矩阵分析与应用-17-Moore-Penrose逆矩阵01
  14. <JVM上篇:内存与垃圾回收篇>01-JVM与Java体系结构
  15. 高纯度高活性艾美捷人重组MEGACD40L蛋白(可溶性)
  16. Android中DialogFragment自动弹出输入法
  17. 瓶子换水问题java_空瓶子换水问题
  18. 来自一个入行三年半的大数据练习生自述
  19. 根据国家法定节假日来计算出报告时间
  20. 什么是数据可视化?为什么要可视化?

热门文章

  1. 简化springboot部署,太灵活方便了!
  2. hibernate配置详情3(Dept)
  3. 系统架构设计师考试999999999999
  4. eclipse 创建ssm spring+springmvc+mybatis 实现登录注册
  5. mysql fpmmm_zabbix配fpmmm(mpm)数据传送不了问题解决
  6. 小微企业名录查询系统_欢迎访问辽宁小微企业名录系统
  7. ReviewForJob——最小生成树(prim + kruskal)源码实现和分析
  8. DFS应用——找出无向图的割点
  9. maven原型_创建自定义Maven原型
  10. java线程池返回线程状态_Java线程的不同状态