Flume组件Source、Channel和Sink使用说明

  • Flume Sources
    • Avro Source
      • 配置范例
    • Thrift Source
      • 配置范例
    • Exec Source
      • 配置范例
    • JMS Source
      • 关于转化器
      • 配置范例
    • Spoolinng Directory Source
      • 配置范例
      • Event反序列化器
    • Taildir Source
      • 配置范例
    • Twitter 1% firehose Source(实验性)
      • 配置范例
    • Kafka Source
      • 配置范例
    • NetCat TCP Source
      • 配置范例
    • NetCat UDP Source
      • 配置范例
    • Sequence Generator Source
      • 配置范例
    • Syslog Sources
      • Syslog TCP Source
        • 配置范例
      • Multiport Syslog TCP Source
        • 配置范例
      • Syslog UDP Source
        • 配置范例
    • HTTP Source
      • 配置范例
    • Stress Source
      • 配置范例
    • Legacy Sources
      • Avro Legacy Source
        • 配置范例
      • Thrift Legacy Source
        • 配置范例
    • Custom Source
      • 配置范例
    • Scribe Source
      • 配置范例
  • Flume Sinks
    • HDFS Sink
      • 配置范例
    • Hive Sink
      • 序列化器
        • 配置范例
    • Logger Sink
      • 配置范例
    • Avro Sink
      • 配置范例
    • Thrift Sink
      • 配置范例
    • IRC Sink
      • 配置范例
    • File Roll Sink
      • 配置范例
    • Null Sink
      • 配置范例
    • HBaseSinks
      • HBaseSink
        • 配置范例
      • AsyncHBaseSink
        • 配置范例
    • MorphlineSolrSink
      • 配置范例
    • ElasticSearchSink
      • 配置范例
    • Kite Dataset Sink
    • Kafka Sink
      • 配置范例
    • HTTP Sink
      • 配置范例
    • Custom Sink
      • 配置范例
  • Flume Channels
    • Memory Channel
      • 配置范例
    • JDBC Channel
      • 配置范例
    • Kafka Channel
      • 配置范例
    • File Channel
      • 配置范例
    • Spillable Memory Channel
      • 配置范例
    • Pseudo Transaction Channel
    • Custom Channel
      • 配置范例

注:属性中用粗体标识的是必需属性

Flume Sources

Avro Source

Avro Source监听Avro端口接收从外部Avro客户端发送来的数据流。如果与上一层Agent的Avro Sink配合使用就组成了一个分层的拓扑结构。

属性 默认值 解释
channel - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是:avro
bind - 监听的服务器名hostname或者ip
port - 监听的端口
threads - 生成的最大工作线程数量
selecto.type 可选值:replicating或multiplexing,分别表示:复制、多路复用
selector.* channel选择器的相关属性,具体属性根据设定的selector.type值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
interceptors.* 拦截器的相关属性
compression-type none 可选值:none或deflate。这个类型必须跟Avro Source相匹配
ssl false 设置为true可启用SSL加密,如果为true必须指定下面的keystore和keystore-password
keystore - SSL加密使用的Java keystore文件路径
keystore-password - Java keystore的密码
keystore-type JKS Java keystore的类型,可选值有JKS、PKCS12
exclude-protocols SSLv3 指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除
ipFilter false 设置为true可启用ip过滤(netty方式的avro)
ipFilterRulers - netty ipFilter的配置

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Thrift Source

监听Thrift端口,从外部的Thrift客户端接收数据流。如果从上一层的Flume Agent的Thrift Sink串联后就创建了一个多层级的Flume架构(同Avro Source一样,只不过是协议不同而已)。Thrift Source可以通过配置让它以安全模式(kerberos authentication)运行,具体的配置看下表。

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是:thrift
bind - 监听的 hostname 或 IP 地址
port - 监听的端口
threads - 生成的最大工作线程数量
selector.type 可选值:replicating或multiplexing ,分别表示: 复制、多路复用
selector.* channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
ssl false 设置为true可启用SSL加密,如果为true必须指定下面的keystore和keystore-password
keystore - SSL加密使用的Java keystore文件路径
keystore-password - Java keystore的密码
keystore-type JKS Java keystore的类型. 可选值有JKS、PKCS12
exclude-protocols SSLv3 排除支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除
kerberos false 设置为true,开启kerberos 身份验证。在kerberos模式下,成功进行身份验证需要 agent-principal 和 agent-keytab 。 安全模式下的Thrift仅接受来自已启用kerberos且已成功通过kerberos KDC验证的Thrift客户端的连接
agent-principal - 指定Thrift Source使用的kerberos主体用于从kerberos KDC进行身份验证
agent-keytab - Thrift Source与Agent主体结合使用的keytab文件位置,用于对kerberos KDC进行身份验证

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Exec Source

这个source在启动时运行给定的Unix命令,并期望该进程在标准输出连续生成数据(stderr信息会被丢弃,除非属性logStdErr设置为true)。

如果进程因任何原因退出,则source也会退出并且不会继续生成数据。综上来看cat [named pipe]或tail -F [file]这两个命令符合要求可以产生所需的结果,而date这种命令可能不会,因为前两个命令(tail 和 cat)能产生持续的数据流,而后者(date这种命令)只会产生单个Event并退出。

注意:cat [named pipe]和tail -F [file]都能持续地输出内容,那些不能持续输出内容的命令不可以。这里注意一下cat命令后面接的参数是命名管道(named pipe)不是文件。

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是:exec
command - 所使用的系统命令,一般是cat或者tail
shell - 设置用于运行命令的shell。例如/bin /sh -c。仅适用于依赖shell功能更的命令,如通配符、后退标记、管道等。
restartThrottle 10000 尝试重新启动之前等待的时间(毫秒)
restart false 如果执行命令线程挂掉,是否重启
logStdErr false 是否会记录命令的stderr内容
batchSize 20 读取并向channel发送数据时单次发送的最大数量
batchTimeout 3000 向下游推送数据时,单次批量发送Event的最大等待时间(毫秒),如果等待了batchTimeout毫秒后未达到一次批量发送数量,则仍然执行发送操作
selector.type replicating 可选值:replicating或multiplexing,分别表示:复制、多路复用
selector.* channel选择器的相关属性,具体属性根据设定的selector.type值不同而不同
interceptors - 该source所使用的的拦截器,多个空格分开
interceptors。* 拦截器相关的属性配置

警告:ExecSource相比于其他异步source的问题在于,如果无法将Event放入Channel中,ExecSource无法保证客户端知道它。在这种情况下数据会丢失。例如:最常见的用法是用tail -F [file]这种,应用程序负责向磁盘写入日志文件,Flume会用tail命令从日志文件尾部读取,将每行作为一个Event发送。这里有一个明显的问题:如果channel满了然后无法继续发送Event,会发生什么?由于种种原因,Flume无法向输出日志文件的应用程序指示它需要保留日志或某些Event尚未发送。总之你需要回到:当使用ExecSource等单向异步接口时,你的应用程序永远无法保证数据已经被成功接收!作为此警告的延伸,此source传递Event时没有交付保证。为了获得更强的可靠性保证,请考虑使用Spooling Directory Source、Taildir Source或通过SDK直接与Flume集成。

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

shell属性是用来配置执行命令的shell(比如Bash或者Powershell)。command会作为参数传递给shell执行,这使得command可以使用shell中的特性,例如通配符、后退标记、管道、循环、条件等。如果没有shell配置,将直接调用command配置命令。shell通常配置的值有:“/bin/sh -c”、“/bin/ksh -c”、“cmd /c”、“powershell -Command”等

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

JMS Source

JMS Source是一个可以从JMS的队列或者topic中读取消息的组件,按理说JMS Source作为有个JMS的应用该是能够与任意的JMS消息队列无缝衔接的,可事实上目前仅在ActiveMQ上做了测试。JMS Source支持配置batch size、message selector、user/pass和Eve不同数据的转换器(converter)。注意所使用的JMS队列的jar包需要在Flume实例的classpath中,建议放在专门的插件目录plugins.d下面,或者启动时候用-classpath指定,或者编辑flume-env.sh文件的FLUME_CLASSPATH来设置。

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是:jms
initialcontextFactor - 初始上下文工厂类,比如:org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory - 连接工厂应显示为的JNDI名称
providerURL - JMS的连接URL
destinationName - 目的地名称
destinationType - 目的地类型,queue或topic
messageSelector - 创建消费者时使用的消息选择器
userName - 连接JMS队列时的用户名
passwordFile - 连接JMS队列时的密码文件,注意是文件名不是密码的明文
batchSie 100 消费JMS消息时单次发送的Event数量
converter.type DEFAULT 用来转换JMS消息为Event的转换器类,参考下面参数
converter.* - 转换器相关的属性
converter.charset UTF-8 转换器把JMS的文本消息转换为byte arrays时候使用的编码,默认转换器的专属参数
createDurableSubscription false 是否创建持久化订阅。持久化订阅只能在destinationType=topic时使用。如果为true,则必须设置clientld和durableSubscriptionName
clientId - 连接创建后立即给JMS客户端设置标识符。持久化订阅必配参数
durableSubscriptionName - 用于标识持久化订阅的名称。持久化订阅必配参数

关于转化器

JMS source可以配置插入式的转换器,尽管默认的转换器已经足够应付大多数场景了,默认的转换器可以把字节、文本、对象消息转换为Event。不管哪种类型消息中的属性都会作为headers被添加到Event中。

  • 字节消息:JMS消息中的字节会被拷贝到Event的body中,注意转换器处理的单个消息大小不能超过2GB。

  • 文本消息:JMS消息中的文本会被转为byte array拷贝到Event的body中。默认的编码是UTF-8,可自行配置编码。

  • 对象消息:对象消息会被写出到封装在ObjectOutputStream中ByteArrayOutputStream里面,得打的array被复制到Event的body。

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE

Spoolinng Directory Source

这个Source允许你把手机的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数解析逻辑是可配置的。在新文件被完全读入Channel之后会重命名该文件以示完成(也可以配置成读完后立即删除)。

与Exec Source不同,Spooling Directory Source是可靠的,即使Flume重新启动或被kill,也不会丢失数据。同时作为这种可靠性的代价,指定目录中的文件必须是不可变的、唯一命名的。Flume会自动检测避免这种情况发生,如果发现问题,则会抛出异常:

  • 1.如果文件在写入完成后又被再次写入新内容,Flume将向其日志文件(这是指Flume自己logs目录下的日志文件)打印错误并停止处理。
  • 2.如果在以后重新使用以前的文件名,Flume将向其日志文件打印错误并停止处理。

为了避免上述问题,生成新文件的时候文件名加上时间戳是个不错的办法。

尽管有这个Source的可靠性保证,但是仍然存在这样的情况,某些下游故障发生时会出现重复Event的情况。这与其他Flume组件提供的保证是一致的。

属性名 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是:spooldir
spooldir - Flume Source监控的文件夹目录,该目录下的文件会被Flume收集
fileSuffix .COMPLETED 被Flume收集完成的文件被重名的后缀。1.txt被Flume收集完成后会重命名为1.txt.COMPLETED
deletePolicy never 是否删除已完成收集的文件,可选值:never或immediate
fileHeader false 是否添加文件的绝对路径名(绝对路径+文件名)到header中
fileHeaderKey file 添加绝对路径名到header里面所使用的的key(配合上面的fileHeader一起使用)
basenameHeader false 是否添加文件名(只是文件名,不包含路径)到header中
basenameHeaderKey basename 添加文件名到header里面所使用的key(配合上面的basenameHeader一起使用)
includePattern ^.*$ 指定会被收集的文件名正则表达式,它跟下面的ignorePattern不冲突,可以一起使用。如果一个文件名同时被这两个正则匹配到,则会被忽略,换句话说ignorePattern的优先级更高
ignorePattern ^$ 指定要忽略的文件名称正则表达式。它可以跟includePattern一起使用,如果一个文件被ignorePattern和includePattern两个正则都匹配到,这个文件会被忽略
trackerDir .flumespool 用于存储与文件处理相关的元数据的目录。如果配置的是相对目录地址,它会在spoolDir中开始创建
consumeOrder oldest 设定收集目录内文件的顺序。默认是“先来先走”(也就是最早生成的文件最先被收集),可选值有:oldest、youngest和random。当使用oldest和youngest这两种选项的时候,Flume会扫描整个文件夹进行对比排序,当文件夹里面有大量的文件的时候可能会运行缓慢。当使用random时候,如果一直在产生新的文件,有一部分老文件可能会很久才会被收集
pollDelay 500 Flume监视目录内新文件产生的时间间隔,单位:毫秒
recursiveDirectorySearch false 是否收集子目录下的日志文件
maxBackoff 4000 等待写入channel的最长退避时间,如果channel已满实例启动时会自动设定一个很低的值,当遇到ChannelException异常时会自动以指数级增加这个超时时间,直到达到设定的这个最大值为止
batchSize 100 每次批量传输到channel时的size大小
decodeErrorPolicy FAIL 当从文件读取时遇到不可解析的字符时如何处理。FAIL:抛出异常,解析文件失败;REPLACE:替换掉这些无法解析的字符,通常是用U+FFFD;IGNORE:忽略无法解析的字符。
deserializer LINE 指定一个文件中的数据行解析成Event的解析器。默认是把每一行当做一个Event进行解析,所有解析器必须实现EventDeserializer.Builder接口
deserializer.* 解析器的相关属性,根据解析器不同而不同
bufferMaxLines - (已废弃)
bufferMaxLineLength 5000 (已废弃)每行的最大长度。改用deserializer.maxLineLength代替
selector.type replicating 可选值:replicating或multiplexing,分别表示:复制、多路复用。
selector.* channel选择器的相关属性,具体属性根据设定的selector.type值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置

配置范例

a1.channels = ch-1
a1.sources = src-1a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true

Event反序列化器

下面是Flume内置的一些反序列化工具

  • LINE

这个反序列化器会把文本数据的每行解析成一个Event

属性 默认值 解释
deserializer.maxLineLength 2048 每个Event数据所包含的最大字符数,如果一行文本字符数超过这个配置就会被截断,剩下的字符会出现再后面的Event数据里
deserializer.outputCharset UTF-8 解析Event所使用的编码

提示:deserializer.maxLineLength的默认值是2048,这个数值对于日志行来说有点小,如果实际使用中日志每行字符可能超过2048,超出的部分会被截断,千万记得根据自己的日志长度调大这个值。

  • AVRO

这个反序列化器能够读取avro容器文件,并在文件中为每个Avro记录生成一个Event。每个Event都会在header中记录它的模式。Event的body是二进制的avro记录内容,不包括模式和容器文件元素的其余部分。

注意如果Spooling Directory Source发生了重新把一个Event放入channel的情况(比如,通道已满导致重试),则它将重置并从最新的Avro容器文件同步点重试。为了减少此类情况下的潜在Event重复,请在Avro输入文件中更频繁地写入同步标记。

属性 默认值 解释
deserializer.schemaType HASH 如何表示模式。 默认或者指定为 HASH 时,会对Avro模式进行哈希处理,并将哈希值存储在Event header中以“flume.avro.schema.hash”这个key。 如果指定为 LITERAL ,则会以JSON格式的模式存储在Event header中以“flume.avro.schema.literal”这个key。 与HASH模式相比,使用LITERAL模式效率相对较低。
  • BlobDeserializer
    这个反序列化器可以反序列化一些答的二进制文件,一个文件解析成一个Event,例如pdf或者jpg文件等。

注意:这个解析器不太适合解析太大的文件,因为被反序列化的操作是在内存里面进行的。

属性 默认值 解释
deserializer - 这个解析器没有别名缩写,需要填类的全限定名: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
deserializer.maxBlobLength 100000000 每次请求的最大读取和缓冲的字节数,默认这个值大概是95.36MB

Taildir Source

注解:Taildir Source目前只是个预览版本,还不能运行在Windows系统上

Taildir Source监控指定的一些文件,并在检测到新的一行数据产生的时候几乎实时地读取它们,如果新的一行数据还没写完,Taildir Source会等到这行写完后再读取。

Taildir Source是可靠的,即使发生文件轮换(注1)也不会丢失数据。它会定期地以JSON格式在一个专门用于定位的文件上记录每个文件的最后读取位置。如果Flume由于某种原因停止或挂掉,它可以从文件的标记位置重新开始读取。

Taildir Source还可以从任意指定的位置开始读取文件。默认情况下,它将从每个文件的第一行开始读取。

文件按照修改时间的顺序来读取。修改时间最早的文件将最先被读取(简单记成:先来先走)。

Taildir Source不重命名、删除或修改它监控的文件。当前不支持读取二进制文件。只能逐行读取文本文件。

提示:注1:文件轮换(file rotate)是英文直译。通常系统会自动丢弃日志文件中时间久远的日志,一般按照日志文件大小或时间来自动分割或丢弃的机制。

属性名 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是:TAILDIR
filegroups - 被监控的文件夹目录集合,这些文件夹下的文件都会被监控,多个用空格分隔
filegroups.<filegroupName> - 被监控文件夹的绝对路径。正则表达式(注意不会匹配文件系统的目录)只是用来匹配文件名
positionFile ~/.flume/taildir_position.json 用来设定一个记录每个文件的绝对路径和最近一次读取位置inode的文件,这个文件可以是JSON格式
headers.<filegroupName>.<headerKey> - 给某个文件组下的Event添加一个固定的键值对到header中,值就是value。一个文件组可以配置多个键值对
byteOffsetHeader false 是否把读取数据行的字节偏移量记录到Event的header里面,这个header的key是byteoffset
skipToEnd false 如果在positionFile里面没有记录某个文件的读取位置,是否直接跳到文件末尾开始读取
idleTimeout 120000 关闭非活动文件的超时时间(毫秒)。如果被关闭的文件重新写入了新的数据行,会被重新打开
writePoslnterval 3000 向positionFile记录文件的读取位置的间隔时间(毫秒)
batchSize 100 一次读取数据行和写入channel的最大数量,通常使用默认值就很好
backoffSleepIncrement 1000 在最后一次尝试未发现任何新数据时,重新尝试轮询新数据之前的时间延迟增量(毫秒)
maxBackoffSleep 5000 每次重新尝试轮询新数据是的最大时间延迟(毫秒)
cachePatternMatching true 对于包含数千个文件的目录,列出目录并应用文件名正则表达式模式可能非常耗时。缓存匹配文件列表可以提高性能。消耗文件的顺序也将被缓存。要求文件系统支持以至少秒级跟踪修改时间。
fileHeader false 是否在header里面存储文件的绝对路径
fileHeaderKey file 文件的绝对路径存储到header里面使用的key

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true

Twitter 1% firehose Source(实验性)

警告:这个Source纯粹是实验性的,之后的版本可能会有改动,使用中任何风险请自行承担。

这个Source通过流API连接到1%的样本twitter信息流并下载这些tweet,将它们转换为Avro格式,并将Avro Event发送到下游Flume。使用者需要有Twitter开发者账号、访问令牌和密钥。

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是:org.apache.flume.source.twitter.TwitterSource
consumerKey - OAuth consumer key
consumerSecret - OAuth consumer secret
accessToken - OAuth access token
accessTokenSecret - OAuth token secret
maxBatchSize 1000 每次获取twitter数据的数据集大小,简单说就是一次取多少
maxBatchDurationMills 1000 每次批量获取数据的最大等待时间(毫秒)

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
a1.sources.r1.channels = c1
a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
a1.sources.r1.maxBatchSize = 10
a1.sources.r1.maxBatchDurationMillis = 200

Kafka Source

Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息,如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是: org.apache.flume.source.kafka.KafkaSource
Kafka.bootstrap.servers - Source使用Kafka集群实例列表
kafka.consumer.group.id flume 消费组的唯一标识符。如果有多个source或者Agent设定了相同的ID,表示它们是同一个消费者组
kafka.topics - 将要读取消息的目标kafka topic列表,多个用逗号分隔
kafka.topics.regex - 会被Kafka Source订阅的topic集合的正则表达式。这个参数比kafka.topics拥有更高的优先级,如果这两个参数同时存在,则会覆盖kafka.topics的配置
batchSize 1000 一批写入channel的最大消息数
batchDurationMills 1000 一个批次写入channel之前的最大等待时间(毫秒)。达到等待时间或者数量达到batchSize都会触发写操作
backoffSleepIncrement 1000 当Kafka topic 显示为空时触发的初始和增量等待时间(毫秒)。等待时间可以避免对Kafka topic的频繁ping操作。默认的1秒钟对于获取数据比较合适, 但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。
maxBackoffSleep 5000 Kafka topic 显示为空时触发的最长等待时间(毫秒)。默认的5秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些
useFlumeEventFormat false 默认情况下,从 Kafka topic 里面读取到的内容直接以字节数组的形式赋值给Event。如果设置为true,会以Flume Avro二进制格式进行读取。与Kafka Sink上的同名参数或者 Kafka channel 的parseAsFlumeEvent参数相关联,这样以对象的形式处理能使生成端发送过来的Event header信息得以保留。
setTopicHeader true 当设置为true时,会把存储Event的topic名字存储到header中,使用的key就是下面的 topicHeader 的值。
topicHeader topic 如果 setTopicHeader 设置为 true ,则定义用于存储接收消息的 topic 使用header key。注意如果与 Kafka Sink 的 topicHeader 参数一起使用的时候要小心,避免又循环将消息又发送回 topic。
migrateZookeeperOffsets true 如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过kafka.consumer.auto.offset.reset配置如何处理偏移量。可以从 Kafka documentation 查看更多详细信息。
kafka.consumer.security.protocol PLAINTEXT 设置使用哪种安全协议写入Kafka。可选值:SASL_PLAINTEXT、SASL_SSL 和 SSL有关安全设置的其他信息,请参见下文。
more consumer security props 如果使用了SASL_PLAINTEXT、SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为消费者增加安全相关的参数配置
Other Kafka Consumer Properties - 其他一些 Kafka 消费者配置参数。任何 Kafka 支持的消费者参数都可以使用。唯一的要求是使用“kafka.consumer.”这个前缀来配置参数,比如: kafka.consumer.auto.offset.reset

注解:Kafka Source覆盖了两个Kafka消费者的参数:auto.commit.enable这个参数被设置成了false,Kafka Source会提交每一个批处理。Kafka Source保证至少一次消息回复策略。Source启动时可以存在重复项,Kafka Source还提供了key.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值,不建议修改这些参数。

已经弃用的一些属性:

属性名 默认值 解释
topic - 改用kafka.topics
groupId flume 改用kafka.consumer.group.id
zookeepperConnect - 自0.9.x起不再受kafka消费者客户端的支持。以后使用kafka.bootstrap.servers与kafka集群建立连接

配置范例

通过逗号分隔的topic列表进行topic订阅的示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id

正则表达式topic订阅的示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used

NetCat TCP Source

这个Source十分像nc -k -l [host] [port]这个命令,监听一个指定的端口,把从该端口收到的TCP协议的文本数据按行转换为Event,它能识别的是带换行符的文本数据,同其他Source一样,解析成功的Event数据会发送到channel中。

常见的系统日志都是逐行输出的,Flume的各种Source接收数据也基本上以行为单位进行解析和处理。不论是NetCat TCP Source,还是其他的读取文本类型的Source比如:Spooling Directory Source、Taildir Source、Exec Source等也都是一样的。

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是:netcat
bind - 要监听的hostname或者IP地址
port - 监听的端口
max-line-length 512 每行解析成?Event消息体的最大字节数
ack-every-event true 对收到的每一行数据用“OK”做出响应
selector.type replicating 可选值:replicating或multiplexing,分别表示:复制、多路复用。
selector.* channel选择器的相关属性,具体属性根据设定的selector.type值不同而不同
interceptors - 该source所使用的的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

NetCat UDP Source

看名字也看得出,跟 NetCat TCP Source 是一对亲兄弟,区别是监听的协议不同。这个source就像是 nc -u -k -l [host] [port]命令一样, 监听一个端口然后接收来自于这个端口上UDP协议发送过来的文本内容,逐行转换为Event发送到channel。

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是:netcatudp
bind - 要监听的 hostname 或者IP地址
port - 监听的端口
remoteAddressHeader - UDP消息源地址(或IP)被解析到Event的header里面时所使用的key名称
selector.type replicating 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.* channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

Sequence Generator Source

这个Source是一个序列式的Event生成器,从它启动就开始生成,总共会生成totalEvents个。它并不是一个日志收集器,它通常是用来测试用的。它在发送失败的时候会重新发送失败的Event到channel, 保证最终发送到channel的唯一Event数量一定是 totalEvents 个。

提示:记住Flume的设计原则之一就是传输过程的『可靠性』,上面说的失败重试以及最终的数量问题,这是毫无疑问的。

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是:seq
selector.type 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.* replicating channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置
batchSize 1 每次请求向channel发送的 Event 数量
totalEvents Long.MAX_VALUE 这个Source会发出的Event总数,这些Event是唯一的

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1

Syslog Sources

这个Source是从syslog读取日志并解析为 Event,同样也分为TCP协议和UDP协议的,TCP协议的Source会按行(\n)来解析成 Event,UDP协议的Souce会把一个消息体解析为一个 Event。

Syslog TCP Source

提示:这个Syslog TCP Source在源码里面已经被@deprecated了,推荐使用 Multiport Syslog TCP Source 来代替。

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是: syslogtcp
host - 要监听的hostname或者IP地址
port - 要监听的端口
eventSize 2500 每行数据的最大字节数
keepFields none 是否保留syslog消息头中的一些属性到Event中,可选值 all 、none 或自定义指定保留的字段。如果设置为all,则会保留Priority, Timestamp 和Hostname三个属性到Event中。 也支持单独指定保留哪些属性(支持的属性有:priority, version, timestamp, hostname),用空格分开即可。现在已经不建议使用 true 和 false,建议改用 all 和 none 了。
selector.type replicating 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.* channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

Multiport Syslog TCP Source

这是一个增强版的 Syslog TCP Source ,它更新、更快、支持监听多个端口。因为支持了多个端口,port参数已经改为了ports。这个Source使用了Apache mina(一个异步通信的框架,同netty类似)来实现。 提供了对RFC-3164和许多常见的RFC-5424格式消息的支持。 支持每个端口配置不同字符集。

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是:multiport_syslogtcp
host - 要监听的hostname或者IP地址
ports - 一个或多个要监听的端口,多个用空格分开
eventSize 2500 解析成Event的每行数据的最大字节数
keepFields none 是否保留syslog消息头中的一些属性到Event中,可选值 all 、none 或自定义指定保留的字段,如果设置为all,则会保留Priority, Timestamp 和Hostname三个属性到Event中。 也支持单独指定保留哪些属性(支持的属性有:priority, version, timestamp, hostname),用空格分开即可。现在已经不建议使用 true 和 false ,建议改用 all 和 none 了。
portHeader - 如果配置了这个属性值,端口号会被存到每个Event的header里面用这个属性配置的值当key。这样就可以在拦截器或者channel选择器里面根据端口号来自定义路由Event的逻辑。
charset.default UTF-8 解析syslog使用的默认编码
charset.port.<port> - 针对具体某一个端口配置编码
batchSize 100 每次请求尝试处理的最大Event数量,通常用这个默认值就很好。
readBufferSize 1024 内部Mina通信的读取缓冲区大小,用于性能调优,通常用默认值就很好。
numProcessors (自动分配) 处理消息时系统使用的处理器数量。 默认是使用Java Runtime API自动检测CPU数量。 Mina将为每个检测到的CPU核心生成2个请求处理线程,这通常是合理的。
selector.type replicating 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.* - channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port

Syslog UDP Source

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是: syslogudp
host - 要监听的hostname或者IP地址
port - 要监听的端口
keepFields false 设置为true后,解析syslog时会保留Priority, Timestamp and Hostname这些属性到Event的消息体中(查看源码发现,实际上保留了priority、version、timestamp、hostname这四个字段在消息体的前面)
selector.type replicating 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.* channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

HTTP Source

这个Source从HTTP POST 和 GET请求里面解析 Event,GET方式目前还只是实验性的。把HTTP请求解析成Event是通过配置一个“handler”来实现的,这个“handler”必须实现 HTTPSourceHandler 接口, 这个接口其实就一个方法,收到一个HttpServletRequest后解析出一个 Event 的List。从一次请求解析出来的若干个Event会以一个事务提交到channel, 从而在诸如『文件channel』的一些channel上提高效率。如果handler抛出异常,这个HTTP的响应状态码是400。如果channel满了或者无法发送Event到channel,此时会返回HTTP状态码503(服务暂时不可用)。

在一个POST请求中发送的所有 Event 视为一个批处理,并在一个事务中插入到 channel。

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type 组件类型,这个是: http
port - 要监听的端口
bind 0.0.0.0 要监听的hostname或者IP地址
handler org.apache.flume.source.http.JSONHandler 所使用的handler,需填写handler的全限定类名
handler.* - handler的一些属性配置
selector.type replicating 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.* channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置
enableSSL false 设置为true启用SSL,HTTP Source不支持SSLv3协议
excludeProtocols SSLv3 指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除
keystore keystore 文件的位置
keystorePassword Keystore 的密码

提示:Flume里面很多组件都明确表示强制不支持SSLv3协议,是因为SSLv3协议的不安全,各大公司很早就表示不再支持了。

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props

Stress Source

StressSource 是一个内部负载生成Source的实现,对于压力测试非常有用 。可以配置每个Event的大小(headers为空)、也可以配置总共发送Event数量以及发送成功的Event最大数量。

提示:它跟 Sequence Generator Source 差不多,都是用来测试用的。

属性 默认值 解释
type - 组件类型,这个是: org.apache.flume.source.StressSource
size 500 每个Event的大小。单位:字节(byte)
maxTotalEvents -1 总共会发送的Event数量
maxSuccessfulEvents -1 发送成功的Event最大数量
batchSize 1 每次请求发送Event的数量

配置范例

a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1

Legacy Sources

Legacy Sources可以让Flume1.x版本的Agent接收来自于Flume0.9.4版本的Agent发来的Event,可以理解为连接两个版本Flume的一个“桥”。接收到0.9.4版本的Event后转换为1.x版本的Event然后发送到 channel。0.9.4版本的Event属性(timestamp, pri, host, nanos, etc)会被转换到1.xEvent的header中。Legacy Sources支持Avro和Thrift RPC两种方式连接。具体的用法是1.x的Agent可以使用 avroLegacy 或者 thriftLegacy source,然后0.9.4的Agent需要指定sink的host和端口为1.x的 Agent。

注解:1.x和0.9.x的可靠性保证有所不同。Legacy Sources并不支持0.9.x的E2E和DFO模式。唯一支持的是BE(best effort,尽力而为),尽管1.x的可靠性保证对于从0.9.x传输过来并且已经存在channel里面的Events是有效的。

提示:虽然数据进入了Flume 1.x的channel之后是适用1.x的可靠性保证,但是从0.9.x到1.x的时候只是BE保证,既然只有BE的保证,也就是说Legacy Sources不算是可靠的传输。对于这种跨版本的部署使用行为要慎重。

Avro Legacy Source

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是: org.apache.flume.source.avroLegacy.AvroLegacySource
host - 要监听的hostname或者IP地址
port - 要监听的端口
selector.type replicating 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.* channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

Thrift Legacy Source

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是: org.apache.flume.source.thriftLegacy.ThriftLegacySource
host - 要监听的hostname或者IP地址
port - 要监听的端口
selector.type 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.* replicating channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

Custom Source

你可以自己写一个Source接口的实现类。启动Flume时候必须把你自定义Source所依赖的其他类配置进Agent的classpath内。custom source在写配置文件的type时候填你的全限定类名。

提示:如果前面章节的那些Source都无法满足你的需求,你可以写一个自定义的Source,与你见过的其他框架的自定义组件写法如出一辙,实现个接口而已,然后把你写的类打成jar包,连同依赖的jar包一同配置进Flume的classpath。 后面章节中的自定义Sink、自定义Channel等都是一样的步骤,不会再赘述。

属性 默认值 解释
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个填你自己Source的全限定类名
selector.type replicating 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.* channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1

Scribe Source

提示:这里先说一句,Scribe是Facebook出的一个实时的日志聚合系统,我在之前没有听说过也没有使用过它, 从Scribe项目的Github文档里面了解到它在2013年就已经停止更新和支持了,貌似现在已经没有新的用户选择使用它了,所以Scribe Source这一节了解一下就行了。

Scribe 是另外一个类似于Flume的数据收集系统。为了对接现有的Scribe可以使用ScribeSource ,它是基于Thrift 的兼容传输协议,如何部署Scribe请参考Facebook提供的文档。

属性 默认值 解释
type - 组件类型,这个是: org.apache.flume.source.scribe.ScribeSource
port 1499 Scribe 的端口
maxReadBufferBytes 16384000 Thrift 默认的FrameBuffer 大小
workerThreads 5 Thrift的线程数
selector.type 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.* channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

配置范例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1

Flume Sinks

HDFS Sink

这个Sink将Event写入Hadoop分布式文件系统(也就是HDFS)。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件(关闭当前文件并创建新文件)。 它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符,会由HDFS Sink进行动态地替换,以生成用于存储Event的目录或文件名。 使用此Sink需要安装hadoop, 以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。 注意, 需要使用支持sync() 调用的Hadoop版本

以下是支持的转义符:

转义符 解释
%{host} Event header中key为host的值。这个host可以是任意的key,只要header中有就能读取,比如%{aabc}将读取header中key为aabc的值
%t 毫秒值的时间戳(同 System.currentTimeMillis() 方法)
%a 星期的缩写(Mon、Tue等)
%A 星期的全拼(Monday、 Tuesday等)
%b 月份的缩写(Jan、 Feb等)
%B 月份的全拼(January、February等)
%c 日期和时间(Thu Feb 14 23:05:25 2019)
%d 月份中的天(00到31)
%e 月份中的天(1到31)
%D 日期,与%m/%d/%y相同 ,例如:02/09/19
%H 小时(00到23)
%I 小时(01到12)
%j 年中的天数(001到366)
%k 小时(0到23),注意跟 %H的区别
%m 月份(01到12)
%n 月份(1到12)
%M 分钟(00到59)
%p am或者pm
%s unix时间戳,是秒值。比如2019/2/14 18:15:49的unix时间戳是:1550139349
%S 秒(00到59)
%y 一年中的最后两位数(00到99),比如1998年的%y就是98
%Y 年(2010这种格式)
%z 数字时区(比如:-0400)
%[localhost] Agent实例所在主机的hostname
%[IP] Agent实例所在主机的IP
%[FQDN] Agent实例所在主机的规范hostname

注意:%[localhost], %[IP] 和 %[FQDN]这三个转义符实际上都是用java的API来获取的,在一些网络环境下可能会获取失败。

正在打开的文件会在名称末尾加上“.tmp”的后缀。文件关闭后,会自动删除此扩展名。这样容易排除目录中的那些已完成的文件。

注解:对于所有与时间相关的转义字符,Event header中必须存在带有“timestamp”键的属性(除非 hdfs.useLocalTimeStamp 设置为 true )。快速自动添加此时间戳的一种方法是使用时间戳添加拦截器 。

属性名 默认值 解释
channel 与 Sink 连接的 channel
type 组件类型,这个是: hdfs
hdfs.path
hdfs.filePrefix FlumeData
hdfs.fileSuffix Flume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置)
hdfs.inUsePrefix Flume正在写入的临时文件前缀,默认没有
hdfs.inUseSuffix .tmp
hdfs.rollInterval 30 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize 1024 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount 10 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
hdfs.idleTimeout 0 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒
hdfs.batchSize 100 向 HDFS 写入内容时每次批量操作的 Event 数量
hdfs.codeC 压缩算法。可选值:gzip 、 bzip2 、 lzo 、 lzop` 、 ``snappy
hdfs.fileType SequenceFile 文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数
hdfs.maxOpenFiles 5000 允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭
hdfs.minBlockReplicas 指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。
hdfs.writeFormat Writable 文件写入格式。可选值: Text 、 Writable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。
hdfs.callTimeout 10000 允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒)
hdfs.threadsPoolSize 10 每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)
hdfs.rollTimerPoolSize 1 每个HDFS Sink实例调度定时文件滚动的线程数
hdfs.kerberosPrincipal 用于安全访问 HDFS 的 Kerberos 用户主体
hdfs.kerberosKeytab 用于安全访问 HDFS 的 Kerberos keytab 文件
hdfs.proxyUser 代理名
hdfs.round false 是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)
hdfs.roundValue 1 向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30
hdfs.roundUnit second 向下舍入的单位,可选值: second 、 minute 、 hour
hdfs.timeZone Local Time 解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai
hdfs.useLocalTimeStamp false 使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳)
hdfs.closeTries 0 开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;如果设置为0,Sink会一直尝试重命名文件直到成功为止;关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。
hdfs.retryInterval 180 连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。
serializer TEXT Event 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。
serializer.* 根据上面 serializer 配置的类型来根据需要添加序列化器的参数

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

上面的例子中时间戳会向前一个整10分钟取整。比如,一个 Event 的 header 中带的时间戳是11:54:34 AM, June 12, 2012,它会保存的 HDFS 路径就是/flume/events/2012-06-12/1150/00。

Hive Sink

此Sink将包含分隔文本或JSON数据的 Event 直接流式传输到 Hive表或分区上。 Event 使用 Hive事务进行写入, 一旦将一组 Event 提交给Hive,它们就会立即显示给Hive查询。 即将写入的目标分区既可以预先自己创建,也可以选择让 Flume 创建它们,如果没有的话。 写入的 Event 数据中的字段将映射到 Hive表中的相应列。

属性 默认值 解释
channel 与 Sink 连接的 channel
type 组件类型,这个是: hive
hive.metastore Hive metastore URI (eg thrift://a.b.com:9083 )
hive.database Hive 数据库名
hive.table Hive表名
hive.partition 逗号分隔的要写入的分区信息。 比如hive表的分区是(continent: string, country :string, time : string), 那么“Asia,India,2014-02-26-01-21”就表示数据会写入到continent=Asia,country=India,time=2014-02-26-01-21这个分区。
hive.txnsPerBatchAsk 100 Hive从Flume等客户端接收数据流会使用多次事务来操作,而不是只开启一个事务。这个参数指定处理每次请求所开启的事务数量。来自同一个批次中所有事务中的数据最终都在一个文件中。 Flume会向每个事务中写入 batchSize 个 Event,这个参数和 batchSize 一起控制着每个文件的大小,请注意,Hive最终会将这些文件压缩成一个更大的文件。
heartBeatInterval 240 发送到 Hive 的连续心跳检测间隔(秒),以防止未使用的事务过期。设置为0表示禁用心跳。
autoCreatePartitions true Flume 会自动创建必要的 Hive分区以进行流式传输
batchSize 15000 写入一个 Hive事务中最大的 Event 数量
maxOpenConnections 500 允许打开的最大连接数。如果超过此数量,则关闭最近最少使用的连接。
callTimeout 10000 Hive、HDFS I/O操作的超时时间(毫秒),比如:开启事务、写数据、提交事务、取消事务。
serializer 序列化器负责解析 Event 中的字段并把它们映射到 Hive表中的列,选择哪种序列化器取决于 Event 中的数据格式,支持的序列化器有:DELIMITED 和 JSON
round false 是否启用时间戳舍入机制
roundUnit minute 舍入值的单位,可选值:second 、 minute 、 hour
roundValue 1 舍入到小于当前时间的最高倍数(使用 roundUnit 配置的单位) 例子1:roundUnit=second,roundValue=10,则14:31:18这个时间戳会被舍入到14:31:10; 例子2:roundUnit=second,roundValue=30,则14:31:18这个时间戳会被舍入到14:31:00,14:31:42这个时间戳会被舍入到14:31:30;
timeZone Local Time 应用于解析分区中转义序列的时区名称,比如:America/Los_Angeles、Asia/Shanghai、Asia/Tokyo等
useLocalTimeStamp false 替换转义序列时是否使用本地时间戳(否则使用Event header中的timestamp )

序列化器

JSON :处理UTF8编码的 Json 格式(严格语法)Event,不需要配置。 JSON中的对象名称直接映射到Hive表中具有相同名称的列。 内部使用 org.apache.hive.hcatalog.data.JsonSerDe ,但独立于 Hive表的 Serde 。 此序列化程序需要安装 HCatalog。

DELIMITED: 处理简单的分隔文本 Event。 内部使用 LazySimpleSerde,但独立于 Hive表的 Serde。

属性 默认值 解释
serializer.delimiter , (类型:字符串)传入数据中的字段分隔符。 要使用特殊字符,请用双引号括起来,例如“\t”
serializer.fieldnames 从输入字段到Hive表中的列的映射。 指定为Hive表列名称的逗号分隔列表(无空格),按顺序标识输入字段。 要跳过字段,请保留未指定的列名称。 例如, ‘time,ip,message’表示输入映射到hive表中的 time,ip 和 message 列的第1,第3和第4个字段。
serializer.serdeSeparator Ctrl-A (类型:字符)自定义底层序列化器的分隔符。如果 serializer.fieldnames 中的字段与 Hive表列的顺序相同,则 serializer.delimiter 与 serializer.serdeSeparator 相同, 并且 serializer.fieldnames 中的字段数小于或等于表的字段数量,可以提高效率,因为传入 Event 正文中的字段不需要重新排序以匹配 Hive表列的顺序。 对于’\t’这样的特殊字符使用单引号,要确保输入字段不包含此字符。 注意:如果 serializer.delimiter 是单个字符,最好将本参数也设置为相同的字符。

以下是支持的转义符:

转义符 解释
%{host} Event header中 key 为 host 的值。这个 host 可以是任意的 key,只要 header 中有就能读取,比如%{aabc}将读取 header 中 key 为 aabc 的值
%t 毫秒值的时间戳(同 System.currentTimeMillis() 方法)
%a 星期的缩写(Mon、Tue等)
%A 星期的全拼(Monday、 Tuesday等)
%b 月份的缩写(Jan、 Feb等)
%B 月份的全拼(January、February等)
%c 日期和时间(Thu Feb 14 23:05:25 2019)
%d 月份中的天(00到31)
%D 日期,与%m/%d/%y相同 ,例如:02/09/19
%H 小时(00到23)
%I 小时(01到12)
%j 年中的天数(001到366)
%k 小时(0到23),注意跟 %H 的区别
%m 月份(01到12)
%M 分钟(00到59)
%p am 或者 pm
%s unix时间戳,是秒值。比如:2019/4/1 15:12:47 的unix时间戳是:1554102767
%S 秒(00到59)
%y 一年中的最后两位数(00到99),比如1998年的%y就是98
%Y 年(2010这种格式)
%z 数字时区(比如:-0400)

注解:对于所有与时间相关的转义字符,Event header 中必须存在带有“timestamp”键的属性(除非 useLocalTimeStamp 设置为 true )。快速添加此时间戳的一种方法是使用 时间戳添加拦截器 ( TimestampInterceptor)。

配置范例

假设Hive表如下:

create table weblogs ( id int , msg string )partitioned by (continent string, country string, time string)clustered by (id) into 5 bucketsstored as orc;

配置范例:

a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg

以上配置会将时间戳向下舍入到最后10分钟。 例如,将时间戳标头设置为2019年4月1日下午15:21:34且“country”标头设置为“india”的Event将评估为分区(continent =’asia’,country =’india’,time =’2019-04-01-15-20’。序列化程序配置为接收包含三个字段的制表符分隔的输入并跳过第二个字段。

Logger Sink

使用INFO级别把Event内容输出到日志中,一般用来测试、调试使用。这个 Sink 是唯一一个不需要额外配置就能把 Event 的原始内容输出的Sink,参照 输出原始数据到日志 。

提示:在输出原始数据到日志 一节中说过,通常在Flume的运行日志里面输出数据流中的原始的数据内容是非常不可取的,所以 Flume 的组件默认都不会这么做。但是总有特殊的情况想要把 Event 内容打印出来,就可以借助这个Logger Sink了。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: logger
maxBytesToLog 16 Event body 输出到日志的最大字节数,超出的部分会被丢弃

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

Avro Sink

这个Sink可以作为 Flume 分层收集特性的下半部分。发送到此Sink的 Event 将转换为Avro Event发送到指定的主机/端口上。Event 从 channel 中批量获取,数量根据配置的 batch-size 而定。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: avro.
hostname 监听的服务器名(hostname)或者 IP
port 监听的端口
batch-size 100 每次批量发送的 Event 数
connect-timeout 20000 第一次连接请求(握手)的超时时间,单位:毫秒
request-timeout 20000 请求超时时间,单位:毫秒
reset-connection-interval none 重置连接到下一跳之前的时间量(秒)。 这将强制 Avro Sink 重新连接到下一跳。 这将允许Sink在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent。
compression-type none 压缩类型。可选值: none 、 deflate 。压缩类型必须与上一级Avro Source 配置的一致
compression-level 6 Event的压缩级别 0:不压缩,1-9:进行压缩,数字越大,压缩率越高
ssl false 设置为 true 表示Sink开启 SSL 下面的 truststore 、 truststore-password 、 truststore-type 就是开启SSL后使用的参数,并且可以指定是否信任所有证书( trust-all-certs )
trust-all-certs false 如果设置为true, 不会检查远程服务器(Avro Source)的SSL服务器证书。不要在生产环境开启这个配置,因为它使攻击者更容易执行中间人攻击并在加密的连接上进行“监听”。
truststore 自定义 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。 如果未指定,将使用缺省 Java JSSE 证书颁发机构文件(通常为Oracle JRE中的“jssecacerts”或“cacerts”)。
truststore-password 上面配置的信任库的密码
truststore-type JKS Java 信任库的类型。可以配成 JKS 或者其他支持的 Java 信任库类型
exclude-protocols SSLv3 要排除的以空格分隔的 SSL/TLS 协议列表。 SSLv3 协议不管是否配置都会被排除掉。
maxIoWorkers 2 * 机器上可用的处理器核心数量 I/O工作线程的最大数量。这个是在 NettyAvroRpcClient 的 NioClientSocketChannelFactory 上配置的。

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

Thrift Sink

这个Sink可以作为 Flume 分层收集特性的下半部分。发送到此Sink的 Event 将转换为 Thrift Event 发送到指定的主机/端口上。Event 从 channel 中获取批量获取,数量根据配置的 batch-size 而定。 可以通过启用 kerberos 身份验证将 Thrift Sink 以安全模式启动。如果想以安全模式与 Thrift Source 通信,那么 Thrift Sink 也必须以安全模式运行。 client-principal 和 client-keytab 是 Thrift Sink 用于向 kerberos KDC 进行身份验证的配置参数。 server-principal 表示此Sink将要以安全模式连接的 Thrift Source 的主体。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: thrift.
hostname 远程 Thrift 服务的主机名或 IP
port 远程 Thrift 的端口
batch-size 100 一起批量发送 Event 数量
connect-timeout 20000 第一次连接请求(握手)的超时时间,单位:毫秒
request-timeout 20000 请求超时时间,单位:毫秒
reset-connection-interval none 重置连接到下一跳之前的时间量(秒)。 这将强制 Thrift Sink 重新连接到下一跳。 允许Sink在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent。
ssl false 设置为 true 表示Sink开启 SSL。下面的 truststore 、 truststore-password 、 truststore-type 就是开启 SSL 后使用的参数
truststore 自定义 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。 如果未指定,将使用缺省 Java JSSE 证书颁发机构文件(通常为 Oracle JRE 中的“jssecacerts”或“cacerts”)。
truststore-password 上面配置的信任库的密码
truststore-type JKS Java 信任库的类型。可以配成 JKS 或者其他支持的 Java 信任库类型
exclude-protocols SSLv3 要排除的以空格分隔的SSL/TLS 协议列表
kerberos false 设置为 true 开启 kerberos 身份验证。在 kerberos 模式下,需要 client-principal 、 client-keytab 和 server-principal 才能成功进行身份验证并与启用了 kerberos 的 Thrift Source 进行通信。
client-principal - Thrift Sink 用来向 kerberos KDC 进行身份验证的 kerberos 主体。
client-keytab - Thrift Sink 与 client-principal 结合使用的 keytab 文件路径,用于对 kerberos KDC 进行身份验证。
server-principal Thrift Sink 将要连接到的 Thrift Source 的 kerberos 主体。

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

IRC Sink

IRC sink 从连接的 channel 获取消息然后将这些消息中继到配置的 IRC 目标上。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: irc
hostname 要连接的服务器名(hostname )或 IP
port 6667 要连接的远程服务器端口
nick 昵称
user 用户名
password 密码
chan 频道
name 真实姓名
splitlines false 是否分割消息后进行发送
splitchars \n 行分隔符如果上面 splitlines 设置为 true ,会使用这个分隔符把消息体先进行分割再逐个发送,如果你要在配置文件中配置默认值,那么你需要一个转义符, 像这样:“ \n”

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume

File Roll Sink

把 Event 存储到本地文件系统。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: file_roll.
sink.directory Event 将要保存的目录
sink.pathManager DEFAULT 配置使用哪个路径管理器,这个管理器的作用是按照规则生成新的存储文件名称,可选值有: default 、 rolltime。default规则:prefix+当前毫秒值+“-”+文件序号+“.”+extension;rolltime规则:prefix+yyyyMMddHHmmss+“-”+文件序号+“.”+extension;注:prefix 和 extension 如果没有配置则不会附带
sink.pathManager.extension 如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的扩展名
sink.pathManager.prefix 如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的文件名的固定前缀
sink.rollInterval 30 表示每隔30秒创建一个新文件进行存储。如果设置为0,表示所有 Event 都会写到一个文件中。
sink.serializer TEXT 配置 Event 序列化器,可选值有:text 、 header_and_text 、 avro_event 或者自定义实现了 EventSerializer.Builder 接口的序列化器的全限定类名.。 text 只会把 Event 的 body 的文本内容序列化; header_and_text 会把 header 和 body 内容都序列化。
batchSize 100 每次请求批处理的 Event 数

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

Null Sink

丢弃所有从 channel 读取到的 Event。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: null.
batchSize 100 每次批处理的 Event 数量

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

HBaseSinks

HBaseSink

此Sink将数据写入 HBase。 Hbase 配置是从classpath中遇到的第一个 hbase-site.xml 中获取的。 配置指定的 HbaseEventSerializer 接口的实现类用于将 Event 转换为 HBase put 或 increments。 然后将这些 put 和 increments 写入 HBase。 该Sink提供与 HBase 相同的一致性保证,HBase 是当前行的原子性。 如果 Hbase 无法写入某些 Event,则Sink将重试该事务中的所有 Event。

这个Sink支持以安全的方式把数据写入到 HBase。为了使用安全写入模式,运行 Flume 实例的用户必须有写入 HBase 目标表的写入权限。可以在配置中指定用于对 KDC 进行身份验证的主体和密钥表。 Flume 的 classpath 中的 hbase-site.xml 必须将身份验证设置为 kerberos(有关如何执行此操作的详细信息,请参阅HBase文档)。

Flume提供了两个序列化器。第一个序列化器是 SimpleHbaseEventSerializer ( org.apache.flume.sink.hbase.SimpleHbaseEventSerializer ) ,它把 Event body 原样写入到HBase,并可选增加HBase列,这个实现主要就是提供个例子。 第二个序列化器是 RegexHbaseEventSerializer ( org.apache.flume.sink.hbase.RegexHbaseEventSerializer ) ,它把 Event body 按照给定的正则进行分割然后写入到不同的列中。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: asynchbase
table 要写入的 Hbase 表名
zookeeperQuorum Zookeeper 节点(host:port格式,多个用逗号分隔),hbase-site.xml 中属性 hbase.zookeeper.quorum 的值
znodeParent /hbase ZooKeeper 中 HBase 的 Root ZNode 路径,hbase-site.xml 中 zookeeper.znode.parent 的值。
columnFamily 要写入的 Hbase 列族
batchSize 100 每个事务写入的 Event 数量
coalesceIncrements false 每次提交时,Sink是否合并多个 increment 到一个cell。如果有限数量的 cell 有多个 increment ,这样可能会提供更好的性能。
timeout 60000 Sink为事务中所有 Event 等待来自 HBase 响应的超时时间(毫秒)
serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer 序列化器
serializer.* 序列化器的一些属性

如果配置文件中没有提供这些参数配置,Sink就会从 classpath 中第一个 hbase-site.xml 中读取这些需要的配置信息。

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1

AsyncHBaseSink

这个Sink使用异步模型将数据写入 HBase。这个Sink使用 AsyncHbaseEventSerializer 这个序列化器来转换 Event 为 HBase 的 put 和 increment,然后写入到 HBase。 此Sink使用 Asynchbase API 来写入 HBase。该Sink提供与 HBase 相同的一致性保证,HBase 是当前行的原子性。 如果 Hbase 无法写入某些 Event,则Sink将重试该事务中的所有 Event。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: asynchbase
table 要写入的 Hbase 表名
zookeeperQuorum Zookeeper 节点(host:port格式,多个用逗号分隔),hbase-site.xml 中属性 hbase.zookeeper.quorum 的值
znodeParent /hbase ZooKeeper 中 HBase 的 Root ZNode 路径,hbase-site.xml 中 zookeeper.znode.parent 的值。
columnFamily 要写入的 Hbase 列族
batchSize 100 每个事务写入的 Event 数量
coalesceIncrements false 每次提交时,Sink是否合并多个 increment 到一个cell。如果有限数量的 cell 有多个 increment ,这样可能会提供更好的性能。
timeout 60000 Sink为事务中所有 Event 等待来自 HBase 响应的超时时间(毫秒)
serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer 序列化器
serializer.* 序列化器的一些属性

如果配置文件中没有提供这些参数配置,Sink就会从 classpath 中第一个 hbase-site.xml 中读取这些需要的配置信息

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1

MorphlineSolrSink

此Sink从 Flume的 Event 中提取数据,对其进行转换,并将其近乎实时地加载到 Apache Solr 服务器中,后者又向最终用户或搜索应用程序提供查询服务。

此Sink非常适合将原始数据流式传输到 HDFS(通过HDFS Sink)并同时提取、转换并将相同数据加载到 Solr(通过MorphlineSolrSink)的使用场景。特别是,此Sink可以处理来自不同数据源的任意异构原始数据,并将其转换为对搜索应用程序有用的数据模型。

ETL 功能可使用 morphline 的配置文件进行自定义,该文件定义了一系列转换命令,用于将 Event 从一个命令传递到另一个命令。

Morphlines 可以看作是 Unix 管道的演变,其中数据模型被推广为使用通用记录流,包括任意二进制有效载荷。 morphline 命令有点像 Flume 拦截器。 Morphlines 可以嵌入到 Flume 等 Hadoop 组件中。

用于解析和转换一组标准数据格式(如日志文件,Avro,CSV,文本,HTML,XML,PDF,Word,Excel等)的命令是开箱即用的,还有其他自定义命令和解析器用于其他数据格式可以作为插件添加到 morphline。可以索引任何类型的数据格式, 并且可以生成任何类型的 Solr 模式的任何 Solr 文档,也可以注册和执行任何自定义 ETL 逻辑。

Morphlines 操纵连续的数据流。数据模型可以描述如下:数据记录是一组命名字段,其中每个字段具有一个或多个值的有序列表。值可以是任何Java对象。也就是说,数据记录本质上是一个哈希表, 其中每个哈希表条目包含一个 String 键和一个 Java 对象列表作为值。 (该实现使用 Guava 的 ArrayListMultimap,它是一个 ListMultimap)。请注意,字段可以具有多个值,并且任何两个记录都不需要使用公共字段名称。

此Sink将 Flume Event 的 body 填充到 morphline 记录的 _attachment_body 字段中,并将 Flume Event 的 header 复制到同名的记录字段中。然后命令可以对此数据执行操作。

支持路由到 SolrCloud 集群以提高可伸缩性。索引负载可以分布在大量 MorphlineSolrSinks 上,以提高可伸缩性。可以跨多个 MorphlineSolrSinks 复制索引负载以实现高可用性, 例如使用 Flume的负载均衡特性。 MorphlineInterceptor 还可以帮助实现到多个 Solr 集合的动态路由(例如,用于多租户)。

老规矩,morphline 和 solr 的 jar 包需要放在 Flume 的 lib 目录中。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
morphlineFile morphline 配置文件的相对或者绝对路径,例如:/etc/flume-ng/conf/morphline.conf
morphlineId null 如果 morphline 文件里配置了多个 morphline 实例,可以用这个参数来标识 morphline 作为一个可选名字
batchSize 1000 单个事务操作的最大 Event 数量
batchDurationMillis 1000 事务的最大超时时间(毫秒)。达到这个时间或者达到 batchSize 都会触发提交事物。
handlerClass org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl 实现了 org.apache.flume.sink.solr.morphline.MorphlineHandler 接口的实现类的全限定类名
isProductionMode false 重要的任务和大规模的生产系统应该启用这个模式,这些系统需要在发生不可恢复的异常时不停机来获取信息。未知的 Solr 架构字段相关的错误、损坏或格式错误的解析器输入数据、解析器错误等都会产生不可恢复的异常。
recoverableExceptionClasses org.apache.solr.client.solrj.SolrServerException 以逗号分隔的可恢复异常列表,这些异常往往是暂时的,在这种情况下,可以进行相应地重试。 比如:网络连接错误,超时等。当 isProductionMode 标志设置为 true 时,使用此参数配置的可恢复异常将不会被忽略,并且会进行重试。
isIgnoringRecoverableExceptions false 如果不可恢复的异常被意外错误分类为可恢复,则应启用这个标志。 这使得Sink能够取得进展并避免永远重试一个 Event。

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000

ElasticSearchSink

这个Sink把数据写入到 elasticsearch 集群,就像 logstash 一样把 Event 写入以便 Kibana 图形接口可以查询并展示。

必须将环境所需的 elasticsearch 和 lucene-core jar 放在 Flume 安装的 lib 目录中。 Elasticsearch 要求客户端 JAR 的主要版本与服务器的主要版本匹配,并且两者都运行相同的 JVM 次要版本。如果版本不正确,会报 SerializationExceptions 异常。 要选择所需的版本,请首先确定 elasticsearch 的版本以及目标群集正在运行的 JVM 版本。然后选择与主要版本匹配的 elasticsearch 客户端库。 0.19.x客户端可以与0.19.x群集通信; 0.20.x可以与0.20.x对话,0.90.x可以与0.90.x对话。确定 elasticsearch 版本后, 读取 pom.xml 文件以确定要使用的正确 lucene-core JAR 版本。运行 ElasticSearchSink 的 Flume 实例程序也应该与目标集群运行的次要版本的 JVM 相匹配。

所有的 Event 每天会被写入到新的索引,名称是-yyyy-MM-dd的格式,其中可以自定义配置。Sink将在午夜 UTC 开始写入新索引。

默认情况下,Event 会被 ElasticSearchLogStashEventSerializer 序列化器进行序列化。可以通过 serializer 参数配置来更改序和自定义列化器。这个参数可以配置 org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer 或 org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory 接口的实现类,ElasticSearchEventSerializer 现在已经不建议使用了,推荐使用更强大的后者。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: org.apache.flume.sink.elasticsearch.ElasticSearchSink
hostNames 逗号分隔的hostname:port列表,如果端口不存在,则使用默认的9300端口
indexName flume 指定索引名称的前缀。比如:默认是“flume”,使用的索引名称就是 flume-yyyy-MM-dd 这种格式。也支持 header 属性替换的方式,比如%{lyf}就会用 Event header 中的属性名为 lyf 的值。
indexType logs 文档的索引类型。默认为 log,也支持 header 属性替换的方式,比如%{lyf}就会用 Event header 中的属性名为 lyf 的值。
clusterName elasticsearch 要连接的 ElasticSearch 集群名称
batchSize 100 每个事务写入的 Event 数量
ttl TTL 以天为单位,设置了会导致过期文档自动删除,如果没有设置,文档将永远不会被自动删除。 TTL 仅以较早的整数形式被接受, 例如 a1.sinks.k1.ttl = 5并且还具有限定符 ms (毫秒), s (秒), m (分钟), h (小时), d (天)和 w (星期)。 示例a1.sinks.k1.ttl = 5d 表示将TTL设置为5天。 点击 http://www.elasticsearch.org/guide/reference/mapping/ttl-field/ 了解更多信息。
serializer org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer 序列化器必须实现 ElasticSearchEventSerializer 或 ElasticSearchIndexRequestBuilderFactory 接口,推荐使用后者。
serializer.* 序列化器的一些属性配置

注解:使用 header 替换可以方便地通过 header 中的值来动态地决定存储 Event 时要时候用的 indexName 和 indexType。使用此功能时应谨慎,因为 Event 提交者可以控制 indexName 和 indexType。 此外,如果使用 elasticsearch REST 客户端,则 Event 提交者可以控制所使用的URL路径。

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

Kite Dataset Sink

这是一个将 Event 写入到 Kite 的实验性的Sink。这个Sink会反序列化每一个 Event body,并将结果存储到 Kite Dataset。它通过按URI加载数据集来确定目标数据集。

唯一支持的序列化方式是 avro,并且必须在在 Event header 中传递数据的结构,使用 flume.avro.schema.literal 加 json 格式的结构信息表示,或者用 flume.avro.schema.url 加一个能够获取到结构信息的URL(比如hdfs:/…这种)。 这与使用deserializer.schemaType = LITERAL的 Log4jAppender 和 Spooling Directory Source 的 avro 反序列化器兼容。

注解:1、flume.avro.schema.hash 这个 header 不支持; 2、在某些情况下,在超过滚动间隔后会略微发生文件滚动,但是这个延迟不会超过5秒钟,大多数情况下这个延迟是可以忽略的。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: org.apache.flume.sink.kite.DatasetSink
kite.dataset.uri 要打开的数据集的 URI
kite.repo.uri 要打开的存储库的 URI( 不建议使用 ,请改用 kite.dataset.uri )
kite.dataset.namespace 将写入记录的数据集命名空间( 不建议使用 ,请改用 kite.dataset.uri )
kite.dataset.name 将写入记录的数据集名称( 不建议使用 ,请改用 kite.dataset.uri )
kite.batchSize 100 每批中要处理的记录数
kite.rollInterval 30 释放数据文件之前的最长等待时间(秒)
kite.flushable.commitOnBatch true 如果为 true,Flume 在每次批量操作 kite.batchSize 数据后提交事务并刷新 writer。 此设置仅适用于可刷新数据集。 如果为 true,则可以将具有提交数据的临时文件保留在数据集目录中。 需要手动恢复这些文件,以使数据对 DatasetReaders 可见。
kite.syncable.syncOnBatch true Sink在提交事务时是否也将同步数据。 此设置仅适用于可同步数据集。 同步操作能保证数据将写入远程系统上的可靠存储上,同时保证数据已经离开Flume客户端的缓冲区(也就是 channel)。 当 thekite.flushable.commitOnBatch 属性设置为 false 时,此属性也必须设置为 false。
kite.entityParser avro 将 Flume Event 转换为 kite 实体的转换器。取值可以是 avro 或者 EntityParser.Builder 接口实现类的全限定类名
kite.failurePolicy retry 发生不可恢复的异常时采取的策略。例如 Event header 中缺少结构信息。默认采取重试的策略。 其他可选的值有: save ,这样会把 Event 原始内容写入到 kite.error.dataset.uri 这个数据集。还可以填自定义的处理策略类的全限定类名(需实现 FailurePolicy.Builder 接口)
kite.error.dataset.uri 保存失败的 Event 存储的数据集。当上面的参数 kite.failurePolicy 设置为 save 时,此参数必须进行配置。
auth.kerberosPrincipal 用于 HDFS 安全身份验证的 Kerberos 用户主体
auth.kerberosKeytab Kerberos 安全验证主体的 keytab 本地文件系统路径
auth.proxyUser HDFS 操作的用户,如果与 kerberos 主体不同的话

Kafka Sink

这个 Sink 可以把数据发送到 Kafka topic上。目的就是将 Flume 与 Kafka 集成,以便基于拉的处理系统可以处理来自各种 Flume Source 的数据。目前支持 Kafka 0.9.x 发行版。

Flume1.8 不再支持 Kafka 0.9.x(不包括0.9.x)以前的版本。

属性 默认值 解释
type 组件类型,这个是: org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers Kafka Sink 使用的 Kafka 集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用(HA)支持。格式为 hostname:port,多个用逗号分隔
kafka.topic default-flume-topic 用于发布消息的 Kafka topic 名称 。如果这个参数配置了值,消息就会被发布到这个 topic 上。如果Event header中包含叫做“topic”的属性, Event 就会被发布到 header 中指定的 topic 上,而不会发布到 kafka.topic 指定的 topic 上。支持任意的 header 属性动态替换, 比如%{lyf}就会被 Event header 中叫做“lyf”的属性值替换(如果使用了这种动态替换,建议将 Kafka 的 auto.create.topics.enable 属性设置为 true )。
flumeBatchSize 100 一批中要处理的消息数。设置较大的值可以提高吞吐量,但是会增加延迟。
kafka.producer.acks 1 在考虑成功写入之前,要有多少个副本必须确认消息。可选值, 0 :(从不等待确认); 1 :只等待leader确认; -1 :等待所有副本确认。 设置为-1可以避免某些情况 leader 实例失败的情况下丢失数据。
useFlumeEventFormat false 默认情况下,会直接将 Event body 的字节数组作为消息内容直接发送到 Kafka topic 。如果设置为true,会以 Flume Avro 二进制格式进行读取。 与 Kafka Source 上的同名参数或者 Kafka channel 的 parseAsFlumeEvent 参数相关联,这样以对象的形式处理能使生成端发送过来的 Event header 信息得以保留。
defaultPartitionId 指定所有 Event 将要发送到的 Kafka 分区ID,除非被 partitionIdHeader 参数的配置覆盖。 默认情况下,如果没有设置此参数,Event 会被 Kafka 生产者的分发程序分发,包括 key(如果指定了的话),或者被 kafka.partitioner.class 指定的分发程序来分发
partitionIdHeader 设置后,Sink将使用 Event header 中使用此属性的值命名的字段的值,并将消息发送到 topic 的指定分区。 如果该值表示无效分区,则将抛出 EventDeliveryException。 如果存在标头值,则此设置将覆盖 defaultPartitionId 。假如这个参数设置为“lyf”,这个 Sink 就会读取 Event header 中的 lyf 属性的值,用该值作为分区ID
allowTopicOverride true 如果设置为 true,会读取 Event header 中的名为 topicHeader 的的属性值,用它作为目标 topic。
topicHeader topic 与上面的 allowTopicOverride 一起使用,allowTopicOverride 会用当前参数配置的名字从 Event header 获取该属性的值,来作为目标 topic 名称
kafka.producer.security.protocol PLAINTEXT 设置使用哪种安全协议写入 Kafka。可选值:SASL_PLAINTEXT 、 SASL_SSL 和 SSL, 有关安全设置的其他信息,请参见下文。
more producer security props 如果使用了 SASL_PLAINTEXT 、 SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为生产者增加安全相关的参数配置
Other Kafka Producer Properties 其他一些 Kafka 生产者配置参数。任何 Kafka 支持的生产者参数都可以使用。唯一的要求是使用“kafka.producer.”这个前缀来配置参数,比如:kafka.producer.linger.ms

注解:Kafka Sink使用 Event header 中的 topic 和其他关键属性将 Event 发送到 Kafka。 如果 header 中存在 topic,则会将Event发送到该特定 topic,从而覆盖为Sink配置的 topic。 如果 header 中存在指定分区相关的参数,则Kafka将使用相关参数发送到指定分区。 header中特定参数相同的 Event 将被发送到同一分区。 如果为空,则将 Event 会被发送到随机分区。 Kafka Sink 还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值,不建议修改这些参数。

配置范例

下面给出 Kafka Sink 的配置示例。Kafka 生产者的属性都是以 kafka.producer 为前缀。Kafka 生产者的属性不限于下面示例的几个。此外,可以在此处包含您的自定义属性,并通过作为方法参数传入的Flume Context对象在预处理器中访问它们。

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

HTTP Sink

HTTP Sink 从 channel 中获取 Event,然后再向远程 HTTP 接口 POST 发送请求,Event 内容作为 POST 的正文发送。

错误处理取决于目标服务器返回的HTTP响应代码。 Sink的 退避 和 就绪 状态是可配置的,事务提交/回滚结果以及Event是否发送成功在内部指标计数器中也是可配置的。

状态代码不可读的服务器返回的任何格式错误的 HTTP 响应都将产生 退避 信号,并且不会从 channel 中消耗该Event。

属性 默认值 解释
channel 与 Sink 绑定的 channel
type 组件类型,这个是: http.
endpoint 将要 POST 提交数据接口的绝对地址
connectTimeout 5000 连接超时(毫秒)
requestTimeout 5000 一次请求操作的最大超时时间(毫秒)
contentTypeHeader text/plain HTTP请求的Content-Type请求头
acceptHeader text/plain HTTP请求的Accept 请求头
defaultBackoff true 是否默认启用退避机制,如果配置的 backoff.CODE 没有匹配到某个 http 状态码,默认就会使用这个参数值来决定是否退避
defaultRollback true 是否默认启用回滚机制,如果配置的 rollback.CODE 没有匹配到某个 http 状态码,默认会使用这个参数值来决定是否回滚
defaultIncrementMetrics false 是否默认进行统计计数,如果配置的 incrementMetrics.CODE 没有匹配到某个 http 状态码,默认会使用这个参数值来决定是否参与计数
backoff.CODE 配置某个 http 状态码是否启用退避机制(支持200这种精确匹配和2XX一组状态码匹配模式)
rollback.CODE 配置某个 http 状态码是否启用回滚机制(支持200这种精确匹配和2XX一组状态码匹配模式)
incrementMetrics.CODE 配置某个 http 状态码是否参与计数(支持200这种精确匹配和2XX一组状态码匹配模式)

注意:backoff, rollback 和 incrementMetrics 的 code 配置通常都是用具体的HTTP状态码,如果2xx和200这两种配置同时存在,则200的状态码会被精确匹配,其余200~299(除了200以外)之间的状态码会被2xx匹配。

提示:Flume里面好多组件都有这个退避机制,其实就是下一级目标没有按照预期执行的时候,会执行一个延迟操作。比如向HTTP接口提交数据发生了错误触发了退避机制生效,系统等待30秒再执行后续的提交操作, 如果再次发生错误则等待的时间会翻倍,直到达到系统设置的最大等待上限。通常在重试成功后退避就会被重置,下次遇到错误重新开始计算等待的时间。

任何空的或者为 null 的 Event 不会被提交到HTTP接口上。

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

Custom Sink

你可以自己写一个 Sink 接口的实现类。启动 Flume 时候必须把你自定义 Sink 所依赖的其他类配置进 classpath 内。custom source 在写配置文件的 type 时候填你的全限定类名。

属性 默认值 解释
channel 与 Sink 绑定的 channe
type 组件类型,这个填你自定义class的全限定类名

配置范例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1

Flume Channels

channel 是在 Agent 上暂存 Event 的缓冲池。 Event由source添加,由sink消费后删除。

Memory Channel

内存 channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。

属性 默认值 解释
type 组件类型,这个是: memory
capacity 100 内存中存储 Event 的最大数
transactionCapacity 100 source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大)
keep-alive 3 添加或删除一个 Event 的超时时间(秒)
byteCapacityBufferPercentage 20 指定 Event header 所占空间大小与 channel 中所有 Event 的总大小之间的百分比
byteCapacity Channel 中最大允许存储所有 Event 的总字节数(bytes)。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算 Event 的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个 Agent 里面有多个内存 channel 的时候, 而且碰巧这些 channel 存储相同的物理 Event(例如:这些 channel 通过复制机制( 复制选择器 )接收同一个 source 中的 Event), 这时候这些 Event 占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。

提示:举2个例子来帮助理解最后两个参数吧:
两个例子都有共同的前提,假设JVM最大的可用内存是100M(或者说JVM启动时指定了-Xmx=100m)。
例子1: byteCapacityBufferPercentage 设置为20, byteCapacity 设置为52428800(就是50M),此时内存中所有 Event body 的总大小就被限制为50M *(1-20%)=40M,内存channel可用内存是50M。
例子2: byteCapacityBufferPercentage 设置为10, byteCapacity 不设置,此时内存中所有 Event body 的总大小就被限制为100M * 80% *(1-10%)=72M,内存channel可用内存是80M。

配置范例

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

JDBC Channel

JDBC Channel会通过一个数据库把Event持久化存储。目前只支持Derby。这是一个可靠的channel,非常适合那些注重可恢复性的流使用。

属性 默认值 解释
type 组件类型,这个是: jdbc
db.type DERBY 使用的数据库类型,目前只支持 DERBY.
driver.class org.apache.derby.jdbc.EmbeddedDriver 所使用数据库的 JDBC 驱动类
driver.url (constructed from other properties) JDBC 连接的 URL
db.username “sa” 连接数据库使用的用户名
db.password 连接数据库使用的密码
connection.properties.file JDBC连接属性的配置文件
create.schema true 如果设置为 true ,没有数据表的时候会自动创建
create.index true 是否创建索引来加快查询速度
create.foreignkey true 是否创建外键
transaction.isolation “READ_COMMITTED” 面向连接的隔离级别,可选值: READ_UNCOMMITTED , READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ
maximum.connections 10 数据库的最大连接数
maximum.capacity 0 (unlimited) channel 中存储 Event 的最大数
sysprop.* 针对不同DB的特定属性
sysprop.user.home Derby 的存储主路径

配置范例

a1.channels = c1
a1.channels.c1.type = jdbc

Kafka Channel

将 Event 存储到Kafka集群(必须单独安装)。Kafka提供了高可用性和复制机制,因此如果Flume实例或者 Kafka 的实例挂掉,能保证Event数据随时可用。 Kafka channel可以用于多种场景:

  • 与source和sink一起:给所有Event提供一个可靠、高可用的channel。

  • 与source、interceptor一起,但是没有sink:可以把所有Event写入到Kafka的topic中,来给其他的应用使用。

  • 与sink一起,但是没有source:提供了一种低延迟、容错高的方式将Event发送的各种Sink上,比如:HDFS、HBase、Solr。

由于依赖于该版本附带的Kafka客户端,Flume1.8需要Kafka 0.9或更高版本。 与之前的Flume版本相比,channel的配置发生了一些变化。

配置参数组织如下:

  • 通常与channel相关的配置值应用于channel配置级别,比如:a1.channel.k1.type =

  • 与Kafka相关的配置值或Channel运行的以“kafka.”为前缀(这与CommonClient Configs类似),例如:a1.channels.k1.kafka.topic 和 a1.channels.k1.kafka.bootstrap.servers。 这与hdfs sink的运行方式没有什么不同

  • 特定于生产者/消费者的属性以kafka.producer或kafka.consumer为前缀

  • 可能的话,使用Kafka的参数名称,例如:bootstrap.servers 和 acks

当前Flume版本是向下兼容的,但是第二个表中列出了一些不推荐使用的属性,并且当它们出现在配置文件中时,会在启动时打印警告日志。

属性 默认值 解释
type 组件类型,这个是: org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers channel使用的Kafka集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用支持。格式为hostname:port,多个用逗号分隔
kafka.topic flume-channel
kafka.consumer.group.id flume channel 用于向 Kafka 注册的消费者群组ID。 多个 channel 必须使用相同的 topic 和 group,以确保当一个Flume实例发生故障时,另一个实例可以获取数据。请注意,使用相同组ID的非channel消费者可能会导致数据丢失。
parseAsFlumeEvent true 是否以avro基准的 Flume Event 格式在channel中存储Event。 如果是Flume的Source向channel的topic写入Event则应设置为true; 如果其他生产者也在向channel的topic写入Event则应设置为false。 通过使用 flume-ng-sdk 中的 org.apache.flume.source.avro.AvroFlumeEvent 可以在Kafka之外解析出Flume source的信息。
migrateZookeeperOffsets true 如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过 kafka.consumer.auto.offset.reset 配置如何处理偏移量。
pollTimeout 500 消费者调用poll()方法时的超时时间(毫秒) https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
defaultPartitionId 指定channel中所有Event将要存储的分区ID,除非被 partitionIdHeader 参数的配置覆盖。 默认情况下,如果没有设置此参数,Event 会被Kafka生产者的分发程序分发,包括key(如果指定了的话),或者被 kafka.partitioner.class 指定的分发程序来分发。
partitionIdHeader 从Event header中读取要存储Event到目标Kafka的分区的属性名。 如果设置了,生产者会从Event header中获取次属性的值,并将消息发送到topic的指定分区。 如果该值表示的分区无效,则Event不会存入channel。如果该值有效,则会覆盖 defaultPartitionId 配置的分区ID。
kafka.consumer.auto.offset.reset latest 当Kafka中没有初始偏移量或者当前偏移量已经不在当前服务器上时(比如数据已经被删除)该怎么办。 earliest:自动重置偏移量到最早的位置; latest:自动重置偏移量到最新的位置; none:如果没有为消费者的组找到任何先前的偏移量,则向消费者抛出异常; else:向消费者抛出异常。
kafka.producer.security.protocol PLAINTEXT 设置使用哪种安全协议写入Kafka。可选值: SASL_PLAINTEXT 、 SASL_SSL 和 SSL 有关安全设置的其他信息,请参见下文。
kafka.consumer.security.protocol PLAINTEXT 与上面的相同,只不过是用于消费者。
more producer/consumer security props 如果使用了 SASL_PLAINTEXT 、 SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为生产者、消费者增加安全相关的参数配置

注解:由于channel是负载均衡的,第一次启动时可能会有重复的Event出现。

配置范例

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

File Channel

属性 默认值 解释
type 组件类型,这个是: file.
checkpointDir ~/.flume/file-channel/checkpoint 记录检查点的文件的存储目录
useDualCheckpoints false 是否备份检查点文件。如果设置为 true , backupCheckpointDir 参数必须设置。
backupCheckpointDir 备份检查点的目录。 此目录不能与数据目录或检查点目录 checkpointDir 相同
dataDirs ~/.flume/file-channel/data 逗号分隔的目录列表,用于存储日志文件。 在不同物理磁盘上使用多个目录可以提高文件channel的性能
transactionCapacity 10000 channel支持的单个事务最大容量
checkpointInterval 30000 检查点的时间间隔(毫秒)
maxFileSize 2146435071 单个日志文件的最大字节数。这个默认值约等于2047MB
minimumRequiredSpace 524288000 最小空闲空间的字节数。为了避免数据损坏,当空闲空间低于这个值的时候,文件channel将拒绝一切存取请求
capacity 1000000 channel的最大容量
keep-alive 3 存入Event的最大等待时间(秒)
use-log-replay-v1 false (专家)是否使用老的回放逻辑 (Flume默认是使用v2版本的回放方法,但是如果v2版本不能正常工作可以考虑通过这个参数改为使用v1版本,v1版本是从Flume1.2开始启用的,回放是指系统关闭或者崩溃前执行的校验检查点文件和文件channel记录是否一致程序)
use-fast-replay false (专家)是否开启快速回放(不适用队列)
checkpointOnClose true channel关闭时是否创建检查点文件。开启次功能可以避免回放提高下次文件channel启动的速度
encryption.activeKey 加密数据所使用的key名称
encryption.cipherProvider 加密类型,目前只支持:AESCTRNOPADDING
encryption.keyProvider key类型,目前只支持:JCEKSFILE
encryption.keyProvider.keyStoreFile keystore 文件路径
encrpytion.keyProvider.keyStorePasswordFile keystore 密码文件路径
encryption.keyProvider.keys 所有key的列表,包含所有使用过的加密key名称
encyption.keyProvider.keys.*.passwordFile 可选的秘钥密码文件路径

注解:默认情况下,文件channel使用默认的用户主目录内的检查点和数据目录的路径(说的就是上面的checkpointDir参数的默认值)。 如果一个Agent中有多个活动的文件channel实例,而且都是用了默认的检查点文件, 则只有一个实例可以锁定目录并导致其他channel初始化失败。 因此,这时候有必要为所有已配置的channel显式配置不同的检查点文件目录,最好是在不同的磁盘上。 此外,由于文件channel将在每次提交后会同步到磁盘,因此将其与将Event一起批处理的sink/source耦合可能是必要的,以便在多个磁盘不可用于检查点和数据目录时提供良好的性能。

配置范例

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Spillable Memory Channel

这个channel会将Event存储在内存队列和磁盘上。 内存队列充当主存储,内存装满之后会存到磁盘。 磁盘存储使用嵌入的文件channel进行管理。 当内存队列已满时,其他传入Event将存储在文件channel中。 这个channel非常适用于需要高吞吐量存储器channel的流,但同时需要更大容量的文件channel,以便更好地容忍间歇性目的地侧(sink)中断或消费速率降低。 在这种异常情况下,吞吐量将大致降低到文件channel速度。 如果Agent程序崩溃或重新启动,只有存储在磁盘上的Event能恢复。 这个channel目前是实验性的,不建议用于生产环境

提示:这个channel的机制十分像Windows系统里面的「虚拟内存」。兼顾了内存channel的高吞吐量和文件channel的可靠、大容量优势。

属性 默认值 解释
type 组件类型,这个是: SPILLABLEMEMORY
memoryCapacity 10000 内存队列存储的Event最大数量。如果设置为0,则会禁用内存队列。
overflowCapacity 100000000 磁盘(比如文件channel)上存储Event的最大数量,如果设置为0,则会禁用磁盘存储
overflowTimeout 3 当内存占满时启用磁盘存储之前等待的最大秒数
byteCapacityBufferPercentage 20 指定Event header所占空间大小与channel中所有Event的总大小之间的百分比
byteCapacity 内存中最大允许存储Event的总字节数。 默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算Event的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。 注意,当你在一个Agent里面有多个内存channel的时候,而且碰巧这些channel存储相同的物理Event(例如:这些channel通过复制机制(复制选择器)接收同一个source中的Event), 这时候这些Event占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。
avgEventSize 500 估计进入channel的Event的平均大小(单位:字节)
see file channel 可以使用除“keep-alive”和“capacity”之外的任何文件channel属性。 文件channel的“keep-alive”由Spillable Memory Channel管理, 而channel容量则是通过使用 overflowCapacity 来设置。

如果达到 memoryCapacity 或 byteCapacity 限制,则内存队列被视为已满。

配置范例

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

禁用内存channel,只使用磁盘存储(就像文件channel那样)的例子:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 0
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

禁用掉磁盘存储,只使用内存channel的例子:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 100000
a1.channels.c1.overflowCapacity = 0

Pseudo Transaction Channel

警告 这个伪事务 channel 仅用于单元测试目的,不适用于生产用途。

属性 默认值 解释
type 组件类型,这个是: org.apache.flume.channel.PseudoTxnMemoryChannel
capacity 50 channel中存储的最大Event数
keep-alive 3 添加或删除Event的超时时间(秒)

Custom Channel

可以自己实现Channel接口来自定义一个channel,启动时这个自定义channel类以及依赖必须都放在flume Agent的classpath中。

属性 默认值 解释
type 你自己实现的channel类的全限定类名,比如:org.example.myCustomChannel

配置范例

a1.channels = c1
a1.channels.c1.type = org.example.MyChannel

大数据——Flume组件Source、Channel和Sink具体使用相关推荐

  1. 大数据各组件安装(数据中台搭建)

    文章目录 一.基础环境配置(三台机器都操作) 1.修改主机名: 2.关闭防火墙: 3.关闭Selinux: 4.文件描述符配置: 5.关闭 THP: 6.自定义 JDK 安装: 6.1 删除默认ope ...

  2. Flume多source,多sink组合框架搭建

    Flume多source,多sink组合框架搭建 文章目录 Flume多source,多sink组合框架搭建 一.实验目的 二.实验原理 三.实验环境 四.实验内容 五.实验步骤 总结 一.实验目的 ...

  3. 大数据flume日志采集系统详解

    一.flume介绍 flume 是一个cloudera提供的 高可用高可靠,分布式的海量日志收集聚合传输系统.Flume支持日志系统中定制各类数据发送方,用于收集数据.同时flume提供对数据进行简单 ...

  4. 大数据———Flume与Kafka整合

    环境配置 名称 版本 下载地址 Centos 7.0 64x 百度 Flume 1.8.0 http://flume.apache.org/download.html Kafka 2.11 http: ...

  5. 如何基于java代理对大数据缓存组件返回的数据进行脱敏和阻断

    如何基于java代理对大数据缓存组件返回的数据进行脱敏和阻断 背景 架构拓扑图 实现方式对比 UDF方案 优点: 缺点: 改写返回结果方案 优点: 缺点: 说明 实现 默认处理方式 redis报文解析 ...

  6. 【大数据入门笔记系列】第一节 大数据常用组件

    [大数据入门笔记系列]第一节 大数据常用组件 大数据释义 大数据组件 跳转 大数据释义 近些年来,坊间一直流传着这样的言论:"大数据时代,人人都在裸奔".对于外行人来说,对于&qu ...

  7. 【Hadoop大数据平台组件搭建系列(一)】——Zookeeper组件配置

    简介 本篇介绍Hadoop大数据平台组件中的Zookeeper组件的搭建 使用软件版本信息 zookeeper-3.4.14.tar.gz Zookeeper安装 解压Zookeeper安装包至目标目 ...

  8. 大数据常用组件官网地址

    大数据常用组件官网地址 数据采集传输 Flume 官网:https://flume.apache.org/ 下载地址:https://flume.apache.org/download.html Ka ...

  9. spark 动态预加载数据_热门大数据引擎/组件概要

    热门大数据引擎/组件概要 TeraData 老牌数仓公司,已经上市十几年,数仓领导者地位(from Gartner),目前在向云端发力.主要提供一体机,MPP架构,运行稳定,之前工行用的是TD的系统, ...

最新文章

  1. eclipse java参数类型_JAVA第二天笔记--eclipse使用/数据类型转换
  2. file协议访问linux,Mozilla Firefox for Android 'file'协议未授权访问漏洞(CVE-2014-1501)
  3. Nginx大规模并发原理
  4. 设计模式之代理:手动实现动态代理,揭秘原理实现
  5. Java学习笔记(二)Java基本语法
  6. Python -- 大小写转换
  7. 【体系结构】Oracle的各种文件及其重要性
  8. 2022最新酷盒iApp源码V7.8版+内置超多功能
  9. 批量修改字幕文件中的时间,c语言实现
  10. 多智能体强化学习入门Qmix
  11. java中将zip文件解压到指定目录下
  12. 2019年平安夜,祝福大总结
  13. IoT产品安全基线(一)硬件安全
  14. 江南大学计算机拟录取名单,江南大学2018年法律硕士拟录取名单公示
  15. 未成年人勿进 谨以献给1980~1990出生的人(二)
  16. IGMP协议(IGMPv1、IGMPv2、IGMPv3)
  17. HDOJ 最小长方形 1859
  18. 怎样利用python写游戏辅助_怎样才能写游戏辅助?
  19. 并发编程合集(1)上下文切换详解、死锁及解决方案详解
  20. 小白 uBuntu20.04 2 安装TP Link TL-WDN5200 无线网卡驱动 亲测有效

热门文章

  1. 制药机械设备远程监控及故障预警维护管理系统
  2. web 自动化测试(入门篇)
  3. java 去除引号_java如何用replaceAll去除字符串中的引号
  4. python一键扣图_Python实例:一键批量抠图
  5. 32位eclipse使用64位jdk问题
  6. 怎样查看Eclipse是32位还是64位
  7. 多模块初始化解决方案
  8. 安装python发生的报错
  9. 【Linux/Unix】Linux中的seq命令
  10. 将web网站转为App