一、Flume集成Kafka

在实际工作中flume和kafka会深度结合使用

1:flume采集数据,将数据实时写入kafka
2:flume从kafka中消费数据,保存到hdfs,做数据备份

下面我们就来看一个综合案例

使用flume采集日志文件中产生的实时数据,写入到kafka中,然后再使用flume从kafka中将数据消费出来,保存到hdfs上面

那为什么不直接使用flume将采集到的日志数据保存到hdfs上面呢?

因为中间使用kafka进行缓冲之后,后面既可以实现实时计算,又可以实现离线数据备份,最终实现离线计算,所以这一份数据就可以实现两种需求,使用起来很方便,所以在工作中一般都会这样做。

下面我们来实现一下这个功能

其实在Flume中,针对Kafka提供的有KafkaSource和KafkaSink
KafkaSource是从kafka中读取数据
KafkaSink是向kafka中写入数据

所以针对我们目前这个架构,主要就是配置Flume的Agent。
需要配置两个Agent:
第一个Agent负责实时采集日志文件,将采集到的数据写入Kafka中
第二个Agent负责从Kafka中读取数据,将数据写入HDFS中进行备份

针对第一个Agent:
source:ExecSource,使用tail -F监控日志文件即可
channel:MemoryChannel
sink:KafkaSink

针对第二个Agent
Source:KafkaSource
channel:MemoryChannel
sink:HdfsSink

这里面这些组件其实只有KafkaSource和KafkaSink我们没有使用过,其它的组件都已经用过了。

1、实时采集日志文件,将采集到的数据写入Kafka中

下面来配置第一个Agent:
文件名为:file-to-kafka.conf

内容如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/test.log# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 指定topic名称
a1.sinks.k1.kafka.topic = test_r2p5
# 指定kafka地址,多个节点地址使用逗号分割
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
# 一次向kafka中写多少条数据,默认值为100,在这里为了演示方便,改为1
# 在实际工作中这个值具体设置多少需要在传输效率和数据延迟上进行取舍
# 如果kafka后面的实时计算程序对数据的要求是低延迟,那么这个值小一点比较好
# 如果kafka后面的实时计算程序对数据延迟没什么要求,那么就考虑传输性能,一次多传输一些数据,这样吞吐量会有所提升
# 建议这个值的大小和ExecSource每秒钟采集的数据量大致相等,这样不会频繁向kafka中写数据,并且对kafka后面的实时计算程序也没有很大影响,1秒的数据延迟一般是可以接收的
a1.sinks.k1.kafka.flumeBatchSize = 1
a1.sinks.k1.kafka.producer.acks = 1
# 一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去
# linger.ms和flumeBatchSize,哪个先满足先按哪个规则执行,这个值默认是0,在这设置为1表示每隔1毫秒就将这一个Batch中的数据发送出去
a1.sinks.k1.kafka.producer.linger.ms = 1
# 指定数据传输时的压缩格式,对数据进行压缩,提高传输效率
a1.sinks.k1.kafka.producer.compression.type = snappy# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、从Kafka中读取数据,将数据写入HDFS中进行备份

下面来配置第二个Agent:
文件名为:kafka-to-hdfs.conf

内容如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 一次性向channel中写入的最大数据量,在这为了演示方便,设置为1
# 这个参数的值不要大于MemoryChannel中transactionCapacity的值
a1.sources.r1.batchSize = 1
# 最大多长时间向channel写一次数据
a1.sources.r1.batchDurationMillis = 2000
# kafka地址
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
# topic名称,可以指定一个或者多个,多个topic之间使用逗号隔开
# 也可以使用正则表达式指定一个topic名称规则
a1.sources.r1.kafka.topics = test_r2p5
# 指定消费者组id
a1.sources.r1.kafka.consumer.group.id = flume-con1# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata01:9000/kafkaout
a1.sinks.k1.hdfs.filePrefix = data-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在bigdata04机器的flume目录下复制两个目录

[root@bigdata04 apache-flume-1.9.0-bin]# cd /data/soft/apache-flume-1.9.0-bin
[root@bigdata04 apache-flume-1.9.0-bin]# cp -r conf conf-file-to-kafka
[root@bigdata04 apache-flume-1.9.0-bin]# cp -r conf conf-kafka-to-hdfs

修改 conf_file_to_kafka和conf_kafka_to_hdfs中log4j的配置。

[root@bigdata04 apache-flume-1.9.0-bin]# cd conf-file-to-kafka
[root@bigdata04 conf_file_to_kafka]# vi log4j.properties
flume.root.logger=ERROR,LOGFILE
flume.log.file=flume-file-to-kafka.log[root@bigdata04 apache-flume-1.9.0-bin]# cd conf-kafka-to-hdfs
[root@bigdata04 conf_kafka_to_hdfs]# vi log4j.properties
flume.root.logger=ERROR,LOGFILE
flume.log.file=flume-kafka-to-hdfs.log

把刚才配置的两个Agent的配置文件复制到这两个目录下。

[root@bigdata04 apache-flume-1.9.0-bin]# cd conf-file-to-kafka
[root@bigdata04 conf-file-to-kafka]# vi file-to-kafka.conf
.....把file-to-kafka.conf文件中的内容复制进来即可[root@bigdata04 apache-flume-1.9.0-bin]# cd conf-kafka-to-hdfs/
[root@bigdata04 conf-kafka-to-hdfs]# vi kafka-to-hdfs.conf
.....把kafka-to-hdfs.conf文件中的内容复制进来即可

启动这两个Flume Agent
确保zookeeper集群、kafka集群和Hadoop集群是正常运行的
以及Kafka中的topic需要提前创建好

创建topic

[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181  --partitions 5 --replication-factor 2 --topic test_r2p5

先启动第二个Agent,再启动第一个Agent

[root@bigdata04 apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf-kafka-to-hdfs --conf-file conf-kafka-to-hdfs/kafka-to-hdfs.conf[root@bigdata04 apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf-file-to-kafka --conf-file conf-file-to-kafka/file-to-kafka.conf

模拟产生日志数据

[root@bigdata04 ~]# cd /data/log/
[root@bigdata04 log]# echo hello world >> /data/log/test.log

到HDFS上查看数据,验证结果:

[root@bigdata04 ~]# hdfs dfs -ls /kafkaout
Found 1 items
-rw-r--r--   2 root supergroup         12 2020-06-09 22:59 /kafkaout/data-.1591714755267.tmp
[root@bigdata04 ~]# hdfs dfs -cat /kafkaout/data-.1591714755267.tmp
hello world

此时Flume可以通过tail -F命令实时监控文件中的新增数据,发现有新数据就写入kafka,然后kafka后面的flume落盘程序,以及kafka后面的实时计算程序就可以使用这份数据了。

Kafka09:【案例】Flume集成Kafka相关推荐

  1. Flume虞兮叹一(为什么要把kafka和flume集成)

    为什么要集成Flume和Kafka 我们很多人在在使用Flume和kafka时,都会问一句为什么要将Flume和Kafka集成? 那首先就应该明白业务需求,一般使用Flume+Kafka架构都是希望完 ...

  2. Flume与Kafka整合案例详解

    环境配置 名称 版本 下载地址 Centos 7.0 64x 百度 Zookeeper 3.4.5   Flume 1.6.0   Kafka 2.1.0   flume笔记 直接贴配置文件 [roo ...

  3. Flume+HBase+Kafka集成与开发

    先把flume1.7的源码包下载 http://archive.apache.org/dist/flume/1.7.0/ 下载解压后 我们通过IDEA这个软件来打开这个工程 点击ok后我们选择打开一个 ...

  4. 大数据流处理:Flume、Kafka和NiFi对比

    在构建大数据流水线时,我们需要考虑处理数据的数量,种类和速度,这些数据通常出现在Hadoop生态系统的入口.在决定采用哪种工具来满足我们的要求时,都会考虑到可扩展性.可靠性.适应性.开发时间方面的成本 ...

  5. kafka数据到flume_大数据摄取:Flume,Kafka和NiFi

    kafka数据到flume 初赛 在构建大数据管道时,我们需要考虑如何吸收出现在通常是Hadoop生态系统大门口的数据量,多样性和速度. 在决定采用哪种工具来满足我们的要求时,诸如可伸缩性,可靠性,适 ...

  6. 大数据摄取:Flume,Kafka和NiFi

    初赛 在构建大数据管道时,我们需要考虑如何吸收出现在通常是Hadoop生态系统大门口的数据量,多样性和速度. 在决定采用哪种工具来满足我们的要求时,诸如可伸缩性,可靠性,适应性,开发时间成本等方面的初 ...

  7. storm如何集成kafka

    之前的kafka案例:http://blog.csdn.net/weixin_35757704/article/details/77196539 之前的storm案例:http://blog.csdn ...

  8. flume与kafka的整合

    案例1:syslog-memory-kafka 将flume采集到的数据落地到kafka上,即sink是kafka(生产者身份) vim syslog-mem-kafka.conf # 命名个组件 a ...

  9. 大数据开发超高频面试题!大厂面试必看!包含Hadoop、zookeeper、Hive、flume、kafka、Hbase、flink、spark、数仓等

    大数据开发面试题 包含Hadoop.zookeeper.Hive.flume.kafka.Hbase.flink.spark.数仓等高频面试题. 数据来自原博主爬虫获取! 文章目录 大数据开发面试题 ...

最新文章

  1. mysql教程日志_mysql日志文件的详细说明
  2. 登上更高峰!颜水成、程明明团队开源ViP,引入三维信息编码机制,无需卷积与注意力...
  3. “2021知乎高赞好物100”榜单揭晓 知乎为美好生活奉上参考答案
  4. 神奇!C语言还可以这样用来仿真
  5. python中定义字符串_Python中的字符串String
  6. 编译html成qch,在应用程序编译过程中运行qcollectiongenerator
  7. 2020年12月最新OneDrive网盘免费领取5TB教程
  8. aws s3 静态网站_使用AWS S3存储桶启动静态网站
  9. centos7 如何使用ReaR进行系统备份(如何使用NFS方法设置ReaR备份)
  10. Delphi窗体显示Echarts图表
  11. DeepStream参数配置之sink
  12. 什么是基金前端收费和后端收费
  13. top conference in AI
  14. 解决安装youtubedownloader的流氓插件“雅虎助手”造成浏览器首页劫持的事件
  15. H5API ---(Web存储-拖拽事件-通信-websocket-geolocation)
  16. java 1/2等于多少_1/2-2等于多少?
  17. windows虚拟机给C盘扩容
  18. Win10--MySQL8.0.29 免安装版本的配置教程及问题解决
  19. 局长在计算机审计培训班的讲话,审计干部培训班开班典礼上的讲话
  20. 转:激励,如何做更有效

热门文章

  1. 服务器项目命名规则,云服务器命名规范
  2. 网络服务器未运行是什么原因是,Win7系统网络诊断提示诊断策略服务未运行怎么办?...
  3. 图灵学院Java架构师课程,基于java
  4. Jenkins构建从github上克隆时,报Host key verification failed.
  5. AppCrash explorer问题(解决方法)
  6. 抖音屏保Java_java编写抖音超火时钟屏保 swing编写
  7. HyperV修改分辨率
  8. C# WPF、Winform中Show()和ShowDialog()区别
  9. 姜小白的Python日记Day13 jason序列化与开发规范
  10. npm run serve stage1@0.1.0 serve vue-cli-service serve node:internal/modules/cjs/loader:936