欢迎来到Apache Flume

Flume是一个分布式的,高可靠的,高可用的,高性能的海量日志数据采集、聚合和传输的系统。它是基于数据流的简单的灵活的架构。它具有高鲁棒性并且有着可调节的可靠的故障恢复机制和许多的失效备援。它使用一个简单的可扩展的数据模型,该模型可适用于在线分析应用。

一丶概述

Apache Flume是一个分布式的可靠的可用的系统,该系统用于有效的采集,聚集和传输大量的日志数据,这些数据来源于许多不同的集中式数据存储目的地。`
Apache Flume的功能并不仅仅局限于日志数据的聚合。既然数据源是可定制的,Flume能够被用于传输大量的事件数据,包括但不局限于网络运输数据,社交媒体产生的数据,邮件信息和其他等等可能的数据源
Apache Flume在Apache Software Foundation是一个顶级的工程项目
目前有两个发行版本,0.9x版本和1.x版本
新老用户被鼓励使用1.x发行版本,以便于利用在最新的架构中改进的性能和更灵活的配置

二丶系统要求

1.java运行环境-Java1.8或之后的版本
2.内存-足够的内存用于配置source,channel和sink
3.硬盘空间-足够的硬盘空间用于channel或者sink使用
4.目录权限-允许agent读写权限

三丶数据流模型

一个Flume事件被定为一个数据流单元,该单元有着1字节的有效载荷和一些可选择的一串属性。一个Fulme用户代理是一个运行与java虚拟机上的进程,主持着事件从外部的数据源到下一个目的地(下一跳)。即Flume是一个JVM进程实例,控制管理着事件流的来源和目的地。
Flume的数据流模型有三个组件构成,分别为source,channelsink
source消耗着由外部数据源(例如web服务器)发送来的数据。外部数据源发送的数据的格式必须是目的Flume source所能识别的。例如,一个Avro Flume source能能够用于接收Avro的事件或者其他Flume代理(该代理必须发送来源于Avro sink的事件)。一个类似的流可以被定义为使用Thrift Flume Source接收来自Thrift sink或者Thrift RPC客户机或者由任何语言编写的遵循thrift协议的thrift客户机。当一个source接收到一个事件,source将存储该事件到一个或者多个channel中。channel被动的存储着事件直到事件被sink所消费。例如,文件channel是基于本地文件系统的。sink从channel中移出事件并将其放入一个外部的仓库中例如HDFS(通过HDFS sink放入),或者放入下一个Flume代理的source中(下一跳)。source和sink在一个特定的Flume代理中是异步执行的,都需要使用分期存储在channel中的事件。

重要概念释义:
flume Event:flume 事件,被定义为一个具有有效荷载的字节数据流和可选的字符串属性集。
flume Agent:flume 代理,是一个进程承载从外部源事件流到下一个目的地的过程。包含source channel 和 sink。
Source:数据源,消耗外部传递给他的事件,外部源将数据按照flume Source 能识别的格式将Flume 事件发送给flumeSource。
Channel:数据通道,是一个被动的存储,用来保持事件,直到由一个flume Sink消耗。
Sink:数据汇聚点,代表外部数据存放位置。发送flume event到指定的外部目标。

理解:类似于操作系统中的生产者消费者模型。channel是缓冲区,既可以是基于由操作系统提供的文件管理系统生成,也可以是其他形式(如数据库)。source从外部数据源中获取数据,存储到channel中。sink从channel从获取数据,并发送到外部仓库或者发送到下一跳的Flume代理的source中(作为整个流的过程的一个部分)。注意到虽然source和sink是异步执行的(即不相互影响,不等待同步应答),但是由于两者都要操作channle(即数据文件),该文件是临界资源,为了保持数据的完整性和一致性,source和sink对channle的操作必须是需要控制。另外,传递的事件的格式需要遵从一定的协议。如Avro就只能接受和发送来源于Avro source或者Avro sink的数据流,thirft就只能接受和发送遵从thirft协议的数据流。类似于计算机网络中协议的概念,协议定义了传输的数据的格式和接收或发送数据时所需采取的动作。事件类似于一个分组,含有一个字节的有效载荷和若干首部字段,首部字段有许多可选择属性字段,用于控制。总之,Flume是一个运行于JVM上的进程,该进程依赖于两个线程(source和sink)和一个数据存储结构(channle),遵循不同的Flume协议(如Avro和thift),发送和接收数据,将数据存于本地或者发送给下一跳(类似于路由选择算法的下一跳)

四丶复杂流动性

Flume允许用户创建进行多级流动的流,事件在到达目的地之前可以经过多种多样的Flume代理。Flume也允许用户定义扇入(多到一)和扇出流(一到多),基于上下文的路由选择和故障转移、失败处理 (如果路由到下一跳失败的话所采取的策略)

理解: 扇入(fan-in)和扇出(fan-out)原本是电子技术中电子元件及其输入端和输出端的结构。在软件工程中,扇入定义为一个模块被多个模板调用,扇出定义为一个模块调用多个模块。在Flume中,一个Flume代理类似于一个路由器,用户可以定义多级路由或者选择多种路由选择算法和路由失败时的默认处理。

五丶可靠性

在每一个Flume代理中,事件被存储于channel中。事件接下来将被发送至下一个代理或者临时的仓库(例如HDFS)中。当且仅当事件被存储于下一个代理或者仓库中后,才会从channel中移除。这就是Flume实现端到端单跳可靠数据传输的方式。
Flume采用事务型的方法Kauai保证事件的可靠交付。source和sink分别在一个事务中封装事件的存储和恢复,每个事件是由channel存储或者提供的。这保证了事件在流中的点对点可靠传输。在多级流动中,来自上一跳的sink和来自下一跳的source各自运行着他们的事务,以此来保证事件安全的存储于下一跳的channel中。
理解:Flume通过事务型的数据传递以及事件的删除机制(仅当成功传输完毕后才删除),来保证数据的可靠传输。source和sink独自的运行着自己的事务,以保证数据的安全性

六丶恢复性

事件存储于channle中,出现故障时可以恢复。Flume提供了一个持久的文件channel(基于本地文件系统的)。Flume也提供了内存channle,该channle仅仅简单的将数据存于内存中,这将更快但是如果代理进程出现故障的话数据不能恢复。
理解:Flume提供了两种channel手段,即channel可以以内存或文件的方式实现,内存更快,但是不可恢复,而文件虽然比较慢但提供了可恢复性。

七丶Flume配置

7.1 配置一个代理

Flume代理配置被存储于一个本地的配置文件。它是一个遵从java格式的文本文件。一个配置文件中可以配置多个Flume代理。配置文件中包含每个source,sink,channel的属性和它们是如何在流中关联到一起的

7.2 配置个人的组件

Flume每个组件(source,sink或者channel)都有名字,类型和一系列特殊的属性。例如,一个Avro source需要一个主机名称(或者ip地址)和一个端口地址来接收数据。一个内存channle需要指定能够有的最大的序列长度(称为capacity),一个HDFS sink需要知道文件系统的URI,创建文件的地址,文件旋转的频率。所有的这些组件的属性都需要在配置文件中设置

7.3 将各个组件组合在一起

Flume 代理为了构建流,需要知道载入哪个独立的组件并且知道他们是如何连接在一起的。完成这项任务需要列出每个组件的名称,和指定连接每对source和sink的channel。例如,假设有个,一个来源于Avro source的事件(叫做avroWeb),要传输到一个HDFS sink(叫做hdfs-cluster1),通过一个文件channel(叫做file-channel),那么配置文件需要包含这些组件的名称和将file-channel作为连接avriWeb source和hdfs-cluster1 sink的共享channel。

7.4 开启代理

Flume代理采用shell脚本(称为flume-ng,位于Flume distribution的bin目录中)开启。你需要在命令行中指定代理名称,配置文件目录和配置文件:

$bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

现在,代理将会根据给定的配置文件中的配置,开始运行

7.5 一个简单的实例

这里,我们给出一个简单的配置文件例子,描述一个单结点的Flume 部署。这个配置文件让用户生成事件随后记录他们到控制台中。

# example.conf: A single-node Flume configuration# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

该配置定义了一个叫做al的单结点代理。a1有一个source(监听端口44444),一个缓存数据到内存的channel和一个记录事件到控制台的sink。这个配置首先声明不同的组件,之后描述他们的类型和配置参数。一个给定的配置文件可以定义多个不同的代理。当一个代理进程被创建时,一个标识将被发送(用于显示哪一个代理正在运行)
根据上文给出的配置文件,我们可以按照下文来开启Flume

  $bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

注意到,在一个完整的部署中,我们需要典型的多包括一个选项: –conf=. 将包含一个shell脚本flume-env.sh和可能包含一个log4j属性文件。在这个例子中,我们传递了一个java选项,让Flume将日志输出到控制台当中,并且我们没有使用用户环境脚本。
在另一个命令行中,我们能够telnet端口44444然后发送Flume一个事件:

$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK

原来的终端将输出事件(以日志消息的形式)。
恭喜你,你已经成功配置和部署了一个Flume代理,后面的章节仅仅包含更多的代理配置细节。
运行过程和结果如下:

7.5.1 从官网下载源文件:

7.5.2 解压,在在解压后的文件apache-flume-1.7.0-bin/conf下创建一个example.conf,内容如上文所述:

7.5.3运行cmd, 进入到apache-flume-1.7-0-bin\bin目录下,运行上文的命令。


将显示:

7.5.4 打开另一个cmd,输入telnet localhost 44444,连接成功后将变成输出状态,输入hello world


同时,源cmd端口将以日志的形式显示结果:

7.6 在配置文件中使用环境变量

Flume有能力在配置文件中替代环境变量,例如:

 a1.sources = r1a1.sources.r1.type = netcata1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = ${NC_PORT}a1.sources.r1.channels = c1

注意:一般只能对值进行操作,不能对变量操作(也就是说,只能放在等式的右边)
这可以通过java system的特质来完成。上文仅仅是一个例子,配置环境变量可以采用其他方式,例如在conf/flume-env.sh中配置。

7.7 记录原始数据

通过管道摄取来记录原始的数据流,这并不是许多生产环境所需求的行为,因为这可能导致泄露敏感数据或安全相关的配置(例如密钥)。缺省条件下,Flume并不会记录这些信息。另一方面,如果数据管道破损了,Flume将会企业提供线索来调试问题。
一个用事件管道来调试问题的方式是设置一个附加的Memory Channel 关联Logger Sink,这将会输出所有的事件数据到Flume日志中。然而,在某些情况下,这方法是不足的。
为了确保记录事件和配置相关数据,一些Java系统属性必须被配置(除了log4j属性)
为了记录配置相关的记录,设置Java系统属性

Dorg.apache.flume.log.printconfig=true

这既可以在命令行中设置,也可以在flume-env.sh.中配置JAVA_OPTS变量
为了记录数据记录,设置Java系统属性:

 Dorg.apache.flume.log.rawdata=true

设置方式和上文所述相同。对于大多数的组件,log4j日志级别必须设置调试或者追踪,来确保事件相关的日志出现在Flume记录中。
下面是一个既允许配置记录和原始数据记录,又设置了log4j日志等级的DEBUG(输出在控制台中):

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true

7.8 安装第三方插件

Flume有着完整的基于插件的架构,有着许多即插即用的source,channel,sink,序列化器,也有许多实现是和Flume分离的。
在flueme-env.sh中添加用户自己的包到FLUME_CLASSPATH 变量中,来实现定制的Flume组件是可行的,Flume现在支持一个特殊的目录(称为plugins.d),它会自动的拾取插件(只要这个插件是按特别的格式打包的)。这使得更方便的插件包管理以及更简单的调试以排除故障和寻找故障(尤其是类库依赖冲突)

7.8.1 plugins.d目录

plugins.d目录存在于

  $FLUME_HOME/plugins.d

在Java启动时,flume-ng运行脚本,寻找位于plugins.d目录下符合特定格式并且放于恰当的路径下的的插件

7.8.2 插件目录布局

每个插件(是plugins.d的子目录下)能够有三个子目录:
1.lib-插件的包(jar)
2.libext-插件所依赖的包
3.native-任何所需的本地库,例如.so文件
下面是一个插件目录结构的例子(插件位于plugins.d目录下):

plugins.d/
plugins.d/custom-source-1/
plugins.d/custom-source-1/lib/my-source.jar
plugins.d/custom-source-1/libext/spring-core-2.5.6.jar
plugins.d/custom-source-2/
plugins.d/custom-source-2/lib/custom.jar
plugins.d/custom-source-2/native/gettext.so

八丶数据获取

Flume支持一系列的从外部数据源获取数据的机制

8.1 RPC

一个Avro客户端(包含在Flume分布中),采用avro RPC机制,能够发送一个给定的文件到Flume Avro source中:

$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10

上面的命令,将发送/usr/logs/log.10的内容到监听该端口的Flume Source中

8.2 网络流

Flume支持下面的机制来读流行的记录流类型,例如:
1. Avro
2. Thrift
3. Syslog
4. Netcat

8.3 设置多级代理流

为了让数据流穿过多个代理或者下一跳,前一个代理的sink和目前的下一跳的source都需要是avro类型,并且sink必须指向指向主机名(或ip地址)和下一跳source的端口

九丶合并

在记录收集中一个非常常见的场景是大量的产生记录的客户机发送数据到少量的消费者代理(这些代理代理依附于存储子系统)。例如,来源于成百上千的web服务器的记录发送给一些写入HDFS簇群的代理。

这在Flume中能被实现,通过配置一系列的第一层代理(有着avro sink,均指向一个单一代理的avro source),下文将阐述thrift是如何实现这种场景的。位于第二层代理的source合并接收到的事件到单一channel中,然后被sink所消费,发送到目的地。

十丶多级流

Flume支持多路传输的流到一个或多个目的地。这通过定义一个流多路转接器来实现。流多路转接器能够复制或者有选择的按照某天路线发送一个事件到一个或多个channel中。

上图的例子展示了一个source(来自于foo代理)扇出流到三个不同的channel中。这个扇出可以复制或者多路复用。在复制流的情况下,每个事件被发送到所有三个channel中。在多路复用的情况下,一个事件仅仅发送到所有可用channle的某个子集中(子集中的元素满足:事件的属性匹配硬件预设的属性)。例如,如果一个事件属性(称为txntype)被设置为”customer”,那么应该送往channel1和channle2。如果属性是”vendor”,那么应该送往channel2,否则channel3。这个地图可以在配置文件中设置。

十一丶配置

正如上面章节所述,Flume的代理配置是读一个文件(类似于java性质文件格式,有着分层的属性设置)

11.1 定义流

为了在一个单一代理中定义流,你需要将sources和sinks通过channel连接在一起。你需要列出给定的代理的source,sink,channels,然后指定source和sink到一个channle。一个source实例能够指定多个channel,但是一个sink仅仅只能指定一个特别的channle,格式如下所示:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2># set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

例如,一个叫agent_foo的代理正在读来自一个外部avro客户机的数据,并通过一个内存channel发送到HDFS中,配置文件weblog.config如下所示:

# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

这将使得事件流通过 mem-channel-1从avro-AppSrv-source到 hdfs-Cluster1-sink。当这个代理根据配置文件weblog.config启动时,它将实例化这个流。

11.2 配置个人的流

在定义流之后,你需要配置每个组件的属性,这和你配置其他组件的类型、值是相同的分层的名字空间风格。

# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue># properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue># properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

属性”type”必须对每个组件设置以让Flume明白它应该属于那种类型的对象。每个组件都有一系列的自己的属性,这些都需要按照需要设置。在上一个例子中,我们有一个通过 mem-channel-1从avro-AppSrv-source到 hdfs-Cluster1-sink的流,下面展示每个组件的属性定义:

 agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1# set channel for sources, sinks# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata

11.3 在代理中添加多级流

11.4 配置一个多代理流

11.5 扇出流

正如我们在前面的章节所讨论的那样,Flume支持扇出流(从一个source输出到多个channel)。扇出流有两种实现方法,复制和多路复用。在复制流中,事件将被发送到所有配置的channel中。对于多路复用的情况,事件仅仅发送到那些具有接收资格的子集中。为了实现扇出流,你需要为一个source指定一系列的channel和扇出的策略。这是通过添加channel选择器来实现的。选择器既能复制又能多路复用。之后进一步的,如果是多路复用的话,为选择题指定选择规则,如果不指定选择器,那么默认情况下选择器将是复制的:

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2># set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2># set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2><Agent>.sources.<Source1>.selector.type = replicating

多路复用的选择有着更进一步的需要定义的属性来分叉流。这需要指定一个特定的事件归属于哪个channel集合。选择器将对每个配置好的属性进行检查(在事件头中)。如果匹配成功,那么这个事件将会送往所有匹配成功的映射到的channel集合中。如果没有匹配成功的,那么事件将会送往配置好的默认的channel中:

# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...<Agent>.sources.<Source1>.selector.default = <Channel2>

这个映射允许channel的每个属性值部分重叠。
下面的例子有一个单一的流(该流有两条路径)。被命名为agent_foo的代理有一个avro source和两个channel连接到两个sink:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

选择器将检测事件的头(称为State)。如果其值为”CA”,那么送往mem-channel-1,如果是”AZ”,那么送往file-channel-2,如果是”NY”,则两个都送往。如果”State”头并不匹配任何,那么将送往mem-channel-1(这是配置文件中指定的默认配置)
选择器也支持可选择的channel。为了指定header中的选项,配置参数字段”optional”按如下的方式:

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

选择器首先将试图写入需要的channel,之后如果任何一个这些channel消耗失败的话,选择器将终止事务。事务将对所有的channel进行再尝试,一旦所有的需要的channel都成功消费了,选择器将试图写入可选择的channel。任何可选择的channel中出现的故障,则该事件被可选择channel忽略并且不再重试。
如果在可选择的channel和需要的channel中有重叠(对于某些特殊的header),那么这个channel将被视为是需要的channel,任何一个channel出现故障将会导致所有的channel重试。例如,在上面的例子中,因为头”CA”, mem-channel-1 被视为需要的(即使它被标记为可选择的),并且如果写入这个channel失败,将会导致所有的channel重试。
注意到,如果一个header并不含有任何需要的channel,那么这个世界将被送往默认channel并且企图写入这个header书写的可选择的channel中。如果没有指定需要的channel的话,那么指定的可选择的channel仍会导致事件写入默认channel中。如果没有指派默认channel并且没有需要的channel的话,选择器将试图将事件写入可选择的channel中。任何故障仅仅只会被简单的忽略(不会导致所有的重试)

十二丶FLume Source

12.1 Avro Source

监听AVRO端口来接受来自外部AVRO客户端的事件流。

利用Avro Source可以实现多级流动、扇出流、扇入流等效果。

另外也可以接受通过flume提供的Avro客户端发送的日志信息。
属性说明:

字段 含义
!type 类型名称,”AVRO”
!bind 需要监听的主机名或IP
!port 要监听的端口
threads 工作线程最大线程数
interceptors 空格分隔的拦截器列表
compression-type none 压缩类型,可以是“none”或“default”,这个值必须和AvroSource的压缩格式匹配
ssl false 是否启用ssl加密,如果启用还需要配置一个“keystore”和一个“keystore-password”。
keystore 为SSL提供的java密钥文件所在路径。
keystore-password 为SSL提供的java密钥文件 密码。
keystore-type JKS 密钥库类型可以是“JKS”或“PKCS12”。
exclude-protocols SSLv3 空格分隔开的列表,用来指定在SSL / TLS协议中排除。SSLv3将总是被排除除了所指定的协议。
ipFilter false 如果需要为netty开启ip过滤,将此项设置为true
ipFilterRules 定义netty的ip过滤设置表达式规则

对于一个叫a1的代理,下面是一个例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

ip过滤器规则:
ip过滤器定义了N个网状的ip过滤器,其定义必须按照如下的格式:

<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern> or allow/deny:ip/name:pattern

例如:

 ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*

注意,第一条规则将会被应用匹配于一个客户机连接到localhost,正如下面的例子所示:
“allow:name:localhost,deny:ip:” This will
deny the client on localhost be allow clients from any other ip

“deny:name:localhost,allow:ip:“

12.2 Thift Source

监听Thrift端口来接受来自外部Thrift客户端的事件流。

12.3 Exec Source

可以将命令产生的输出作为源。
属性说明:

项目 价格
Computer $1600
Phone $12
Pipe $1
字段 含义
!type 类型名称,需要是”exec”
!command 要执行的命令
shell Ashell invocation used to run the command. e.g. /bin/sh -c. Required only forcommands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle 10000 毫秒为单位的时间,用来声明等待多久后尝试重试命令
restart false 如果cmd挂了,是否重启cmd
logStdErr false 无论是否是标准错误都该被记录
batchSize 20 同时发送到通道中的最大行数
batchTimeout 3000 如果缓冲区没有满,经过多长时间发送 数据
selector.type 复制还是多路复用
selector.* Depends on the selector.type value
interceptors 空格分隔的拦截器列表

12.4 JMS Source

接收来自JMS目的地的消息。提供一批可配置的大小,选择器,过滤器和Flume事件转换器。注意到使用时需要提供JMS Jar的路径在Flume的类路径中。

12.4.1 转换器

JMS Source允许即插即用的转换器,尽管默认的转换器已经适用于大多数情况了。默认的转换器可以转换字节类型,文本类型和对象类型到Flume类型,在所有的情况下,信息中的头将被加入到Flume事件的header中。

12.5 Spooling Directory Source

这个Source允许你将将要收集的数据放置到”自动搜集”目录中。这个Source将监视该目录,并将解析新文件的出现。事件处理逻辑是可插拔的,当一个文件被完全读入通道,它会被重命名或可选的直接删除。
要注意的是,放置到自动搜集目录下的文件不能修改,如果修改,则flume会报错。另外,也不能产生重名的文件,如果有重名的文件被放置进来,则flume会报错。

12.6 Taildir Source, Kafka Source

这些是预先浏览,具有实验性的特色,在Windows中并不能运行

12.7 NetCat TCP Source

一个很像netcat的source,监听给定的端口并将文本的每一行转换为一个事件。也就是说,它打开一个给定的端口然后监听数据。例外情况是收到的数据时一个新行的分开的文本。每一行都会被转化为一个Flume事件,并且通过链接到的channel发送出去。

12.8 NetCat UDP Source

与NetCat TCP Source类似,运行在UDP之上。

12.9 Sequence Generator Source

一个简单的序列发生器,不断的产生事件,值是从0开始每次递增1。例如:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1

主要用来进行测试。

12.10 HTTP Source

12.10.1 概述

TTP Source接受HTTP的GET和POST请求作为Flume的事件,其中GET方式应该只用于试验。

该Source需要提供一个可插拔的”处理器”来将请求转换为事件对象,这个处理器必须实现HTTPSourceHandler接口,该处理器接受一个 HttpServletRequest对象,并返回一个Flume Envent对象集合。

从一个HTTP请求中得到的事件将在一个事务中提交到通道中。因此允许像文件通道那样对通道提高效率。

如果处理器抛出一个异常,Source将会返回一个400的HTTP状态码。

如果通道已满,无法再将Event加入Channel,则Source返回503的HTTP状态码,表示暂时不可用。

12.10.2 属性说明

字段 属性
!type 类型,必须为”HTTP”
!port 监听的端口
bind 0.0.0.0,监听的主机名或ip
handler org.apache.flume.source.http.JSONHandler处理器类,需要实现HTTPSourceHandler接口
handler.* 处理器的配置参数
enableSSL 是否开启SSL,如果需要设置为true。注意,HTTP不支持SSLv3。
excludeProtocols SSLv3 空格分隔的要排除的SSL/TLS协议。SSLv3总是被排除的。
keystore 密钥库文件所在位置。
keystorePasswordKeystore 密钥库密码

12.10.3 常用的Handler

12.10.3.1 JSONHandler

可以处理JSON格式的数据,并支持UTF-8 UTF-16 UTF-32字符集,该handler接受Evnet数组,并根据请求头中指定的编码将其转换为Flume Event。
如果没有指定编码,默认编码为UTF-8.
JSON格式如下:

[{"headers": {"timestamp" :"434324343","host" :"random_host.example.com"},"body": "random_body"},{"headers": {"namenode" :"namenode.example.com","datanode" :"random_datanode.example.com"},"body": "really_random_body"}]

设置字符集时,请求必须包含content type 并设置为

application/json;charset=UTF-8。
Typetype=newTypeToken<List<JSONEvent>>(){}.getType();

12.10.3.2 Blob Handler

BlobHandler是一种将请求中上传文件信息转化为event的处理器。

12.11 Custom Source

自定义源是自己实现源接口得到的,自定义源的类和其依赖包必须在开始时就放置到Flume的类加载目录下。

十三丶Flume Sink

13.1 HDFS Sink

这个Sink将事件写入HDFS中(即Hadoop分布式文件系统)。它目前支持创建文本文件和序列化文件。它支持两种文件类型的压缩。文件可以被定期的关闭和打开新的文件(基于定时器或者数据的大小或者事件的个数)。它也可以通过属性(例如时间戳或者引起这个事件的主机)来划分或打包数据。一个HDFS的目录路径可能包含格式化的转义序列(将会被HDFS Sink 替换来形成存储事件的文件/目录)。使用这个sink需要你首先安装Hadhoop,由Flume能够使用Hadhoop的包来访问HDFS簇。注意,此版本hadoop必须支持sync()调用。
转义序列如下所示:

Alias Description 含义
%{host} Substitute(代用品) value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds(毫秒)
%a locale’s short weekday name (Mon, Tue, …)
%A locale’s full weekday name (Monday, Tuesday, …)
%b locale’s short month name (Jan, Feb, …)
%B locale’s long month name (January, February, …)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%e day of month without padding (1)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%n month without padding (1..12)
%M minute (00..59)
%p locale’s equivalent(等价的) of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y l ast two digits(数字) of year (00..99)
%Y year (2010)
%z +hhmm numeric(数) timezone(时区) (for example, -0400)
%[localhost] Substitute(代用品) the hostname(主机名称) of the host where the agent is running
%[IP] Substitute the IP address of the host where the agent is running
%[FQDN] Substitute(代用品) the canonical(依教规的) hostname(主机名称) of the host where the agent is running

例如:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

上方的配置将时间戳下舍入到十分钟,例如一个事件的时间戳为:

11:54:34 AM, June 12, 2012

其生产的hdfs路径将为:

/flume/events/2012-06-12/1150/00.

13.2 Hive Sink

这个Sink可以将含分隔符的文本或JSON数据事件直接导入Hive的表或分区。

事件Event是使用Hive transactions编写的,当一组Event被提交到Hive中,他们立即可以通过Hive被查询出来。

Flume要写入数据的分区即可以预先创建好,也可以在缺失时由Flume来创建。

Flume收到的数据字段将映射到Hive表的列上。

此功能是一个预览功能,不推荐在生产环境下使用。

13.3 Custom Sink

自定义接收器,是自己实现的接收器接口Sink来实现的。
自定义接收器的类及其依赖类须在Flume启动前放置到Flume类加载目录下。

十四丶Flume Channel

14.1 Memory Channel

Memory Channel,内存通道;事件将被存储在内存中的具有指定大小的队列中。
非常适合那些需要高吞吐量但是失败时会丢失数据的场景下。

14.2 JDBC Channel

事件被持久存储在可靠的数据库中。目前支持嵌入式的Derby数据库。如果可恢复性非常的重要可以使用这种方式。

14.3 Spillable Memory Channel

Spillable Memory Channel:内存溢出通道。

事件被存储在内存队列和磁盘中,内存队列作为主存储,而磁盘作为溢出内容的存储。

内存存储通过embeddedFile channel来进行管理,当内存队列已满时,后续的事件将被存储在文件通道中,这个通道适用于正常操作期间适用内存通道已期实现高效吞吐,而在高峰期间适用文件通道实现高耐受性。通过降低吞吐效率提高系统可耐受性。

如果Agent崩溃,则只有存储在文件系统中的事件可以被恢复;此通道处于试验阶段,不建议在生产环境中使用。

十五丶Flume Channel Selector

Selector(选择器)可以工作在复制或多路复用(路由) 模式下。如果没有指定类型,则默认是复制模式

15.1 复制Channel选择器(默认)

下面是一个配置文件的例子,代理名为a1,source名为r1:

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

在上面的配置中,C3是一个可选择的channel。如果写入C3失败,则仅仅简单的忽略。既然C1和C2并没有标记是可选择的,那么写入这些Chanel中任何一个失败都会导致事务失败

15.2 多路传输Channel选择器

下面是一个配置文件的例子,代理名为a1,source名为r1:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

15.3 定制Channel选择器

个定制的channel选择器是你自己通过ChannelSelector接口实现的。当代理启动时,它的类和依赖必须放入代理的类路径中,例如如下所示:

a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector

十六丶Flume Sink Processor

Sink Group允许用户将多个Sink组合成一个实体。
Flume Sink Processor 可以通过切换组内Sink用来实现负载均衡的效果,或在一个Sink故障时切换到另一个Sink。

16.1 默认Sink处理器

Default Sink Processor 只接受一个 Sink,不要求用户为单一Sink创建processor.

16.2失效备援Sink处理器

Failover Sink Processor 维护一个sink们的优先表。确保只要一个是可用的就事件就可以被处理。

失败处理原理为:为失效的sink指定一个冷却时间,在冷却时间到达后再重新使用。

sink们可以被配置一个优先级,数字越大优先级越高,如果sink发送事件失败,则下一个最高优先级的sink将会尝试接着发送事件;如果没有指定优先级,则优先级顺序取决于sink们的配置顺序,先配置的默认优先级高于后配置的。

在配置的过程中,设置一个groupprocessor ,并且为每个sink都指定一个优先级。优先级必须是唯一的。另外可以设置maxpenalty属性指定限定失败时间。

16.3 负载均衡Sink处理器

Load balancing Sink processor 提供了在多个sink之间实现负载均衡的能力,它维护了一个活动sink的索引列表,并支持轮询或随机方式的负载均衡,默认值是轮询方式,可以通过配置指定,也可以通过实现AbstractSinkSelector接口实现自定义的选择机制。

16.4 定制Sink处理器

目前尚未支持。

十七丶Flume Interceptor

Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。

拦截器需要实现org.apache.flume.interceptor.Interceptor接口。

拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。

拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截。

一个拦截器返回的事件列表被传递给链中的下一个拦截器。

如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可。

如果要删除所有事件,只需返回一个空列表。

17.1 时间戳拦截器

这个拦截器在事件头中插入以毫秒为单位的当前处理时间。

头的名字为timestamp,值为当前处理的时间戳。

如果在之前已经有这个时间戳,则保留原有的时间戳。

17.2 主机拦截器

这个拦截器插入当前处理Agent的主机名或ip

头的名字为host或配置的名称

值是主机名或ip地址,基于配置。

17.3 静态拦截器

此拦截器允许用户增加静态头信息使用静态的值到所有事件。

目前的实现中不允许一次指定多个头。

如果需要增加多个静态头可以指定多个Static interceptors

17.4 全局唯一识别拦截器

这个拦截器在所有事件头中增加一个全局一致性标志,其实就是UUID。

17.5 搜索替代拦截器

这个拦截器提供了简单的基于字符串的正则搜索和替换功能。使用规则和Java相同。

17.6 正则表达式过滤拦截器

使用指定正则表达式匹配事件,并将匹配到的组作为头加入到事件中,它也支持插件化的序列化器用来格式化匹配到的组在加入他们作为头之前。
例1:
如果Flume的事件体包含1:2:3:4foobar5,你需要使用下面的配置:

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

被提取的事件将会包含相同的头,但是下面的头将会被添加:

one=>1, two=>2, three=>3

例子2:
如果Flume的事件体包含2012-10-18 18:47:57,614 some log line,那么你需要使用下面的配置:

a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

被提取的事件将会包含相同的头,但是下面的头将会被添加:
timestamp=>1350611220000

十八丶所有可用组件

Component Interface Type Alias Implementation Class
org.apache.flume.Channel memory org.apache.flume.channel.MemoryChannel
org.apache.flume.Channel jdbc org.apache.flume.channel.jdbc.JdbcChannel
org.apache.flume.Channel file org.apache.flume.channel.file.FileChannel
org.apache.flume.Channel org.apache.flume.channel.PseudoTxnMemoryChannel
org.apache.flume.Channel org.example.MyChannel
org.apache.flume.Source avro org.apache.flume.source.AvroSource
org.apache.flume.Source netcat org.apache.flume.source.NetcatSource
org.apache.flume.Source seq org.apache.flume.source.SequenceGeneratorSource
org.apache.flume.Source exec org.apache.flume.source.ExecSource
org.apache.flume.Source syslogtcp org.apache.flume.source.SyslogTcpSource
org.apache.flume.Source multiport_syslogtcp org.apache.flume.source.MultiportSyslogTCPSource
org.apache.flume.Source syslogudp org.apache.flume.source.SyslogUDPSource
org.apache.flume.Source spooldir org.apache.flume.source.SpoolDirectorySource
org.apache.flume.Source http org.apache.flume.source.http.HTTPSource
org.apache.flume.Source thrift org.apache.flume.source.ThriftSource
org.apache.flume.Source jms org.apache.flume.source.jms.JMSSource
org.apache.flume.Source org.apache.flume.source.avroLegacy.AvroLegacySource
org.apache.flume.Source org.apache.flume.source.thriftLegacy.ThriftLegacySource
org.apache.flume.Source org.example.MySource
org.apache.flume.Sink null org.apache.flume.sink.NullSink
org.apache.flume.Sink logger org.apache.flume.sink.LoggerSink
org.apache.flume.Sink avro org.apache.flume.sink.AvroSink
org.apache.flume.Sink hdfs org.apache.flume.sink.hdfs.HDFSEventSink
org.apache.flume.Sink hbase org.apache.flume.sink.hbase.HBaseSink
org.apache.flume.Sink asynchbase org.apache.flume.sink.hbase.AsyncHBaseSink
org.apache.flume.Sink elasticsearch org.apache.flume.sink.elasticsearch.ElasticSearchSink
org.apache.flume.Sink file_roll org.apache.flume.sink.RollingFileSink
org.apache.flume.Sink irc org.apache.flume.sink.irc.IRCSink
org.apache.flume.Sink thrift org.apache.flume.sink.ThriftSink

十九丶Is Flume a good fit for your problem?

如果你需要获取文本的日志数据放入Hadhoop/HDFS中,那么Flume很适合你的问题,可以完美的解决你的问题。对于其他情况,下面是一些指导方针:
Flume被设计来通过一种相对稳定的,潜在的,复杂拓的扑结构来传输和获取日常产生的事件数据。”事件数据”的概念是相当宽泛的。对于Flume,一个事件仅是一类的二进制对象。对于一个事件能有多大有一些限制,例如,它不能大于内存的最大容量或磁盘最大容量(对于单机而言),但是实际上,Flume的事件可以是任何文件(文本文件,图像文件等等)。一个事件的关键属性是他们是在一个连续的流环境中产生的。如果你的数据不是定期产生的(例如,你想将一个批量数据放入Hadhoop簇群中),那么Flume仍然可以工作,但是对你的情况而言可能是有点大材小用的。Flume喜欢相对稳定的拓扑结构。你的拓扑结构不必是一成不变的,因为Flume可以处理拓扑结构的变化,并且不会丢失数据,而且能够忍受周期性的重新配置(因为其容错机制和自动精简配置)。当然,如果你计划每天都更改拓扑结构的话,Flume可能不会很好的工作,因为重新配置需要一些配置时间和费用。

Flume官方文档阅读笔记及实际操作相关推荐

  1. Qt官方文档阅读笔记-对官方Star Delegate Example实例的解析

    对应的博文为: 目录 Star Delegate Example StarDelegate Class Definition StarDelegate Class Implementation Sta ...

  2. spring官方文档阅读笔记

    前言 几个月前阅读spring文档时做的笔记,记录了以写我认为重要的内容. IOC container IOC(Inverse of Control) 控制反转,也称为DI(Dependency In ...

  3. Qt官方文档阅读笔记-QStyledItemDelegate Class描述

    对应的原文为: 笔记如下: 简单描述: QStyledItemDelegate提供了展示和编辑item的功能,让这两种功能更有个性化.QStyledItemDelegate是所有Item View的默 ...

  4. Ti 官方文档阅读笔记

    文章目录 参考资料 Optimizing TI mmWave Radar Configurations for FCC Certification Programming Chirp Paramete ...

  5. Javassist 官方文档 随手笔记

    Javassist 官方文档 随手笔记 Javassist.CtClass Class search path Introspection and customization \$0, \$1, \$ ...

  6. ZooKeeper官方文档学习笔记03-程序员指南03

    我的每一篇这种正经文章,都是我努力克制玩心的成果,我可太难了,和自己做斗争. ZooKeeper官方文档学习笔记04-程序员指南03 绑定 Java绑定 客户端配置参数 C绑定 陷阱: 常见问题及故障 ...

  7. Open3D官方文档学习笔记

    Open3D官方文档学习笔记 第一部分--点云 1 可视化点云 2 体素降采样 3 顶点法线评估 4 访问顶点法线 补充:Numpy在Open3D中的应用 5 裁剪点云 补充1:获取点云坐标 补充2: ...

  8. JMeter官方文档阅读及实践笔记(上)

    JMeter笔记 一.测试计划元件概览 本节简单介绍测试计划的不同部分. 最小测试将包括测试计划.线程组和一个或多个采样器. 1.Thread Group,线程组 线程组元素是任何测试计划的起点.所有 ...

  9. vue.js 2.0 官方文档学习笔记 —— 01. vue 介绍

    这是我的vue.js 2.0的学习笔记,采取了将官方文档中的代码集中到一个文件的形式.目的是保存下来,方便自己查阅. !官方文档:https://cn.vuejs.org/v2/guide/ 01. ...

最新文章

  1. linux bash字符串截取
  2. CI Weekly #7 | Instgram/Quora 等大公司如何做持续部署?
  3. Windows 系统版本判断
  4. 【双100%解法】剑指 Offer 22. 链表中倒数第k个节点
  5. HBase shell命令行
  6. 资深架构专家聊架构之道:规划、简化和演化
  7. Selenium自动化测试-JavaScript定位
  8. 尝试从远程计算机访问Web服务不显示调用按钮
  9. html英文字体汇总,笔记 CSS常用中文字体英文名称对照表
  10. VM虚拟机手动配置IP地址
  11. 10年专业导师整理的单片机控制电动机正反转设计类毕业论文文献
  12. 迅雷有linux版本吗,迅雷 - Linux Wiki
  13. ict中的it和ct_ICT.Social – IT专业人员的社交网络
  14. Latex排版论文——傻瓜式操作一晚排出毕业论文
  15. linux计算机连接PEAP企业网的wi-fi
  16. 异构图注意力网络(Heterogeneous Graph Attention Network)
  17. HTTPS中CA证书的签发及使用过程
  18. 用Python写中文数字对照表
  19. 双亲表示法、孩子表示法、孩子兄弟表示法(二叉树表示法),森林和二叉树的转换
  20. 分享我的疯狂Linux内核知识

热门文章

  1. ESP8266-Arduino编程实例-VEML6040颜色传感器驱动
  2. vue项目中如何使用阿里的字体图标iconfont
  3. 2018春运火车票务系统:每天1500亿浏览量,1秒卖票700张
  4. 利用python爬虫电影分析_python 爬虫分析30年香港电影
  5. SpringSecurity(四)——自定义数据源(Filter)
  6. python微信公众号爬虫_微信公众号推送信息爬取---python爬虫
  7. 2022年大一网页期末作业(纯HTML+CSS实现)
  8. 草蟒“逗号表”模块介绍及中文命名随想
  9. Python异步爬虫之协程抓取妹子图片(aiohttp、aiofiles)
  10. 坤湛科技获2000万美元融资 由前阿里云机器智能首席科学家闵万里创办