Flume常用组件详解Source

Flume支持众多的source、sink、拦截器等组件具体实现,详细手册可参考官方文档http://flume.apache.org/FlumeUserGuide.html

一、netcat source

1、工作机制

启动一个socket服务,监听一个端口;

将端口上收到的数据,转成event写入channel;

2、配置文件

a1.sources = s1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 44444

a1.sources.s1.channels = c1

二、exec source

1、工作机制

启动一个用户所指定的linux shell命令;

采集这个linux shell命令的标准输出,作为收集到的数据,转为event写入channel;

2、参数详解

channels

本source要发往的channel

type

本source的类别名称:exec

command

本source所要运行的linux命令,比如: tail -F /path/file

shell

指定运行上述命令所用shell

restartThrottle

10000

命令die了以后,重启的时间间隔

restart

false

命令die了以后,是否要重启

logStdErr

false

是否收集命令的错误输出stderr

batchSize

20

提交的event批次大小

batchTimeout

3000

发往下游没完成前,等待的时间

selector.type

replicating

指定channel选择器:replicating or multiplexing

selector.*

选择器的具体参数

interceptors

指定拦截器

interceptors.*

指定的拦截器的具体参数

3、配置文件

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

4、启动测试

1.准备一个日志文件

2.写一个脚本模拟往日志文件中持续写入数据

for i in {1..10000}; do echo ${i}--------------------------- >> access.log ; sleep 0.5; done

3.创建一个flume自定义配置文件

4.启动flume采集

注意:通过人为破坏测试,发现这个exec source,不会记录宕机前所采集数据的偏移量位置,重启后可能会造成数据丢失!

三、spooldir source

1、工作机制

监视一个指定的文件夹,如果文件夹下有没采集过的新文件,则将这些新文件中的数据采集,并转成event写入channel;

注意:spooling目录中的文件必须是不可变的,而且是不能重名的!否则,source会loudly fail!

2、参数详解

Property Name

Default

Description

channels

type

The component type name, needs to be spooldir.

spoolDir

The directory from which to read files from.

fileSuffix

.COMPLETED

采集完成的文件,添加什么后缀名

deletePolicy

never

是否删除采完的文件: never or immediate

fileHeader

false

是否将所采集文件的绝对路径添加到header中

fileHeaderKey

file

上述header的key名称

basenameHeader

false

是否将文件名添加到header

basenameHeaderKey

basename

上述header的key名称

includePattern

^.*$

指定需要采集的文件名的正则表达式

ignorePattern

^$

指定要排除的文件名的正则表达式

如果一个文件名即符合includePattern又匹配ignorePattern,则该文件不采

trackerDir

.flumespool

记录元数据的目录所在路径,可以用绝对路径也可以用相对路径(相对于采集目录)

trackingPolicy

rename

采集进度跟踪策略,有两种: “rename”和 “tracker_dir”. 本参数只在deletePolicy=never时才生效

“rename”- 采完的文件根据filesuffix重命名

“tracker_dir” - 采完的文件会在trackerDir目录中生成一个同名的空文件

consumeOrder

oldest

采集顺序: oldest, youngest and random.

oldest和youngest情况下,可能会带来一定效率的损失;(需要对文件夹中所有文件进行一次扫描以寻找最old或最young的)

pollDelay

500

Delay (in milliseconds) used when polling for new files.

recursiveDirectorySearch

false

Whether to monitor sub directories for new files to read.

maxBackoff

4000

The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.

batchSize

100

一次传输到channel的event条数(一批)

inputCharset

UTF-8

Character set used by deserializers that treat the input file as text.

decodeErrorPolicy

FAIL

What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence.

deserializer

LINE

Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implementEventDeserializer.Builder.

deserializer.*

Varies per event deserializer.

bufferMaxLines

(Obselete) This option is now ignored.

bufferMaxLineLength

5000

(Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.

selector.type

replicating

replicating or multiplexing

selector.*

Depends on the selector.type value

interceptors

Space-separated list of interceptors

interceptors.*

3、配置文件

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = spooldir

a1.sources.s1.spoolDir = /root/weblog

a1.sources.s1.batchSize = 200

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

4、启动测试

bin/flume-ng agent -n a1 -c conf -f myconf/spooldir-mem-logger.conf -Dflume.root.logger=DEBUG,console

注意:spooldir source 与exec source不同,spooldir source本身是可靠的!会记录崩溃之前的采集位置!

四、avro source

Avro source 是通过监听一个网络端口来接受数据,而且接受的数据必须是使用avro序列化框架序列化后的数据;

Avro是一种序列化框架,跨语言的;

扩展:什么是序列化,什么是序列化框架?

序列化: 是将一个有复杂结构的数据块(对象)变成扁平的(线性的)二进制序列

序列化框架: 一套现成的软件,可以按照既定策略,将对象转成二进制序列

比如: jdk就有: ObjectOutputStream

hadoop就有: Writable

跨平台的序列化框架: avro

1、工作机制

启动一个网络服务,监听一个端口,收集端口上收到的avro序列化数据流!

该source中拥有avro的反序列化器,能够将收到的二进制流进行正确反序列化,并装入一个event写入channel!

2、参数详解

Property Name

Default

Description

channels

type

本source的别名: avro

bind

要绑定的地址

port

要绑定的端口号

threads

服务的最大线程数

selector.type

selector.*

interceptors

Space-separated list of interceptors

interceptors.*

compression-type

none

压缩类型:跟发过来的数据是否压缩要匹配:none | deflate

ssl

false

Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section).

keystore

This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error).

keystore-password

The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error).

keystore-type

JKS

The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS).

exclude-protocols

SSLv3

Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.

include-protocols

Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.

exclude-cipher-suites

Space-separated list of cipher suites to exclude.

include-cipher-suites

Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites.

ipFilter

false

Set this to true to enable ipFiltering for netty

ipFilterRules

Define N netty ipFilter pattern rules with this config.

3、配置文件

a1.sources = r1

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

4、启动测试

启动agent:

bin/flume-ng agent -c ./conf -f ./myconf/avro-mem-logger.conf -n a1 -Dflume.root.logger=DEBUG,console

用一个客户端去给启动好的source发送avro序列化数据:

bin/flume-ng avro-client --host c703 --port 4141

​​​​5、利用avro source和avro sink实现agent级联

(1)需求说明

(2)配置文件

上游配置文件: vi  exec-m-avro.conf

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /tmp/logs/access.log

a1.sources.r1.batchSize = 100

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.trasactionCapacity = 100

a1.sinks.k1.channel = c1

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = h3

a1.sinks.k1.port = 4455

游配置文件:vi  avro-m-log.conf

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4455

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.trasactionCapacity = 100

a1.sinks.k1.channel = c1

a1.sinks.k1.type = logger​​​​​

(3)启动测试

先启动下游:bin/flume-ng agent -n a1 -c conf/ -f avro-m-log.conf -Dflume.root.logger=INFO,console

再启动上游:bin/flume-ng agent -n a1 -c conf/ -f exec-m-avro.conf

然后写一个脚本在h1上模拟生成数据

while true

do

echo "hello "  >> /tmp/logs/access.log

sleep 0.1

done

五、taildir source

1、工作机制

监视指定目录下的一批文件,只要某个文件中有新写入的行,则会被tail到。

它会记录每一个文件所tail到的位置,记录到一个指定的positionfile保存目录中,格式为json(如果需要的时候,可以人为修改,就可以让source从任意指定的位置开始读取数据)

它对采集完成的文件,不会做任何修改(比如重命名,删除…..)

taildir source会把读到的数据成功写入channel后,再更新记录偏移量。这种机制,能保证数据不会漏采(丢失),但是有可能会产生数据重复!

2、参数详解

Property Name

Default

Description

channels

所要写往的channel

type

本source的别名: TAILDIR.

filegroups

空格分割的组名,每一组代表着一批文件

g1 g2

filegroups.<filegroupName>

每个文件组的绝路路径,文件名可用正则表达式

positionFile

~/.flume/taildir_position.json

记录偏移量位置的文件所在路径

headers.<filegroupName>.<headerKey>

Header value which is the set with header key. Multiple headers can be specified for one file group.

byteOffsetHeader

false

Whether to add the byte offset of a tailed line to a header called ‘byteoffset’.

skipToEnd

false

Whether to skip the position to EOF in the case of files not written on the position file.

idleTimeout

120000

关闭非活动文件的时延。如果被关闭的这个文件又在某个时间有了新增行,会被此source检测到,并重新打开

writePosInterval

3000

3s 记录一次偏移量到positionfile

batchSize

100

提交event到channel的批次最大条数

maxBatchCount

Long.MAX_VALUE

控制在一个文件上连续读取的最大批次个数(如果某个文件正在被高速写入,那就应该让这个参数调为最大值,以让source可以集中精力专采这个文件)

backoffSleepIncrement

1000

The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.

maxBackoffSleep

5000

The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.

cachePatternMatching

true

Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity.

fileHeader

false

Whether to add a header storing the absolute path filename.

fileHeaderKey

file

Header key to use when appending absolute path filename to event header.

3、配置文件

a1.sources = r1

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/flumedata/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/access.log

a1.sources.r1.fileHeader = true

a1.sources.ri.maxBatchCount = 1000

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

4、启动测试

bin/flume-ng agent -n a1 -c conf/ -f myconf/taildir-mem-logger.conf -Dflume.root.logger=DEBUG,console

经过人为破坏测试,发现, this source还是真正挺reliable的!

不会丢失数据,但在极端情况下可能会产生重复数据!

六、kafka source

1、工作机制

Kafka source的工作机制:就是用kafka consumer连接kafka,读取数据,然后转换成event,写入channel

2、参数详解

Property Name

Default

Description

channels

数据发往的channel

type

本source的名称:

org.apache.flume.source.kafka.KafkaSource

kafka.bootstrap.servers

Kafka broker服务器列表,逗号分隔

kafka.consumer.group.id

flume

Kafka消费者组id

kafka.topics

Kafka消息主题列表,逗号隔开

kafka.topics.regex

用正则表达式来指定一批topic;本参数的优先级高于kafka.topics

batchSize

1000

写入channel的event 批,最大消息条数

batchDurationMillis

1000

批次写入channel的最大时长

backoffSleepIncrement

1000

Kafka Topic 显示为空时触发的初始和增量等待时间。

maxBackoffSleep

5000

Kafka Topic 显示为空时触发的最长等待时间

useFlumeEventFormat

false

默认情况下,event 将从Kafka Topic 直接作为字节直接进入event 主体。设置为true以读取event 作为Flume Avro二进制格式。与Kafka Sink上的相同属性或Kafka Channel上的parseAsFlumeEvent属性一起使用时,这将保留在生成端发送的任何Flume标头。

setTopicHeader

true

是否要往header中加入一个kv:topic信息

topicHeader

topic

应上面开关的需求,加入kv:topic =>topic名称

kafka.consumer.security.protocol

PLAINTEXT

Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.

more consumer security props

If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on consumer.

Other Kafka Consumer Properties

本source,允许直接配置任意的kafka消费者参数,格式如下:

For example: kafka.consumer.auto.offset.reset

(就是在消费者参数前加统一前缀: kafka.consumer.)

3、配置文件

a1.sources = s1

a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.s1.channels = c1

a1.sources.s1.batchSize = 100

a1.sources.s1.batchDurationMillis = 2000

a1.sources.s1.kafka.bootstrap.servers = c701:9092,c702:9092,c703:9092

a1.sources.s1.kafka.topics = TAOGE

a1.sources.s1.kafka.consumer.group.id = g1

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

​​​​​​​4、启动测试

(1)首先,操作kafka,准备好topic

# 查看当前kafka集群中的topic:

bin/kafka-topics.sh  --list --zookeeper c701:2181

# 创建一个新的topic

bin/kafka-topics.sh  --create --topic TAOGE --partitions 3 --replication-factor 2 --zookeeper c701:2181

# 查看topic的详细信息

bin/kafka-topics.sh --describe --topic TAOGE --zookeeper c701:2181

# 控制台生产者,向topic中写入数据

bin/kafka-console-producer.sh --broker-list c701:9092,c702:9092,c703:9092 --topic TAOGE

(2) 启动flume agent来采集kafka中的数据

bin/flume-ng agent -n a1 -c conf/ -f myconf/kfk-mem-logger.conf  -Dflume.root.logger=INFO,console

注意:

Source往channel中写入数据的批次大小  <=  channel的事务控制容量大小

Flume常用组件详解之Source相关推荐

  1. Android Material 常用组件详解(六)—— Progress indicators、Slider 使用详解

    1.Progress indicators Progress indicators是自带动画效果的Progress.进度指示器向用户通知正在进行的进程的状态,例如加载应用程序,提交表单或保存更新. 类 ...

  2. Unity零基础到进阶 ☀️| 音频源Audio Source组件 详解

    [Unity3D组件使用指南]AudioSource组件 详解 一.组件介绍 二.组件属性面板 三.代码操作组件 四.组件常用方法示例 五.组件相关扩展 1.在Unity中听不到声音的问题 总结

  3. Linux常用命令详解(最全)

    Linux命令目录 Linux命令集合 系统信息 关机 (系统的关机.重启以及登出 ) 文件和目录 文件搜索 挂载一个文件系统 磁盘空间 用户和群组 文件的权限 - 使用 "+" ...

  4. Streamsets组件详解

    Streamsets优化详解 一.Origin类组件详解 二.Processor类组件详解 三.Destination类组件详解 四.Executor类组件使用详解 一.Origin类组件详解 Ama ...

  5. Samtools(CentOS Linux)安装及常用命令详解

    序列比对(将测序reads与已知序列信息的基因或基因组进行比对)是高通量测序数据分析中最重要的一环,无论是转录组还是重测序都是基于比对结果来进行后续各项分析的,比对结果格式比较常见的是sam和bam文 ...

  6. ReactNative ViewPageAndroid组件详解

    源码传送门 在我们开发Android的时候,ViewPage这个控件的使用频率还是很高的,最简单的就是制作引导页,应用程序的主界面等,在ReactNative开发中实现该功能的组件是ViewPageA ...

  7. QT:常用函数详解--常用操作记录(个人笔记)

    QT:常用函数详解(个人笔记) PS:一下内容个人笔记,要求自己看懂,随笔,阅读体验会很差很差! Qt setContentsMargins()函数 函数原型:void QLayout::setCon ...

  8. logback 常用配置详解(二) appender

    详细整理了logback常用配置 不是官网手册的翻译版,而是使用总结,旨在更快更透彻的理解其配置 logback 简介 logback常用配置详解(一)<configuration> an ...

  9. Android应用开发—Intent组件详解

    转载自:Android中Intent组件详解 Intent是不同组件之间相互通讯的纽带,封装了不同组件之间通讯的条件. Intent本身是定义为一个类别(Class),一个Intent对象表达一个目的 ...

最新文章

  1. ThinkPad L440 FN键设置
  2. 二叉树的前序,中序,后序,层序遍历的递归和非递归实现
  3. python测试用例图_pytest以函数形式的测试用例
  4. 安全测试-抓包工具BurpSuite
  5. 捕获计算机屏幕++方法,如何在Windows 10计算机上录制屏幕以及如何捕获计算机的音频...
  6. C#(.net)中的一次连接数据库执行多条sql语句
  7. java商品新增怎麽弄_添加新商品时如何初始化计数器 - java
  8. 配置Server Side TAF
  9. 网络爬虫之Java基础篇(Ⅱ)
  10. android中常用正则表达式
  11. 【C++】获取二维数组的行和列
  12. 批量删除多台linux服务器文件
  13. dcp7080d怎么加墨粉_兄弟7080打印机怎么加粉
  14. 猫眼IPO后,在线票务平台或抛弃补贴战,未来看电影又贵了?
  15. 老鹰教小鹰飞翔的故事
  16. 万能通用!权限系统就该这么设计
  17. python爬虫爬取深交所数据
  18. 如何借助分布式存储 JuiceFS 加速 AI 模型训练
  19. 计算机性能指标ppt,计算机网络—评价网络的性能指标知识讲解.ppt
  20. 数据分析方法:非正态数据转化成正态数据

热门文章

  1. python商品销售情况数据分析_用python分析小红书销售情况
  2. 从事IT5年的历程--续(学习经)
  3. python猜数游戏续_python猜数游戏续改编
  4. 【读书札记】《北大授课》中华文化四十七讲 余秋雨
  5. scrapy爬取微信公众号内容,多管道储存,orm数据储存
  6. kindle touch 花屏无反应
  7. 基于matlab 宗晓萍,宗晓萍 - 河北大学 - 电子信息工程学院
  8. 学习ESP8266_11_系统软件定时器
  9. 数据库原理-几种数据模型
  10. 基于javaee的养老保险管理系统