文章目录

  • 0x00 文章内容
  • 0x01 Kafka准备
    • 1. 启动Kafka
    • 2. 创建Topic
    • 3. 启动消费者与消费者
  • 0x02 Storm准备
    • 1. 构建Maven项目
    • 2. 编写代码
  • 0x03 校验结果
    • 1. 打包Storm代码
    • 2. 执行ZK与Storm
    • 3. 执行Storm作业
    • 4. 校验过程
  • 0xFF 总结

0x00 文章内容

  1. Kafka准备
  2. Storm准备
  3. 校验结果

0x01 Kafka准备

1. 启动Kafka

a. 后台启动Kafka(三台都要启动)

nohup ~/bigdata/kafka_2.11-1.0.0/bin/kafka-server-start.sh ~/bigdata/kafka_2.11-1.0.0/config/server.properties >~/bigdata/kafka_2.11-1.0.0/logs/server.log 2>&1 &
2. 创建Topic

a. 创建Topic:word-count-input

~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic word-count-input

b. 创建Topic:word-count-output

~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic word-count-output
3. 启动消费者与消费者

a. 启动一个producer,向word-count-input发送消息

进入到$KAFKA_HOME路径:
cd ~/bigdata/kafka_2.11-1.0.0

启动:

bin/kafka-console-producer.sh --broker-list master:9092 --topic word-count-input


b. 启动一个consumer,消费word-count-output的消息

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic word-count-output --property print.key=true

0x02 Storm准备

1. 构建Maven项目

a. 引入Storm依赖

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.2.2</version><scope>provided</scope>
</dependency>

b. 引入Kafka依赖

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka-client</artifactId><version>1.2.2</version>
</dependency>

c. 引入额外打包插件

<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target><testExcludes><testExclude>/src/test/**</testExclude></testExcludes><encoding>utf-8</encoding></configuration>
</plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id> <!-- this is used for inheritance merges --><phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 --><goals><goal>single</goal></goals></execution></executions>
</plugin>

d. 完整的pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.shaonaiyi</groupId><artifactId>stormlearning</artifactId><version>1.0-SNAPSHOT</version><dependencies><!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core --><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.2.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka-client</artifactId><version>1.2.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target><testExcludes><testExclude>/src/test/**</testExclude></testExcludes><encoding>utf-8</encoding></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id> <!-- this is used for inheritance merges --><phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 --><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
2. 编写代码

a. 项目代码结构

b. KafkaSpoutBuilder

package com.shaonaiyi.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;import java.util.List;/*** @author: shaonaiyi* @createTime: 2019/07/14 13:32* @description: KafkaSpout构建器*/public class KafkaSpoutBuilder {private List<String> brokers;private String topic;public KafkaSpoutBuilder brokers(List<String> v) {brokers = v;return this;}public KafkaSpoutBuilder topic(String v) {topic = v;return this;}public KafkaSpout build() {/** 配置kafka* 1. 需要设置consumer group(注意一个partition中的消息只能被同一group中的一个consumer消费)* 2. 起始消费策略:根据业务需要配置*/String allBrokers = String.join(",", brokers);KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder(allBrokers, topic).setProp(ConsumerConfig.GROUP_ID_CONFIG, "word-count-storm")//消费最新的数据.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST).build();return new KafkaSpout(conf);}}

c. KafkaSplitSentenceBolt

package com.shaonaiyi.kafka;/*** @author: shaonaiyi* @createTime: 2019/07/14 13:38* @description: 语句分割bolt*/import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;/*** 如,接收的Tuple是:Tuple("sentence" -> "I love teacher shao")* 则,输出的Tuple为:*      Tuple("word" -> "I")*      Tuple("word" -> "love")*      Tuple("word" -> "teacher")*      Tuple("word" -> "shao")*/
public class KafkaSplitSentenceBolt extends BaseRichBolt {private OutputCollector collector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;}@Overridepublic void execute(Tuple tuple) { // 实时接收SentenceSpout中输出的Tuple流String sentence = tuple.getStringByField("value"); // 根据key获取Tuple中的语句,"value"是Kafka中固定了的String[] words = sentence.split(" "); // 将语句按照空格进行切割for (String word: words) {this.collector.emit(new Values(word)); // 将切割之后的每一个单词作为Tuple的value输出到下一个bolt中}this.collector.ack(tuple); // 表示成功处理kafka-spout输出的消息,需要应答,要不然,kafka-spout会不断的重复发送消息}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word")); // 输出Tuple的key}}

d. KafkaWordCountBolt

package com.shaonaiyi.kafka;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.HashMap;
import java.util.Map;/*** @author: shaonaiyi* @createTime: 2019/07/14 13:42* @description: 单词计数bolt*/public class KafkaWordCountBolt extends BaseRichBolt {private OutputCollector collector;private HashMap<String, Long> counts = null; // 用于统计每隔单词的计数@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;this.counts = new HashMap<String, Long>();}@Overridepublic void execute(Tuple tuple) { // 实时接收SplitSentenceBolt中输出的Tuple流String word = tuple.getStringByField("word"); // 根据key获取Tuple中的单词// 统计每一个单词总共出现的次数Long count = counts.getOrDefault(word, 0L);count++;this.counts.put(word, count);// 将每一个单词以及这个单词出现的次数作为Tuple中的value输出到下一个bolt中this.collector.emit(new Values(word, count.toString()));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {// 输出Tuple的key,有两个key,是因为每次输出的value也有两个outputFieldsDeclarer.declare(new Fields("key", "message"));}}

e. WordCountKafkaTopology

package com.shaonaiyi.kafka;import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;import java.util.Arrays;
import java.util.Properties;/*** @author: shaonaiyi* @createTime: 2019/07/15 22:54* @description: Kafka之WordCountTopology*/public class WordCountKafkaTopology {private static final String SENTENCE_SPOUT_ID = "sentence-spout";private static final String SPLIT_BOLT_ID = "split-bolt";private static final String COUNT_BOLT_ID = "count-bolt";private static final String KAFKA_BOLT_ID = "kafka-bolt";private static final String TOPOLOGY_NAME = "word-count-topology";public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {int workers = Integer.parseInt(args[0]);// 从Kafka中消费数据KafkaSpout kafkaSpout = new KafkaSpoutBuilder().brokers(Arrays.asList("master:9092")).topic("word-count-input").build();KafkaSplitSentenceBolt splitSentenceBolt = new KafkaSplitSentenceBolt();KafkaWordCountBolt wordCountBolt = new KafkaWordCountBolt();Properties props = new Properties();props.put("bootstrap.servers", "master:9092");// 此配置是表明当一次produce请求被认为完成时的确认值。// 特别是,多少个其他brokers必须已经提交了数据到他们的log并且向他们的leader确认了这些信息。典型的值包括:// 0: 表示producer从来不等待来自broker的确认信息(和0.7一样的行为)。// 这个选择提供了最小的时延但同时风险最大(因为当server宕机时,数据将会丢失)。// 1:表示获得leader replica已经接收了数据的确认信息。这个选择时延较小同时确保了server确认接收成功。// -1:producer会获得所有同步replicas都收到数据的确认props.put("acks", "1");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaBolt kafkaBolt = new KafkaBolt().withProducerProperties(props).withTopicSelector(new DefaultTopicSelector("word-count-output")).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());TopologyBuilder builder = new TopologyBuilder();builder.setSpout(SENTENCE_SPOUT_ID, kafkaSpout);builder.setBolt(SPLIT_BOLT_ID, splitSentenceBolt).shuffleGrouping(SENTENCE_SPOUT_ID);builder.setBolt(COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));builder.setBolt(KAFKA_BOLT_ID, kafkaBolt).shuffleGrouping(COUNT_BOLT_ID);// 3、提交TopologyConfig config = new Config(); // 用来配置Topology运行时行为,对Topology所有组件全局生效的配置参数集合config.setNumWorkers(workers);StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); // 提交Topology}}

0x03 校验结果

1. 打包Storm代码

a. 打包

b. 上传到集群

2. 执行ZK与Storm

此步骤与教程:实时流处理框架之Storm的安装与部署
=>
0x03 启动并校验Storm 步骤一样

即:
a. 启动集群上的三台Zookeeper(查看进程是否存在,如果Kafka已经启动,应该还有Kafka的进程)

b. 启动Storm
在master上启动Nimbus和Web UI
cd ~/bigdata/apache-storm-1.2.2
nohup bin/storm nimbus 2>&1 &
然后回车,切换终端2,执行:
nohup bin/storm ui 2>&1 &
然后回车
在slave1和slave2上启动Supervisor
cd ~/bigdata/apache-storm-1.2.2
nohup bin/storm supervisor 2>&1 &

3. 执行Storm作业

a. 执行Storm作业

~/bigdata/apache-storm-1.2.2/bin/storm jar /home/hadoop-sny/jar/stormlearning-1.0-SNAPSHOT-jar-with-dependencies.jar com.shaonaiyi.kafka.WordCountKafkaTopology 1


b. 查看Web UI界面(master:8080

4. 校验过程

a. 目前各节点的进程情况

b. 发送消息到Kafka

c. 查看消费者信息

d. 查看Storm的Web UI界面

0xFF 总结

  1. 在生产者端多发送几个语句,你会发现这种统计的结果,并不是我们真正想要的结果,思考应该怎样才能想我们前面学习WordCount那种表现形式,请看后面的教程。
  2. 内容比较多,请大家认真操作。

作者简介:邵奈一
全栈工程师、市场洞察者、专栏编辑
| 公众号 | 微信 | 微博 | CSDN | 简书 |

福利:
邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。


Flume+Kafka+Storm实战:一、Kakfa与Storm整合相关推荐

  1. flume kafka storm mysql_flume+kafka+storm打通过程

    0.有的地方我已经整理成脚本了,有的命令是脚本里面截取的 1.启动hadoop和yarn $HADOOP_HOME/sbin/start-dfs.sh;$HADOOP_HOME/sbin/start- ...

  2. Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

    http://blog.51cto.com/xpleaf/2104160?cid=704690 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进 ...

  3. 实时数据处理环境搭建flume+kafka+storm:4.storm安装配置

    1.解压 apache-storm-0.9.3.tar.gz 2.修改配置文件 conf/storm.yaml --zk地址  storm.zookeeper.servers:  - "wc ...

  4. flume+kafka+storm整合02---问题

    ###1.启动storm时总是报错 storm Session 0x0 for server null, unexpected error, closing socket connection ### ...

  5. storm 实战及实例讲解(二)

    storm 实战及实例讲解(二)   --comaple.zhang      前面已近介绍了storm集群的搭建,和使用场景,那么现在让我们一起来探讨一下storm具体该怎么使用吧. 首先,我们要明 ...

  6. storm 实战及实例讲解(一)

    storm 实战及实例讲解一 --应用场景分析,drpc服务器配置 --by comaple  2012-08-27 先给大家打打气,看看效果.这是taobao对外公布的storm使用情况,请大家欣赏 ...

  7. storm 实战及实例讲解(三)

    storm 实战及实例讲解(三)                                      --comaple.zhang                                ...

  8. Kafka项目实战-用户日志上报实时统计之编码实践

    1.概述 本课程的视频教程地址:<Kafka实战项目之编码实践>  该课程我以用户实时上报日志案例为基础,带着大家去完成各个KPI的编码工作,实现生产模块.消费模块,数据持久化,以及应用调 ...

  9. Flume+Kafka双剑合璧玩转大数据平台日志采集

    点击上方蓝色字体,选择"设为星标" 回复"资源"获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 大数据真好玩 点击右侧关注,大数据真好 ...

最新文章

  1. php 日志函数,php 写入日志函数
  2. java 分批同时处理_java中List集合分批处理
  3. 截网页全屏图的方法-截网页全屏软件-Web2Pic Pro
  4. (软件工程复习核心重点)第十章面向对象设计-第四节:设计人机交互子系统和设计任务管理子系统
  5. C语言整数的取值范围
  6. Tree树结构java实现
  7. 同时更改一条数据_数据仓库amp;面试总结
  8. Linux连接mysql 出现Access denied for user ‘root‘@‘localhost‘(using password: YES)错误解决方案
  9. 《人工智能及其应用》1-6章
  10. html 怎么设置时间函数,JavaScript日期函数 - 计时器、innerHTML
  11. Mac上编译C++报错
  12. 远程连接桌面报:这可能是由于credssp加密oracle修正
  13. (6)java的内存泄露问题
  14. 蚂蚁金服 CEO 突然辞职!去向很意外。。。
  15. Mybatis事务管理机制<transactionManager>
  16. html5分镜头脚本范例,分镜头脚本模板(小故事分镜头脚本范例)
  17. Sentiment140数据集
  18. Nginx专题:nginx+tomcat实现动静分离
  19. IOS 使用itms-services协议,服务端安装应用
  20. Instruments 之 Energy Log

热门文章

  1. 【电子文档】大批量电子文档的自动化生成 - 第一版
  2. 计算机培训机构内部分工,电教人员分工及岗位职责
  3. 利用CNN实现图像和数值数据融合
  4. 工业软件持续发展的动力:科学探索与制造业创新
  5. 一级消防工程师考试,知道这些不失分!
  6. Golang GC概述
  7. 2023年,下班后可以做什么副业?
  8. 4年估值140亿的元气森林 内忧外患的壁垒困境
  9. anchorPoint随记
  10. mediasoup详解