Flume原理和使用

1.定义

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集聚合和传输的系统。

Flume基于流式架构,灵活简单。

2.基础架构

2.1 Agent

1. Agent:Flume的部署单元,本质是一个JVM进程,Agent内部是以事件的形式将数据从源头送至目的。

2. **组成:**Agent主要有3个部分组成,Source、Channel、Sink

核心就是Source、Channel、Sink三个

source

1. Source:是负责接收数据到Flume Agent的组件。

2. **特点:**Source组件可以处理各种类型、各种格式的日志数据,

3. Source组件类型:

① avro:本质是RPC框架,支持跨语言、跨平台的数据传输,avro Source在flume中多用于Agent的连接

② netcat:本质是Linux下的端口类工具,netcat Source在Flume中用于采集端口传输的数据。

③ exec:支持执行命令的,并将命令执行后的标准输出作为数据采集,多用于采集一个可追加文件

④ spooling directory:支持对一个目录进行监听,采集目录中一个或多个新生成的文件数据

⑤ taildir:支持对多个目录进行监听,采集一个或多个目录下的一个或多个可追加文件,支持断点续传。

除此之外还有:thrift、jms、sequence generator、syslog、http、自定义Source。

Sink

1. Sink:是负责发送数据到外部系统的Flume Agent的组件。

2. **特点:**Sink组件不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量的、事务的写入到存储或索引系统、或者被发送到另一个Flume Agent。

3. Sink组件类型:

①logger:logger Sink组件则是将数据写到成Flume框架的运行日志中,配合运行参数-Dflume.root.logger=INFO,console可以将Flume运行日志(其中就包含了采集的数据)输出到控制台,多用于测试环境。

② hdfshdfs Sink组件是负责将数据传输到HDFS分布式文件系统中。

③ avro:avro Sink组件配合avro Source组件可以实现Agent的连接

④ file:file Sink组件是将采集到的数据直接输出到本地文件系统中,即linux的磁盘上。

除此之外还有:thrift、ipc、HBase、solr、自定义Sink。

Channel

1. Channel是负责暂存数据的,是位于Source和Sink组件之间的缓冲区。

2. 特点:

由于Channel组件的存在,使得Source和Sink组件可以运作在不同速率上。

Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。

3. Flume自带两种Channel:

① Memory Channel:基于内存的队列存储事件,适用于对数据安全性要求不高的场景。 快,不安全

② File Channel:基于磁盘存储事件,宕机数据不丢失,适用于对数据安全敏感度高的场景。 慢,安全

Event

1. Event:agent中的事件,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。

2. 特点:Event由Header和Body两部分组成,

① Header:用来存放该event的一些属性,为K-V结构

② Body:用来存放该条数据,形式为字节数组。

3.入门

3.1 安装部署

安装过程看博客:

3.2入门案例

3.2.1 监控端口

1.案例需求:

使用Flume监听一个端口***,收集该端口数据,并打印到***控制台。

  1. 需求分析

  1. 实现步骤

软件环境配置

(1)安装netcat工具

[atguigu@hadoop102 ~]$ sudo yum install -y nc

(2)判断44444端口是否被占用

[atguigu@hadoop102 ~]$ sudo netstat -nlp | grep 44444

(3)在flume目录下创建job文件夹并进入job文件夹。

[atguigu@hadoop102 flume]$ mkdir -p job/simpleCase
[atguigu@hadoop102 flume]$ cd job/simpleCase

编写配置文件(自己手敲一遍)

在job/simpleCase文件夹下创建Flume Agent配置文件flume-1-netcat-logger.conf, 添加如下内容

#Name the components on this agent
a1.sources = r1                                      # 为a1的Source组件命名为r1,多个组件用空格间隔
a1.sinks = k1                                        # 为a1的Sink组件命名为k1,多个组件用空格间隔
a1.channels = c1                                    # 为a1的Channel组件命名为c1,多个组件用空格间隔# Describe/configure the source
a1.sources.r1.type = netcat                      # 配置r1的类型
a1.sources.r1.bind = localhost                  # 配置r1的绑定地址(注意localhost和hadoop102的区别)
a1.sources.r1.port = 44444                       # 配置r1的监听端口# Describe the sink
a1.sinks.k1.type = logger                        # 配置k1的类型为logger,输出到控制台# Use a channel which buffers events in memory
a1.channels.c1.type = memory                    # 配置c1的类型为memory
a1.channels.c1.capacity = 1000                 # 配置c1的容量为1000个事件
a1.channels.c1.transactionCapacity = 100     # 配置c1的事务容量为100个事件# Bind the source and sink to the channel
a1.sources.r1.channels = c1                    # 配置r1的channel属性,指定r1连接到那个channel
a1.sinks.k1.channel = c1                        # 配置k1的channel属性,指定k1连接到那个channel

部署运行flume监听端口

  • 第一种写法
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console
  • 第二种写法:
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console

参数说明:

–conf/-c:表示配置文件存储在conf/目录

–name/-n:表示给agent起名为a1

–conf-file/-f:指定读取的配置文件是在job/simpleCase文件夹下的flume-1-1netcat-logger.conf文件

-Dflume.root.logger=INFO,console :-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。

测试

(1)使用netcat工具向本机的44444端口发送内容

**[atguigu@hadoop102 flume]$** nc localhost 44444hello

(2)在Flume监听页面观察接收数据情况

3.2.2 实时监控目录下的多个追加文件

思考使用那种source,sink

① Exec source:适用于监控一个实时追加的文件,不能实现断点续传;

② Spooldir Source:适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;

③ Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。

1. 案例需求:

使用Flume监听整个目录的实时追加文件,并上传至HDFS

  1. 需求分析

  1. 实现步骤
  • 环境准备

(1)在flume根目录下创建目录datas/tailCase/files和datas/tailCase/logs用于存放数据文件

[atguigu@hadoop102 flume]$ mkdir -p datas/tailCase/files datas/tailCase/logs
  • 编写配置文件

在job/simpleCase目录下,创建配置文件flume-2-taildir-hdfs.conf,编辑如下内容

[atguigu@hadoop102 simpleCase]$ vim flume-2-taildir-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = TAILDIR
a2.sources.r1.positionFile = /opt/module/flume/tail_dir.json
a2.sources.r1.filegroups = f1 f2
a2.sources.r1.filegroups.f1 = /opt/module/flume/datas/tailCase/files/.*file.*
a2.sources.r1.filegroups.f2 = /opt/module/flume/datas/tailCase/logs/.*log.*# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/tailDir/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = tail-
# 是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,(可选择设置支持压缩的CompressedStream或者不支持压缩的DataStream)
a2.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

启动flume监控文件夹

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/simpleCase/flume-2-taildir-hdfs.conf

测试

(1)在/opt/module/flume/datas/目录下创建tailCase/files文件夹向files文件夹下文件追加内容

[atguigu@hadoop102 files]$ touch file1.txt
[atguigu@hadoop102 files]$ echo I am file1 >> file1.txt
[atguigu@hadoop102 files]$ touch log1.txt
[atguigu@hadoop102 files]$ echo I am log1 >> log1.txt

(2)在/opt/module/flume/datas/目录下创建tailCase/logs文件夹向logs文件夹下文件追加内容

[atguigu@hadoop102 tailCase]$ mkdir –p /opt/module/flume/datas/tailCase/logs
[atguigu@hadoop102 logs]$ touch file2.txt
[atguigu@hadoop102 logs]$ echo I am file2 >> file2.txt
[atguigu@hadoop102 logs]$ touch log2.txt
[atguigu@hadoop102 logs]$ echo I am log2 >> log2.txt

(3)查看HDFS上的数据,验证flume对多目录下文件的实时采集

(4)关掉flume采集程序,对logs/和files/下文件追加,再开启flume采集程序,验证flume的断点续传

//关掉flume采集程序
[atguigu@hadoop102 flume]$ cat /opt/module/flume/tail_dir.json       // 观察json文件
[atguigu@hadoop102 flume]$ cd datas/tailCase/files
[atguigu@hadoop102 files]$ echo I am file1 duandian >> file1.txt
[atguigu@hadoop102 files]$ cd /opt/module/flume/datas/tailCase/logs
[atguigu@hadoop102 logs]$ echo I am log2 xuchuan>> log2.txt

(5)Taildir说明:

Taildir Source维护了一个json格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File的格式如下:

{“inode”:2496272,“pos”:12,“file”:“/opt/module/flume/datas/tailCase/files/file1.txt”}

{“inode”:2496275,“pos”:12,“file”:“/opt/module/flume/datas/tailCase/logs/log2.txt”}

注:Linux中储存文件元数据的区域就叫做inode,每个inode都有一个号码,操作系统用inode号码来识别不同的文件,Unix/Linux系统内部不使用文件名,而使用inode号码来识别文件。

4. Flume进阶

4.1 Flume事务

1. 在Flume中一共有两个事务

·Put事务:在Source组件和Channel组件之间,保证Source组件到Channel组件之间数据传递的可靠性。

·take事务:在Channel组件和Sink组件之间,保证channel组件到Sink组件之间数据传输的可靠性。

2. Put事务流程

source组件采集外部数据到agent内部,并且将数据包装为事件

source组件开始将事件传输到Channel组件中

首先,会开启事务,在事务内部,通过doPut方法将一批数据放入到putlist中存储

之后,调用doCommit方法,把putList中的所有Event放到Channel中,成功之后就清空putList

  1. putList在像channel中发送数据前会先检查channel中的容量是否放得下,放不下一个都不会放,调用doRollback

  2. 调用doRollback方法后,doRollback方法会进行两步操作: ·将putList清空 ·抛出ChannelException异常。

  3. source组件会捕捉到doRollback抛出的异常后,source就将刚才的一批数据重新采集,然后就开启一个新的事务。

  4. 数据批的大小取决于Source组件的配置参数batch size的值

  5. putList的大小取决于Channel组件的配置参数transactionCapacity的值(capacity参数是指Channel的容量)

    思考:put事务能否保证采集数据不丢失?

  6. Take事务流程

Sink组件不断的轮询Channel,当其中有新的事件到达时,开启take事务

take事务开启后,会调用doTake方法将Channel组件中的Event剪切到takeList中。

当takeList中存放了batch size数量的Event之后,就会调用doCommit方法

doCommit方法中,首先会将数据写出到外部系统,成功后就会清空takeList

当事务失败时,就会调用doRollback方法来进行回滚,就是将takeList中的数据原封不动的还给channel。

当take事务失败时,可能向外部写了一半的数据了,但是回滚时,是将tabkeList中的全部数返给channel,当开启新的take事务时,又会将这批数据再次写出到外部,就造成了数据重复。

思考:take事务可能造成数据重复,如何避免呢?

4.2 Flume Agent内部原理

  1. 重要组件
组件名称 概述 组件包含类型 特点
ChannelSelector 选出Event将要发到那个channel Replication Channel Selector 复制,默认选项
Multiplexing Channel Seletctor 多路复用
SinkProcessor 通过配置不同类型的SinkProcess实现不同的功能 DefaultSinkProcessor 单个Sink,默认
LoadBalancingSinkProcessor 负载均衡
FailoverSinkProcessor 故障转移

2. 执行流程

Source组件采集外部数据到agent内部,并包装为Event

然后,将事件发送到ChannelProcessor中,

·通过拦截器链中每个拦截器的拦截过滤,符合要求的Event会返回到ChannelProcessor

·在通过ChannelSelector,根据不同的选择器来决定Event去往哪个Channel,然后返回到ChannelProcessor

开启Put事务,将批量的Event发送到Channel中

根据SinkProcessor组件配置的类型不同,实现相应的功能(负载均衡或故障转移),最终都会且同一时刻只能有一个Sink去拉取数据。

Sink组件不断的轮询Channel,当有新的Event到达Channel时,向外部系统写出。

4.3 Flume企业开发案例

flume 中文文档:https://flume.liyifeng.org/

flume英文文档:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

4.3.1 复制

  1. 案例需求

使用Flume-1监控文件变动

·Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS

·同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。

  1. 需求分析

  1. 实现步骤

(1)准备工作

在/opt/module/flume/job目录下创建enterprise/copy文件夹,存放复制案例的配置文件

[atguigu@hadoop102 flume]$ mkdir -p /opt/module/flume/job/enterprise/copy

在/opt/module/flume/datas/目录下创建模拟日志文件realtime.log

[atguigu@hadoop102 flume]$ touch /opt/module/flume/datas/realtime.log

(2)编写配置文件

flume-1的agent配置文件flume-1-exec-avro.conf

其中配置1个source两个channel**、两个sink,分别输送给flume-2-avro-hdfs和**flume-3-avro-file。

[atguigu@hadoop102 copy]$ vim flume-1-exec-avro.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel,其实默认就是replicating
a1.sources.r1.selector.type = replicating# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/flume/datas/realtime.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

编写flume-2的agent配置文件flume-2-avro-hdfs.conf,创建Flume-3的agent配置文件,创建flume-3-avro-file.conf,采集Flume-1的输出数据,输出到本地/opt/module/flume/datas/copy_result目录下

[atguigu@hadoop102 copy]$ vim flume-2-avro-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
# source端的avro是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/copy/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = copy-
# 是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
[atguigu@hadoop102 copy]$ vim flume-3-avro-file.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume/datas/copy_result# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

(3)创建本地目录

[atguigu@hadoop102 flume]$ mkdir /opt/module/flume/datas/copy_result

提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。

测试

(1)运行flume,开启对数据的监控采集:启动顺序是先下游,再上游

[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f /opt/module/flume/job/enterprise/copy/flume-3-avro-file.conf[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f /opt/module/flume/job/enterprise/copy/flume-2-avro-hdfs.conf[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f /opt/module/flume/job/enterprise/copy/flume-1-exec-avro.conf

(2)向文件中追加内容,模拟日志实时更新

[atguigu@hadoop102 datas]$ echo 2021-10-31 09-10-34 >> realtime.log

(3)检查HDFS上数据文件

(4)检查/opt/module/datas/copy_result目录中数据

[atguigu@hadoop102 copy_result]$ ll
总用量 8
-rw-rw-r--. 1 atguigu atguigu 5942 5月  22 00:09 1526918887550-3

注意:file Sink采集数据到本地磁盘时,本地文件是按照时间滚动产生的,即使没有时间采集过来,本地也会生成空文件

4.3.2 多路复用和拦截器的使用

1.案例需求

使用flume采集服务器端口日志数据,需要按照日志类型的不同,将不同种类的日志发往不同分析系统。

多路复用用于分类

2.需求分析

1)背景:在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。

此时会用到Flume的channel selecter中的Multiplexing结构。

2)Multiplexing的原理是:根据event中Header的某个key的值,将不同的event发送到不同的Channel中,3)自定义Interceptor:实现为不同类型的event的Header中的key赋予不同的值。

4)总结:在该案例中,我们以端口数据模拟日志,以数字和字母模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。

3.实现步骤

(1)创建一个maven项目,并引入以下依赖。

<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>

(2)定义CustomInterceptor类并实现Interceptor接口。

package com.atguigu.flume.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;public class CustomInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 1. 从事件中获取数据byte[] body = event.getBody();// 2. 判断数据开头的字符是字母还是数据if (body[0] >= 'a' && body[0] <= 'z') {event.getHeaders().put("type", "letter");         // 是字母就在事件头部设置type类型为letter} else if (body[0] >= '0' && body[0] <= '9') {event.getHeaders().put("type", "number");         // 是数字就在事件头部设置type类型为number}// 3. 返回事件return event;}// 对批量事件进行拦截@Overridepublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}@Overridepublic void close() {}// 拦截器对象的构造对象public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new CustomInterceptor();}@Overridepublic void configure(Context context) {}}
}

(3)将项目打包,并导入到flume的lib目录下。

(4)编辑flume-1配置文件

在hadoop102上的/opt/module/flume/job/目录下创建文件夹/custom/multi,存放本案例配置文件

[atguigu@hadoop102 flume]$ mkdir -p /opt/module/flume/job/custom/multi

为hadoop102上的Flume1配置1个netcat source,1个sink group(2个avro sink),并配置相应的ChannelSelector和interceptor。

[atguigu@hadoop102 flume]$ vim /opt/module/flume/job/custom/multi/flume-1-netcat-avro.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

(5)编写flume-2的配置文件和flume-3的配置文件

分别为hadoop103上的flume-2和hadoop104上的flume-3配置一个avro source和一个logger sink。

[atguigu@hadoop103 flume] $ vim /opt/module/flume/job/custom/multi/flume-2-avro-logger.conf
# agent
a2.sources=r1
a2.sinks = k1
a2.channels = c1# source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 4141# sink
a2.sinks.k1.type = logger# Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# bind
a2.sinks.k1.channel = c1
a2.sources.r1.channels = c1
[atguigu@hadoop104 flume]$ vim /opt/module/flume/job/custom/multi/flume-3-avro-logger.conf
# agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1# source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4242# sink
a3.sinks.k1.type = logger# Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# bind
a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1

(6)分别在hadoop102,hadoop103,hadoop104上启动flume进程,注意先后顺序。

(7)在hadoop102使用netcat向localhost:44444发送字母和数字。

(8)观察hadoop103和hadoop104打印的日志。

1. 实现 Interceptor

2. 重写四个方法

·initialize 初始化方法

·public Event intercept(Event event) 处理单个Event

·public List intercept(List events) 处理多个Event

·close 方法

·静态内部类,实现Interceptor.Builder

  1. 拦截器可以不用吗?

·可以不用;需要在下一级hive的dwd层和SparkSteaming里面处理

·优势:只处理一次,轻度处理;

·劣势:影响性能,不适合做实时推荐这种对实时要求比较高的场景。

4.3.3 聚合

将多个数据源的数据合并成一个

1.案例需求:

hadoop102上的flume-1监控文件/opt/module/flume/datas/.file.,

hadoop103上的flume-2监控某一个端口的数据流,

hadoop104上的flume-3,接收flume-1和flume-2的数据,flume-3将最终数据打印到控制台。

将多个数据源过来的信息聚合在一起然后输出

2.需求分析

  1. 实现步骤

(1)准备工作

在hadoop102、hadoop103以及hadoop104的/opt/module/flume/job/enterprise目录下创建juhe文件夹

[atguigu@hadoop102 flume]$ mkdir /opt/module/flume/job/enterprise/juhe

将Flume解压后的目录分发到集群的其他节点

[atguigu@hadoop102 flume]$ xsync /opt/module/flume

(2)在hadoop102上的/opt/module/flume/job/enterprise/juhe目录下,创建flume-1-exec-avro.conf文件

配置exec Source用于监控file1.log文件,配置avro Sink输出数据到下一级Flume 3中。

[atguigu@hadoop102 flume]$ vim /opt/module/flume/job/enterprise/juhe/flume-1-exec-avro.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/flume/datas/realtime.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)在hadoop103上的/opt/module/flume/job/enterprise/juhe目录下,创建flume-1-netcat-avro.conf文件,配置Netcat Source监控端口44444数据流,配置avro Sink数据到下一级Flume 3中:

[atguigu@hadoop103 flume]$ vim /opt/module/flume/job/enterprise/juhe/flume-2-netcat-avro.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)在hadoop104上的/opt/module/flume/job/enterprise/juhe目录下,创建flume-3-avro-logger.conf文件配置Avro source用于接收flume1与flume2发送过来的数据流,最终合并后logger sink到控制台。

[atguigu@hadoop104 flume]$ vim /opt/module/flume/job/enterprise/juhe/flume-3-avro-logger.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

(5)部署运行flume采集程序:

在hadoop104节点上运行flume3:

[atguigu@hadoop104 flume]$ /opt/module/flume/bin/flume-ng agent –c conf/ -n a3 -f /opt/module/flume/job/enterprise/juhe/flume-3-avro-logger.conf -Dflume.root.logger=INFO,console

在hadoop103节点上运行flume2:

[atguigu@hadoop103 flume]$ /opt/module/flume/bin/flume-ng agent –c conf/ -n a2 -f /opt/module/flume/job/enterprise/juhe/flume-2-netcat-avro.conf

在hadoop102节点上运行flume1:

[atguigu@hadoop102 flume]$* /opt/module/flume/bin/flume-ng agent –c conf/ -n a1 -f /opt/module/flume/job/enterprise/juhe/flume-1-exec-avro.conf

(6)在hadoop102上向/opt/module/flume/datas/目录下的realtime.log追加内容

[atguigu@hadoop102 flume]$ echo ‘hello’ > /opt/module/flume/datas/realtime.log

(7)在hadoop103上向44444端口发送数据

[atguigu@hadoop103 flume]$ nc hadoop103 44444

(8)检查hadoop104上数据

.transactionCapacity = 100

Bind the source and sink to the channel

a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1


(5)部署运行flume采集程序:在hadoop104节点上运行flume3:> **[atguigu@hadoop104 flume]$** /opt/module/flume/bin/flume-ng agent –c conf/ -n a3 -f /opt/module/flume/job/enterprise/juhe/flume-3-avro-logger.conf -Dflume.root.logger=INFO,console在hadoop103节点上运行flume2:> **[atguigu@hadoop103 flume]$** /opt/module/flume/bin/flume-ng agent –c conf/ -n a2 -f /opt/module/flume/job/enterprise/juhe/flume-2-netcat-avro.conf在hadoop102节点上运行flume1:```shell
[atguigu@hadoop102 flume]$* /opt/module/flume/bin/flume-ng agent –c conf/ -n a1 -f /opt/module/flume/job/enterprise/juhe/flume-1-exec-avro.conf

(6)在hadoop102上向/opt/module/flume/datas/目录下的realtime.log追加内容

[atguigu@hadoop102 flume]$ echo ‘hello’ > /opt/module/flume/datas/realtime.log

(7)在hadoop103上向44444端口发送数据

[atguigu@hadoop103 flume]$ nc hadoop103 44444

(8)检查hadoop104上数据

Flume原理和使用相关推荐

  1. Flume原理详解(好文)

    Flume(一)Flume原理解析 转载地址:https://www.cnblogs.com/zhangyinhua/p/7803486.html#_label0 阅读目录(Content) 一.Fl ...

  2. Flume原理及使用案例

    本文为转载篇!原文: https://www.cnblogs.com/zhangyinhua/p/7803486.html https://www.cnblogs.com/ciade/p/549521 ...

  3. 大数据技术——Flume原理分析

    摘要 主要是分析和讲解Flum的原理源码分析 Flume概述 Flume是的一个分布式.高可用.高可靠的海量日志采集.聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据,同时提供了对数 ...

  4. Flume 原理介绍

    1 .背景 flume是由cloudera软件公司产出的可分布式日志收集系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一.尤其近几年随着flume的不断被完善以及升级版本 ...

  5. Flume原理初探:基本执行原理概述

    Flume Flume是一款在数据收集领域使用较多的一个apache的开源工具,是一款分布式.可靠和高可用的系统,能够高效的从不同的源中收集.聚合上传大量的日志数据到结构化的存储模块中,Flume的使 ...

  6. 【Flume】Flume原理简述及示例实践

    文章目录 1. Flume是什么 2. Flume三大组件 3. Flume高级应用场景 3.1 多路复用 3.2 整合 4. 示例实践 4.1 配置 4.2 运行 4.2.1 运行结果输出 4.2. ...

  7. 大数据之Flume原理

    听完小吴老师讲的以后,做的笔记~~~ 1.Flume定义 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方 ...

  8. Apache Flume(1):Apache Flume原理

    1.概述 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的软件.Flume的核心是把数据从数据源(source)收集过来,再将收集到的数据送到指定的目的地( ...

  9. Flume原理深度解析

    一.Flume简介 flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generati ...

最新文章

  1. Windows实现appium+iOS自动化测试
  2. python编程在哪里写-Python自带的IDE在哪里
  3. Qt下使用OpenCV3打开摄像头并把图像显示到QLabel上
  4. NJUST4316(立体几何投影的面积交)
  5. 数据科学家 数据工程师_数据科学家实际上赚了多少钱?
  6. mysql sharding 读取_MySQL读写分离(一)——sharding-jdbc
  7. .NET 中文件嵌套,例如:cshtml文件下面嵌套css和js【机器翻译】
  8. PHP 8 中确认支持 JIT!
  9. Hadoop环境搭建(单机)
  10. mac电脑怎么配置adb环境变量
  11. LPC1768 Flash 存储器加速模块
  12. PG-FP6烧录机1拖16上位机项目
  13. 各双拼输入方案之间有明显的优劣之分吗?
  14. 梦熊杯-十二月月赛-白银组题解-A.自由
  15. 什么是云计算和大数据?他们之间的区别是什么?
  16. Python自动化办公:word文件操作教程
  17. 页面动态时间php,HTML制作网页动态时钟教程
  18. bootstrap3 表单构建器_Knex - 灵活轻便的 Node.js SQL 查询构建器
  19. 在pc端上操作手机工具分享
  20. Mybatis——动态sql

热门文章

  1. 1.架设邮件服务器-概念
  2. 短视频seo 矩阵系统源码私有化部署
  3. SpringBoot整合Mybatis-Pius(简单易懂!)
  4. win10 休眠不读u盘_win10系统无法识别u盘的原因及解决方法
  5. 2个月Java学习总结
  6. Redis学习之Redis概述及原理、基本操作及持久化
  7. C++最小生成树Kruskal算法
  8. 注意机制(CBAM)理解
  9. 计算机检测维修与数据恢复国赛培训班招生
  10. DPDK的基本原理、学习路线总结