一 Avro Source

监听avro 端口和从外部的Avro 客户端接受event。当与其他的flumeagent内嵌的AvroSink成对时,他能创建分层集合。

1.1Required Properties

channels: 绑定的channel

type: 类型 avro

bind:监听的主机名或IP

port: 监听的端口

1.2Optional Properties

threads:最大工作者线程数

selector.type:

selector.*

interceptors:空格分割的拦截器列表

compression-type: none|deflate

ssl:是否启用ssl

keystore:java keystore 文件路径,如果启用ssl,这个必须配置

keystore-password:java keystore密码

keystore-type:java keystore类型,可以是JKS|PKCS12

exclude-protocols:排除的协议列表,默认值SSLv3

ipFilter:是否启用netty的ipFilter功能

ipFilter.rules:过滤ip的规则,格式如下:

<allow|deny>:<ip|name>:<pattern>

举个例子:

ipFilter.rules=allow:ip:127.*,allow:name:localhost,deny:ip:*

二 Thrift Source

监听thrift 端口和从外部的thrift客户端接受event. 当与其他的flumeagent内嵌的ThriftSink成对时,他能创建分层集合。

2.1Required Properties

channels: 绑定的channel

type: 类型 thrift

bind:监听的主机名或IP

port: 监听的端口

2.2Optional Properties

threads:最大工作者线程数

selector.type:

selector.*

interceptors:空格分割的拦截器列表

三 Exec Source

运行一个给定 Unix命令,然后连续产生数据到标准输出,如果有错误,会被丢弃,除非你显示的设置logStdErr属性为true,如果这个进程退出了,那么source将不会在产生数据。

3.1Required Properties

channels: 绑定的channel

type: 类型 exec

command: 要执行的命令

3.2 Optional Properties

shell: 调用shell运行命令,一般情况下,如果命令依赖于shell特征诸如通配符,反勾号,管道等。 /bin/sh –c

restartThrottle: 重启需要等到的时间

restart:进程死掉是否应该重启

logStdErr:错误是否应该被作为数据

batchSize:每一次发送到Channel最大行数

batchTimeout: 如果没有达到buffer值,push 数据应该等待的时间

selector.type:replicating|multiplexing

interceptors:空格分割的拦截器列表

注意:Shell指令执行出错时,将无法收集到日志数据,无法保证日志数据的完整性。

例子:

agent.sources= src1

agent.channels= channel1

agent.sinks= sink1

#For each one of the sources, the type is defined

agent.sources.src1.type= exec

agent.sources.src1.command= tail -f /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log

agent.sources.src1.shell= /bin/sh -c

agent.sources.src1.batchSize= 20

agent.sources.src1.batchTimeout= 3000

四 JMS Source

JMS从目的地读消息。

4.1Required Properties

channels: 绑定的channel

type: 类型 jms

initialContextFactory: 实例化JMS 上下文工厂,诸如

org.apache.activemq.jndi.ActiveMQInitialContextFactory

connectionFactory: 连接工厂的JNDI名字

providerURL:JMS provider URL

destinationName: 目的地名字

destinationType: queue|topic

4.2Optional Properties

messageSelector: 创建消费者使用的Message Selector

userName: destination/provider用户名

passwordFile:destination/provider密码

batchSize: 在一个批次消费的消息数量,默认100

converter.type: 转换消息成flume event的转换类的class

converter.*     –    Converter properties.

converter.charset: 默认UTF-8

注意:JMS Source允许使用Converters 插件,尽管默认的converter已经能胜任大部分目标。默认的能转换BytesMessage,Text Message,以及Object Message成Flume event,在任何情况下,这message的属性都需要加到FluemEvent的header里

BytesMessage:

BytesMessage复制到FlumeEvent body里.每一个消息不能转换不超过2G

TextMessage:被转成字节数组,然后拷贝到FlumeEventbody里,默认字符集UTF-8

ObjectMessage:对象写入到ByteArrayOutputStream,然后数组拷贝到FlumeEvent

例子:

a1.sources= r1

a1.channels= c1

a1.sources.r1.type= jms

a1.sources.r1.channels= c1

a1.sources.r1.initialContextFactory= org.apache.activemq.jndi.ActiveMQInitialContextFactory

a1.sources.r1.connectionFactory= GenericConnectionFactory

a1.sources.r1.providerURL= tcp://mqserver:61616

a1.sources.r1.destinationName= BUSINESS_DATA

a1.sources.r1.destinationType= QUEUE

五 Spooling Directory Source

我们可以在某一个目录下去拿数据,这个目录会不断的产生日志,spoolingdirectory source 能够监视该目录的新的文件,然后将该文件转换成event,然后被解析或者被转转换的文件我们还可以设置一个标记,标志他已经被转换或者被解析

它和ExecSource相比较而言,可以保证数据的完整性,即使flume重启或者被杀掉

推荐我们应该尽量保证文件名字唯一,最好使用时间戳

5.1Required Properties

channels: 绑定的channel

type: 类型spooldir

spoolDir: 从哪儿读取文件

5.2Optional Properties

fileSuffix: 当一个文件完成解析或者转换,应该被加上的后缀,默认是COMPLETED

deletePolicy:删除策略,never|immediate

fileHeader:是否添加一个header存储文件的绝对路径

fileHeaderKey: 如果启用fileHeader,应该使用的key

basenameHeader:是否添加一个header存储文件名

basenameHeaderKey:如果启用basenameHeader,应该使用的key

ignorePattern: 哪些文件可以被忽略,默认是没有^$,这里应该是一个正在表达式

trackerDir: 存储处理文件相关的元数据的目录,如果不是绝对路径,那么将是spoolDir的相对路径

consumeOrder: 转换文件的顺序oldest|youngest|random

maxBackoff:如果Channel已经满了,那么该Source连续尝试写入该Channel的最长时间(单位:毫秒)

batchSize:批量传输到Channel的粒度

inputCharset:反序列化使用的字符集

decodeErrorPolicy:在文件中有不可解析的字符时的解析策略。FAIL: 抛出一个异常,并且不能解析该文件。REPLACE:取代不可

解析的字符,通常用UnicodeU+FFFD. IGNORE: 丢弃不可能解析字符序列。

deserializer使用指定的反序列化方式,如果自定义反序列化,必须实现EventDeserializer.Builder

selector.type:replicating|multiplexing

interceptors:空格分割的拦截器列表

例子:

agent.sources= src1

agent.sources.src1.type= spooldir

agent.sources.src1.spoolDir= /opt/data/logs/spool

agent.sources.src1.fileSuffix= .DONE

agent.sources.src1.ignorePattern= ^(.)*\\.log$

六 Kafka Source

Kafka消费者从kafkatopic读取数据。

6.1Required Properties

channels: 绑定的channel

type:org.apache.flume.source.kafka.KafkaSource

zookeeperConnect: Kafka集群的ZooKeeperURI

groupId:   consumer group 的唯一标识符

topic: 要读取的topic

6.2Optional Properties

batchSize:每一次写入channel的最大消息数量,默认1000

例子:

tier1.sources.source1.type= org.apache.flume.source.kafka.KafkaSource

tier1.sources.source1.channels= channel1

tier1.sources.source1.zookeeperConnect= localhost:2181

tier1.sources.source1.topic= test1

tier1.sources.source1.groupId= flume

tier1.sources.source1.kafka.consumer.timeout.ms= 100

七 NetCat Source

监听指定端口的的event,比如我们可以用telnet

7.1Required Properties

channels: 绑定的channel

type:netcat

bind:hostname 或者 ip

port:监听的端口

7.2Optional Properties

max-line-length:每一个eventbody里面 每一行最大长度

interceptors:空格分割的拦截器列表

八 Http Source

通过HTTPGET 或者POST接收FlumeEvent. HTTP 请求转换成flumeevent 是通过可插拔的handler来实现的,handler必须实现HTTPSourceHandler接口,handler携带HttpServletRequest然后返回flumeevent的一个列表。最后处理的event通过一个事务提交给Channel.如果hadnder错误,抛出异常,source返回HTTP 状态400

如果channel满了,source不能添加,source返回503

8.1Required Properties

type:http

port:绑定的端口

8.2Optional Properties

bind:监听的主机名或者IP地址

handler:handlerclass 比如org.apache.flume.source.http.JSONHandler.

handler.*配置handler参数

selector.type:replicating|multiplexing

interceptors:空格分割的拦截器列表

enableSSL:是否启用SSL

keystore:如果启用SSL, 指定keystore文件路径

九 taildir source

我们知道ExecSource可以动态读取一个文件;SpoolingDirectory Source 可以动态读取一个文件夹,但是需要对读取过的文件重命名。

那先在既需要动态读取文件夹,又需要动态读取文件,怎么办呢?

Flume1.7新出来一个Taildirsource, 可以按照行读取文件夹中内容,并且提供数据恢复机制。

9.1下载源码

gitclone https://github.com/apache/flume.git

切换到flume-1.7这个分之,然后进行gitpull

9.2编译

第一步:复制flume-ng-sources\flume-taildir-source到某一个目录,

第二步:导入依赖包,有的可能只有cloudera源才有,所以在pom.xml加入:

<repositories>

<repository>

<id>cloudera</id>

<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>

</repository>

</repositories>

第三步骤:将pom.xml中1.7版本修改为1.5-cdh5.3.6(这个取决于自己的cdh版本)

中间可能还有一些小错误,可以自己随手解决掉。

9.3导出jar包

Runas > maven build > goals输入:clean package然后target文件夹应该会生成一个jar包

9.4放入当前flume环境中

放入flumelib目录

如何配置呢?

看源码或者看示例代码

9.5Required Properties

channels:绑定的channel

type: 类名称org.apache.flume.source.taildir. TaildirSource

filegroups:空格分隔的文件组列表,每一个文件组指向一个被tail了的文件集合

filegroups.<filegroupName>:文件组的绝对路径

9.6Optional Properties

positionFile:记录inode,文件绝对路径和每一次tail的最后位置的json格式的文件,默认位置~/.flume/taildir_position.json

headers.<filegroupName>.<headerKey>:

idleTimeout:关闭非活动文件,默认时间120000,2分钟

writePosInterval:在positionfile 写入每一个文件lastposition的时间间隔

batchSize:读取的行数,默认是100

backoffSleepIncrement:

maxBackoffSleep:

cachePatternMatching:

fileHeader:是否添加header存储文件绝对路径

fileHeaderKey:fileHeader启用时,使用的key

例子:

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile =/var/log/flume/taildir_position.json

a1.sources.r1.filegroups = f1 f2

a1.sources.r1.filegroups.f1 =/var/log/test1/example.log

a1.sources.r1.headers.f1.headerKey1 = value1

a1.sources.r1.filegroups.f2 =/var/log/test2/.*log.*

a1.sources.r1.headers.f2.headerKey1 = value2

a1.sources.r1.headers.f2.headerKey2 =value2-2

a1.sources.r1.fileHeader = true

Flume Source相关推荐

  1. Flume Source 实例

    Avro Source 监听avro端口,接收外部avro客户端数据流.跟前面的agent的Avro Sink可以组成多层拓扑结构. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 ...

  2. Flume的Avro Sink和Avro Source研究之一: Avro Source

    问题 : Avro Source提供了怎么样RPC服务,是怎么提供的? 问题 1.1 Flume Source是如何启动一个Netty Server来提供RPC服务. 由GitHub上avro-rpc ...

  3. Flume之Source

    Source Flume内置了大量的Sourece,其中Avro Source(集群).Thrift Source.Spooling Directory Source(目录).Kafka Source ...

  4. flume avro java_flume之Avro Source和Avro Sink

    一.Avro Souce介绍 Flume主要的RPC Source是Avro Source Avro Source被设计为高扩展的RPC服务器端,能从其他的Flume Agent的Avro Sink或 ...

  5. Flume的开发之 自定义 source 自定义 sink 自定义拦截器

    一:开发相关 加入 jar 包依赖: <dependency> <groupId>org.apache.flume</groupId> <artifactId ...

  6. Flume常用组件详解之Source

    Flume常用组件详解:Source Flume支持众多的source.sink.拦截器等组件具体实现,详细手册可参考官方文档http://flume.apache.org/FlumeUserGuid ...

  7. 大数据——Flume组件Source、Channel和Sink具体使用

    Flume组件Source.Channel和Sink使用说明 Flume Sources Avro Source 配置范例 Thrift Source 配置范例 Exec Source 配置范例 JM ...

  8. flume学习(十一):如何使用Spooling Directory Source

    最近在弄一个信令数据汇聚的事情,主要目的是把FTP上的信令数据汇聚到HDFS上去存储. 逻辑是这样的:把FTP服务器上的文件下载到一台主机上,然后SCP到另外一台主机上的Spooling Direct ...

  9. flume avro java_Flume的Avro Sink和Avro Source研究之一: Avro Source

    问题 : Avro Source提供了怎么样RPC服务,是怎么提供的? 问题 1.1 Flume Source是如何启动一个Netty Server来提供RPC服务. 由GitHub上avro-rpc ...

最新文章

  1. 树状数组 区间update/query
  2. linux编程课后作业,Unix/Linux 编程实践教程第三章习题
  3. 在javascript中,如何判断一个被多次encode 的url 已经被decode到原来的格式?
  4. Linux TCP server系列(4)-浅谈listen与大并发TCP连接
  5. LwIP移植准备工作
  6. 27 FI配置-财务会计-外币评估-定义评估方范围
  7. so库调用java函数_linux下so动态库调用主程序函数
  8. Python提取Word文档中所有超链接地址和文本
  9. 1.根据MAC地址抓包
  10. Pytorch 的迁移学习的理解
  11. struts1(转)
  12. 转换整形数字为16进制字符串
  13. win10计算机变成了英文,Win10系统中自带的Office(Word,Excel)突然变成英文怎么变回中文...
  14. Mac中Homebrew下载指定版本软件的方法
  15. linux添加变色龙引导,u盘启动盘制作win7变色龙引导工具
  16. 电子邮箱邮件安全使用技巧,公司电子邮件安全使用总结
  17. JZOJ 3337. 【NOI2013模拟】wyl8899的TLE【暴力】
  18. cf. (E) Thematic Contests
  19. 蓝桥杯(Java) 回文日期
  20. 跑步时戴什么耳机好、推荐几款专业跑步的耳机

热门文章

  1. cad无法加载arx文件_CAD文件损坏?无法打开?试试这8个方法吧
  2. php 数组按个数分组,如何在PHP中基于内部数组键对数组进行分组?
  3. c++ fork 进程时 共享内存_尚学堂百战程序员:Python多进程与共享内存
  4. 鸟哥linux教学怎么样,鸟哥关于学习Linux的一些建议
  5. linux ojvm补丁安装,打补丁PSU
  6. python计算N维数据的笛卡尔积
  7. 解决Ubuntu make 命令 sudo: make: command not found
  8. 上传docker到阿里云镜像仓库
  9. docker导入与导出容器
  10. python使用新线程执行目标函数