Flume NG 学习笔记(八)Interceptors(拦截器)测试
版权声明:本文为博主原创文章,未经博主允许不得转载。
目录(?)[+]
拦截器主要是对事件的header信息信息操作,要么直接忽略他,要么修改他的数据
一、Event Serializers
file_roll sink 和hdfs sink 都支持EventSerializer接口
1.1、Body Text Serializer
Body TextSerializer,别名:text。这个拦截器将把事件的body部分写入到输出流中而不需要任何转换或者修改。事件的header将直接被忽略。
下面是官网配置:
Property Name |
Default |
Description |
appendNewline |
true |
Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons. |
下面是官网例子:appendNewline是选择是否加入到新行去。默认是true,而false 就是换行,一般我们都选择换行。
a1.sinks=k1
a1.sinks.k1.type=file_roll
a1.sinks.k1.channel=c1
a1.sinks.k1.sink.directory=/var/log/flume
a1.sinks.k1.sink.serializer=text
a1.sinks.k1.sink.serializer.appendNewline=false
下面是实际例子
因为是考虑Body TextSerializer的特性,他会忽略header的信息,因此我们这边要采用http source来接收定义的header 与body 的内容
[html] view plain copy
#配置文件:body_case15.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = http
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /tmp/logs
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline =false
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#敲命令
flume-ng agent -c conf -fconf/body_case15.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
curl -X POST -d '[{"headers":{"looklook1" : "looklook1 isheader","looklook2": "looklook2 isheader"},"body" : "hellolooklook5"}]'http://192.168.233.128:50000
#在启动源发送的代理终端查看console输出
数据已经输出,但只输出了hello looklook5,即BODY这块。
1.2、Avro Event Serializer
Avro Event Serializer别名:avro_event。这个拦截器将把事件序列化到一个Avro容器文件中。使用的模式和RPC Avro机制使用到的处理flume事件的机制一样。这个序列化器继承自AbstractAvroEventSerializer类。
官网例子
Property Name |
Default |
Description |
syncIntervalBytes |
2048000 |
Avro sync interval, in approximate bytes. |
compressionCodec |
null |
Avro compression codec. For supported codecs, see Avro’s CodecFactory docs. |
下面是官网例子
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer=avro_event
a1.sinks.k1.serializer.compressionCodec=snappy
例子这边就不演示了,因为和BodyText Serializer 差距不大。
二、Timestamp Interceptor
官网说Flume 可以在事件传输过程中对它进行修改与删除,而这个都是通过Interceptor进行实现的,实际都是往事件的header里插数据。而Timestamp Interceptor拦截器就是可以往event的header中插入关键词为timestamp的时间戳。
下面是官网配置
Property Name |
Default |
Description |
type |
– |
The component type name, has to be timestamp or the FQCN |
preserveExisting |
false |
If the timestamp already exists, should it be preserved - true or false |
以及官网例子
a1.sources=r1
a1.channels=c1
a1.sources.r1.channels= c1
a1.sources.r1.type=seq
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp
下面是测试例子
[html] view plain copy
#配置文件:timestamp_case16.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.preserveExisting= false
a1.sources.r1.interceptors.i1.type = timestamp
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path =hdfs://carl:9000/flume/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = looklook5.
a1.sinks.k1.hdfs.fileType=DataStream
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
这里拿header作为文件夹目录名称。
#敲命令
flume-ng agent -c conf -f conf/timestamp_case16.conf-n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
echo "TimestampInterceptor" | nc 192.168.233.128 50000
#在启动源发送的代理终端查看console输出
查看hdfs生成的文件,可以看到timestamp已经生成在header里面,可以根据自定义的格式生成文件夹,数据也都传输过来了。
三、Host Interceptor
该拦截器可以往event的header中插入关键词默认为host主机名或者ip地址(注意是agent运行的机器的主机名或者ip地址)
下面是官网配置
Property Name |
Default |
Description |
type |
– |
The component type name, has to be host |
preserveExisting |
false |
If the host header already exists, should it be preserved - true or false |
useIP |
true |
Use the IP Address if true, else use hostname. |
hostHeader |
host |
The header key to be used. |
以及官网例子
a1.sources=r1
a1.channels=c1
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=host
a1.sources.r1.interceptors.i1.hostHeader=hostname
下面是测试例子
[html] view plain copy
#配置文件:time_host_case17.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.preserveExisting= false
a1.sources.r1.interceptors.i1.type =timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader =hostname
a1.sources.r1.interceptors.i2.useIP = false
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path =hdfs://carl:9000/flume/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = %{hostname}
a1.sinks.k1.hdfs.fileType=DataStream
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
增加一个拦截器,类型是host,h将hostname作为文件前缀。
#敲命令
flume-ng agent -c conf -f conf/time_host_case17.conf-n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
echo "Time&hostInterceptor1" | nc 192.168.233.128 50000
echo "Time&hostInterceptor2" | nc 192.168.233.128 50000
#在启动源发送的代理终端查看console输出
查看hdfs生成的文件,可以看到host已经生成在header里面,可以根据自定义的格式生成文件夹,数据也都传输过来了。
四、Static Interceptor
Static Interceptor拦截器允许用户增加一个static的header并为所有的事件赋值。范围是所有事件。
官网配置
Property Name |
Default |
Description |
type |
– |
The component type name, has to be static |
preserveExisting |
true |
If configured header already exists, should it be preserved - true or false |
key |
key |
Name of header that should be created |
value |
value |
Static value that should be created |
其中参数key与value等于类似json格式里的"headers":{" key":" value"}
下面是官网例子
a1.sources=r1
a1.channels=c1
a1.sources.r1.channels= c1
a1.sources.r1.type=seq
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=static
a1.sources.r1.interceptors.i1.key=datacenter
a1.sources.r1.interceptors.i1.value=NEW_YORK
以及实际的列子
[html] view plain copy
#配置文件:static_case18.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = looklook5
a1.sources.r1.interceptors.i1.value =looklook10
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#敲命令
flume-ng agent -c conf -f conf/static_case18.conf-n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
echo "statInterceptor1" | nc 192.168.233.128 50000
#在启动源发送的代理终端查看console输出
可以看到输出的header信息里自定义部分正确输出,body部分也输出正确。
五、Regex FilteringInterceptor
Regex Filtering Interceptor拦截器用于过滤事件,筛选出与配置的正则表达式相匹配的事件。可以用于包含事件和排除事件。常用于数据清洗,通过正则表达式把数据过滤出来。
官网配置
Property Name |
Default |
Description |
type |
– |
The component type name has to be regex_filter |
regex |
”.*” |
Regular expression for matching against events |
excludeEvents |
false |
If true, regex determines events to exclude, otherwise regex determines events to include. |
excludeEvents 为true的时候为排除所有匹配正则表达式的数据。
下面是测试例子
[html] view plain copy
#配置文件:regex_filter_case19.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents =true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
我们对开头字母是数字的数据,全部过滤。
#敲命令
flume-ng agent -c conf -f conf/regex_filter_case19.conf-n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
echo "a" | nc192.168.233.128 50000
echo "1222" |nc 192.168.233.128 50000
echo "a222" |nc 192.168.233.128 50000
#在启动源发送的代理终端查看console输出
可以看出1222 被认为是无效的数据没有发出来。
Regex Filtering Interceptor测试成功。
转载于:https://blog.51cto.com/jackwxh/1906899
Flume NG 学习笔记(八)Interceptors(拦截器)测试相关推荐
- Vue学习笔记:axios 拦截器的用法
Vue学习笔记:axios 拦截器的用法 什么是axios 拦截器? 拦截器就是拦截每一次的请求和响应,然后进行相应的处理.请求拦截器,它可以统一在你发送请求前在请求体里加上token:响应拦截器,是 ...
- Flume NG 学习笔记(五)Sinks和Channel配置
一.HDFS Sink Flume Sink是将事件写入到Hadoop分布式文件系统(HDFS)中.主要是Flume在Hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景. ...
- Struts2学习笔记04 之 拦截器
一.创建拦截器组件 1. 创建一个类,实现Interceptor接口,并实现intercept方法 2.注册拦截器 3.引用拦截器 二.拦截器栈 预置拦截器: 默认引用拦截器 拦截器调用顺序: Fil ...
- Flume的学习笔记
Flume的学习笔记 文章目录 Flume的学习笔记 1. Flume简介 1.1 Flume的基本概念 (1) 什么是Flume (2) Flume 目的 1.2 Flume 基本组件 (1) Fl ...
- 【Java Web开发学习】Spring MVC 拦截器HandlerInterceptor
[Java Web开发学习]Spring MVC 拦截器HandlerInterceptor 转载:https://www.cnblogs.com/yangchongxing/p/9324119.ht ...
- 安卓学习笔记之骚扰拦截
安卓学习笔记之骚扰拦截 1.监听电话状态 2.检测来电号码是否匹配拦截条件 3.若匹配,则挂断电话 实例代码: 获取TelephonyManager 对象,并监听来电状态 TelephonyManag ...
- 黑马程序员_java自学学习笔记(八)----网络编程
黑马程序员_java自学学习笔记(八)----网络编程 android培训. java培训.期待与您交流! 网络编程对于很多的初学者来说,都是很向往的一种编程技能,但是很多的初学者却因为很长一段时间无 ...
- OpenGL学习笔记(八):进一步理解VAO、VBO和SHADER,并使用VAO、VBO和SHADER绘制一个三角形
原博主博客地址:http://blog.csdn.net/qq21497936 本文章博客地址:http://blog.csdn.net/qq21497936/article/details/7888 ...
- Spring MVC学习(8)—HandlerInterceptor处理器拦截器机制全解
基于最新Spring 5.x,详细介绍了Spring MVC的HandlerInterceptor处理器拦截器机制,以及它的一系列拦截方法. 本次我们来学习Sring MVC的HandlerInter ...
最新文章
- 银行柜台基金买卖现长龙 业内支招宜用新方式
- Cesium Vue开发环境搭建
- NYOJ 300 hdu 2276 Kiki Little Kiki 2 (矩阵快速幂)
- linux下搭建cacti监控
- Javascript中的arguments数组对象
- java控制台高级_K9s Kubernetes的高级控制台
- matlab小波分析常用函数
- mysql 添加最高权限设置_mysql 添加用户并设置权限
- Tachyou alluxio初识
- 理工大学统考计算机在线作业,北京理工在线作业-现代远程学习技术-20210417121542.pdf-原创力文档...
- java settings文件夹_JAVA工具例大全--Setting文件读取配置参数
- opencart 添加新模型
- 使用电脑开发的,连个黑屏休眠都不会设置?
- 怎么学习PLC技术?
- EfficientFormer | 苹果手机实时推理的Transformer模型,登顶轻量化Backbone之巅
- 产品调研,如何避免「浮于表面」?
- iFunk超极本或出新,你最想知道什么
- codeforces 300B切题记录
- 计算机机房的安全知识有哪些,计算机机房消防安全管理制度
- win10-LTSC2019装机必备操作和软件备忘录