之前的kafka案例:http://blog.csdn.net/weixin_35757704/article/details/77196539

之前的storm案例:http://blog.csdn.net/weixin_35757704/article/details/77246313

首先要提醒:导包的时候要注意导入正确的包,如果出现强制类型转换或是方法不存在,对于新手来说多半是导包导错了

kafka与storm各自的模块了解之后,就要开始集成了.他们都要拿出自己的一部分东西:

  • storm拿出spout给kafka,即数据源交给kafka
  • kafka拿出自身的消费者,将消费者变为storm
那么,逻辑就很简单了,现在只需要在Spout类上动手脚就可以了.如果到这里不是很懂,建议参考头顶上的那两篇博客;
修改即可(在开头的storm案例里的代码的基础上进行修改)
直接更改代表着整个拓扑的Topology类就好:
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.kafka.*;public class TopologyWordCount {public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();// BrokerHosts接口有2个实现类StaticHosts和ZkHosts,ZkHosts会定时(默认60秒)从ZK中更新brokers的信息,StaticHosts是则不会// 要注意这里的第二个参数brokerZkPath要和kafka中的server.properties中配置的zookeeper.connect对应,没有专门配置就可以不填写,使用默认值BrokerHosts brokerHosts = new ZkHosts("192.168.0.171:2181," + "192.168.0.207:2181," + "192.168.0.204:2181");// 定义spoutConfig// 第一个参数hosts是上面定义的brokerHosts// 第二个参数topic是该Spout订阅的topic名称,需要提前创建这个topic// 第三个参数zkRoot是存储消费的offset(存储在zookeeper中了),也就是当这个Topology遇到故障重启后会将故障期间未消费的message继续消费而不会丢失(可不配置)// 第四个参数id是当前spout的唯一标识,当存在多个spout时,id不能冲突SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "shuaige33333", "", "0");// 定义kafkaSpout如何解析数据,这里是将kafka的producer send的数据放入到String// 类型的str变量中输出,这个str是StringSchema定义的变量名称/*通过字符串的方式解析数据*/spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());/*开始拓扑套路*/builder.setSpout("kafkaSpout", new KafkaSpout(spoutConfig));/*kafka 的spout*/builder.setBolt("WordSplitBolt", new WordSplitBolt()).shuffleGrouping("kafkaSpout");/*WordCountBolt获得从WordSplitBolt中传递过来的单词并统计词频*/builder.setBolt("WordCountBolt", new WordCountBolt()).fieldsGrouping("WordSplitBolt", new Fields("word"));// 本地运行或者提交到集群if (args != null && args.length == 1) {// 集群运行StormSubmitter.submitTopology(args[0], new Config(), builder.createTopology());} else {// 本地运行LocalCluster cluster = new LocalCluster();cluster.submitTopology("local", new Config(), builder.createTopology());Thread.sleep(10000000);cluster.shutdown();}}
}

只需要更改TopologyWordCount,就可以将storm集成kafka,将storm作为kafka的消费者,之后的那些逻辑就可以自己来写了.

注意这里:
builder.setSpout("kafkaSpout", new KafkaSpout(spoutConfig));

这里的KafkaSpout 是官方的Spout,在下一个bolt进行数据的提取时,要使用的方法是:

public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {String line = input.getStringByField("str");//得到进入kafka的字符串for (String word : line.split(" ")) {basicOutputCollector.emit(new Values(word));}}

然后就可以测试了:

整体思路:
先把向kafka发送数据的那个类写好(看头顶第一篇博客),然后再练习storm(第二篇博客),然后更改掉Topology类与Spout类就好

storm如何集成kafka相关推荐

  1. Storm集成Kafka

    一.整合说明 Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下: Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持: ...

  2. 大数据Spark Structured Streaming集成 Kafka

    目录 1 Kafka 数据消费 2 Kafka 数据源 3 Kafka 接收器 3.1 配置说明 3.2 实时数据ETL架构 3.3 模拟基站日志数据 3.4 实时增量ETL 4 Kafka 特定配置 ...

  3. SpringBoot笔记:SpringBoot2.3集成Kafka组件配置

    文章目录 说明 Springboot集成Kafka 依赖配置 配置文件yml配置 Producer生产者 Consumer消费者 测试代码 注意事项 说明 本文是接<Kafka学习:CentOS ...

  4. springboot集成kafka及kafka web UI的使用

    springboot集成kafka application.properties spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,Ce ...

  5. .Net Core 集成 Kafka

    最近维护的一个系统并发有点高,所以想引入一个消息队列来进行削峰.考察了一些产品,最终决定使用kafka来当做消息队列.以下是关于kafka的一些知识的整理笔记. kafka kafka 是分布式流式平 ...

  6. Splunk集成Kafka配置方法

    [摘要]Splunk是业界赫赫有名的数据分析工具,比较擅长BI和安全分析,我司很多部门都有购买其产品和服务.最近有个需求要把Splunk和分部署消息队列Kafka做个集成,Splunk官方提供的一个K ...

  7. SpringCloud学习之SpringCloudStream集成kafka

    一.关于Spring-Cloud-Stream Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框 ...

  8. Kafka09:【案例】Flume集成Kafka

    一.Flume集成Kafka 在实际工作中flume和kafka会深度结合使用 1:flume采集数据,将数据实时写入kafka 2:flume从kafka中消费数据,保存到hdfs,做数据备份 下面 ...

  9. springboot 集成kafka 实现多个customer不同group

    springboot正常集成kafka 这个网上很多资料都有些集成,我就不浪费太多篇幅和时间了,笔者找了篇还算很容易理解的博客,自行学习 https://blog.csdn.net/tzs_10412 ...

最新文章

  1. Failed to load module script: The server responded with a non-JavaScript MIME type of “text/plain“.
  2. 开放大学MySQL形考_95至尊考试网-国家开放大学《数据库基础与应用》形考任务1...
  3. python查询MySQL写入Excel
  4. 大剑无锋之DQL、DML、DDL、DCL,简单举个例子【面试推荐】
  5. LeetCode 1305. 两棵二叉搜索树中的所有元素(二叉树迭代器)
  6. Linux之防火墙开通端口
  7. WinXp怎么开机进入Dos
  8. Python爬虫辅助库BeautifulSoup4用法精要
  9. jQuery CSS 添加/删除类名
  10. Unicode编码在JavaScript中的作用是什么?
  11. Canon imageRUNNER 2525i打印机驱动通过IP地址进行安装
  12. python调用dm.dll
  13. windows磁盘管理压缩卷只能压缩一部分的问题解决办法
  14. C#数字转字母,ASCII码转换
  15. Java语言为excel添加水印,使用原生POI, (XSSFWorkbook, XSSFSheet), 真正背景图水印效果,非普通图片张贴
  16. 【最近抖音上元宇宙虚拟项目七国争霸,直播互动游戏源码解析】
  17. 在Linux上yum安装snmp,centos7配置安装snmp
  18. 【增强版短视频去水印源码】去水印微信小程序+去水印软件源码
  19. 后端组装PHP,后端程序安装
  20. oracle时差,oracle的时差

热门文章

  1. 用html编写勾股定理,一种勾股定理演示器的制作方法
  2. 为虚幻引擎4设置Visual Studio
  3. vue实现两重列表集合,点击显示,点击隐藏的折叠效果,(默认显示集合最新一条数据,点击展开,显示集合所有数据)...
  4. 为Unity项目生成文档(一)
  5. linux sz rz 下载与上传命令
  6. Mysql数据库Sql语句执行效率-Explain
  7. Win2003远程桌面报错:RPC错误 解决办法
  8. 3G时代 一起走近无线运维的3A标准——柳州市劳动和社会保障局
  9. bgb邻居关系建立模型_今日 Paper | 新闻推荐系统;多路编码;知识增强型预训练模型等...
  10. C语言考试题及答案(8),2015年计算机二级C语言测试题及答案(8)