转载自 Flume中的拦截器(Interceptor)介绍与使用(一)

Flume中的拦截器(interceptor)

用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。这在实际业务场景中非常有用.

Flume-ng 1.6中目前提供了以下拦截器:

Timestamp Interceptor;
Host Interceptor;
Static Interceptor;
UUID Interceptor;
Morphline Interceptor;
Search and Replace Interceptor;
Regex Filtering Interceptor;
Regex Extractor Interceptor;

本文对常用的几种拦截器进行学习和介绍,并附上使用示例。

本文中使用的Source为TaildirSource,就是监控一个文件的变化,将内容发送给Sink,具体可参考《Flume中的TaildirSource》.

Source配置如下:

#-->设置sources名称
agent_lxw1234.sources = sources1
#--> 设置channel名称
agent_lxw1234.channels = fileChannel
#--> 设置sink 名称
agent_lxw1234.sinks = sink1# source 配置
agent_lxw1234.sources.sources1.type = com.lxw1234.flume17.TaildirSource
agent_lxw1234.sources.sources1.positionFile = /tmp/flume/agent_lxw1234_position.json
agent_lxw1234.sources.sources1.filegroups = f1
agent_lxw1234.sources.sources1.filegroups.f1 = /tmp/lxw1234_.*.log
agent_lxw1234.sources.sources1.batchSize = 100
agent_lxw1234.sources.sources1.backoffSleepIncrement = 1000
agent_lxw1234.sources.sources1.maxBackoffSleep = 5000
agent_lxw1234.sources.sources1.channels = fileChannel

Flume Source中使用拦截器的相关配置如下:

## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1 i2
agent_lxw1234.sources.sources1.interceptors.i1.type = host
agent_lxw1234.sources.sources1.interceptors.i1.useIP = false
agent_lxw1234.sources.sources1.interceptors.i1.hostHeader = agentHost
agent_lxw1234.sources.sources1.interceptors.i2.type = timestamp

对一个Source可以使用多个拦截器。

一、Timestamp Interceptor

时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多。比如在使用HDFS Sink时候,根据events的时间戳生成结果文件,hdfs.path = hdfs://cdh5/tmp/dap/%Y%m%d

hdfs.filePrefix = log_%Y%m%d_%H

会根据时间戳将数据写入相应的文件中。

但可以用其他方式代替(设置useLocalTimeStamp = true)。

二、Host Interceptor

主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)。

根据上面的Source,拦截器的配置如下:

## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1
agent_lxw1234.sources.sources1.interceptors.i1.type = host
agent_lxw1234.sources.sources1.interceptors.i1.useIP = false
agent_lxw1234.sources.sources1.interceptors.i1.hostHeader = agentHost# sink 1 配置
agent_lxw1234.sinks.sink1.type = hdfs
agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234/%Y%m%d
agent_lxw1234.sinks.sink1.hdfs.filePrefix = lxw1234_%{agentHost}
agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .log
agent_lxw1234.sinks.sink1.hdfs.fileType = DataStream
agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true
agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text
agent_lxw1234.sinks.sink1.hdfs.rollCount = 0
agent_lxw1234.sinks.sink1.hdfs.rollSize = 0
agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600
agent_lxw1234.sinks.sink1.hdfs.batchSize = 500
agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10
agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0
agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1
agent_lxw1234.sinks.sink1.channel = fileChannel

该配置用于将source的events保存到HDFS上hdfs://cdh5/tmp/lxw1234的目录下,文件名为lxw1234_<主机名>.log

三、Static Interceptor

静态拦截器,用于在events header中加入一组静态的key和value。

根据上面的Source,拦截器的配置如下:

## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1
agent_lxw1234.sources.sources1.interceptors.i1.type = static
agent_lxw1234.sources.sources1.interceptors.i1.preserveExisting = true
agent_lxw1234.sources.sources1.interceptors.i1.key = static_key
agent_lxw1234.sources.sources1.interceptors.i1.value = static_value# sink 1 配置
agent_lxw1234.sinks.sink1.type = hdfs
agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234
agent_lxw1234.sinks.sink1.hdfs.filePrefix = lxw1234_%{static_key}
agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .log
agent_lxw1234.sinks.sink1.hdfs.fileType = DataStream
agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true
agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text
agent_lxw1234.sinks.sink1.hdfs.rollCount = 0
agent_lxw1234.sinks.sink1.hdfs.rollSize = 0
agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600
agent_lxw1234.sinks.sink1.hdfs.batchSize = 500
agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10
agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0
agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1
agent_lxw1234.sinks.sink1.channel = fileChannel

看看最终Sink在HDFS上生成的文件结构:

四、UUID Interceptor

UUID拦截器,用于在每个events header中生成一个UUID字符串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中读取并使用。根据上面的source,拦截器的配置如下:

## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1
agent_lxw1234.sources.sources1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
agent_lxw1234.sources.sources1.interceptors.i1.headerName = uuid
agent_lxw1234.sources.sources1.interceptors.i1.preserveExisting = true
agent_lxw1234.sources.sources1.interceptors.i1.prefix = UUID_# sink 1 配置
agent_lxw1234.sinks.sink1.type = logger
agent_lxw1234.sinks.sink1.channel = fileChannel

运行后在日志中查看header信息:

五、Morphline Interceptor

Morphline拦截器,该拦截器使用Morphline对每个events数据做相应的转换。关于Morphline的使用,可参考

http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html

后续再研究这块。

Hadoop生态Flume(三)拦截器(Interceptor)介绍与使用(1)相关推荐

  1. Hadoop生态Flume(四)拦截器(Interceptor)介绍与使用(2)

    转载自 Flume中的拦截器(Interceptor)介绍与使用(二) lume中的拦截器(interceptor),用户Source读取events发送到Sink的时候,在events header ...

  2. 总结 拦截器(Interceptor) 和 过滤器(Filter)的区别

    一.前言 拦截器(Interceptor) 和 过滤器(Filter)的区别是面试中常问的问题,也是开发中容易被大家混淆的问题,在此总结下,希望对大家有所帮助. 二.Filter 介绍 2.1.概念 ...

  3. 从零开始SpringCloud Alibaba实战(59)——过滤器filter、拦截器interceptor、和AOP的区别与联系及应用

    文章目录 前言 过滤器 拦截器 过滤器与拦截器的区别 AOP(面向切面) 三者使用场景 Filter过滤器 Interceptor拦截器 Spring AOP拦截器 Filter与Intercepto ...

  4. 过滤器(Filter)和拦截器(Interceptor)的区别

    来自:http://www.cnblogs.com/luoyun/archive/2013/01/04/2844274.html 过滤器(Filter)和拦截器(Interceptor)的区别 Fil ...

  5. spring过滤器Filter 、 拦截器Interceptor 、 切片Aspect 详解

    springboot 过滤器Filter vs 拦截器Interceptor vs 切片Aspect 详解 1 前言 最近接触到了过滤器和拦截器,网上查了查资料,这里记录一下,这篇文章就来仔细剖析下过 ...

  6. Flume-NG源码阅读之SourceRunner,及选择器selector和拦截器interceptor的执行

    在AbstractConfigurationProvider类中loadSources方法会将所有的source进行封装成SourceRunner放到了Map<String, SourceRun ...

  7. Struts2拦截器(Interceptor)原理详解

    1.    理解拦截器 1.1.    什么是拦截器: 拦截器,在AOP(Aspect-Oriented Programming)中用于在某个方法或字段被访问之前,进行拦截然后在之前或之后加入某些操作 ...

  8. struts2学习笔记--拦截器(Interceptor)和登录权限验证Demo

    理解 Interceptor拦截器类似于我们学过的过滤器,是可以在action执行前后执行的代码.是我们做web开发是经常使用的技术,比如权限控制,日志.我们也可以把多个interceptor连在一起 ...

  9. Springmvc中的拦截器interceptor及与过滤器filter的区别

    一.Springmvc中的拦截器概述及与过滤器filter的区别 1).Springmvc中的拦截器interceptor用于对控制器controller进行预处理和后处理的技术; 2).可以定义拦截 ...

最新文章

  1. 分享HTML 5的参考手册,演讲稿,电子书和教程
  2. POJ - 2142 The Balance(扩展欧几里得)
  3. lucene 索引出错 no segments* file found in org.apache.lucene.store.MMapDirectory
  4. zookeeper 日志查看_每天使用的注册中心zookeeper,流量暴涨怎么办?
  5. javascript 嵌入python_通过Python将区块链数据嵌入Javascript,这是正确的方法吗?
  6. 诸多研究生的一个通病:对导师过度依赖!
  7. 7 centos 时钟跟物理机同步_centos7上使用chrony自动同步时间
  8. 【原创】为什么 Redis 重启后没有正确恢复之前的内存数据
  9. 【vue2.0进阶】轻松理解Vuex的3个核心概念
  10. Hadoop的环境搭建
  11. Android应用开发之使用Socket进行大文件断点上传续传
  12. 《程序员修炼之道》笔记(五)
  13. 如何在 Ubuntu 上转换图像、音频和视频格式
  14. 多空对比(DKDB)指标
  15. 微信小程序 选项卡demo
  16. UCI机器学习库和一些相关算法
  17. 查看计算机各程序运行时间,查看电脑运行时间_查看电脑运行时间命令
  18. 2021年 阿里云商标注册申请的相关详情及分类介绍
  19. SAP_ABAP_采购价格条件报表
  20. centos7 默认中文字体_centos7安装中文宋体

热门文章

  1. python开发一个自己的技术网站_手把手教你写网站:Python WEB开发技术实战
  2. 函数求值需要运行所有线程_JavaScript函数式编程(二)
  3. java 8 stream 性能_java8中parallelStream性能测试及结果分析
  4. php如何判断二维数组为空,PHP判断数组为空的具体方式
  5. [JavaWeb-HTML]HTML标签_图片标签
  6. [Java基础]Stream流的常见生成方式
  7. 数位dp总结 之 从入门到模板(stO)
  8. 二叉搜索树(创建,插入,删除):基础篇,适合新手观看。
  9. android elevation 白色,Android Elevation
  10. python 进行一元线性回归并输出相关结果_Python实现一元线性回归实战