Apache Storm:如何使用Flux配置KafkaBolt
微型框架中的助焊剂可以帮助我们定义和部署Storm拓扑。
Flux有各种包装器,可帮助您定义所需的流并初始化Bolts和Spouts(使用带有或不带有参数的构造函数,并通过反射自动调用自定义配置方法)。
您只需要使用Flux就是将其作为依赖项添加到“ pom.xml”中,通过单个YAML文件进行配置(请检查助焊剂示例 ),然后将其用作主类以在Storm集群中部署拓扑(或作为本地测试)。
为了初始化KafkaBolt ,需要执行以下步骤:
- 通过“ withTopicSelector ”方法定义“ topicSelector ”
- 通过“ withTupleToKafkaMapper ”方法定义一个“ kafkaMapper”
- 通过“ withProducerProperties ”方法定义一个“ kafkaProducerProps”
- 使用以上配置初始化“ org.apache.storm.kafka.bolt.KafkaBolt ”
- 在流中包含以上KafkaBolt
KafkaBolt的最小Flux配置示例:
components:- id: "stringScheme"className: "org.apache.storm.kafka.StringScheme"- id: "stringMultiScheme"className: "org.apache.storm.spout.SchemeAsMultiScheme"constructorArgs:- ref: "stringScheme"- id: "zkHosts"className: "org.apache.storm.kafka.ZkHosts"constructorArgs:- "localhost:2181"- id: "topicSelector"className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"constructorArgs:- "myTopicName"- id: "kafkaMapper"className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"- id: "kafkaProducerProps"className: "java.util.Properties"configMethods:- name: "put"args:- "bootstrap.servers"- "localhost:9092"- name: "put"args:- "acks"- "1"- name: "put"args:- "key.serializer"- "org.apache.kafka.common.serialization.StringSerializer"- name: "put"args:- "value.serializer"- "org.apache.kafka.common.serialization.StringSerializer" bolts: - id: "bolt-kafka"className: "org.apache.storm.kafka.bolt.KafkaBolt"parallelism: 1configMethods:- name: "withProducerProperties"args: [ref: "kafkaProducerProps"]- name: "withTopicSelector"args: [ref: "topicSelector"]- name: "withTupleToKafkaMapper"args: [ref: "kafkaMapper"]streams:- name: "spout --> kafkaBolt"from: "spout-1"to: "bolt-kafka"grouping:type: LOCAL_OR_SHUFFLE
有关完整的工作配置示例,请选中此项 ,可以像这样使用 。
在Storm上部署拓扑的示例命令:
storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --remote --c nimbus.host=192.168.1.200 src/test/resources/flux/topology_kafka.yaml
KafkaSpout的助焊剂配置已作为官方助焊剂示例进行了描述。 Flux是一个非常有用的框架,它消除了定义和初始化拓扑所需的自定义代码
翻译自: https://www.javacodegeeks.com/2016/05/apache-storm-configure-kafkabolt-flux.html
Apache Storm:如何使用Flux配置KafkaBolt相关推荐
- flux storm_Apache Storm:如何使用Flux配置KafkaBolt
flux storm 微型框架中的助焊剂可以帮助我们定义和部署Storm拓扑. Flux有各种包装器,可帮助您定义所需的流并初始化Bolts和Spouts(使用带有或不带有参数的构造函数,并通过反射自 ...
- kite 使用 go_使用Apache Storm和Kite SDK Morphlines的可配置ETL处理
kite 使用 go 从我担任软件工程师的第一天起,我总是听到很多方面的相同要求: " 我们希望所有内容都可配置,我们希望在运行时更改所有内容,我们希望有一个可视化工具来应用所有这些逻辑,以 ...
- 使用Apache Storm和Kite SDK Morphlines的可配置ETL处理
从我担任软件工程师的第一天起,我总是听到很多方面的相同要求: " 我们希望所有内容都可配置,我们希望在运行时更改所有内容,我们希望有一个可视化工具来应用所有这些逻辑,以便非开发人员使用和配置 ...
- Apache Storm的实时情绪分析示例
实时情感分析是指处理自然语言文本(或语音)流以提取主观信息. 琐碎的用例用于构建推荐引擎或查找社交媒体趋势. 我选择了Apache Storm作为实时处理引擎. Storm非常强大(我们正在生产中使用 ...
- Spotify如何对Apache Storm进行规模扩展
[编者的话]Spotify是一家音乐流媒体服务商,最新的数据显示他们已经有6000万用户.Spotify内部使用Apache Storm来构建实时类系统,包括广告定位.音乐推荐以及数据可视化等.本文来 ...
- Apache Storm 实时流处理系统ACK机制以及源码分析
1.ACK机制简介 Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间(timeout)内被完全处理.完全处理的意思是该MessageId绑定的源Tuple以及由该源Tupl ...
- Apache Storm 实时流处理系统通信机制源码分析
我们今天就来仔细研究一下Apache Storm 2.0.0-SNAPSHOT的通信机制.下面我将从大致思想以及源码分析,然后我们细致分析实时流处理系统中源码通信机制研究. 1. 简介 Worker间 ...
- apache ignite_使用Apache Storm和Apache Ignite进行复杂事件处理(CEP)
apache ignite 在本文中, "使用Apache Ignite进行高性能内存计算"一书的作者将讨论使用Apache Strom和Apache Ignite进行复杂的事件处 ...
- 使用Apache Storm和Apache Ignite进行复杂的事件处理(CEP)
在本文中, "使用Apache Ignite进行高性能内存计算"一书的作者将讨论使用Apache Strom和Apache Ignite进行复杂的事件处理. 本文的一部分摘自 书 ...
最新文章
- CentOS Linux下VNC Server远程桌面配置详解
- hdu A Simple Math Problem
- 【类库】私房干货.Net数据层方法的封装
- sql 2012先分离迁移mdf mlf 文件到别的机器后附加 数据库成只读的修复方法
- mysql 内存溢出_mysql - MySQL在非常大的表上计算性能 - 堆栈内存溢出
- 提升开发效率的十个工具
- 循序渐进db2 第3版_「图书推荐」焊接工程师手册第3版
- MySQL高级 —— 查询性能优化
- hc sr04流程图_HC-SR04超声模块示例代码/原理图/说明书等全套资料
- 吊打面试官系列:你会「递归」么?
- error: R_LARCH_SOP_PUSH_PCREL against `x264_log_default‘:PLT shouldn‘t be with r_addend.
- win10计算机内存,win10系统电脑怎么升级内存?win10系统升级内存的方法
- 矩阵分析与应用-17-Moore-Penrose逆矩阵01
- <JVM上篇:内存与垃圾回收篇>01-JVM与Java体系结构
- 高纯度高活性艾美捷人重组MEGACD40L蛋白(可溶性)
- Android中DialogFragment自动弹出输入法
- 瓶子换水问题java_空瓶子换水问题
- 来自一个入行三年半的大数据练习生自述
- 根据国家法定节假日来计算出报告时间
- 什么是数据可视化?为什么要可视化?
热门文章
- 简化springboot部署,太灵活方便了!
- hibernate配置详情3(Dept)
- 系统架构设计师考试999999999999
- eclipse 创建ssm spring+springmvc+mybatis 实现登录注册
- mysql fpmmm_zabbix配fpmmm(mpm)数据传送不了问题解决
- 小微企业名录查询系统_欢迎访问辽宁小微企业名录系统
- ReviewForJob——最小生成树(prim + kruskal)源码实现和分析
- DFS应用——找出无向图的割点
- maven原型_创建自定义Maven原型
- java线程池返回线程状态_Java线程的不同状态