目录

一、Flume简介

二、Flume架构

2.1 Flume基本组件

2.2 Flume常见数据流模型

三、Source,Channel,Sink 详解

3.1 Source

3.2 Channel

3.3 Sink

四、小结


本文仅用于学习记录总结所用,配置方面有大量参考官方文档,以方便查阅,不喜勿喷。

Flume可以说是配置型框架,通过简单的配置实现数据的收集和发送,比较简单,所以建议大家学习Flume还是学会看官方文档。

一、Flume简介

Flume是一个分布式的,可靠的,高可用的海量日志采集,聚合和传输的系统。它也是Hadoop生态圈中的关键组件之一。

Flume通过可扩展,插件化,组合式,高可用,高容错的设计模式,为用户提供了高效准确的轻量化大数据采集工具。

Flume可以通过简单的配置,收集不同数据源的海量数据,并将数据准确,高效的传入到不同中心存储里面。目前Flume可以对接主流的大数据框架有,Hadoop,Kafka,Spark等。

在使用Flume的过程中,通过配置文件就可以实现整个数据收集过程的负载均衡和故障转移,整个流程不需要修改Flume的任何代码。Flume具有的这些优良的特性,得益于它优秀的架构设计。

Apache 官方的简介

Apache Flume是一个分布式,可靠且可用的系统,用于有效地收集,聚合大量日志数据并将其从许多不同的源移动到集中式数据存储中。

Apache Flume的使用不仅限于日志数据聚合。由于数据源是可定制的,因此Flume可用于传输大量事件数据,包括但不限于网络流量数据,社交媒体生成的数据,电子邮件消息以及几乎所有可能的数据源。

Apache Flume是Apache Software Foundation的顶级项目。

二、Flume架构

Flume事件定义为具有字节有效负载和可选字符串属性集的数据流单位。Flume是使用Java开发的,它的代理是一个(JVM)进程,承载了组件,Event通过这些组件从外部源流到下一个目标(hop)。

2.1 Flume基本组件

Event:数据消息的基本单位,有header(键值对)和body(字节数组)组成。

Agent:Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方

一个Agent包括Source、Channel和Sink,还有一些可选的组件(稍后会介绍),如下图所示。

组件 1 --> Source : 从外部来源读取Event,并写入到Channel当中。

组件 2 --> Channel : Event临时存储组件,Source写入后,Event将会一直保存,直到被Sink成功消费。

组件 3 --> Sink :从Channel读取Event,并写入目的地。

其实在Agent里面我们还可以配置三个可选的组件,分别是Interceptor(拦截器),Selector(选择器),SinkProcessor(event处理器)。

可选组件 1 --> Interceptor(拦截器):可以处理Event中的header信息,可以在Header中添加消息处理的时间戳,还可以添加主机名称,还有一些静态值,也可以根据正则表达式对Event进行过滤,选择那些Event可以继续向后进行传输,也可以决定哪些Event被删除,停止向后传输。Interceptor可以配置多个,多个Interceptor可以顺序执行

可选组件 2 --> Selector(多路复用选择器):Interceptor处理完Event之后会传输给Selector(选择器)。选择器的作用是选择哪种方式把Event写入到Channel当中。

可选组件 3 --> SinkProcessor(event处理器):当Sink从Channel中消费消息之前,可以选择SinkProcessor(event处理器),根据配置可以选择使用故障转移处理器或者负载均衡处理器。然后进Sink消费消息。

2.2 Flume常见数据流模型

2.2.1 基本数据流模型

Flume启动一个Agent,通过Source从外部数据源(如Web服务器)对数据消息(Event)进行采集,然后放在Channel当中临时存储,由Sink从Channel里面进行消费消息,写入到HDFS里面进行持久化存储。

2.2.2 多Agent的复杂流

日志收集中的一种非常常见的情况是,大量的日志生成,客户端将数据发送到连接存储子系统的几个消费者代理。例如,从数百台Web服务器收集的日志发送到许多写入HDFS群集的代理。其中用到的主要架构是多级的Flume流:如下图所示

下面我们举个例子来说明生产中常见的使用场景:

说明:收集多个节点的服务器数据,然后进行汇总,汇总之后再进行持久化存储。(如下图所示)

有三个WebServer服务器分别产生数据,针对这三台服务器分别启动三个Agent,Agent1,Agent2,Agent3对应三个WebServer,每个Agent负责收集对应服务器产生的数据消息(Event),收集完之后,这三个Agent再Sink阶段,都把数据发送到了Agent4,Agent4的作用是负责收集其他三个Agent输出的数据,并把收集到的所有数据写入到HDFS当中。

这里我们看出,Agent不仅可以和外部的数据源和外部的存储机制进行连接,在多个Agent之间也是可以进行连接的,可以根据不同的使用场景对Agent进行灵活的组合。

2.2.3 多路复用流

Flume还支持将Event流复用到一个或多个目的地。这是通过定义一种流多路复用器来实现的,该流多路复用器可以将Event复制或选择性地路由到一个或多个通道。

这个数据流是将不同类型的数据流写入到不同的Channel当中,然后不通的Channel对应的Sink将收集到的数据消息(Event)输出到不同的目的地。

三、Source,Channel,Sink 详解

3.1 Source

对接各种外部数据源,将收集到的Event发送到Channel中,一个Source可以向多个Channel发送Event,Flume内置非常丰富的Source,同时用户可以自定义Source。

篇幅原因,这里只介绍集中常见的Source,其他的可以区官网自行查看。

3.1.1 Avro Source

flume可以多级代理,然后代理与代理之间用Avro去连接。也就是Avro Source可以和上一个Agent进行连接。

Avro Source监听Avro端口接收从外部Avro客户端发送来的数据流。如果与上一层Agent的 Avro Sink 配合使用就组成了一个分层的拓扑结构。 必需的参数已用 粗体 标明。

属性 默认值 解释
channels 与Source绑定的channel,多个用空格分开
type 组件类型,这个是: avro
bind 监听的服务器名hostname或者ip
port 监听的端口
threads 生成的最大工作线程数量
selector.type   可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.*   channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors 该source所使用的拦截器,多个用空格分开
interceptors.*   拦截器的相关属性
compression-type none 可选值: none 或 deflate 。这个类型必须跟Avro Source相匹配
ssl false 设置为 true 可启用SSL加密,如果为true必须指定下面的 keystore 和 keystore-password 。
keystore SSL加密使用的Java keystore文件路径
keystore-password Java keystore的密码
keystore-type JKS Java keystore的类型. 可选值有 JKS 、 PKCS12 。
exclude-protocols SSLv3 指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除
ipFilter false 设置为true可启用ip过滤(netty方式的avro)
ipFilterRules netty ipFilter的配置(参考下面的ipFilterRules详细介绍和例子)

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#avro-source

中文翻译版参考:https://flume.liyifeng.org/#avro-source

3.1.2 Thrift Source

ThriftSource 与Avro Source 基本一致。只要把source的类型改成thrift即可,例如a1.sources.r1.type = thrift 。

监听Thrift 端口,从外部的Thrift客户端接收数据流。如果从上一层的Flume Agent的 Thrift Sink 串联后就创建了一个多层级的Flume架构(同 Avro Source 一样,只不过是协议不同而已)。Thrift Source可以通过配置让它以安全模式(kerberos authentication)运行,具体的配置看下表。 必需的参数已用 粗体 标明。

属性 默认值 解释
channels 与Source绑定的channel,多个用空格分开
type 组件类型,这个是: thrift
bind 监听的 hostname 或 IP 地址
port 监听的端口
threads 生成的最大工作线程数量
selector.type   可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.*   channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors 该source所使用的拦截器,多个用空格分开
interceptors.*   拦截器的相关属性
ssl false 设置为true可启用SSL加密,如果为true必须指定下面的keystore和keystore-password。
keystore SSL加密使用的Java keystore文件路径
keystore-password Java keystore的密码
keystore-type JKS Java keystore的类型. 可选值有 JKS 、 PKCS12
exclude-protocols SSLv3 排除支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除
kerberos false 设置为 true ,开启kerberos 身份验证。在kerberos模式下,成功进行身份验证需要 agent-principal 和 agent-keytab 。 安全模式下的Thrift仅接受来自已启用kerberos且已成功通过kerberos KDC验证的Thrift客户端的连接。
agent-principal 指定Thrift Source使用的kerberos主体用于从kerberos KDC进行身份验证。
agent-keytab —- Thrift Source与Agent主体结合使用的keytab文件位置,用于对kerberos KDC进行身份验证。

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#thrift-source

中文翻译版参考:https://flume.liyifeng.org/#thrift-source

3.1.3 Exec Source

执行Unix命令,获取标准输出,例如:tail -f 。

这个source在启动时运行给定的Unix命令,并期望该进程在标准输出上连续生成数据(stderr 信息会被丢弃,除非属性 logStdErr 设置为 true )。 如果进程因任何原因退出, 则source也会退出并且不会继续生成数据。 综上来看cat [named pipe]或tail -F [file]这两个命令符合要求可以产生所需的结果,而date这种命令可能不会,因为前两个命令(tail 和 cat)能产生持续的数据流,而后者(date这种命令)只会产生单个Event并退出。

提示:

cat [named pipe]和tail -F [file]都能持续地输出内容,那些不能持续输出内容的命令不可以。这里注意一下cat命令后面接的参数是命名管道(named pipe)不是文件。

必需的参数已用 粗体 标明。

属性 默认值 解释
channels 与Source绑定的channel,多个用空格分开
type 组件类型,这个是: exec
command 所使用的系统命令,一般是cat 或者tail
shell 设置用于运行命令的shell。 例如 / bin / sh -c。 仅适用于依赖shell功能的命令,如通配符、后退标记、管道等。
restartThrottle 10000 尝试重新启动之前等待的时间(毫秒)
restart false 如果执行命令线程挂掉,是否重启
logStdErr false 是否会记录命令的stderr内容
batchSize 20 读取并向channel发送数据时单次发送的最大数量
batchTimeout 3000 向下游推送数据时,单次批量发送Event的最大等待时间(毫秒),如果等待了batchTimeout毫秒后未达到一次批量发送数量,则仍然执行发送操作。
selector.type replicating 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.*   channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors 该source所使用的拦截器,多个用空格分开
interceptors.*   拦截器相关的属性配置

警告

ExecSource相比于其他异步source的问题在于,如果无法将Event放入Channel中,ExecSource无法保证客户端知道它。在这种情况下数据会丢失。例如,最常见的用法是用tail -F [file]这种,应用程序负责向磁盘写入日志文件, Flume 会用tail命令从日志文件尾部读取,将每行作为一个Event发送。这里有一个明显的问题:如果channel满了然后无法继续发送Event,会发生什么?由于种种原因,Flume无法向输出日志文件的应用程序指示它需要保留日志或某些Event尚未发送。 总之你需要知道:当使用ExecSource等单向异步接口时,您的应用程序永远无法保证数据已经被成功接收!作为此警告的延伸,此source传递Event时没有交付保证。为了获得更强的可靠性保证,请考虑使用 Spooling Directory Source, Taildir Source 或通过SDK直接与Flume集成。

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#exec-source

中文翻译版参考:https://flume.liyifeng.org/#exec-source

3.1.4 JMS Source

从JMS源获取数据。

JMS Source是一个可以从JMS的队列或者topic中读取消息的组件。按理说JMS Source作为一个JMS的应用应该是能够与任意的JMS消息队列无缝衔接工作的,可事实上目前仅在ActiveMQ上做了测试。 JMS Source支持配置batch size、message selector、user/pass和Event数据的转换器(converter)。 注意所使用的JMS队列的jar包需要在Flume实例的classpath中,建议放在专门的插件目录plugins.d下面,或者启动时候用-classpath指定,或者编辑flume-env.sh文件的FLUME_CLASSPATH来设置。

必需的参数已用 粗体 标明。

属性 默认值 解释
channels 与Source绑定的channel,多个用空格分开
type 组件类型,这个是: jms
initialContextFactory 初始上下文工厂类,比如: org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory 连接工厂应显示为的JNDI名称
providerURL JMS 的连接URL
destinationName 目的地名称
destinationType 目的地类型, queue 或 topic
messageSelector 创建消费者时使用的消息选择器
userName 连接JMS队列时的用户名
passwordFile 连接JMS队列时的密码文件,注意是文件名不是密码的明文
batchSize 100 消费JMS消息时单次发送的Event数量
converter.type DEFAULT 用来转换JMS消息为Event的转换器类,参考下面参数。
converter.* 转换器相关的属性
converter.charset UTF-8 转换器把JMS的文本消息转换为byte arrays时候使用的编码,默认转换器的专属参数
createDurableSubscription false 是否创建持久化订阅。 持久化订阅只能在 destinationType = topic 时使用。 如果为 true ,则必须配置 clientId 和 durableSubscriptionName
clientId 连接创建后立即给JMS客户端设置标识符。持久化订阅必配参数。
durableSubscriptionName 用于标识持久订阅的名称。持久化订阅必配参数。

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#jms-source

中文翻译版参考:https://flume.liyifeng.org/#jms-source

3.1.5 Spooling Directory Source

监听目录下的新增文件。

这个Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。在新文件被完全读入Channel之后会重命名该文件以示完成(也可以配置成读完后立即删除)。

与Exec Source不同,Spooling Directory Source是可靠的,即使Flume重新启动或被kill,也不会丢失数据。同时作为这种可靠性的代价,指定目录中的文件必须是不可变的、唯一命名的。Flume会自动检测避免这种情况发生,如果发现问题,则会抛出异常:

  1. 如果文件在写入完成后又被再次写入新内容,Flume将向其日志文件(这是指Flume自己logs目录下的日志文件)打印错误并停止处理。

  2. 如果在以后重新使用以前的文件名,Flume将向其日志文件打印错误并停止处理。

为了避免上述问题,生成新文件的时候文件名加上时间戳是个不错的办法。

尽管有这个Source的可靠性保证,但是仍然存在这样的情况,某些下游故障发生时会出现重复Event的情况。这与其他Flume组件提供的保证是一致的。

属性名 默认值 解释
channels 与Source绑定的channel,多个用空格分开
type 组件类型,这个是: spooldir.
spoolDir Flume Source监控的文件夹目录,该目录下的文件会被Flume收集
fileSuffix .COMPLETED 被Flume收集完成的文件被重命名的后缀。1.txt被Flume收集完成后会重命名为1.txt.COMPLETED
deletePolicy never 是否删除已完成收集的文件,可选值: never 或 immediate
fileHeader false 是否添加文件的绝对路径名(绝对路径+文件名)到header中。
fileHeaderKey file 添加绝对路径名到header里面所使用的key(配合上面的fileHeader一起使用)
basenameHeader false 是否添加文件名(只是文件名,不包括路径)到header 中
basenameHeaderKey basename 添加文件名到header里面所使用的key(配合上面的basenameHeader一起使用)
includePattern ^.*$ 指定会被收集的文件名正则表达式,它跟下面的ignorePattern不冲突,可以一起使用。如果一个文件名同时被这两个正则匹配到,则会被忽略,换句话说ignorePattern的优先级更高
ignorePattern ^$ 指定要忽略的文件名称正则表达式。它可以跟 includePattern 一起使用,如果一个文件被 ignorePattern 和 includePattern 两个正则都匹配到,这个文件会被忽略。
trackerDir .flumespool 用于存储与文件处理相关的元数据的目录。如果配置的是相对目录地址,它会在spoolDir中开始创建
consumeOrder oldest 设定收集目录内文件的顺序。默认是“先来先走”(也就是最早生成的文件最先被收集),可选值有: oldest 、 youngest 和 random 。当使用oldest和youngest这两种选项的时候,Flume会扫描整个文件夹进行对比排序,当文件夹里面有大量的文件的时候可能会运行缓慢。 当使用random时候,如果一直在产生新的文件,有一部分老文件可能会很久才会被收集
pollDelay 500 Flume监视目录内新文件产生的时间间隔,单位:毫秒
recursiveDirectorySearch false 是否收集子目录下的日志文件
maxBackoff 4000 等待写入channel的最长退避时间,如果channel已满实例启动时会自动设定一个很低的值,当遇到ChannelException异常时会自动以指数级增加这个超时时间,直到达到设定的这个最大值为止。
batchSize 100 每次批量传输到channel时的size大小
inputCharset UTF-8 解析器读取文件时使用的编码(解析器会把所有文件当做文本读取)
decodeErrorPolicy FAIL 当从文件读取时遇到不可解析的字符时如何处理。 FAIL :抛出异常,解析文件失败; REPLACE :替换掉这些无法解析的字符,通常是用U+FFFD; IGNORE :忽略无法解析的字符。
deserializer LINE 指定一个把文件中的数据行解析成Event的解析器。默认是把每一行当做一个Event进行解析,所有解析器必须实现EventDeserializer.Builder接口
deserializer.*   解析器的相关属性,根据解析器不同而不同
bufferMaxLines (已废弃)
bufferMaxLineLength 5000 (已废弃)每行的最大长度。改用 deserializer.maxLineLength 代替
selector.type replicating 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.*   channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors 该source所使用的拦截器,多个用空格分开
interceptors.*   拦截器相关的属性配置

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#spooling-directory-source

中文翻译版参考:https://flume.liyifeng.org/#spooling-directory-source

3.1.6 Taildir Source

用于监听目录或文件。

Taildir Source监控指定的一些文件,并在检测到新的一行数据产生的时候几乎实时地读取它们,如果新的一行数据还没写完,Taildir Source会等到这行写完后再读取。

Taildir Source是可靠的,即使发生文件轮换(译者注1)也不会丢失数据。它会定期地以JSON格式在一个专门用于定位的文件上记录每个文件的最后读取位置。如果Flume由于某种原因停止或挂掉,它可以从文件的标记位置重新开始读取。

Taildir Source还可以从任意指定的位置开始读取文件。默认情况下,它将从每个文件的第一行开始读取。

文件按照修改时间的顺序来读取。修改时间最早的文件将最先被读取(简单记成:先来先走)。

Taildir Source不重命名、删除或修改它监控的文件。当前不支持读取二进制文件。只能逐行读取文本文件。

需要注意的是:Taildir Source目前只是个预览版本,还不能运行在windows系统上。

属性名 默认值 解释
channels 与Source绑定的channel,多个用空格分开
type 组件类型,这个是: TAILDIR.
filegroups 被监控的文件夹目录集合,这些文件夹下的文件都会被监控,多个用空格分隔
filegroups.<filegroupName> 被监控文件夹的绝对路径。正则表达式(注意不会匹配文件系统的目录)只是用来匹配文件名
positionFile ~/.flume/taildir_position.json 用来设定一个记录每个文件的绝对路径和最近一次读取位置inode的文件,这个文件是JSON格式。
headers.<filegroupName>.<headerKey> 给某个文件组下的Event添加一个固定的键值对到header中,值就是value。一个文件组可以配置多个键值对。
byteOffsetHeader false 是否把读取数据行的字节偏移量记录到Event的header里面,这个header的key是byteoffset
skipToEnd false 如果在 positionFile 里面没有记录某个文件的读取位置,是否直接跳到文件末尾开始读取
idleTimeout 120000 关闭非活动文件的超时时间(毫秒)。如果被关闭的文件重新写入了新的数据行,会被重新打开
writePosInterval 3000 向 positionFile 记录文件的读取位置的间隔时间(毫秒)
batchSize 100 一次读取数据行和写入channel的最大数量,通常使用默认值就很好
backoffSleepIncrement 1000 在最后一次尝试未发现任何新数据时,重新尝试轮询新数据之前的时间延迟增量(毫秒)
maxBackoffSleep 5000 每次重新尝试轮询新数据时的最大时间延迟(毫秒)
cachePatternMatching true 对于包含数千个文件的目录,列出目录并应用文件名正则表达式模式可能非常耗时。 缓存匹配文件列表可以提高性能。 消耗文件的顺序也将被缓存。 要求文件系统支持以至少秒级跟踪修改时间。
fileHeader false 是否在header里面存储文件的绝对路径
fileHeaderKey file 文件的绝对路径存储到header里面使用的key

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#taildir-source

中文翻译版参考:https://flume.liyifeng.org/#taildir-source

3.1.7 Kafka Source

读取Kafka数据。

Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息。 如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。

属性名 默认值 解释
channels 与Source绑定的channel,多个用空格分开
type 组件类型,这个是: org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers Source使用的Kafka集群实例列表
kafka.consumer.group.id flume 消费组的唯一标识符。如果有多个source或者Agent设定了相同的ID,表示它们是同一个消费者组
kafka.topics 将要读取消息的目标 Kafka topic 列表,多个用逗号分隔
kafka.topics.regex 会被Kafka Source订阅的 topic 集合的正则表达式。这个参数比 kafka.topics 拥有更高的优先级,如果这两个参数同时存在,则会覆盖kafka.topics的配置。
batchSize 1000 一批写入 channel 的最大消息数
batchDurationMillis 1000 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。
backoffSleepIncrement 1000 当Kafka topic 显示为空时触发的初始和增量等待时间(毫秒)。等待时间可以避免对Kafka topic的频繁ping操作。默认的1秒钟对于获取数据比较合适, 但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。
maxBackoffSleep 5000 Kafka topic 显示为空时触发的最长等待时间(毫秒)。默认的5秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。
useFlumeEventFormat false 默认情况下,从 Kafka topic 里面读取到的内容直接以字节数组的形式赋值给Event。如果设置为true,会以Flume Avro二进制格式进行读取。与Kafka Sink上的同名参数或者 Kafka channel 的parseAsFlumeEvent参数相关联,这样以对象的形式处理能使生成端发送过来的Event header信息得以保留。
setTopicHeader true 当设置为true时,会把存储Event的topic名字存储到header中,使用的key就是下面的 topicHeader 的值。
topicHeader topic 如果 setTopicHeader 设置为 true ,则定义用于存储接收消息的 topic 使用header key。注意如果与 Kafka Sink 的 topicHeader 参数一起使用的时候要小心,避免又循环将消息又发送回 topic。
migrateZookeeperOffsets true 如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过kafka.consumer.auto.offset.reset配置如何处理偏移量。可以从 Kafka documentation 查看更多详细信息。
kafka.consumer.security.protocol PLAINTEXT 设置使用哪种安全协议写入Kafka。可选值:SASL_PLAINTEXT、SASL_SSL 和 SSL有关安全设置的其他信息,请参见下文。
more consumer security props   如果使用了SASL_PLAINTEXT、SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为消费者增加安全相关的参数配置
Other Kafka Consumer Properties 其他一些 Kafka 消费者配置参数。任何 Kafka 支持的消费者参数都可以使用。唯一的要求是使用“kafka.consumer.”这个前缀来配置参数,比如: kafka.consumer.auto.offset.reset

注意

Kafka Source 覆盖了两个Kafka 消费者的参数:auto.commit.enable 这个参数被设置成了false,Kafka Source 会提交每一个批处理。Kafka Source 保证至少一次消息恢复策略。 Source 启动时可以存在重复项。Kafka Source 还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和 value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值,不建议修改这些参数。

已经弃用的一些属性:

属性名 默认值 解释
topic 改用 kafka.topics
groupId flume 改用 kafka.consumer.group.id
zookeeperConnect 自0.9.x起不再受kafka消费者客户端的支持。以后使用kafka.bootstrap.servers与kafka集群建立连接

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#kafka-source

中文翻译版参考:https://flume.liyifeng.org/#kafka-source

3.1.8 HTTP Source

启动一个HTTPServer。

这个Source从HTTP POST 和 GET请求里面解析 Event,GET方式目前还只是实验性的。把HTTP请求解析成Event是通过配置一个“handler”来实现的,这个“handler”必须实现 HTTPSourceHandler 接口, 这个接口其实就一个方法,收到一个HttpServletRequest后解析出一个 Event 的List。从一次请求解析出来的若干个Event会以一个事务提交到channel, 从而在诸如『文件channel』的一些channel上提高效率。如果handler抛出异常,这个HTTP的响应状态码是400。如果channel满了或者无法发送Event到channel,此时会返回HTTP状态码503(服务暂时不可用)。

在一个POST请求中发送的所有 Event 视为一个批处理,并在一个事务中插入到 channel。

属性 默认值 解释
channels 与Source绑定的channel,多个用空格分开
type   组件类型,这个是: http
port 要监听的端口
bind 0.0.0.0 要监听的hostname或者IP地址
handler org.apache.flume.source.http.JSONHandler 所使用的handler,需填写handler的全限定类名
handler.* handler的一些属性配置
selector.type replicating 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.*   channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors 该source所使用的拦截器,多个用空格分开
interceptors.*   拦截器相关的属性配
enableSSL false 设置为true启用SSL,HTTP Source不支持SSLv3协议
excludeProtocols SSLv3 指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除
keystore   keystore 文件的位置
keystorePassword   Keystore 的密码

提示

Flume里面很多组件都明确表示强制不支持SSLv3协议,是因为SSLv3协议的不安全,各大公司很早就表示不再支持了。

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#http-source

中文翻译版参考:https://flume.liyifeng.org/#http-source

3.1.9 其他 Source

除了以上介绍的,Flume还提供了很多类型的Source,比如NetCat TCP Source,NetCat UDP Source,Sequence Generator Source,Syslog Sources,Stress Source,Custom Source,Scribe Source等这里就不逐一介绍了,其他source 可以去官网自行查看:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources

中文翻译版参考:https://flume.liyifeng.org/#flume-sources

3.2 Channel

channel 是在 Agent 上暂存 Event 的缓冲池。 Event由source添加,由sink消费后删除。

channel被设计为event中转暂存区,存储Source收集并且没有被Sink消费的event,为了平衡source收集和sink读取数据的速度,可视为Flume内部的消息队列。

channel是线程安全的并且具有事务性,支持source写失败重复写和sink读失败重复读等操作。

常用的Channel类型:Memory Channel、File Channel、Kafka Channel等。

3.2.1 Memory Channel

内存 channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。 必需的参数已用 粗体 标明。

属性 默认值 解释
type 组件类型,这个是: memory
capacity 100 内存中存储 Event 的最大数
transactionCapacity 100 source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大)
keep-alive 3 添加或删除一个 Event 的超时时间(秒)
byteCapacityBufferPercentage 20 指定 Event header 所占空间大小与 channel 中所有 Event 的总大小之间的百分比
byteCapacity   Channel 中最大允许存储所有 Event 的总字节数(bytes)。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算 Event 的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个 Agent 里面有多个内存 channel 的时候, 而且碰巧这些 channel 存储相同的物理 Event(例如:这些 channel 通过复制机制( 复制选择器 )接收同一个 source 中的 Event), 这时候这些 Event 占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。

提示

举2个例子来帮助理解最后两个参数吧:

两个例子都有共同的前提,假设JVM最大的可用内存是100M(或者说JVM启动时指定了-Xmx=100m)。

例子1: byteCapacityBufferPercentage 设置为20, byteCapacity 设置为52428800(就是50M),此时内存中所有 Event body 的总大小就被限制为50M *(1-20%)=40M,内存channel可用内存是50M。

例子2: byteCapacityBufferPercentage 设置为10, byteCapacity 不设置,此时内存中所有 Event body 的总大小就被限制为100M * 80% *(1-10%)=72M,内存channel可用内存是80M。

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#memory-channel

中文翻译版参考:https://flume.liyifeng.org/#memory-channel

3.2.2 JDBC Channel

JDBC Channel会通过一个数据库把Event持久化存储。目前只支持Derby。这是一个可靠的channel,非常适合那些注重可恢复性的流使用。 必需的参数已用 粗体 标明。

属性 默认值 解释
type 组件类型,这个是: jdbc
db.type DERBY 使用的数据库类型,目前只支持 DERBY.
driver.class org.apache.derby.jdbc.EmbeddedDriver 所使用数据库的 JDBC 驱动类
driver.url (constructed from other properties) JDBC 连接的 URL
db.username “sa” 连接数据库使用的用户名
db.password 连接数据库使用的密码
connection.properties.file JDBC连接属性的配置文件
create.schema true 如果设置为 true ,没有数据表的时候会自动创建
create.index true 是否创建索引来加快查询速度
create.foreignkey true 是否创建外键
transaction.isolation “READ_COMMITTED” 面向连接的隔离级别,可选值: READ_UNCOMMITTED , READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ
maximum.connections 10 数据库的最大连接数
maximum.capacity 0 (unlimited) channel 中存储 Event 的最大数
sysprop.*   针对不同DB的特定属性
sysprop.user.home   Derby 的存储主路径

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#jdbc-channel

中文翻译版参考:https://flume.liyifeng.org/#jdbc-channel

3.2.3 Kafka Channel

将 Event 存储到Kafka集群(必须单独安装)。Kafka提供了高可用性和复制机制,因此如果Flume实例或者 Kafka 的实例挂掉,能保证Event数据随时可用。 Kafka channel可以用于多种场景:

  1. 与source和sink一起:给所有Event提供一个可靠、高可用的channel。
  2. 与source、interceptor一起,但是没有sink:可以把所有Event写入到Kafka的topic中,来给其他的应用使用。
  3. 与sink一起,但是没有source:提供了一种低延迟、容错高的方式将Event发送的各种Sink上,比如:HDFS、HBase、Solr。

由于依赖于该版本附带的Kafka客户端,Flume1.8需要Kafka 0.9或更高版本。 与之前的Flume版本相比,channel的配置发生了一些变化。

配置参数组织如下:

  1. 通常与channel相关的配置值应用于channel配置级别,比如:a1.channel.k1.type =
  2. 与Kafka相关的配置值或Channel运行的以“kafka.”为前缀(这与CommonClient Configs类似),例如:a1.channels.k1.kafka.topic 和 a1.channels.k1.kafka.bootstrap.servers。 这与hdfs sink的运行方式没有什么不同
  3. 特定于生产者/消费者的属性以kafka.producer或kafka.consumer为前缀
  4. 可能的话,使用Kafka的参数名称,例如:bootstrap.servers 和 acks

当前Flume版本是向下兼容的,但是第二个表中列出了一些不推荐使用的属性,并且当它们出现在配置文件中时,会在启动时打印警告日志。

必需的参数已用 粗体 标明。

属性 默认值 解释
type 组件类型,这个是: org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers channel使用的Kafka集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用支持。格式为hostname:port,多个用逗号分隔
kafka.topic flume-channel channel使用的Kafka topic
kafka.consumer.group.id flume channel 用于向 Kafka 注册的消费者群组ID。 多个 channel 必须使用相同的 topic 和 group,以确保当一个Flume实例发生故障时,另一个实例可以获取数据。请注意,使用相同组ID的非channel消费者可能会导致数据丢失。
parseAsFlumeEvent true 是否以avro基准的 Flume Event 格式在channel中存储Event。 如果是Flume的Source向channel的topic写入Event则应设置为true; 如果其他生产者也在向channel的topic写入Event则应设置为false。 通过使用 flume-ng-sdk 中的 org.apache.flume.source.avro.AvroFlumeEvent 可以在Kafka之外解析出Flume source的信息。
migrateZookeeperOffsets true 如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过 kafka.consumer.auto.offset.reset 配置如何处理偏移量。
pollTimeout 500 消费者调用poll()方法时的超时时间(毫秒) https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
defaultPartitionId 指定channel中所有Event将要存储的分区ID,除非被 partitionIdHeader 参数的配置覆盖。 默认情况下,如果没有设置此参数,Event 会被Kafka生产者的分发程序分发,包括key(如果指定了的话),或者被 kafka.partitioner.class 指定的分发程序来分发。
partitionIdHeader 从Event header中读取要存储Event到目标Kafka的分区的属性名。 如果设置了,生产者会从Event header中获取次属性的值,并将消息发送到topic的指定分区。 如果该值表示的分区无效,则Event不会存入channel。如果该值有效,则会覆盖 defaultPartitionId 配置的分区ID。
kafka.consumer.auto.offset.reset latest 当Kafka中没有初始偏移量或者当前偏移量已经不在当前服务器上时(比如数据已经被删除)该怎么办。 earliest:自动重置偏移量到最早的位置; latest:自动重置偏移量到最新的位置; none:如果没有为消费者的组找到任何先前的偏移量,则向消费者抛出异常; else:向消费者抛出异常。
kafka.producer.security.protocol PLAINTEXT 设置使用哪种安全协议写入Kafka。可选值: SASL_PLAINTEXT 、 SASL_SSL 和 SSL 有关安全设置的其他信息,请参见下文。
kafka.consumer.security.protocol PLAINTEXT 与上面的相同,只不过是用于消费者。
more producer/consumer security props   如果使用了 SASL_PLAINTEXT 、 SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为生产者、消费者增加安全相关的参数配置

下表是弃用的一些参数:

属性 默认值 解释
brokerList 改用 kafka.bootstrap.servers
topic flume-channel 改用 kafka.topic
groupId flume 改用 kafka.consumer.group.id
readSmallestOffset false 改用 kafka.consumer.auto.offset.reset

注意

由于channel是负载均衡的,第一次启动时可能会有重复的Event出现。

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#kafka-channel

中文翻译版参考:https://flume.liyifeng.org/#kafka-channel

3.2.4 File Channel

必需的参数已用 粗体 标明。

属性 默认值 解释
type 组件类型,这个是: file.
checkpointDir ~/.flume/file-channel/checkpoint 记录检查点的文件的存储目录
useDualCheckpoints false 是否备份检查点文件。如果设置为 true , backupCheckpointDir 参数必须设置。
backupCheckpointDir 备份检查点的目录。 此目录不能与**数据目录**或检查点目录 checkpointDir 相同
dataDirs ~/.flume/file-channel/data 逗号分隔的目录列表,用于存储日志文件。 在不同物理磁盘上使用多个目录可以提高文件channel的性能
transactionCapacity 10000 channel支持的单个事务最大容量
checkpointInterval 30000 检查点的时间间隔(毫秒)
maxFileSize 2146435071 单个日志文件的最大字节数。这个默认值约等于2047MB
minimumRequiredSpace 524288000 最小空闲空间的字节数。为了避免数据损坏,当空闲空间低于这个值的时候,文件channel将拒绝一切存取请求
capacity 1000000 channel的最大容量
keep-alive 3 存入Event的最大等待时间(秒)
use-log-replay-v1 false (专家)是否使用老的回放逻辑 (Flume默认是使用v2版本的回放方法,但是如果v2版本不能正常工作可以考虑通过这个参数改为使用v1版本,v1版本是从Flume1.2开始启用的,回放是指系统关闭或者崩溃前执行的校验检查点文件和文件channel记录是否一致程序)
use-fast-replay false (专家)是否开启快速回放(不适用队列)
checkpointOnClose true channel关闭时是否创建检查点文件。开启次功能可以避免回放提高下次文件channel启动的速度
encryption.activeKey 加密数据所使用的key名称
encryption.cipherProvider 加密类型,目前只支持:AESCTRNOPADDING
encryption.keyProvider key类型,目前只支持:JCEKSFILE
encryption.keyProvider.keyStoreFile keystore 文件路径
encrpytion.keyProvider.keyStorePasswordFile keystore 密码文件路径
encryption.keyProvider.keys 所有key的列表,包含所有使用过的加密key名称
encyption.keyProvider.keys.*.passwordFile 可选的秘钥密码文件路径

注意

默认情况下,文件channel使用默认的用户主目录内的检查点和数据目录的路径(说的就是上面的checkpointDir参数的默认值)。 如果一个Agent中有多个活动的文件channel实例,而且都是用了默认的检查点文件, 则只有一个实例可以锁定目录并导致其他channel初始化失败。 因此,这时候有必要为所有已配置的channel显式配置不同的检查点文件目录,最好是在不同的磁盘上。 此外,由于文件channel将在每次提交后会同步到磁盘,因此将其与将Event一起批处理的sink/source耦合可能是必要的,以便在多个磁盘不可用于检查点和数据目录时提供良好的性能。

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#file-channel

中文翻译版参考:https://flume.liyifeng.org/#file-channel

3.2.5 Spillable Memory Channel

这个channel会将Event存储在内存队列和磁盘上。 内存队列充当主存储,内存装满之后会存到磁盘。 磁盘存储使用嵌入的文件channel进行管理。 当内存队列已满时,其他传入Event将存储在文件channel中。 这个channel非常适用于需要高吞吐量存储器channel的流,但同时需要更大容量的文件channel,以便更好地容忍间歇性目的地侧(sink)中断或消费速率降低。 在这种异常情况下,吞吐量将大致降低到文件channel速度。 如果Agent程序崩溃或重新启动,只有存储在磁盘上的Event能恢复。 这个channel目前是实验性的,不建议用于生产环境 。

必需的参数已用 粗体 标明。有关其他必需属性,请参阅文件channel

提示

这个channel的机制十分像Windows系统里面的「虚拟内存」。兼顾了内存channel的高吞吐量和文件channel的可靠、大容量优势。

属性 默认值 解释
type 组件类型,这个是: SPILLABLEMEMORY
memoryCapacity 10000 内存队列存储的Event最大数量。如果设置为0,则会禁用内存队列。
overflowCapacity 100000000 磁盘(比如文件channel)上存储Event的最大数量,如果设置为0,则会禁用磁盘存储
overflowTimeout 3 当内存占满时启用磁盘存储之前等待的最大秒数
byteCapacityBufferPercentage 20 指定Event header所占空间大小与channel中所有Event的总大小之间的百分比
byteCapacity   内存中最大允许存储Event的总字节数。 默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算Event的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。 注意,当你在一个Agent里面有多个内存channel的时候,而且碰巧这些channel存储相同的物理Event(例如:这些channel通过复制机制(复制选择器)接收同一个source中的Event), 这时候这些Event占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。
avgEventSize 500 估计进入channel的Event的平均大小(单位:字节)
<file channel properties> see file channel 可以使用除“keep-alive”和“capacity”之外的任何文件channel属性。 文件channel的“keep-alive”由Spillable Memory Channel管理, 而channel容量则是通过使用 overflowCapacity 来设置。

如果达到 memoryCapacity 或 byteCapacity 限制,则内存队列被视为已满。

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#spillable-memory-channel

中文翻译版参考:https://flume.liyifeng.org/#spillable-memory-channel

3.2.6 Pseudo Transaction Channel

 这个伪事务 channel 仅用于单元测试目的,不适用于生产用途。

必需的参数已用 粗体 标明。

属性 默认值 解释
type 组件类型,这个是: org.apache.flume.channel.PseudoTxnMemoryChannel
capacity 50 channel中存储的最大Event数
keep-alive 3 添加或删除Event的超时时间(秒)

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#pseudo-transaction-channel

中文翻译版参考:https://flume.liyifeng.org/#pseudo-transaction-channel

3.2.7 Custom Channel

可以自己实现Channel接口来自定义一个channel,启动时这个自定义channel类以及依赖必须都放在flume Agent的classpath中。 必需的参数已用 粗体 标明。

属性 默认值 解释
type 你自己实现的channel类的全限定类名,比如:org.example.myCustomChannel

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#custom-channel

中文翻译版参考:https://flume.liyifeng.org/#custom-channel

3.3 Sink

篇幅原因,这里只列几种常见的Sink,其他类型的Sink没有详细介绍,有需要可以去官方文档看一下。

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sinks

中文翻译版参考:https://flume.liyifeng.org/#flume-sinks

3.3.1 HDFS Sink

这个Sink将Event写入Hadoop分布式文件系统(也就是HDFS)。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件(关闭当前文件并创建新文件)。 它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符,会由HDFS Sink进行动态地替换,以生成用于存储Event的目录或文件名。 使用此Sink需要安装hadoop, 以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。 注意, 需要使用支持sync() 调用的Hadoop版本 。

以下是支持的转义符:

转义符 解释
%{host} Event header中key为host的值。这个host可以是任意的key,只要header中有就能读取,比如%{aabc}将读取header中key为aabc的值
%t 毫秒值的时间戳(同 System.currentTimeMillis() 方法)
%a 星期的缩写(Mon、Tue等)
%A 星期的全拼(Monday、 Tuesday等)
%b 月份的缩写(Jan、 Feb等)
%B 月份的全拼(January、February等)
%c 日期和时间(Thu Feb 14 23:05:25 2019)
%d 月份中的天(00到31)
%e 月份中的天(1到31)
%D 日期,与%m/%d/%y相同 ,例如:02/09/19
%H 小时(00到23)
%I 小时(01到12)
%j 年中的天数(001到366)
%k 小时(0到23),注意跟 %H的区别
%m 月份(01到12)
%n 月份(1到12)
%M 分钟(00到59)
%p am或者pm
%s unix时间戳,是秒值。比如2019/2/14 18:15:49的unix时间戳是:1550139349
%S 秒(00到59)
%y 一年中的最后两位数(00到99),比如1998年的%y就是98
%Y 年(2010这种格式)
%z 数字时区(比如:-0400)
%[localhost] Agent实例所在主机的hostname
%[IP] Agent实例所在主机的IP
%[FQDN] Agent实例所在主机的规范hostname

注意,%[localhost], %[IP] 和 %[FQDN]这三个转义符实际上都是用java的API来获取的,在一些网络环境下可能会获取失败。

正在打开的文件会在名称末尾加上“.tmp”的后缀。文件关闭后,会自动删除此扩展名。这样容易排除目录中的那些已完成的文件。 必需的参数已用 粗体 标明。

注意

对于所有与时间相关的转义字符,Event header中必须存在带有“timestamp”键的属性(除非 hdfs.useLocalTimeStamp 设置为 true )。快速自动添加此时间戳的一种方法是使用 时间戳添加拦截器 。

属性名 默认值 解释
channel 与 Sink 连接的 channel
type 组件类型,这个是: hdfs
hdfs.path HDFS目录路径(例如:hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Flume在HDFS文件夹下创建新文件的固定前缀
hdfs.fileSuffix Flume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置)
hdfs.inUsePrefix Flume正在写入的临时文件前缀,默认没有
hdfs.inUseSuffix .tmp Flume正在写入的临时文件后缀
hdfs.rollInterval 30 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize 1024 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount 10 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
hdfs.idleTimeout 0 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒
hdfs.batchSize 100 向 HDFS 写入内容时每次批量操作的 Event 数量
hdfs.codeC 压缩算法。可选值:gzip 、 bzip2 、 lzo 、 lzop` 、 ``snappy
hdfs.fileType SequenceFile 文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数
hdfs.maxOpenFiles 5000 允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭
hdfs.minBlockReplicas 指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。
hdfs.writeFormat Writable 文件写入格式。可选值: Text 、 Writable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。
hdfs.callTimeout 10000 允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒)
hdfs.threadsPoolSize 10 每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)
hdfs.rollTimerPoolSize 1 每个HDFS Sink实例调度定时文件滚动的线程数
hdfs.kerberosPrincipal 用于安全访问 HDFS 的 Kerberos 用户主体
hdfs.kerberosKeytab 用于安全访问 HDFS 的 Kerberos keytab 文件
hdfs.proxyUser   代理名
hdfs.round false 是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)
hdfs.roundValue 1 向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30
hdfs.roundUnit second 向下舍入的单位,可选值: second 、 minute 、 hour
hdfs.timeZone Local Time 解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai
hdfs.useLocalTimeStamp false 使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳)
hdfs.closeTries 0

开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。

如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;

如果设置为0,Sink会一直尝试重命名文件直到成功为止;

关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。

hdfs.retryInterval 180 连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。
serializer TEXT Event 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。
serializer.*   根据上面 serializer 配置的类型来根据需要添加序列化器的参数

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#hdfs-sink

中文翻译版参考:https://flume.liyifeng.org/#hdfs-sink

3.3.2 Hive Sink

此Sink将包含分隔文本或JSON数据的 Event 直接流式传输到 Hive表或分区上。 Event 使用 Hive事务进行写入, 一旦将一组 Event 提交给Hive,它们就会立即显示给Hive查询。 即将写入的目标分区既可以预先自己创建,也可以选择让 Flume 创建它们,如果没有的话。 写入的 Event 数据中的字段将映射到 Hive表中的相应列。

属性 默认值 解释
channel 与 Sink 连接的 channel
type 组件类型,这个是: hive
hive.metastore Hive metastore URI (eg thrift://a.b.com:9083 )
hive.database Hive 数据库名
hive.table Hive表名
hive.partition 逗号分隔的要写入的分区信息。 比如hive表的分区是(continent: string, country :string, time : string), 那么“Asia,India,2014-02-26-01-21”就表示数据会写入到continent=Asia,country=India,time=2014-02-26-01-21这个分区。
hive.txnsPerBatchAsk 100 Hive从Flume等客户端接收数据流会使用多次事务来操作,而不是只开启一个事务。这个参数指定处理每次请求所开启的事务数量。来自同一个批次中所有事务中的数据最终都在一个文件中。 Flume会向每个事务中写入 batchSize 个 Event,这个参数和 batchSize 一起控制着每个文件的大小,请注意,Hive最终会将这些文件压缩成一个更大的文件。
heartBeatInterval 240 发送到 Hive 的连续心跳检测间隔(秒),以防止未使用的事务过期。设置为0表示禁用心跳。
autoCreatePartitions true Flume 会自动创建必要的 Hive分区以进行流式传输
batchSize 15000 写入一个 Hive事务中最大的 Event 数量
maxOpenConnections 500 允许打开的最大连接数。如果超过此数量,则关闭最近最少使用的连接。
callTimeout 10000 Hive、HDFS I/O操作的超时时间(毫秒),比如:开启事务、写数据、提交事务、取消事务。
serializer   序列化器负责解析 Event 中的字段并把它们映射到 Hive表中的列,选择哪种序列化器取决于 Event 中的数据格式,支持的序列化器有:DELIMITED 和 JSON
round false 是否启用时间戳舍入机制
roundUnit minute 舍入值的单位,可选值:second 、 minute 、 hour
roundValue 1 舍入到小于当前时间的最高倍数(使用 roundUnit 配置的单位) 例子1:roundUnit=second,roundValue=10,则14:31:18这个时间戳会被舍入到14:31:10; 例子2:roundUnit=second,roundValue=30,则14:31:18这个时间戳会被舍入到14:31:00,14:31:42这个时间戳会被舍入到14:31:30;
timeZone Local Time 应用于解析分区中转义序列的时区名称,比如:America/Los_Angeles、Asia/Shanghai、Asia/Tokyo等
useLocalTimeStamp false 替换转义序列时是否使用本地时间戳(否则使用Event header中的timestamp )

下面介绍Hive Sink的两个序列化器:

JSON :处理UTF8编码的 Json 格式(严格语法)Event,不需要配置。 JSON中的对象名称直接映射到Hive表中具有相同名称的列。 内部使用 org.apache.hive.hcatalog.data.JsonSerDe ,但独立于 Hive表的 Serde 。 此序列化程序需要安装 HCatalog。

DELIMITED: 处理简单的分隔文本 Event。 内部使用 LazySimpleSerde,但独立于 Hive表的 Serde。

属性 默认值 解释
serializer.delimiter , (类型:字符串)传入数据中的字段分隔符。 要使用特殊字符,请用双引号括起来,例如“\t”
serializer.fieldnames 从输入字段到Hive表中的列的映射。 指定为Hive表列名称的逗号分隔列表(无空格),按顺序标识输入字段。 要跳过字段,请保留未指定的列名称。 例如, ‘time,,ip,message’表示输入映射到hive表中的 time,ip 和 message 列的第1,第3和第4个字段。
serializer.serdeSeparator Ctrl-A (类型:字符)自定义底层序列化器的分隔符。如果 serializer.fieldnames 中的字段与 Hive表列的顺序相同,则 serializer.delimiter 与 serializer.serdeSeparator 相同, 并且 serializer.fieldnames 中的字段数小于或等于表的字段数量,可以提高效率,因为传入 Event 正文中的字段不需要重新排序以匹配 Hive表列的顺序。 对于’\t’这样的特殊字符使用单引号,要确保输入字段不包含此字符。 注意:如果 serializer.delimiter 是单个字符,最好将本参数也设置为相同的字符。

以下是支持的转义符:

转义符 解释
%{host} Event header中 key 为 host 的值。这个 host 可以是任意的 key,只要 header 中有就能读取,比如%{aabc}将读取 header 中 key 为 aabc 的值
%t 毫秒值的时间戳(同 System.currentTimeMillis() 方法)
%a 星期的缩写(Mon、Tue等)
%A 星期的全拼(Monday、 Tuesday等)
%b 月份的缩写(Jan、 Feb等)
%B 月份的全拼(January、February等)
%c 日期和时间(Thu Feb 14 23:05:25 2019)
%d 月份中的天(00到31)
%D 日期,与%m/%d/%y相同 ,例如:02/09/19
%H 小时(00到23)
%I 小时(01到12)
%j 年中的天数(001到366)
%k 小时(0到23),注意跟 %H 的区别
%m 月份(01到12)
%M 分钟(00到59)
%p am 或者 pm
%s unix时间戳,是秒值。比如:2019/4/1 15:12:47 的unix时间戳是:1554102767
%S 秒(00到59)
%y 一年中的最后两位数(00到99),比如1998年的%y就是98
%Y 年(2010这种格式)
%z 数字时区(比如:-0400)

注意

对于所有与时间相关的转义字符,Event header 中必须存在带有“timestamp”键的属性(除非 useLocalTimeStamp 设置为 true )。快速添加此时间戳的一种方法是使用 时间戳添加拦截器 ( TimestampInterceptor)。

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#hive-sink

中文翻译版参考:https://flume.liyifeng.org/#hive-sink

3.3.3 Logger Sink

使用INFO级别把Event内容输出到日志中,一般用来测试、调试使用。这个 Sink 是唯一一个不需要额外配置就能把 Event 的原始内容输出的Sink,参照 输出原始数据到日志 。

提示

在 输出原始数据到日志 一节中说过,通常在Flume的运行日志里面输出数据流中的原始的数据内容是非常不可取的,所以 Flume 的组件默认都不会这么做。但是总有特殊的情况想要把 Event 内容打印出来,就可以借助这个Logger Sink了。

必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: logger
maxBytesToLog 16 Event body 输出到日志的最大字节数,超出的部分会被丢弃

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#logger-sink

中文翻译版参考:https://flume.liyifeng.org/#logger-sink

3.3.4 Avro Sink

这个Sink可以作为 Flume 分层收集特性的下半部分。发送到此Sink的 Event 将转换为Avro Event发送到指定的主机/端口上。Event 从 channel 中批量获取,数量根据配置的 batch-size 而定。 必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: avro.
hostname 监听的服务器名(hostname)或者 IP
port 监听的端口
batch-size 100 每次批量发送的 Event 数
connect-timeout 20000 第一次连接请求(握手)的超时时间,单位:毫秒
request-timeout 20000 请求超时时间,单位:毫秒
reset-connection-interval none 重置连接到下一跳之前的时间量(秒)。 这将强制 Avro Sink 重新连接到下一跳。 这将允许Sink在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent。
compression-type none 压缩类型。可选值: none 、 deflate 。压缩类型必须与上一级Avro Source 配置的一致
compression-level 6 Event的压缩级别 0:不压缩,1-9:进行压缩,数字越大,压缩率越高
ssl false 设置为 true 表示Sink开启 SSL 下面的 truststore 、 truststore-password 、 truststore-type 就是开启SSL后使用的参数,并且可以指定是否信任所有证书( trust-all-certs )
trust-all-certs false 如果设置为true, 不会检查远程服务器(Avro Source)的SSL服务器证书。不要在生产环境开启这个配置,因为它使攻击者更容易执行中间人攻击并在加密的连接上进行“监听”。
truststore 自定义 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。 如果未指定,将使用缺省 Java JSSE 证书颁发机构文件(通常为Oracle JRE中的“jssecacerts”或“cacerts”)。
truststore-password 上面配置的信任库的密码
truststore-type JKS Java 信任库的类型。可以配成 JKS 或者其他支持的 Java 信任库类型
exclude-protocols SSLv3 要排除的以空格分隔的 SSL/TLS 协议列表。 SSLv3 协议不管是否配置都会被排除掉。
maxIoWorkers 2 * 机器上可用的处理器核心数量 I/O工作线程的最大数量。这个是在 NettyAvroRpcClient 的 NioClientSocketChannelFactory 上配置的。

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#avro-sink

中文翻译版参考:https://flume.liyifeng.org/#avro-sink

3.3.5 Thrift Sink

这个Sink可以作为 Flume 分层收集特性的下半部分。发送到此Sink的 Event 将转换为 Thrift Event 发送到指定的主机/端口上。Event 从 channel 中获取批量获取,数量根据配置的 batch-size 而定。 可以通过启用 kerberos 身份验证将 Thrift Sink 以安全模式启动。如果想以安全模式与 Thrift Source 通信,那么 Thrift Sink 也必须以安全模式运行。 client-principal 和 client-keytab 是 Thrift Sink 用于向 kerberos KDC 进行身份验证的配置参数。 server-principal 表示此Sink将要以安全模式连接的 Thrift Source 的主体,必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: thrift.
hostname 远程 Thrift 服务的主机名或 IP
port 远程 Thrift 的端口
batch-size 100 一起批量发送 Event 数量
connect-timeout 20000 第一次连接请求(握手)的超时时间,单位:毫秒
request-timeout 20000 请求超时时间,单位:毫秒
reset-connection-interval none 重置连接到下一跳之前的时间量(秒)。 这将强制 Thrift Sink 重新连接到下一跳。 允许Sink在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent。
ssl false 设置为 true 表示Sink开启 SSL。下面的 truststore 、 truststore-password 、 truststore-type 就是开启 SSL 后使用的参数
truststore 自定义 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。 如果未指定,将使用缺省 Java JSSE 证书颁发机构文件(通常为 Oracle JRE 中的“jssecacerts”或“cacerts”)。
truststore-password 上面配置的信任库的密码
truststore-type JKS Java 信任库的类型。可以配成 JKS 或者其他支持的 Java 信任库类型
exclude-protocols SSLv3 要排除的以空格分隔的 SSL/TLS 协议列表
kerberos false 设置为 true 开启 kerberos 身份验证。在 kerberos 模式下,需要 client-principal 、 client-keytab 和 server-principal 才能成功进行身份验证并与启用了 kerberos 的 Thrift Source 进行通信。
client-principal —- Thrift Sink 用来向 kerberos KDC 进行身份验证的 kerberos 主体。
client-keytab —- Thrift Sink 与 client-principal 结合使用的 keytab 文件路径,用于对 kerberos KDC 进行身份验证。
server-principal Thrift Sink 将要连接到的 Thrift Source 的 kerberos 主体。

提示

官方英文文档 connection-reset-interval 这个参数是错误的,在源码里面是 reset-connection-interval ,本文档已经纠正。

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#thrift-sink

中文翻译版参考:https://flume.liyifeng.org/#thrift-sink

3.3.6 IRC Sink

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#irc-sink

中文翻译版参考:https://flume.liyifeng.org/#irc-sink

3.3.7 File Roll Sink

把 Event 存储到本地文件系统。 必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: file_roll.
sink.directory Event 将要保存的目录
sink.pathManager DEFAULT

配置使用哪个路径管理器,这个管理器的作用是按照规则生成新的存储文件名称,可选值有: default 、 rolltime

default规则:prefix+当前毫秒值+“-”+文件序号+“.”+extension;

rolltime规则:prefix+yyyyMMddHHmmss+“-”+文件序号+“.”+extension;

注:prefix 和 extension 如果没有配置则不会附带

sink.pathManager.extension 如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的扩展名
sink.pathManager.prefix 如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的文件名的固定前缀
sink.rollInterval 30 表示每隔30秒创建一个新文件进行存储。如果设置为0,表示所有 Event 都会写到一个文件中。
sink.serializer TEXT 配置 Event 序列化器,可选值有:text 、 header_and_text 、 avro_event 或者自定义实现了 EventSerializer.Builder 接口的序列化器的全限定类名.。 text 只会把 Event 的 body 的文本内容序列化; header_and_text 会把 header 和 body 内容都序列化。
batchSize 100 每次请求批处理的 Event 数

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#file-roll-sink

中文翻译版参考:https://flume.liyifeng.org/#file-roll-sink

3.3.8 Null Sink

丢弃所有从 channel 读取到的 Event。 必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: null.
batchSize 100 每次批处理的 Event 数量

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#null-sink

中文翻译版参考:https://flume.liyifeng.org/#null-sink

3.3.9 HBaseSinks

此Sink将数据写入 HBase。 Hbase 配置是从classpath中遇到的第一个 hbase-site.xml 中获取的。 配置指定的 HbaseEventSerializer 接口的实现类用于将 Event 转换为 HBase put 或 increments。 然后将这些 put 和 increments 写入 HBase。 该Sink提供与 HBase 相同的一致性保证,HBase 是当前行的原子性。 如果 Hbase 无法写入某些 Event,则Sink将重试该事务中的所有 Event。

这个Sink支持以安全的方式把数据写入到 HBase。为了使用安全写入模式,运行 Flume 实例的用户必须有写入 HBase 目标表的写入权限。可以在配置中指定用于对 KDC 进行身份验证的主体和密钥表。 Flume 的 classpath 中的 hbase-site.xml 必须将身份验证设置为 kerberos(有关如何执行此操作的详细信息,请参阅HBase文档)。

Flume提供了两个序列化器。第一个序列化器是 SimpleHbaseEventSerializer ( org.apache.flume.sink.hbase.SimpleHbaseEventSerializer ) ,它把 Event body 原样写入到HBase,并可选增加HBase列,这个实现主要就是提供个例子。 第二个序列化器是 RegexHbaseEventSerializer ( org.apache.flume.sink.hbase.RegexHbaseEventSerializer ) ,它把 Event body 按照给定的正则进行分割然后写入到不同的列中。

必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: hbase
table 要写入的 Hbase 表名
columnFamily 要写入的 Hbase 列族
zookeeperQuorum Zookeeper 节点(host:port格式,多个用逗号分隔),hbase-site.xml 中属性 hbase.zookeeper.quorum 的值
znodeParent /hbase ZooKeeper 中 HBase 的 Root ZNode 路径,hbase-site.xml中 zookeeper.znode.parent 的值。
batchSize 100 每个事务写入的 Event 数量
coalesceIncrements false 每次提交时,Sink是否合并多个 increment 到一个 cell。如果有限数量的 cell 有多个 increment ,这样可能会提供更好的性能。
serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer 指定序列化器。默认的increment column = “iCol”, payload column = “pCol”。
serializer.* 序列化器的属性
kerberosPrincipal 以安全方式访问 HBase 的 Kerberos 用户主体
kerberosKeytab 以安全方式访问 HBase 的 Kerberos keytab 文件目录

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#hbasesinks

中文翻译版参考:https://flume.liyifeng.org/#hbasesinks

3.3.10 MorphlineSolrSink

3.3.11 ElasticSearchSink

3.3.12 Kite Dataset Sink

3.3.13 Kafka Sink

这个 Sink 可以把数据发送到 Kafka topic上。目的就是将 Flume 与 Kafka 集成,以便基于拉的处理系统可以处理来自各种 Flume Source 的数据。目前支持 Kafka 0.9.x 发行版。

Flume1.8 不再支持 Kafka 0.9.x(不包括0.9.x)以前的版本。

必需的参数已用 粗体 标明。

属性 默认值 解释
type 组件类型,这个是: org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers Kafka Sink 使用的 Kafka 集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用(HA)支持。格式为 hostname:port,多个用逗号分隔
kafka.topic default-flume-topic 用于发布消息的 Kafka topic 名称 。如果这个参数配置了值,消息就会被发布到这个 topic 上。如果Event header中包含叫做“topic”的属性, Event 就会被发布到 header 中指定的 topic 上,而不会发布到 kafka.topic 指定的 topic 上。支持任意的 header 属性动态替换, 比如%{lyf}就会被 Event header 中叫做“lyf”的属性值替换(如果使用了这种动态替换,建议将 Kafka 的 auto.create.topics.enable 属性设置为 true )。
flumeBatchSize 100 一批中要处理的消息数。设置较大的值可以提高吞吐量,但是会增加延迟。
kafka.producer.acks 1 在考虑成功写入之前,要有多少个副本必须确认消息。可选值, 0 :(从不等待确认); 1 :只等待leader确认; -1 :等待所有副本确认。 设置为-1可以避免某些情况 leader 实例失败的情况下丢失数据。
useFlumeEventFormat false 默认情况下,会直接将 Event body 的字节数组作为消息内容直接发送到 Kafka topic 。如果设置为true,会以 Flume Avro 二进制格式进行读取。 与 Kafka Source 上的同名参数或者 Kafka channel 的 parseAsFlumeEvent 参数相关联,这样以对象的形式处理能使生成端发送过来的 Event header 信息得以保留。
defaultPartitionId 指定所有 Event 将要发送到的 Kafka 分区ID,除非被 partitionIdHeader 参数的配置覆盖。 默认情况下,如果没有设置此参数,Event 会被 Kafka 生产者的分发程序分发,包括 key(如果指定了的话),或者被 kafka.partitioner.class 指定的分发程序来分发
partitionIdHeader 设置后,Sink将使用 Event header 中使用此属性的值命名的字段的值,并将消息发送到 topic 的指定分区。 如果该值表示无效分区,则将抛出 EventDeliveryException。 如果存在标头值,则此设置将覆盖 defaultPartitionId 。假如这个参数设置为“lyf”,这个 Sink 就会读取 Event header 中的 lyf 属性的值,用该值作为分区ID
allowTopicOverride true 如果设置为 true,会读取 Event header 中的名为 topicHeader 的的属性值,用它作为目标 topic。
topicHeader topic 与上面的 allowTopicOverride 一起使用,allowTopicOverride 会用当前参数配置的名字从 Event header 获取该属性的值,来作为目标 topic 名称
kafka.producer.security.protocol PLAINTEXT 设置使用哪种安全协议写入 Kafka。可选值:SASL_PLAINTEXT 、 SASL_SSL 和 SSL, 有关安全设置的其他信息,请参见下文。
more producer security props   如果使用了 SASL_PLAINTEXT 、 SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为生产者增加安全相关的参数配置
Other Kafka Producer Properties 其他一些 Kafka 生产者配置参数。任何 Kafka 支持的生产者参数都可以使用。唯一的要求是使用“kafka.producer.”这个前缀来配置参数,比如:kafka.producer.linger.ms

注意

Kafka Sink使用 Event header 中的 topic 和其他关键属性将 Event 发送到 Kafka。 如果 header 中存在 topic,则会将Event发送到该特定 topic,从而覆盖为Sink配置的 topic。 如果 header 中存在指定分区相关的参数,则Kafka将使用相关参数发送到指定分区。 header中特定参数相同的 Event 将被发送到同一分区。 如果为空,则将 Event 会被发送到随机分区。 Kafka Sink 还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值,不建议修改这些参数。

弃用的一些参数:

属性 默认值 解释
brokerList 改用 kafka.bootstrap.servers
topic default-flume-topic 改用 kafka.topic
batchSize 100 改用 kafka.flumeBatchSize
requiredAcks 1 改用 kafka.producer.acks

Kafka Sink还有其他安全与加密、使用TLS、Kerberos安全配置等配置,这里由于篇幅原因就不介绍了,可以去官网详细看下。

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#kafka-sink

中文翻译版参考:https://flume.liyifeng.org/#kafka-sink

3.3.14 HTTP Sink

HTTP Sink 从 channel 中获取 Event,然后再向远程 HTTP 接口 POST 发送请求,Event 内容作为 POST 的正文发送。

错误处理取决于目标服务器返回的HTTP响应代码。 Sink的 退避 和 就绪 状态是可配置的,事务提交/回滚结果以及Event是否发送成功在内部指标计数器中也是可配置的。

状态代码不可读的服务器返回的任何格式错误的 HTTP 响应都将产生 退避 信号,并且不会从 channel 中消耗该Event。

必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: http.
endpoint 将要 POST 提交数据接口的绝对地址
connectTimeout 5000 连接超时(毫秒)
requestTimeout 5000 一次请求操作的最大超时时间(毫秒)
contentTypeHeader text/plain HTTP请求的Content-Type请求头
acceptHeader text/plain HTTP请求的Accept 请求头
defaultBackoff true 是否默认启用退避机制,如果配置的 backoff.CODE 没有匹配到某个 http 状态码,默认就会使用这个参数值来决定是否退避
defaultRollback true 是否默认启用回滚机制,如果配置的 rollback.CODE 没有匹配到某个 http 状态码,默认会使用这个参数值来决定是否回滚
defaultIncrementMetrics false 是否默认进行统计计数,如果配置的 incrementMetrics.CODE 没有匹配到某个 http 状态码,默认会使用这个参数值来决定是否参与计数
backoff.CODE 配置某个 http 状态码是否启用退避机制(支持200这种精确匹配和2XX一组状态码匹配模式)
rollback.CODE 配置某个 http 状态码是否启用回滚机制(支持200这种精确匹配和2XX一组状态码匹配模式)
incrementMetrics.CODE 配置某个 http 状态码是否参与计数(支持200这种精确匹配和2XX一组状态码匹配模式)

注意 backoff, rollback 和 incrementMetrics 的 code 配置通常都是用具体的HTTP状态码,如果2xx和200这两种配置同时存在,则200的状态码会被精确匹配,其余200~299(除了200以外)之间的状态码会被2xx匹配。

提示

Flume里面好多组件都有这个退避机制,其实就是下一级目标没有按照预期执行的时候,会执行一个延迟操作。比如向HTTP接口提交数据发生了错误触发了退避机制生效,系统等待30秒再执行后续的提交操作, 如果再次发生错误则等待的时间会翻倍,直到达到系统设置的最大等待上限。通常在重试成功后退避就会被重置,下次遇到错误重新开始计算等待的时间。

任何空的或者为 null 的 Event 不会被提交到HTTP接口上。

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#http-sink

中文翻译版参考:https://flume.liyifeng.org/#http-sink

3.3.15 Custom Sink

你可以自己写一个 Sink 接口的实现类。启动 Flume 时候必须把你自定义 Sink 所依赖的其他类配置进 classpath 内。custom source 在写配置文件的 type 时候填你的全限定类名。 必需的参数已用 粗体 标明。

属性 默认值 解释
channel 与 Sink 绑定的 channe
type 组件类型,这个填你自定义class的全限定类名

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1

官网配置举例参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#custom-sink

中文翻译版参考:https://flume.liyifeng.org/#custom-sink

四、小结

关于Flume的介绍到这里就结束了,首先感谢大家的阅读,由于为了方便查阅加了一些官方的配置介绍,导致篇幅有点长,不过这些配置应该是选择性的查看,用到哪个看哪个就行,Flume使用起来还是比较容易一点的,希望可以 帮助到大家!

关于Flume的启动,这里列举一条 kafkaSource-kafkachannel-hbaseSink:

flume-ng agent  --conf conf --conf-file conf/flume-kafkaSource-channel-hbaseSink.conf --name kafkaToHbaseagent -Dflume.root.logger=INFO,console

可以参考:https://blog.csdn.net/qianshangding0708/article/details/48088611/

flume-kafkaSource-channel-hbaseSink.conf

# ------------------- 定义数据流----------------------
# source的名字
agent.sources=kafkaSource
# channels的名字,建议按照type来命名
agent.channels=kafkaChannel
# sink的名字,建议按照目标来命名
agent.sinks=hbaseSink#-------- kafkaSource相关配置-----------------# 与Source绑定的channel,多个用空格分开
agent.sources.kafkaSource.channels=kafkaChannel
# 定义消息源类型
agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource
# Source使用的Kafka集群实例列表
agent.sources.kafkaSource.kafka.bootstrap.servers=192.168.183.150:9092,192.168.183.151:9092,192.168.183.152:9092
# 配置消费者组的id
agent.sources.kafkaSource.kafka.consumer.group.id=groupId_kafkaSource_flume
# 将要读取消息的目标 Kafka topic 列表,多个用逗号分隔
agent.sources.kafkaSource.kafka.topics=tpoic_sensors
# 消费超时时间,参照如下写法可以配置其他所有kafka的consumer选项。注意格式从kafka.xxx开始是consumer的配置属性
agent.sources.kafkaSource.batchDurationMillis = 2000#------- kafkaChannel相关配置-------------------------# channel类型
agent.channels.kafkaChannel.type = org.aprache.flume.channel.kafka.KafkaChannel
# channel使用的Kafka集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用支持。格式为hostname:port,多个用逗号分隔
agent.channels.kafkaChannel.kafka.bootstrap.servers=192.168.183.150:9092,192.168.183.151:9092,192.168.183.152:9092
# 指定channel使用的Kafka topic
agent.channels.kafkaChannel.kafka.topic=topic_kafkaChannel
# channel 用于向 Kafka 注册的消费者群组ID。 多个 channel 必须使用相同的 topic 和 group,以确保当一个Flume实例发生故障时,另一个实例可以获取数据。请注意,使用相同组ID的非channel消费者可能会导致数据丢失。
agent.channels.kafkaChannel.kafka.consumer.group.id=groupId_kafkaChannel_flume#---------hbaseSink 相关配置------------------# 指定sink需要使用的channel的名字,注意这里是channel
agent.sinks.hbaseSink.channel=kafkaChannel
# 指定sink类型。PS:如果使用RegexHbaseEventSerializer只能使用hbase类型
agent.sinks.hbaseSink.type=hbase
# agent.sinks.hbaseSink.type=asynchbase
# 指定hbase中的表名
agent.sinks.hbaseSink.table=sensors
# 指明column family
agent.sinks.hbaseSink.columnFamily=info
# 使用的serializer
agent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
# 如果需要使用正则处理value可以使用以下的serializer
#agent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# 指定正则表达式,这里用的正则是匹配逗号分隔的字符串
#agent.sinks.hbaseSink.serializer.regex= ^([^,]+),([^,]+),([^,]+),([^,]+)$
# 指定在列族中对应的的colName
# agent.sinks.hbaseSink.serializer.colNames=c1,c2,c3
# 指定hbase所用的zk集合
agent.sinks.hbaseSink.zookeeperQuorum = 192.168.183.150:2181,192.168.183.151:2181,192.168.183.152:2181

大数据系列的其他文章:

大数据系列(一)之 ZooKeeper 分布式协调服务

大数据系列(二)之 hdfs 分布式文件系统详解

大数据系列(三)之 Yarn 资源调度框架详解

大数据系列(四)之 MapReduce过程及shuffle详解

大数据系列(五)之 Flume 数据传输

大数据系列(六)之 Spark 分布式计算框架


本文仅用于学习记录总结所用,配置方面有大量参考官方文档,以方便查阅,不喜勿喷。

Flume可以说是配置型框架,通过简单的配置实现数据的收集和发送,比较简单,所以建议大家学习Flume还是学会看官方文档。

附上:Flume官方文档:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

大数据系列(五)之 Flume 数据传输相关推荐

  1. hive退出命令_从零开始学习大数据系列(五十五)使用Hive命令行及内置服务

    [本文2000字左右,预计阅读需要15-20分钟] 让我们来回顾下之前用到的Hive用户的交互接口.Hive用户接口主要有三个:命令行(CLI),客户端(Client)和Web界面(WUI).$HIV ...

  2. Cris 玩转大数据系列之日志收集神器 Flume

    Cris 玩转大数据系列之日志收集神器 Flume Author:Cris 文章目录 Cris 玩转大数据系列之日志收集神器 Flume Author:Cris 1. Flume 概述 1.1 什么是 ...

  3. 多层数组如何遍历_带你从零学大数据系列之Java篇---第五章:数组

    温馨提示:如果想学扎实,一定要从头开始看凯哥的一系列文章(凯哥带你从零学大数据系列),千万不要从中间的某个部分开始看,知识前后是有很大关联,否则学习效果会打折扣. 系列文章第一篇是拥抱大数据:凯哥带你 ...

  4. 大数据系列 之 学习准备

    在学习大数据的过程中,需要具备的能力或者知识,在这里简单的罗列一下: 语言基础:需要会使用shell脚本.java和scala(这俩语言主要是用于日常代码和阅读源代码) 工具:IDE如eclipse或 ...

  5. 大数据系列(一)之hadoop介绍及集群搭建

    大数据系列(一)之hadoop介绍及集群搭建 文章最早发布来源,来源本人原创初版,同一个作者: https://mp.weixin.qq.com/s/fKuKRrpmHrKtxlCPY9rEYg 系列 ...

  6. Cris 玩转大数据系列之消息队列神器 Kafka

    Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 文章目录 Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 1. Kafka 概述 1.1 消息队 ...

  7. sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据

    Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据,本文简单介绍两种方式的pyspark实现. 1.Spark Streami ...

  8. 23篇大数据系列(二)scala基础知识全集(史上最全,建议收藏)

    作者简介: 蓝桥签约作者.大数据&Python领域优质创作者.管理多个大数据技术群,帮助大学生就业和初级程序员解决工作难题. 我的使命与愿景:持续稳定输出,赋能中国技术社区蓬勃发展! 大数据系 ...

  9. jdbc代码_凯哥带你从零学大数据系列之数据库篇---第三章:JDBC基础

    温馨提示:如果想学扎实,一定要从头开始看凯哥的一系列文章(凯哥带你从零学大数据系列),千万不要从中间的某个部分开始看,知识前后是有很大关联,否则学习效果会打折扣. 系列文章第一篇是拥抱大数据:凯哥带你 ...

最新文章

  1. 每年节省170万美元的文档预览费用,借助机器学习的DropBox有多强​?
  2. 机器学习与深度学习常见面试问题与答案
  3. 想让进程后台运行,试试Linux的nohup命令,3分钟学会。
  4. 信息基础---LDPCcodes随机矩阵构造java项目源代码
  5. html拖拽显示获取坐标,html界面元素拖拽实现[超简单]
  6. python执行效率有多低_python – Scapy的低性能
  7. PHP中4个包含文件方法的差异
  8. 在浏览器设置里能看到cookie, 页面调试Application里看不到
  9. 周鸿祎回应年会特等奖「免裁券」;微信放开 5000 人好友上限;Firefox 72 正式发布| 极客头条...
  10. favicon.ico在ie下面不显示的解决方法
  11. php数组实例,PHP数组实例总结及说明
  12. linux系统可以装win10吗,如何在Win10专业版中安装Linux系统?
  13. 【ArcGIS微课1000例】0019:什么是Shapefile文件?Shapefile文件之全解
  14. 思科交换机路由器破解密码
  15. IObit Uninstaller(电脑软件彻底卸载, 包含注册表) 彻底解决软件卸载不干净的问题
  16. linux ntfs 安装教程,Linux NTFS文件系统安装教程
  17. 每个开发人员现在应该下载的十种必备工具!
  18. mysql 联表查询 简书_mysql多表查询
  19. sql部分注入方式的学习
  20. 嵌入式 STM32 串口波特率生成器BRR的值计算笔记

热门文章

  1. std::any用法示例
  2. ppt编辑器android,演示文稿编辑器 PPTWork PPT 幻灯片_v1.7.5
  3. SharePoint网站迁移问题
  4. sysbench 1.0.6 mysql_sysbench mysql压测
  5. live2d碰撞_如何评价游戏《废墟战旗》?
  6. RabbitMQ 相关整合实战项目(完结)
  7. Arrays.asList详解和示例
  8. 如何删除服务里oracle,oracle rac如何删除服务及srvctl命令使用帮助介绍
  9. 【TCAX相关】小丸工具箱压制TCAS特效的操作步骤
  10. IT领域什么创业项目有前途