Flume常用组件详解之Source
Flume常用组件详解:Source
Flume支持众多的source、sink、拦截器等组件具体实现,详细手册可参考官方文档http://flume.apache.org/FlumeUserGuide.html
一、netcat source
1、工作机制
启动一个socket服务,监听一个端口;
将端口上收到的数据,转成event写入channel;
2、配置文件
a1.sources = s1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 44444
a1.sources.s1.channels = c1
二、exec source
1、工作机制
启动一个用户所指定的linux shell命令;
采集这个linux shell命令的标准输出,作为收集到的数据,转为event写入channel;
2、参数详解
channels |
– |
本source要发往的channel |
type |
– |
本source的类别名称:exec |
command |
– |
本source所要运行的linux命令,比如: tail -F /path/file |
shell |
– |
指定运行上述命令所用shell |
restartThrottle |
10000 |
命令die了以后,重启的时间间隔 |
restart |
false |
命令die了以后,是否要重启 |
logStdErr |
false |
是否收集命令的错误输出stderr |
batchSize |
20 |
提交的event批次大小 |
batchTimeout |
3000 |
发往下游没完成前,等待的时间 |
selector.type |
replicating |
指定channel选择器:replicating or multiplexing |
selector.* |
选择器的具体参数 |
|
interceptors |
– |
指定拦截器 |
interceptors.* |
指定的拦截器的具体参数 |
3、配置文件
a1.sources = s1
a1.sources.s1.channels = c1
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /root/weblog/access.log
a1.sources.s1.batchSize = 100
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 100
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
4、启动测试
1.准备一个日志文件
2.写一个脚本模拟往日志文件中持续写入数据
for i in {1..10000}; do echo ${i}--------------------------- >> access.log ; sleep 0.5; done
3.创建一个flume自定义配置文件
4.启动flume采集
注意:通过人为破坏测试,发现这个exec source,不会记录宕机前所采集数据的偏移量位置,重启后可能会造成数据丢失!
三、spooldir source
1、工作机制
监视一个指定的文件夹,如果文件夹下有没采集过的新文件,则将这些新文件中的数据采集,并转成event写入channel;
注意:spooling目录中的文件必须是不可变的,而且是不能重名的!否则,source会loudly fail!
2、参数详解
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be spooldir. |
spoolDir |
– |
The directory from which to read files from. |
fileSuffix |
.COMPLETED |
采集完成的文件,添加什么后缀名 |
deletePolicy |
never |
是否删除采完的文件: never or immediate |
fileHeader |
false |
是否将所采集文件的绝对路径添加到header中 |
fileHeaderKey |
file |
上述header的key名称 |
basenameHeader |
false |
是否将文件名添加到header |
basenameHeaderKey |
basename |
上述header的key名称 |
includePattern |
^.*$ |
指定需要采集的文件名的正则表达式 |
ignorePattern |
^$ |
指定要排除的文件名的正则表达式 如果一个文件名即符合includePattern又匹配ignorePattern,则该文件不采 |
trackerDir |
.flumespool |
记录元数据的目录所在路径,可以用绝对路径也可以用相对路径(相对于采集目录) |
trackingPolicy |
rename |
采集进度跟踪策略,有两种: “rename”和 “tracker_dir”. 本参数只在deletePolicy=never时才生效 “rename”- 采完的文件根据filesuffix重命名 “tracker_dir” - 采完的文件会在trackerDir目录中生成一个同名的空文件 |
consumeOrder |
oldest |
采集顺序: oldest, youngest and random. oldest和youngest情况下,可能会带来一定效率的损失;(需要对文件夹中所有文件进行一次扫描以寻找最old或最young的) |
pollDelay |
500 |
Delay (in milliseconds) used when polling for new files. |
recursiveDirectorySearch |
false |
Whether to monitor sub directories for new files to read. |
maxBackoff |
4000 |
The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter. |
batchSize |
100 |
一次传输到channel的event条数(一批) |
inputCharset |
UTF-8 |
Character set used by deserializers that treat the input file as text. |
decodeErrorPolicy |
FAIL |
What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence. |
deserializer |
LINE |
Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implementEventDeserializer.Builder. |
deserializer.* |
Varies per event deserializer. |
|
bufferMaxLines |
– |
(Obselete) This option is now ignored. |
bufferMaxLineLength |
5000 |
(Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. |
selector.type |
replicating |
replicating or multiplexing |
selector.* |
Depends on the selector.type value |
|
interceptors |
– |
Space-separated list of interceptors |
interceptors.* |
3、配置文件:
a1.sources = s1
a1.sources.s1.channels = c1
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /root/weblog
a1.sources.s1.batchSize = 200
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 100
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
4、启动测试:
bin/flume-ng agent -n a1 -c conf -f myconf/spooldir-mem-logger.conf -Dflume.root.logger=DEBUG,console
注意:spooldir source 与exec source不同,spooldir source本身是可靠的!会记录崩溃之前的采集位置!
四、avro source
Avro source 是通过监听一个网络端口来接受数据,而且接受的数据必须是使用avro序列化框架序列化后的数据;
Avro是一种序列化框架,跨语言的;
扩展:什么是序列化,什么是序列化框架?
序列化: 是将一个有复杂结构的数据块(对象)变成扁平的(线性的)二进制序列
序列化框架: 一套现成的软件,可以按照既定策略,将对象转成二进制序列
比如: jdk就有: ObjectOutputStream
hadoop就有: Writable
跨平台的序列化框架: avro
1、工作机制
启动一个网络服务,监听一个端口,收集端口上收到的avro序列化数据流!
该source中拥有avro的反序列化器,能够将收到的二进制流进行正确反序列化,并装入一个event写入channel!
2、参数详解
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
本source的别名: avro |
bind |
– |
要绑定的地址 |
port |
– |
要绑定的端口号 |
threads |
– |
服务的最大线程数 |
selector.type |
||
selector.* |
||
interceptors |
– |
Space-separated list of interceptors |
interceptors.* |
||
compression-type |
none |
压缩类型:跟发过来的数据是否压缩要匹配:none | deflate |
ssl |
false |
Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section). |
keystore |
– |
This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error). |
keystore-password |
– |
The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error). |
keystore-type |
JKS |
The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS). |
exclude-protocols |
SSLv3 |
Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols |
– |
Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites |
– |
Space-separated list of cipher suites to exclude. |
include-cipher-suites |
– |
Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites. |
ipFilter |
false |
Set this to true to enable ipFiltering for netty |
ipFilterRules |
– |
Define N netty ipFilter pattern rules with this config. |
3、配置文件
a1.sources = r1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 100
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
4、启动测试
启动agent:
bin/flume-ng agent -c ./conf -f ./myconf/avro-mem-logger.conf -n a1 -Dflume.root.logger=DEBUG,console
用一个客户端去给启动好的source发送avro序列化数据:
bin/flume-ng avro-client --host c703 --port 4141
5、利用avro source和avro sink实现agent级联
(1)需求说明
(2)配置文件
上游配置文件: vi exec-m-avro.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /tmp/logs/access.log
a1.sources.r1.batchSize = 100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.trasactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h3
a1.sinks.k1.port = 4455
游配置文件:vi avro-m-log.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4455
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.trasactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
(3)启动测试
先启动下游:bin/flume-ng agent -n a1 -c conf/ -f avro-m-log.conf -Dflume.root.logger=INFO,console
再启动上游:bin/flume-ng agent -n a1 -c conf/ -f exec-m-avro.conf
然后写一个脚本在h1上模拟生成数据
while true do echo "hello " >> /tmp/logs/access.log sleep 0.1 done |
五、taildir source
1、工作机制
监视指定目录下的一批文件,只要某个文件中有新写入的行,则会被tail到。
它会记录每一个文件所tail到的位置,记录到一个指定的positionfile保存目录中,格式为json(如果需要的时候,可以人为修改,就可以让source从任意指定的位置开始读取数据)
它对采集完成的文件,不会做任何修改(比如重命名,删除…..)
taildir source会把读到的数据成功写入channel后,再更新记录偏移量。这种机制,能保证数据不会漏采(丢失),但是有可能会产生数据重复!
2、参数详解
Property Name |
Default |
Description |
channels |
– |
所要写往的channel |
type |
– |
本source的别名: TAILDIR. |
filegroups |
– |
空格分割的组名,每一组代表着一批文件 g1 g2 |
filegroups.<filegroupName> |
– |
每个文件组的绝路路径,文件名可用正则表达式 |
positionFile |
~/.flume/taildir_position.json |
记录偏移量位置的文件所在路径 |
headers.<filegroupName>.<headerKey> |
– |
Header value which is the set with header key. Multiple headers can be specified for one file group. |
byteOffsetHeader |
false |
Whether to add the byte offset of a tailed line to a header called ‘byteoffset’. |
skipToEnd |
false |
Whether to skip the position to EOF in the case of files not written on the position file. |
idleTimeout |
120000 |
关闭非活动文件的时延。如果被关闭的这个文件又在某个时间有了新增行,会被此source检测到,并重新打开 |
writePosInterval |
3000 |
3s 记录一次偏移量到positionfile |
batchSize |
100 |
提交event到channel的批次最大条数 |
maxBatchCount |
Long.MAX_VALUE |
控制在一个文件上连续读取的最大批次个数(如果某个文件正在被高速写入,那就应该让这个参数调为最大值,以让source可以集中精力专采这个文件) |
backoffSleepIncrement |
1000 |
The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data. |
maxBackoffSleep |
5000 |
The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data. |
cachePatternMatching |
true |
Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity. |
fileHeader |
false |
Whether to add a header storing the absolute path filename. |
fileHeaderKey |
file |
Header key to use when appending absolute path filename to event header. |
3、配置文件
a1.sources = r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /root/flumedata/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/weblog/access.log
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 100
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
4、启动测试
bin/flume-ng agent -n a1 -c conf/ -f myconf/taildir-mem-logger.conf -Dflume.root.logger=DEBUG,console
经过人为破坏测试,发现, this source还是真正挺reliable的!
不会丢失数据,但在极端情况下可能会产生重复数据!
六、kafka source
1、工作机制
Kafka source的工作机制:就是用kafka consumer连接kafka,读取数据,然后转换成event,写入channel
2、参数详解
Property Name |
Default |
Description |
channels |
– |
数据发往的channel |
type |
– |
本source的名称: org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers |
– |
Kafka broker服务器列表,逗号分隔 |
kafka.consumer.group.id |
flume |
Kafka消费者组id |
kafka.topics |
– |
Kafka消息主题列表,逗号隔开 |
kafka.topics.regex |
– |
用正则表达式来指定一批topic;本参数的优先级高于kafka.topics |
batchSize |
1000 |
写入channel的event 批,最大消息条数 |
batchDurationMillis |
1000 |
批次写入channel的最大时长 |
backoffSleepIncrement |
1000 |
Kafka Topic 显示为空时触发的初始和增量等待时间。 |
maxBackoffSleep |
5000 |
Kafka Topic 显示为空时触发的最长等待时间 |
useFlumeEventFormat |
false |
默认情况下,event 将从Kafka Topic 直接作为字节直接进入event 主体。设置为true以读取event 作为Flume Avro二进制格式。与Kafka Sink上的相同属性或Kafka Channel上的parseAsFlumeEvent属性一起使用时,这将保留在生成端发送的任何Flume标头。 |
setTopicHeader |
true |
是否要往header中加入一个kv:topic信息 |
topicHeader |
topic |
应上面开关的需求,加入kv:topic =>topic名称 |
kafka.consumer.security.protocol |
PLAINTEXT |
Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. |
more consumer security props |
If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on consumer. |
|
Other Kafka Consumer Properties |
– |
本source,允许直接配置任意的kafka消费者参数,格式如下: For example: kafka.consumer.auto.offset.reset (就是在消费者参数前加统一前缀: kafka.consumer.) |
3、配置文件
a1.sources = s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.channels = c1
a1.sources.s1.batchSize = 100
a1.sources.s1.batchDurationMillis = 2000
a1.sources.s1.kafka.bootstrap.servers = c701:9092,c702:9092,c703:9092
a1.sources.s1.kafka.topics = TAOGE
a1.sources.s1.kafka.consumer.group.id = g1
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 100
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
4、启动测试
(1)首先,操作kafka,准备好topic
# 查看当前kafka集群中的topic:
bin/kafka-topics.sh --list --zookeeper c701:2181
# 创建一个新的topic
bin/kafka-topics.sh --create --topic TAOGE --partitions 3 --replication-factor 2 --zookeeper c701:2181
# 查看topic的详细信息
bin/kafka-topics.sh --describe --topic TAOGE --zookeeper c701:2181
# 控制台生产者,向topic中写入数据
bin/kafka-console-producer.sh --broker-list c701:9092,c702:9092,c703:9092 --topic TAOGE
(2) 启动flume agent来采集kafka中的数据
bin/flume-ng agent -n a1 -c conf/ -f myconf/kfk-mem-logger.conf -Dflume.root.logger=INFO,console
注意:
Source往channel中写入数据的批次大小 <= channel的事务控制容量大小
Flume常用组件详解之Source相关推荐
- Android Material 常用组件详解(六)—— Progress indicators、Slider 使用详解
1.Progress indicators Progress indicators是自带动画效果的Progress.进度指示器向用户通知正在进行的进程的状态,例如加载应用程序,提交表单或保存更新. 类 ...
- Unity零基础到进阶 ☀️| 音频源Audio Source组件 详解
[Unity3D组件使用指南]AudioSource组件 详解 一.组件介绍 二.组件属性面板 三.代码操作组件 四.组件常用方法示例 五.组件相关扩展 1.在Unity中听不到声音的问题 总结
- Linux常用命令详解(最全)
Linux命令目录 Linux命令集合 系统信息 关机 (系统的关机.重启以及登出 ) 文件和目录 文件搜索 挂载一个文件系统 磁盘空间 用户和群组 文件的权限 - 使用 "+" ...
- Streamsets组件详解
Streamsets优化详解 一.Origin类组件详解 二.Processor类组件详解 三.Destination类组件详解 四.Executor类组件使用详解 一.Origin类组件详解 Ama ...
- Samtools(CentOS Linux)安装及常用命令详解
序列比对(将测序reads与已知序列信息的基因或基因组进行比对)是高通量测序数据分析中最重要的一环,无论是转录组还是重测序都是基于比对结果来进行后续各项分析的,比对结果格式比较常见的是sam和bam文 ...
- ReactNative ViewPageAndroid组件详解
源码传送门 在我们开发Android的时候,ViewPage这个控件的使用频率还是很高的,最简单的就是制作引导页,应用程序的主界面等,在ReactNative开发中实现该功能的组件是ViewPageA ...
- QT:常用函数详解--常用操作记录(个人笔记)
QT:常用函数详解(个人笔记) PS:一下内容个人笔记,要求自己看懂,随笔,阅读体验会很差很差! Qt setContentsMargins()函数 函数原型:void QLayout::setCon ...
- logback 常用配置详解(二) appender
详细整理了logback常用配置 不是官网手册的翻译版,而是使用总结,旨在更快更透彻的理解其配置 logback 简介 logback常用配置详解(一)<configuration> an ...
- Android应用开发—Intent组件详解
转载自:Android中Intent组件详解 Intent是不同组件之间相互通讯的纽带,封装了不同组件之间通讯的条件. Intent本身是定义为一个类别(Class),一个Intent对象表达一个目的 ...
最新文章
- ThinkPad L440 FN键设置
- 二叉树的前序,中序,后序,层序遍历的递归和非递归实现
- python测试用例图_pytest以函数形式的测试用例
- 安全测试-抓包工具BurpSuite
- 捕获计算机屏幕++方法,如何在Windows 10计算机上录制屏幕以及如何捕获计算机的音频...
- C#(.net)中的一次连接数据库执行多条sql语句
- java商品新增怎麽弄_添加新商品时如何初始化计数器 - java
- 配置Server Side TAF
- 网络爬虫之Java基础篇(Ⅱ)
- android中常用正则表达式
- 【C++】获取二维数组的行和列
- 批量删除多台linux服务器文件
- dcp7080d怎么加墨粉_兄弟7080打印机怎么加粉
- 猫眼IPO后,在线票务平台或抛弃补贴战,未来看电影又贵了?
- 老鹰教小鹰飞翔的故事
- 万能通用!权限系统就该这么设计
- python爬虫爬取深交所数据
- 如何借助分布式存储 JuiceFS 加速 AI 模型训练
- 计算机性能指标ppt,计算机网络—评价网络的性能指标知识讲解.ppt
- 数据分析方法:非正态数据转化成正态数据