1.1 Avro Source

监听Avro端口,从Avro client streams接收events。要求属性是粗体字。利用Avro Source可以实现多级流动、扇出流、扇入流等效果。另外也可以接受通过flume提供的Avro客户端发送的日志信息。

!channels  –

!type  –   类型名称,"AVRO"

!bind  –   需要监听的主机名或IP

!port  –   要监听的端口

threads    –   工作线程最大线程数

selector.type

selector.*

interceptors  –   空格分隔的拦截器列表

interceptors.*

compression-type  none   压缩类型,可以是“none”或“default”,这个值必须和AvroSource的压缩格式匹配

sslfalse  是否启用ssl加密,如果启用还需要配置一个“keystore”和一个“keystore-password”。

keystore   –   为SSL提供的java密钥文件所在路径。

keystore-password–   为SSL提供的java密钥文件 密码。

keystore-typeJKS密钥库类型可以是“JKS”或“PKCS12”。

exclude-protocolsSSLv3  空格分隔开的列表,用来指定在SSL / TLS协议中排除。SSLv3将总是被排除除了所指定的协议。

ipFilter   false  如果需要为netty开启ip过滤,将此项设置为true

ipFilterRules–   定义netty的ip过滤设置表达式规则

agent a1例子:

ipFilterRules例子:

ipFilterRules=allow:ip:127.*,  allow:name:localhost,deny:ip:*   

编写配置文件  修改上面给出的配置文件,除了Source部分配置不同,其余部分都一样。不同的地方如下:

    #描述/配置Sourcea1.sources.r1.type  =  avroa1.sources.r1.bind  =  0.0.0.0a1.sources.r1.port  =  44444

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template2.conf --name a1 -Dflume.root.logger=INFO,console

通过flume提供的avro客户端向指定机器指定端口发送日志信息:

./flume-ng avro-client --conf ../conf --host 0.0.0.0 --port 44444 --filename ../mydata/log1.txt

会发现确实收集到日志。

1.2 Thrift Source

监听Thrift端口和从外部Thrift client streams接收events。要求属性为粗体字:

agent a1 例子:

1.3 Exec Source

Exec Source在启动时运行一个Unix命令行,并期望这过程在标准输出上连续生产数据。要求属性为粗体字:

agent a1例子:

'shell'配置被用来通过一个命令shell调用‘command’。

1.4 JMS Source

JMS Source从JMS目标(如队列或者主题)读取消息。JMS应用程序应该可以与任何JMS提供程序一起工作,但是只能使用ActiveMQ进行测试。要求属性是粗体字。

agent a1例子:

1.5 Spooling Directory Source

该source让你通过放置被提取文件在磁盘”spooling“目录下这一方式,提取数据。该source将会监控指定目录的新增文件,当新文件出现时解析event。event解析逻辑是可插入的。当一个给定文件被全部读取进channel之后,它被重命名,以标识为已完成(或者可选择deleted)。

要注意的是,放置到自动搜集目录下的文件不能修改,如果修改,则flume会报错。另外,也不能产生重名的文件,如果有重名的文件被放置进来,则flume会报错。

!channels  –

!type  –   类型,需要指定为"spooldir"

!spoolDir  –   读取文件的路径,即"搜集目录"

fileSuffix.COMPLETED对处理完成的文件追加的后缀

agent-1例子:

案例:

编写配置文件  修改上面给出的配置文件,除了Source部分配置不同,其余部分都一样。不同的地方如下:

#描述/配置Source
a1.sources.r1.type  = spooldir
a1.sources.r1.spoolDir=/home/park/work/apache-flume-1.6.0-bin/mydata

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template4.conf --name a1 -Dflume.root.logger=INFO,console

向指定目录中传输文件,发现flume收集到了该文件,将文件中的每一行都作为日志来处理

1.6 Taildir Source

注意:该source不能用于windows。

agent a1例子:

1.7 Twitter 1% firehose Source(试验)

1.8 Kafka Source

Kafka Source是Apache Kafka消费者,从Kfaka topics读取消息。如果你有多个Kafka source在跑,你可以配置它们在相同的Consumer Group,以使它们每个读取topics独特的分区。

以逗号分隔的topic列表进行topic订阅的例子:

通过正则表达式进行topic订阅的例子:

安全和Kafka Source

Kafka 0.9.0支持SASL/GSSAPI 或者 SSL 协议。

设置 kafka.consumer.security.protocol的值:

①SASL_PLAINTEXT - Kerberos or plaintext authentication with no data encryption

②SASL_SSL - Kerberos or plaintext authentication with data encryption

③SSL - TLS based encryption with optional authentication.

TLS和Kafka Source

带有服务端认证和数据加密配置的例子:

注意:属性ssl.endpoint.identification.algorithm没有定义,因此没有hostname验证,为了是hostname验证,可以设置属性:

如果要求有客户端认证,在Flume agent配置中添加下述配置。每个Flume agent必须有它的客户端凭证,以便被Kafka brokers信任。

如果keystore和key使用不用的密码保护,那么ssl.key.password属性需要提供出来:

Kerberos和Kafka Soure

kerberos配置文件可以在flume-env.sh通过JAVA_OPTS指定:

使用SASL_PLAINTEST的安全配置示例:

使用SASL_SSL的安全配置示例:

JAAS文件实例(暂时没看懂):

1.9 NetCat TCP Source

netcat source监听一个给定的端口,然后把接收到的数据每一行转换成一个event。要求属性是粗体字。

!channels–

!type–   类型名称,需要被设置为"netcat"

!bind–   指定要绑定到的ip或主机名。

!port–   指定要绑定到的端口号

max-line-length   512单行最大字节数

agent a1示例:

1.10 NetCat UDP Source

netcat source监听一个给定的端口,然后把text文件的每一行转换成一个event。要求属性是粗体字。

agent a1的示例:

1.11 Sequence Generator Source

一个简单的序列生成器可以不断生成events,带有counter计数器,从0开始,以1递增,在totalEvents停止。当不能发送events到channels时会不断尝试。

agent a1示例:

1.12 Syslog Sources

读取系统日志,并生成Flume events。UDP source以整条消息作为一个简单event。TCP source以新一行”n“分割的字符串作为一个新的event。

1.12.1 Syslog TCP Source

原始的,可靠的Syslog TCP source。

agent a1的syslog TCP source示例:

1.12.2 Multiport Syslog TCP Source

这是一个新的,更快的,多端口的Syslog TCP source版本。注意ports配置替代port。

agent a1的multiport syslog TCP source示例:

1.12.3 Syslog UDP Source

agent a1的syslog UDP source示例:

1.13 HTTP Source

HTTP Source接受HTTP的GET和POST请求作为Flume的事件,其中GET方式应该只用于试验。

该Source需要提供一个可插拔的"处理器"来将请求转换为事件对象,这个处理器必须实现HTTPSourceHandler接口,该处理器接受一个HttpServletRequest对象,并返回一个Flume Envent对象集合。

从一个HTTP请求中得到的事件将在一个事务中提交到通道中。因此允许像文件通道那样对通道提高效率。

如果处理器抛出一个异常,Source将会返回一个400的HTTP状态码。

如果通道已满,无法再将Event加入Channel,则Source返回503的HTTP状态码,表示暂时不可用。

!type    类型,必须为"HTTP"

!port–   监听的端口

bind   0.0.0.0    监听的主机名或ip

handler      org.apache.flume.source.http.JSONHandler处理器类,需要实现HTTPSourceHandler接口

handler.*  –   处理器的配置参数

selector.type

selector.*

interceptors  –

interceptors.*

enableSSL  false  是否开启SSL,如果需要设置为true。注意,HTTP不支持SSLv3。

excludeProtocols  SSLv3  空格分隔的要排除的SSL/TLS协议。SSLv3总是被排除的。

keystore      密钥库文件所在位置。

keystorePassword Keystore 密钥库密码

agent a1的http source示例:

Handler属性有两种,一是JSONHandler,一是BlobHandler。

BlobHandler用于处理请求参数带有比较大的对象(Binary Large Object),如PDF或者JPG。

案例2:

编写配置文件  修改上面给出的配置文件,除了Source部分配置不同,其余部分都一样。不同的地方如下:

1
2
3
#描述/配置Source
    a1.sources.r1.type  = http
    a1.sources.r1.port  = 66666

启动flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template6.conf --name a1 -Dflume.root.logger=INFO,console

通过命令发送HTTP请求到指定端口:

curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://0.0.0.0:6666

1.14 Stress Source

StressSource 是内部负载生成source的实现,这对于压力测试是非常有用的。它允许用户配置Event有效载荷的大小。

agent a1的示例:

1.15 Legacy Sources

legacy sources允许Flume 1.x agent接收来自Flume 0.9.4 agents的events。

legacy source 支持Avro和Thrift RPC 连接。为了使用两个Flume 版本搭建的桥梁,你需要开始一个带有avroLegacy或者thriftLegacy source的Flume 1.x agent。0.9.4agent应该有agent Sink指向1.x agent的host/port。

1.15.1 Avro Legacy Source

agent a1的示例:

1.15.2 Thrift Legacy Source

agent a1的示例:

1.16 Custom Source(自定义Source)

自定义Source是你实现Source接口。当启动Flume agent时,一个自定义source类和它依赖项必须在agent的classpath中。

agent a1的示例:

1.17 Scrible Source

Scribe是另一种类型的提取系统。采用现有的Scribe提取系统,Flume应该使用基于Thrift的兼容传输协议的ScribeSource。

agent a1示例:

转载于:https://www.cnblogs.com/duanxz/p/9157465.html

flume-source相关推荐

  1. Flume Source

    一 Avro Source 监听avro 端口和从外部的Avro 客户端接受event.当与其他的flumeagent内嵌的AvroSink成对时,他能创建分层集合. 1.1Required Prop ...

  2. Flume Source 实例

    Avro Source 监听avro端口,接收外部avro客户端数据流.跟前面的agent的Avro Sink可以组成多层拓扑结构. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 ...

  3. Flume的Avro Sink和Avro Source研究之一: Avro Source

    问题 : Avro Source提供了怎么样RPC服务,是怎么提供的? 问题 1.1 Flume Source是如何启动一个Netty Server来提供RPC服务. 由GitHub上avro-rpc ...

  4. Flume之Source

    Source Flume内置了大量的Sourece,其中Avro Source(集群).Thrift Source.Spooling Directory Source(目录).Kafka Source ...

  5. flume avro java_flume之Avro Source和Avro Sink

    一.Avro Souce介绍 Flume主要的RPC Source是Avro Source Avro Source被设计为高扩展的RPC服务器端,能从其他的Flume Agent的Avro Sink或 ...

  6. Flume的开发之 自定义 source 自定义 sink 自定义拦截器

    一:开发相关 加入 jar 包依赖: <dependency> <groupId>org.apache.flume</groupId> <artifactId ...

  7. Flume常用组件详解之Source

    Flume常用组件详解:Source Flume支持众多的source.sink.拦截器等组件具体实现,详细手册可参考官方文档http://flume.apache.org/FlumeUserGuid ...

  8. 大数据——Flume组件Source、Channel和Sink具体使用

    Flume组件Source.Channel和Sink使用说明 Flume Sources Avro Source 配置范例 Thrift Source 配置范例 Exec Source 配置范例 JM ...

  9. flume学习(十一):如何使用Spooling Directory Source

    最近在弄一个信令数据汇聚的事情,主要目的是把FTP上的信令数据汇聚到HDFS上去存储. 逻辑是这样的:把FTP服务器上的文件下载到一台主机上,然后SCP到另外一台主机上的Spooling Direct ...

  10. flume avro java_Flume的Avro Sink和Avro Source研究之一: Avro Source

    问题 : Avro Source提供了怎么样RPC服务,是怎么提供的? 问题 1.1 Flume Source是如何启动一个Netty Server来提供RPC服务. 由GitHub上avro-rpc ...

最新文章

  1. underscorejs之 _.indexBy(list, iteratee, [context])
  2. python之路_Python之路【第二篇】:Python基础(一)
  3. O-R mapping工具
  4. html 查找添加联系人,使用phonegap查找联系人的实现方法
  5. iphone-common-codes-ccteam源代码 CCUINavigationBar.h
  6. Uncaught SyntaxError: Unexpected token export
  7. 网上收集总结一下mssql( 部分)
  8. distinct性能问题_Mysql性能优化:如何给字符串加索引?
  9. 【Python分子动力学】
  10. 通信中的“交织”技术
  11. Xshell 颜色配置
  12. 前端基础总结--CSS
  13. 牛客网刷题java之(斐波那契数列)一只青蛙一次可以跳上1级台阶,也可以跳上2级。求该青蛙跳上一个n级的台阶总共有多少种跳法(先后次序不同算不同的结果)。
  14. 法国电影《蝴蝶》Le Papillon主题曲
  15. js 去除最后一个逗号
  16. mysql数据库命中率_Oracle数据库关于命中率的查询语句总结
  17. 徽州臭鳜鱼渐成“网红年货”
  18. sm2多端加密解密,java,js,android,ios实战
  19. ntp 服务端配置(/etc/ntp.conf配置详解) -小白实操记录
  20. 在钉钉小程序中使用可视化图表插件F2

热门文章

  1. CentOS7升级Git版本
  2. K8S部署工具:KubeOperator系统设置
  3. win10开启telnet客户端
  4. helm部署hadoop并指定namespace和名称的命令
  5. k8s部署nfs-client-provisioner完整实践版(亲测有效)
  6. JVM调优:打印所有-XX非标参数命令
  7. double处理arithmeticexception为什么不报错_为什么工业废气处理设备的价格不一样?...
  8. h3c GR5200路由器上如何设置公网ip可以访问
  9. Ubuntu下安装谷歌浏览器(Google chrome)报错
  10. QML中类似QMap的用法