第一章:Flume的简介

1.1 大数据处理流程

在企业中,大数据的处理流程一般是:

  • 1.数据采集
  • 2.数据存储
  • 3.数据清洗
  • 4.数据分析
  • 5.数据展示

参考下图:

1.2 Flume的简介

Flume是一种分布式的,可靠的、高可用的服务,用于有效地收集,聚合和移动大量日志数据。它具有基于流数据流的简单灵活的体系结构。它具有可调整的可靠性机制以及许多故障转移和恢复机制,具有强大的功能和容错能力。它使用一个简单的可扩展数据模型,允许在线分析应用程序。

参考官网: http://flume.apache.org/

flume 最开始是由 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。但随着 flume 功能的扩展,flume的代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点渐渐暴露出来,尤其是在发行版本 0.9.4中,日志传输不稳定的现象尤为严重。

为了解决这些问题,2011 年 10 月 22 号,cloudera 对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,并将 Flume 纳入 apache 旗下,从cloudera Flume 改名为 Apache Flume。

1.3 版本区别

​ 为了与之前版本区分开,重构后的版本统称为 Flume NG(next generation),重构前的版本被统称为 Flume OG(original generation),Flume目前只有Linux系统的启动脚本,没有Windows环境的启动脚本。

第二章:Flume的体系结构

2.1 体系结构简介

Flume 运行的核心是 Agent。Flume是以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方。如下图所示:

2.2 组件及其作用

- Client:客户端,Client生产数据,运行在一个独立的线程中- Event: 一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)- Flow:Event从源点到达目的点的迁移的抽象。- Agent: 一个独立的Flume进程,运行在JVM中,包含组件Source、 Channel、 Sink。每台机器运行一个agent,但是一个agent中可以包含多个sources和sinks。- Source: 数据收集组件。source从Client收集数据,传递给Channel- Channel: 管道,负责接收source端的数据,然后将数据推送到sink端。- Sink: 负责从channel端拉取数据,并将其推送到持久化系统或者是下一个Agent。- selector:选择器,作用于source端,然后决定数据发往哪个目标。- interceptor:拦截器,flume允许使用拦截器拦截数据。允许使用拦截器链,作用于source和sink阶段。

第三章:Flume的安装

3.1 安装和配置环境变量

3.1.1 准备软件包

将apache-flume-1.8.0-bin.tar.gz 上传到linux系统中的/root/soft/目录中

3.1.2 解压软件包

[root@qianfeng01 soft]# pwd
/root/soft
[root@qianfeng01 soft]# tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local/

3.1.3 更名操作

[root@master soft]# cd /usr/local/
[root@master local]# mv apache-flume-1.8.0-bin/ flume

3.1.4 配置环境变量

[root@master local]# vi /etc/profile
........省略..........
export FLUME_HOME=/usr/local/flume
export PATH=$FLUME_HOME/bin:$PATH# 加载环境变量
[root@master apps]# source /etc/profile

3.1.5 验证环境变量

[root@master local]# flume-ng version
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f
Compiled by denes on Fri Sep 15 14:58:00 CEST 2017
From source with checksum fbb44c8c8fb63a49be0a59e27316833d

3.2 配置文件

[root@master local]# cd flume/conf/
[root@master conf]# ll   #查看里面是否有一个flume-env.sh.template文件
[root@master conf]# cp flume-env.sh.template flume-env.sh
[root@master conf]# vi flume-env.sh
........省略..........
export JAVA_HOME=/usr/local/jdk
........省略..........

第四章:Flume的部署

4.1 数据模型

- 单一数据模型
- 多数据流模型

4.1.1 单一数据模型

在单个 Agent 内由单个 Source, Channel, Sink 建立一个单一的数据流模型,如下图所示,整个数据流为 Web Server --> Source --> Channel --> Sink --> HDFS。

4.1.2 多数据流模型

1)多 Agent 串行传输数据流模型

2)多 Agent 汇聚数据流模型

**3)**单 Agent 多路数据流模型

**4)**Sinkgroups 数据流模型

4.1.3 小总结

在flume提供的数据流模型中,几个原则很重要。Source--> Channel1.单个Source组件可以和多个Channel组合建立数据流,既可以replicating 和 multiplexing。2.多个Sources可以写入单个 ChannelChannel-->Sink1.多个Sinks又可以组合成Sinkgroups从Channel中获取数据,既可以loadbalancing和failover机制。2.多个Sinks也可以从单个Channel中取数据。3.单个Sink只能从单个Channel中取数据根据上述 5 个原则,你可以设计出满足你需求的数据流模型。

4.2 配置介绍

4.2.1 定义组件名称

要定义单个代理中的流,您需要通过通道链接源和接收器。您需要列出给定代理的源,接收器和通道,然后将源和接收器指向一个通道。一个源实例可以指定多个通道,但是一个接收器实例只能指定一个通道。格式如下:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2># set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

案例如下:

# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

4.2.2 配置组件属性

# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue># properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue># properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

案例如下:

agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1# set channel for sources, sinks# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata#...

4.3 常用的source和sink种类

参考flume官方文档

4.3.1 常用的flume sources

# Avro source:avro
# Syslog TCP source:syslogtcp
# Syslog UDP Source:syslogudp
# HTTP Source:http
# Exec source:exec
# JMS source:jms
# Thrift source:thrift
# Spooling directory source:spooldir
# Kafka source:org.apache.flume.source.kafka,KafkaSource
.....

4.3.2 常用的flume channels

# Memory Channelmemory
# JDBC Channeljdbc
# Kafka Channelorg.apache.flume.channel.kafka.KafkaChannel
# File Channelfile

4.3.3 常用的flume sinks

# HDFS Sinkhdfs
# HIVE Sinkhive
# Logger Sinklogger
# Avro Sinkavro
# Kafka Sinkorg.apache.flume.sink.kafka.KafkaSink
# Hbase Sinkhbase

第五章:案例演示

5.1 案例演示:avro+memory+logger

Avro Source:监听一个指定的Avro端口,通过Avro端口可以获取到Avro client发送过来的文件,即只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容,输出位置为Logger

5.1.1 编写采集方案

[root@master flume]# mkdir flumeconf
[root@master flume]# cd flumeconf
[root@master flumeconf]# vi avro-logger.conf
#定义各个组件的名字
a1.sources=avro-sour1
a1.channels=mem-chan1
a1.sinks=logger-sink1#定义sources组件的相关属性
a1.sources.avro-sour1.type=avro
a1.sources.avro-sour1.bind=master
a1.sources.avro-sour1.port=9999#定义channels组件的相关属性
a1.channels.mem-chan1.type=memory#定义sinks组件的相关属性
a1.sinks.logger-sink1.type=logger
a1.sinks.logger-sink1.maxBytesToLog=100#组件之间进行绑定
a1.sources.avro-sour1.channels=mem-chan1
a1.sinks.logger-sink1.channel=mem-chan1

5.1.2 启动Agent

[root@master flumeconf]# flume-ng agent -c ../conf -f ./avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

5.1.3 测试数据

[root@master ~]# mkdir flumedata
[root@master ~]# cd flumedata/
[root@master flumedata]#
[root@master flumedata]# date >> test.data
[root@master flumedata]# cat test.data
2019年 11月 21日 星期四 21:22:36 CST
[root@master flumedata]# ping master >> test.data
[root@master flumedata]# cat test.data
....省略....
[root@master flumedata]# flume-ng avro-client -c /usr/local/flume-1.6.0/conf/ -H qianfeng01 -p 9999 -F ./test.dat

5.2 实时采集(监听文件):exec+memory+hdfs

Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
#常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容

memory:传输数据的Channel为Memory

hdfs 是输出目标为Hdfs

5.2.1 配置方案

[root@master  flumeconf]# vi exec-hdfs.conf
a1.sources=r1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /usr/local/flume-1.8/flumedata/test.dataa1.sinks=k1
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://master :8020/flume/tailout/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix=events-
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=second
a1.sinks.k1.hdfs.rollInterval=3
a1.sinks.k1.hdfs.rollSize=20
a1.sinks.k1.hdfs.rollCount=5
a1.sinks.k1.hdfs.batchSize=1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.fileType=DataStreama1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

5.2.2 启动Agent

[root@master flumeconf]# flume-ng agent -c ../conf -f ./exec-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

5.2.3 测试数据

[root@master flumedata]# ping master >> test.data

5.3 实时采集(监听文件) exec+memory+logger

Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
#常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 ,

logger为日志格式输出

5.3.1 配置方案

[root@master flumeconf]# vi exec-logger.conf
a2.sources = r1
a2.channels = c1
a2.sinks = s1a2.sources.r1.type = exec
a2.sources.r1.command = tail -F /home/flume/log.01a2.channels.c1.type=memory
a2.channels.c1.capacity=1000
a2.channels.c1.transactionCapacity=100
a2.channels.c1.keep-alive=3
a2.channels.c1.byteCapacityBufferPercentage=20
a2.channels.c1.byteCapacity=800000a2.sinks.s1.type=logger
a2.sinks.s1.maxBytesToLog=30a2.sources.r1.channels=c1
a2.sinks.s1.channel=c1

5.3.2 启动agent

[root@master flumeconf]# flume-ng agent -c ../conf -f ./exec-logger.conf -n a2 -Dflume.root.logger=INFO,console

5.3.3 测试:

[root@master ~]# echo "nice" >> /home/flume/log.01

5.4 实时采集(监听目录):spool +file + hdfs

spool 是Source来源于目录,有文件进入目录就摄取,File Channel将它暂存到磁盘,最终目的地是HDFS
即只要某个目录不断有文件,HDFS上也会同步到所有数据。

5.4.1 配置方案

[root@master flumeconf]# vi spool-hdfs.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/flume/input/2020/01/a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/flume/checkpoint
a1.channels.c1.dataDirs = /home/flume/dataa1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:8020/flume/spooldir
a1.sinks.k1.hdfs.filePrefix =
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileSuffix= .log
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Texta1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5.4.2 启动Agent

[root@master flumeconf]# flume-ng agent -c ../conf/ -f ./spool-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

5.4.3 测试

--1. 向/home/flume/input/2020/01/目录里添加几个文件
[root@master ~]# cd /home/flume/input/2020/01/
[root@master 01]# echo "hello world" >>a1.log
[root@master 01]# echo "no zuo no die" >>a2.log
[root@master 01]# echo "ao li gei" >>a3.sh
--2. 查看hdfs的目录内容
--3. 在查看一下home/flume/input/2020/01/目录里的内容
[root@master 01]# ll

5.6 案例演示:http+ mem+logger

http: 表示数据来源是http网络协议,一般接收的请求为get或post请求. 所有的http请求会通过插件格式的Handle转化为一个flume的Event数据.

mem:表示用内存传输通道

logger:表示输出格式为Logger格式

5.6.1 配置方案

[root@master flumeconf]# vi http-logger.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1a1.sources.r1.type=http
a1.sources.r1.bind = master
a1.sources.r1.port = 6666
a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler
a1.sources.r1.handler.nickname = random propsa1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=16a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

5.6.2 启动agent的服务:

[root@master ~]# flume-ng agent -c ../conf -f ./http-logger.conf -n a1 -Dflume.root.logger=INFO,console

5.6.3 测试:

[root@master ~]# curl -X POST -d '[{"headers":{"name":"zhangsan","pwd":"123456"},"body":"this is my content"}]' http://master:6666

5.7 案例演示:Syslog tcp+mem+logger

syslog tcp: syslog广泛应用于系统日志。syslog日志消息既可以记录在本地文件中,也可以通过网络发送到接收syslog的服务器。接收syslog的服务器可以对多个设备的syslog消息进行统一的存储,或者解析其中的内容做相应的处理。本数据源指的是syslog的通过tcp来端口来传送数据

mem:通过内存选择器来处理

logger:输出目的地为logger

5.7.1 配置方案

[root@master flumeconf]# vi syslogtcp-logger.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1a1.sources.r1.type=syslogtcp
a1.sources.r1.host = master
a1.sources.r1.port = 6666a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=16a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

5.7.2 启动agent

[root@master flumeconf]# flume-ng agent -c ../conf -f ./syslogtcp-logger.conf -n a1 -Dflume.root.logger=INFO,console

5.7.3 测试:需要先安装nc

[root@master ~]# yum -y install nmap-ncat
[root@master ~]# echo "hello world" | nc master 6666

5.8 案例演示:Syslogtcp+mem+hdfs

syslogtcp:同上

mem:表示从内存传送

hdfs:表示目的地为HDFS服务器

5.8.1 配置方案

[root@master flumeconf]# vi syslogtcp-hdfs.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1a1.sources.r1.type=syslogtcp
a1.sources.r1.host = master
a1.sources.r1.port = 6666a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=flume-hdfs
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=truea1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

5.8.2 启动agent的服务:

[root@master flumeconf]# flume-ng agent -c ../conf -f ./syslogtcp-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

5.8.3 测试:

[root@master ~]# echo "hello world hello master " | nc master 6666

5.9 案例演示:Syslogtcp+file+hdfs

syslogtcp 同上

file:表示用file作为channel传送介质

hdfs:表示目的地为HDFS

5.9.1 配置方案

[root@master flumeconf]# vi syslogtcp-fh.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1a1.sources.r1.type=syslogtcp
a1.sources.r1.host = master
a1.sources.r1.port = 6666a1.channels.c1.type=file
a1.channels.c1.dataDirs=/home/flume/filechannel/data
a1.channels.c1.checkpointDir=/home/flume/filechannel/point
a1.channels.c1.transactionCapacity=10000
a1.channels.c1.checkpointInterval=30000
a1.channels.c1.capacity=1000000
a1.channels.c1.keep-alive=3a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=flume-hdfs
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=truea1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

5.9.2 启动agent的服务:

[root@master flumeconf]# flume-ng agent -c ../conf -f ./syslogtcp-fh.conf -n a1 -Dflume.root.logger=INFO,console

5.9.3 测试:

[root@master ~]# echo "hello world hello master " | nc master 6666

第六章:拦截器的使用

在Flume运行过程中 ,Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。拦截器有下面几个特点:

  • 拦截器需要实现org.apache.flume.interceptor.Interceptor接口。

  • 拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。

  • 拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截。

  • 一个拦截器返回的事件列表被传递给链中的下一个拦截器。

  • 如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可。

6.1 常用拦截器:

  1. Timestamp Interceptor :时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多
  2. Host Interceptor:主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)
  3. Static Interceptor:静态拦截器,用于在events header中加入一组静态的key和value。

6.2 案例演示:Timestamp+Syslogtcp+file+hdfs

通过时间拦截器,数据源为SyslogTcp,传送的通道模式是FileChannel,最后输出的目的地为HDFS

6.2.1 配置方案:

[root@master flumeconf]# vi ts.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1a1.sources.r1.type=syslogtcp
a1.sources.r1.host = master
a1.sources.r1.port = 6666
a1.sources.r1.interceptors=i1 i2 i3
a1.sources.r1.interceptors.i1.type=timestamp
a1.sources.r1.interceptors.i1.preserveExisting=false
a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.preserveExisting=false
a1.sources.r1.interceptors.i2.useIP=true
a1.sources.r1.interceptors.i2.hostHeader=hostname
a1.sources.r1.interceptors.i3.type=static
a1.sources.r1.interceptors.i3.preserveExisting=false
a1.sources.r1.interceptors.i3.key=hn
a1.sources.r1.interceptors.i3.value=master a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=truea1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

6.2.2 启动agent的服务:

[root@master flumeconf]# flume-ng agent -c ../conf -f ./ts.conf -n a1 -Dflume.root.logger=INFO,console

6.2.3 测试:

[root@master ~]# echo "hello world hello master " | nc master 6666

6.3 案例演示:regex+Syslogtcp+file+hdfs

拦截器为正则表达式拦截器, 数据源为Syslogtcp格式,传送通道为FileChannel,最后传送的目的地是HDFS

6.3.1 配置方案

[root@master flumeconf]# vi regex-ts.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1a1.sources.r1.type=syslogtcp
a1.sources.r1.host = master
a1.sources.r1.port = 6666
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=regex_filter
#不要加引号包裹正则
a1.sources.r1.interceptors.i1.regex=^[0-9].*$
a1.sources.r1.interceptors.i1.excludeEvents=falsea1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=truea1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

6.3.2 启动agent的服务:

[root@master flumeconf]# flume-ng agent -c ../conf -f ./regex-ts.conf -n a1 -Dflume.root.logger=INFO,console

6.3.3 测试:

[root@master ~]# echo "hello world hello master " | nc master 6666
[root@master ~]# echo "123123123 hello world hello master " | nc master 6666

6.4 自定义拦截器

6.4.0 需求:

为了提高Flume的扩展性,用户可以自己定义一个拦截器, 对每一组的item_type和active_time都过滤出相应的HOST和USERID

6.4.1 pom.xml

可以参考/code/pom.xml

<dependencies><!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.8.0</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.48</version></dependency>
</dependencies>

6.4.2 代码

具体代码可以参考 code/MyInterceptor

/*** @Author 千锋大数据教学团队* @Company 千锋好程序员大数据* @Description  自定义拦截器:对每一组的item_type和active_time都过滤出相应的HOST和USERID*/
public class MyInterceptor implements Interceptor {@Overridepublic void initialize() {//初始化方法,写拦截器初始化时的业务}@Overridepublic void close() {//关闭方法,写拦截器关闭时的代码}/*** 解析单条event* @param event* @return*/@Overridepublic Event intercept(Event event) {//输入String inputeBody=null;//输出byte[] outputBoday=null;//解析---这里定义对单条Event处理规则try {inputeBody=new String(event.getBody(), Charsets.UTF_8);ArrayList<String> temp = new ArrayList<>();JSONObject bodyObj = JSON.parseObject(inputeBody);//1)公共字段String host = bodyObj.getString("host");String user_id = bodyObj.getString("user_id");JSONArray data = bodyObj.getJSONArray("items");//2)Json数组=>every json objfor (Object item : data) {JSONObject itemObj = JSON.parseObject(item.toString());HashMap<String, Object> fields = new HashMap<>();fields.put("host",host);fields.put("user_id",user_id);fields.put("item_type",itemObj.getString("item_type"));fields.put("active_time",itemObj.getLongValue("active_time"));temp.add(new JSONObject(fields).toJSONString());}//3)Json obj 拼接outputBoday=String.join("\n",temp).getBytes();}catch (Exception e){System.out.println("输入数据:"+inputeBody);e.printStackTrace();}event.setBody(outputBoday);return event;}/*** 解析一批event* @param events* @return*/@Overridepublic List<Event> intercept(List<Event> events) {//输出---一批EventArrayList<Event> result = new ArrayList<>();//输入---一批Eventtry{for (Event event : events) {//一条条解析Event interceptedEvent = intercept(event);byte[] interceptedEventBody = interceptedEvent.getBody();if(interceptedEventBody.length!=0){String multiEvent = new String(interceptedEventBody, Charsets.UTF_8);String[] multiEventArr = multiEvent.split("\n");for (String needEvent : multiEventArr) {SimpleEvent simpleEvent = new SimpleEvent();simpleEvent.setBody(needEvent.getBytes());result.add(simpleEvent);}}}}catch (Exception e){e.printStackTrace();}return result;}/*** 实现内部类接口*/public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new MyInterceptor();}@Overridepublic void configure(Context context) {}}}

6.4.3 打包上传

使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到flume lib目录下

6.4.4 编写方案

[root@master flumeconf]# vi mytest.conf
a1.sources = s1
a1.channels = c1
a1.sinks = r1a1.sources.s1.type = TAILDIR
a1.sources.s1.positionFile = /home/flume/taildir_position.json
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1=/home/flume/data/.*log
a1.sources.s1.fileHeader = true
#使用自定义拦截器
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = com.flume.interceptor.MyInterceptor$Buildera1.channels.c1.type = file
a1.channels.c1.dataDirs = /home/flume/filechannle/dataDirs
a1.channels.c1.checkpointDir = /home/flume/filechannle/checkpointDir
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.r1.type = hdfs
a1.sinks.r1.hdfs.path = hdfs://master:8020/flume/spooldir
a1.sinks.r1.hdfs.filePrefix =
a1.sinks.r1.hdfs.round = true
a1.sinks.r1.hdfs.roundValue = 10
a1.sinks.r1.hdfs.roundUnit = minute
a1.sinks.r1.hdfs.fileSuffix= .log
a1.sinks.r1.hdfs.rollInterval=60
a1.sinks.r1.hdfs.fileType=DataStream
a1.sinks.r1.hdfs.writeFormat=Texta1.sources.s1.channels = c1
a1.sinks.r1.channel = c1

6.4.5 启动agent

[root@master flumeconf]# flume-ng agent -c ../conf/ -f ./mytest.conf -n a1 -Dflume.root.logger=INFO,console

6.4.6 测试:

[root@master ~]# vi my.sh
#!/bin/bash
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[{"item_type":"eat","active_time":156234},{"item_type":"car","active_time":156233}]
}'
echo $log>> /home/flume/data/test.log[root@master ~]# bash my.sh

第七章:选择器的使用

7.1 说明

​ Flume中的Channel选择器作用于source阶段 ,是决定Source接受的特定事件写入到哪个Channel的组件,他们告诉Channel处理器,然后由其将事件写入到Channel。

Agent中各个组件的交互
由于Flume不是两阶段提交,事件被写入到一个Channel,然后事件在写入下一个Channel之前提交,如果写入一个Channel出现异常,那么之前已经写入到其他Channel的相同事件不能被回滚。当这样的异常发生时,Channel处理器抛出ChannelException异常,事务失败,如果Source试图再次写入相同的事件(大多数情况下,会再次写入,只有Syslog,Exec等Source不能重试,因为没有办法生成相同的数据),重复的事件将写入到Channel中,而先前的提交是成功的,这样在Flume中就发生了重复。

Channel选择器的配置是通过Channel处理器完成的,Channel选择器可以指定一组Channel是必须的,另一组的可选的。

Flume分类两种选择器,如果Source配置中没有指定选择器,那么会自动使用复制Channel选择器.

  • replicating:该选择器复制每个事件到通过Source的Channels参数指定的所有Channel中。
  • multiplexing:是一种专门用于动态路由事件的Channel选择器,通过选择事件应该写入到哪个Channel,基于一个特定的事件头的值进行路由

7.2 案例演示:replicating selector

7.2.1 配置方案

[root@master flumeconf]# vi rep.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2a1.sources.r1.type=syslogtcp
a1.sources.r1.host = master
a1.sources.r1.port = 6666
a1.sources.r1.selector.type=replicatinga1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/rep
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=truea1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/rep
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundUnit=second
a1.sinks.s2.hdfs.useLocalTimeStamp=truea1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

7.2.2 启动agent的服务:

[root@master flumeconf]# flume-ng agent -c ../conf -f ./rep.conf -n a1 -Dflume.root.logger=INFO,console

7.2.3 测试:

[root@master ~]# echo "hello world hello master " | nc master 6666

7.3 案例演示:Multiplexing selector

7.3.1 配置方案

[root@master flumeconf]# vi mul.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2a1.sources.r1.type=http
a1.sources.r1.bind = master
a1.sources.r1.port = 6666
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.USER = c1
a1.sources.r1.selector.mapping.ORDER = c2
a1.sources.r1.selector.default = c1a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/mul
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=truea1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/mul
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundUnit=second
a1.sinks.s2.hdfs.useLocalTimeStamp=truea1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

7.3.2 启动agent的服务:

[root@master flumeconf]# flume-ng agent -c ../conf -f ./mul.conf -n a1 -Dflume.root.logger=INFO,console

7.3.3 测试:

[root@master ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://master:6666
[root@master ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this is my content"}]' http://master:6666

=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true

a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://master:8020/flume/%Y/%m/%d/mul
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundUnit=second
a1.sinks.s2.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2


### 7.3.2 启动agent的服务:```shell
[root@master flumeconf]# flume-ng agent -c ../conf -f ./mul.conf -n a1 -Dflume.root.logger=INFO,console

7.3.3 测试:

[root@master ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://master:6666
[root@master ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this is my content"}]' http://master:6666

Flume的学习及使用相关推荐

  1. Flume的学习笔记

    Flume的学习笔记 文章目录 Flume的学习笔记 1. Flume简介 1.1 Flume的基本概念 (1) 什么是Flume (2) Flume 目的 1.2 Flume 基本组件 (1) Fl ...

  2. 大数据开发笔记(六):Flume基础学习

      ✨大数据开发笔记推荐: 大数据开发面试知识点总结_GoAI的博客-CSDN博客_大数据开发面试​本文详细介绍大数据hadoop生态圈各部分知识,包括不限于hdfs.yarn.mapreduce.h ...

  3. Flume NG 学习笔记(五)Sinks和Channel配置

    一.HDFS Sink Flume Sink是将事件写入到Hadoop分布式文件系统(HDFS)中.主要是Flume在Hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景. ...

  4. Flume NG 学习笔记(八)Interceptors(拦截器)测试

    版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 拦截器主要是对事件的header信息信息操作,要么直接忽略他,要么修改他的数据 一.Event Serializers file ...

  5. Hadoop学习笔记系列文章导航

    一.为何要学习Hadoop? 这是一个信息爆炸的时代.经过数十年的积累,很多企业都聚集了大量的数据.这些数据也是企业的核心财富之一,怎样从累积的数据里寻找价值,变废为宝炼数成金成为当务之急.但数据增长 ...

  6. Flume基本原理及使用

    Flume是一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统.Flume最主要是用在分布式系统中,例如读取服务器本地的磁盘数据,并将数据写入到HDFS中. 对Flume的学习,最好就是结 ...

  7. 大数据开发工程师目录

    阶段一:走进大数据 第1周   学好大数据先攻克Linux 在步入大数据殿堂之前,先带领大家快速掌握大数据的必备技能:Linux的操作使用,为后面学习大数据技术打下坚实基础. 课程安排: 1.掌握Li ...

  8. 大数据学习笔记53:Flume Sink Processors(Flume接收器处理器)

    文章目录 一.Flume Sink Processors用户指南 二.Default Sink Processor 三.FailOver Sink Processor 四.Load Balancing ...

  9. 大数据之_数据采集Flume_Flume了解_学习内容介绍---Flume工作笔记002

    可以看到flume是个海量日志的采集,聚合和传输的系统 可以看到比如我们之前用的hive,有大量的日志可以用flume进行采集到hdfs中去 然后再看一下flume的具体学习内容.

最新文章

  1. 为什么不建议直接使用 Async 注解?
  2. 帮助你构建自适应布局的30款优秀 jQuery 插件(下篇)
  3. Flink + Iceberg 在去哪儿的实时数仓实践
  4. SAP 电商云 Spartacus UI Angular Component 动态创建的单步调试
  5. RxJava线程控制
  6. Linux新安装后设置root密码
  7. shareSDK 提示#warning:尚未配置[新浪微博]URL Scheme:sinaweibosso.或wb
  8. 和利时DCS系统设服务器,和利时DCS系统全套资料.pdf
  9. Matlab矩阵的运算
  10. AI嘻哈写歌词软件总结
  11. 项目中GIT的红色和绿色的标识不显示
  12. Endnote 域代码已更改
  13. MATLAB泰勒展开
  14. 仿真软件proteus构建LCD1602四线驱动实验
  15. Freebase中的基本概念
  16. 股票量化分析(11)——第二个策略(5日移动均线、双均线、MACD策略)
  17. sql语句分类(附mysql实操语句)
  18. cocoscreator 模拟点击
  19. GPU Skinning介绍
  20. DIT和DIF的基2FFT算法

热门文章

  1. php 加密解密方法,PHP加密解密方法
  2. 解决IDEA里提示“Spring Configuration Check“ “Unmapped Spring configuration files found.“的问题
  3. 我们得无盘工作站的做法
  4. 通往WinDbg的捷径(一
  5. Python 打包适用于win 7/xp系统的应用程序(exe)
  6. 4万高考冒名顶替事件_高考生冒名顶替上大学事件内幕调查
  7. 如何获取Flickr图片链接地址作为外链图片
  8. 深度学习 (四)Keras利用CNN实现图片识别(Mnist、Cifar10)
  9. java machine 报错_Rhapsody启动过程显示[Cannot Find Java Virtual Machine file]错误信息
  10. 终于结束了TensorFlow的安装。我不会但是我jio的要记录下来