Flume+Kafka+Storm实战:一、Kakfa与Storm整合
文章目录
- 0x00 文章内容
- 0x01 Kafka准备
- 1. 启动Kafka
- 2. 创建Topic
- 3. 启动消费者与消费者
- 0x02 Storm准备
- 1. 构建Maven项目
- 2. 编写代码
- 0x03 校验结果
- 1. 打包Storm代码
- 2. 执行ZK与Storm
- 3. 执行Storm作业
- 4. 校验过程
- 0xFF 总结
0x00 文章内容
- Kafka准备
- Storm准备
- 校验结果
0x01 Kafka准备
1. 启动Kafka
a. 后台启动Kafka(三台都要启动)
nohup ~/bigdata/kafka_2.11-1.0.0/bin/kafka-server-start.sh ~/bigdata/kafka_2.11-1.0.0/config/server.properties >~/bigdata/kafka_2.11-1.0.0/logs/server.log 2>&1 &
2. 创建Topic
a. 创建Topic:word-count-input
~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic word-count-input
b. 创建Topic:word-count-output
~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic word-count-output
3. 启动消费者与消费者
a. 启动一个producer,向word-count-input
发送消息
进入到$KAFKA_HOME
路径:
cd ~/bigdata/kafka_2.11-1.0.0
启动:
bin/kafka-console-producer.sh --broker-list master:9092 --topic word-count-input
b. 启动一个consumer,消费word-count-output
的消息
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic word-count-output --property print.key=true
0x02 Storm准备
1. 构建Maven项目
a. 引入Storm依赖
<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.2.2</version><scope>provided</scope>
</dependency>
b. 引入Kafka依赖
<dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka-client</artifactId><version>1.2.2</version>
</dependency>
c. 引入额外打包插件
<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target><testExcludes><testExclude>/src/test/**</testExclude></testExcludes><encoding>utf-8</encoding></configuration>
</plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id> <!-- this is used for inheritance merges --><phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 --><goals><goal>single</goal></goals></execution></executions>
</plugin>
d. 完整的pom.xml
文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.shaonaiyi</groupId><artifactId>stormlearning</artifactId><version>1.0-SNAPSHOT</version><dependencies><!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core --><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.2.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka-client</artifactId><version>1.2.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target><testExcludes><testExclude>/src/test/**</testExclude></testExcludes><encoding>utf-8</encoding></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id> <!-- this is used for inheritance merges --><phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 --><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
2. 编写代码
a. 项目代码结构
b. KafkaSpoutBuilder
package com.shaonaiyi.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;import java.util.List;/*** @author: shaonaiyi* @createTime: 2019/07/14 13:32* @description: KafkaSpout构建器*/public class KafkaSpoutBuilder {private List<String> brokers;private String topic;public KafkaSpoutBuilder brokers(List<String> v) {brokers = v;return this;}public KafkaSpoutBuilder topic(String v) {topic = v;return this;}public KafkaSpout build() {/** 配置kafka* 1. 需要设置consumer group(注意一个partition中的消息只能被同一group中的一个consumer消费)* 2. 起始消费策略:根据业务需要配置*/String allBrokers = String.join(",", brokers);KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder(allBrokers, topic).setProp(ConsumerConfig.GROUP_ID_CONFIG, "word-count-storm")//消费最新的数据.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST).build();return new KafkaSpout(conf);}}
c. KafkaSplitSentenceBolt
package com.shaonaiyi.kafka;/*** @author: shaonaiyi* @createTime: 2019/07/14 13:38* @description: 语句分割bolt*/import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;/*** 如,接收的Tuple是:Tuple("sentence" -> "I love teacher shao")* 则,输出的Tuple为:* Tuple("word" -> "I")* Tuple("word" -> "love")* Tuple("word" -> "teacher")* Tuple("word" -> "shao")*/
public class KafkaSplitSentenceBolt extends BaseRichBolt {private OutputCollector collector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;}@Overridepublic void execute(Tuple tuple) { // 实时接收SentenceSpout中输出的Tuple流String sentence = tuple.getStringByField("value"); // 根据key获取Tuple中的语句,"value"是Kafka中固定了的String[] words = sentence.split(" "); // 将语句按照空格进行切割for (String word: words) {this.collector.emit(new Values(word)); // 将切割之后的每一个单词作为Tuple的value输出到下一个bolt中}this.collector.ack(tuple); // 表示成功处理kafka-spout输出的消息,需要应答,要不然,kafka-spout会不断的重复发送消息}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word")); // 输出Tuple的key}}
d. KafkaWordCountBolt
package com.shaonaiyi.kafka;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.HashMap;
import java.util.Map;/*** @author: shaonaiyi* @createTime: 2019/07/14 13:42* @description: 单词计数bolt*/public class KafkaWordCountBolt extends BaseRichBolt {private OutputCollector collector;private HashMap<String, Long> counts = null; // 用于统计每隔单词的计数@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;this.counts = new HashMap<String, Long>();}@Overridepublic void execute(Tuple tuple) { // 实时接收SplitSentenceBolt中输出的Tuple流String word = tuple.getStringByField("word"); // 根据key获取Tuple中的单词// 统计每一个单词总共出现的次数Long count = counts.getOrDefault(word, 0L);count++;this.counts.put(word, count);// 将每一个单词以及这个单词出现的次数作为Tuple中的value输出到下一个bolt中this.collector.emit(new Values(word, count.toString()));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {// 输出Tuple的key,有两个key,是因为每次输出的value也有两个outputFieldsDeclarer.declare(new Fields("key", "message"));}}
e. WordCountKafkaTopology
package com.shaonaiyi.kafka;import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;import java.util.Arrays;
import java.util.Properties;/*** @author: shaonaiyi* @createTime: 2019/07/15 22:54* @description: Kafka之WordCountTopology*/public class WordCountKafkaTopology {private static final String SENTENCE_SPOUT_ID = "sentence-spout";private static final String SPLIT_BOLT_ID = "split-bolt";private static final String COUNT_BOLT_ID = "count-bolt";private static final String KAFKA_BOLT_ID = "kafka-bolt";private static final String TOPOLOGY_NAME = "word-count-topology";public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {int workers = Integer.parseInt(args[0]);// 从Kafka中消费数据KafkaSpout kafkaSpout = new KafkaSpoutBuilder().brokers(Arrays.asList("master:9092")).topic("word-count-input").build();KafkaSplitSentenceBolt splitSentenceBolt = new KafkaSplitSentenceBolt();KafkaWordCountBolt wordCountBolt = new KafkaWordCountBolt();Properties props = new Properties();props.put("bootstrap.servers", "master:9092");// 此配置是表明当一次produce请求被认为完成时的确认值。// 特别是,多少个其他brokers必须已经提交了数据到他们的log并且向他们的leader确认了这些信息。典型的值包括:// 0: 表示producer从来不等待来自broker的确认信息(和0.7一样的行为)。// 这个选择提供了最小的时延但同时风险最大(因为当server宕机时,数据将会丢失)。// 1:表示获得leader replica已经接收了数据的确认信息。这个选择时延较小同时确保了server确认接收成功。// -1:producer会获得所有同步replicas都收到数据的确认props.put("acks", "1");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaBolt kafkaBolt = new KafkaBolt().withProducerProperties(props).withTopicSelector(new DefaultTopicSelector("word-count-output")).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());TopologyBuilder builder = new TopologyBuilder();builder.setSpout(SENTENCE_SPOUT_ID, kafkaSpout);builder.setBolt(SPLIT_BOLT_ID, splitSentenceBolt).shuffleGrouping(SENTENCE_SPOUT_ID);builder.setBolt(COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));builder.setBolt(KAFKA_BOLT_ID, kafkaBolt).shuffleGrouping(COUNT_BOLT_ID);// 3、提交TopologyConfig config = new Config(); // 用来配置Topology运行时行为,对Topology所有组件全局生效的配置参数集合config.setNumWorkers(workers);StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); // 提交Topology}}
0x03 校验结果
1. 打包Storm代码
a. 打包
b. 上传到集群
2. 执行ZK与Storm
此步骤与教程:实时流处理框架之Storm的安装与部署
=>
0x03 启动并校验Storm 步骤一样
即:
a. 启动集群上的三台Zookeeper(查看进程是否存在,如果Kafka已经启动,应该还有Kafka的进程)
b. 启动Storm
在master上启动Nimbus和Web UI
cd ~/bigdata/apache-storm-1.2.2
nohup bin/storm nimbus 2>&1 &
然后回车,切换终端2,执行:
nohup bin/storm ui 2>&1 &
然后回车
在slave1和slave2上启动Supervisor
cd ~/bigdata/apache-storm-1.2.2
nohup bin/storm supervisor 2>&1 &
3. 执行Storm作业
a. 执行Storm作业
~/bigdata/apache-storm-1.2.2/bin/storm jar /home/hadoop-sny/jar/stormlearning-1.0-SNAPSHOT-jar-with-dependencies.jar com.shaonaiyi.kafka.WordCountKafkaTopology 1
b. 查看Web UI界面(master:8080
)
4. 校验过程
a. 目前各节点的进程情况
b. 发送消息到Kafka
c. 查看消费者信息
d. 查看Storm的Web UI界面
0xFF 总结
- 在生产者端多发送几个语句,你会发现这种统计的结果,并不是我们真正想要的结果,思考应该怎样才能想我们前面学习WordCount那种表现形式,请看后面的教程。
- 内容比较多,请大家认真操作。
作者简介:邵奈一
全栈工程师、市场洞察者、专栏编辑
| 公众号 | 微信 | 微博 | CSDN | 简书 |
福利:
邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。
Flume+Kafka+Storm实战:一、Kakfa与Storm整合相关推荐
- flume kafka storm mysql_flume+kafka+storm打通过程
0.有的地方我已经整理成脚本了,有的命令是脚本里面截取的 1.启动hadoop和yarn $HADOOP_HOME/sbin/start-dfs.sh;$HADOOP_HOME/sbin/start- ...
- Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示
http://blog.51cto.com/xpleaf/2104160?cid=704690 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进 ...
- 实时数据处理环境搭建flume+kafka+storm:4.storm安装配置
1.解压 apache-storm-0.9.3.tar.gz 2.修改配置文件 conf/storm.yaml --zk地址 storm.zookeeper.servers: - "wc ...
- flume+kafka+storm整合02---问题
###1.启动storm时总是报错 storm Session 0x0 for server null, unexpected error, closing socket connection ### ...
- storm 实战及实例讲解(二)
storm 实战及实例讲解(二) --comaple.zhang 前面已近介绍了storm集群的搭建,和使用场景,那么现在让我们一起来探讨一下storm具体该怎么使用吧. 首先,我们要明 ...
- storm 实战及实例讲解(一)
storm 实战及实例讲解一 --应用场景分析,drpc服务器配置 --by comaple 2012-08-27 先给大家打打气,看看效果.这是taobao对外公布的storm使用情况,请大家欣赏 ...
- storm 实战及实例讲解(三)
storm 实战及实例讲解(三) --comaple.zhang ...
- Kafka项目实战-用户日志上报实时统计之编码实践
1.概述 本课程的视频教程地址:<Kafka实战项目之编码实践> 该课程我以用户实时上报日志案例为基础,带着大家去完成各个KPI的编码工作,实现生产模块.消费模块,数据持久化,以及应用调 ...
- Flume+Kafka双剑合璧玩转大数据平台日志采集
点击上方蓝色字体,选择"设为星标" 回复"资源"获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 大数据真好玩 点击右侧关注,大数据真好 ...
最新文章
- php 日志函数,php 写入日志函数
- java 分批同时处理_java中List集合分批处理
- 截网页全屏图的方法-截网页全屏软件-Web2Pic Pro
- (软件工程复习核心重点)第十章面向对象设计-第四节:设计人机交互子系统和设计任务管理子系统
- C语言整数的取值范围
- Tree树结构java实现
- 同时更改一条数据_数据仓库amp;面试总结
- Linux连接mysql 出现Access denied for user ‘root‘@‘localhost‘(using password: YES)错误解决方案
- 《人工智能及其应用》1-6章
- html 怎么设置时间函数,JavaScript日期函数 - 计时器、innerHTML
- Mac上编译C++报错
- 远程连接桌面报:这可能是由于credssp加密oracle修正
- (6)java的内存泄露问题
- 蚂蚁金服 CEO 突然辞职!去向很意外。。。
- Mybatis事务管理机制<transactionManager>
- html5分镜头脚本范例,分镜头脚本模板(小故事分镜头脚本范例)
- Sentiment140数据集
- Nginx专题:nginx+tomcat实现动静分离
- IOS 使用itms-services协议,服务端安装应用
- Instruments 之 Energy Log