Flume-NG源码阅读之Interceptor(原创)
有的时候希望通过Flume将读取的文件再细分存储,比如讲source的数据按照业务类型分开存储,具体一点比如类似:将source中web、wap、media等的内容分开存储;比如丢弃或修改一些数据。这时可以考虑使用拦截器Interceptor。
flume通过拦截器实现修改和丢弃事件的功能。拦截器通过定义类继承org.apache.flume.interceptor.Interceptor接口来实现。用户可以通过该节点定义规则来修改或者丢弃事件。Flume支持链式拦截,通过在配置中指定构建的拦截器类的名称。在source的配置中,拦截器被指定为一个以空格为间隔的列表。拦截器按照指定的顺序调用。一个拦截器返回的事件列表被传递到链中的下一个拦截器。当一个拦截器要丢弃某些事件时,拦截器只需要在返回事件列表时不返回该事件即可。若拦截器要丢弃所有事件,则其返回一个空的事件列表即可。
先解释一下一个重要对象Event:event是flume传输的最小对象,从source获取数据后会先封装成event,然后将event发送到channel,sink从channel拿event消费。event由头(Map<String, String> headers)和身体(body)两部分组成:Headers部分是一个map,body部分可以是String或者byte[]等。其中body部分是真正存放数据的地方,headers部分用于本节所讲的interceptor。
Flume-NG自带拦截器有多种:
1、HostInterceptor:使用IP或者hostname拦截;
2、TimestampInterceptor:使用时间戳拦截;
3、RegexExtractorInterceptor:该拦截器提取正则表达式匹配组,通过使用指定的正则表达式并追加匹配组作为事件的header。它还支持可插拔的serializers用于在添加匹配组作为事件header之前格式化匹配组;
4、RegexFilteringInterceptor:该拦截器会选择性地过滤事件。通过以文本的方式解析事件主体,用配置好的规则表达式来匹配文本。提供的正则表达式可以用于包含事件或排除事件;这个和上面的那个区别是这个会按照正则表达式选择性的让event通过,上面那个是提取event.body符合正则的内容作为headers的value。
5、StaticInterceptor:可以自定义event的header的value。
这些类都在org.apache.flume.interceptor包下。
这些interceptor都比较简单我们选取HostInterceptor来讲解interceptor的原理,以及如何自己定制interceptor。
这些interceptor都实现了org.apache.flume.interceptor.Interceptor接口,该接口有四个方法以及一个内部接口:
1、public void initialize()运行前的初始化,一般不需要实现(上面的几个都没实现这个方法);
2、public Event intercept(Event event)处理单个event;
3、public List<Event> intercept(List<Event> events)批量处理event,实际上市循环调用上面的2;
4、public void close()可以做一些清理工作,上面几个也都没有实现这个方法;
5、 public interface Builder extends Configurable 构建Interceptor对象,外部使用这个Builder来获取Interceptor对象。
如果要自己定制,必须要完成上面的2,3,5。
下面,我们来看看org.apache.flume.interceptor.HostInterceptor,其全部代码如下:
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flume.interceptor;import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; import java.util.Map; import org.apache.flume.Context; import org.apache.flume.Event; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import static org.apache.flume.interceptor.HostInterceptor.Constants.*;/*** Simple Interceptor class that sets the host name or IP on all events* that are intercepted.<p>* The host header is named <code>host</code> and its format is either the FQDN* or IP of the host on which this interceptor is run.*** Properties:<p>** preserveExisting: Whether to preserve an existing value for 'host'* (default is false)<p>** useIP: Whether to use IP address or fully-qualified hostname for 'host'* header value (default is true)<p>** hostHeader: Specify the key to be used in the event header map for the* host name. (default is "host") <p>** Sample config:<p>** <code>* agent.sources.r1.channels = c1<p>* agent.sources.r1.type = SEQ<p>* agent.sources.r1.interceptors = i1<p>* agent.sources.r1.interceptors.i1.type = host<p>* agent.sources.r1.interceptors.i1.preserveExisting = true<p>* agent.sources.r1.interceptors.i1.useIP = false<p>* agent.sources.r1.interceptors.i1.hostHeader = hostname<p>* </code>**/ public class HostInterceptor implements Interceptor {private static final Logger logger = LoggerFactory.getLogger(HostInterceptor.class);private final boolean preserveExisting;private final String header;private String host = null;/*** Only {@link HostInterceptor.Builder} can build me*/private HostInterceptor(boolean preserveExisting,boolean useIP, String header) {this.preserveExisting = preserveExisting;this.header = header;InetAddress addr;try {addr = InetAddress.getLocalHost();if (useIP) {host = addr.getHostAddress();} else {host = addr.getCanonicalHostName();}} catch (UnknownHostException e) {logger.warn("Could not get local host address. Exception follows.", e);}}@Overridepublic void initialize() {// no-op }/*** Modifies events in-place.*/@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();if (preserveExisting && headers.containsKey(header)) {return event;}if(host != null) {headers.put(header, host);}return event;}/*** Delegates to {@link #intercept(Event)} in a loop.* @param events* @return*/@Overridepublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}@Overridepublic void close() {// no-op }/*** Builder which builds new instances of the HostInterceptor.*/public static class Builder implements Interceptor.Builder {private boolean preserveExisting = PRESERVE_DFLT;private boolean useIP = USE_IP_DFLT;private String header = HOST;@Overridepublic Interceptor build() {return new HostInterceptor(preserveExisting, useIP, header);}@Overridepublic void configure(Context context) {preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);useIP = context.getBoolean(USE_IP, USE_IP_DFLT);header = context.getString(HOST_HEADER, HOST);}}public static class Constants {public static String HOST = "host";public static String PRESERVE = "preserveExisting";public static boolean PRESERVE_DFLT = false;public static String USE_IP = "useIP";public static boolean USE_IP_DFLT = true;public static String HOST_HEADER = "hostHeader";}}
Constants类是参数类及默认的一些参数:
Builder类是构造HostInterceptor对象的,它会首先通过configure(Context context)方法获取配置文件中interceptor的参数,然后方法build()用来返回一个HostInterceptor对象:
1、preserveExisting表示如果event的header中包含有本interceptor指定的header,是否要保留这个header,true则保留;
2、useIP表示是否使用本机IP地址作为header的value,true则使用IP,默认是true;
3、header是event的headers的key,默认是host。
HostInterceptor:
1、构造函数除了赋值外,还有就是根据useIP获取IP或者hostname;
2、intercept(Event event)方法是设置event的header的地方,首先是获取headers对象,然后如果同时满足preserveExisting==true并且headers.containsKey(header)就直接返回event,否则设置headers:headers.put(header, host)。
3、intercept(List<Event> events)方法是循环调用上述2的方法。
显然其他几个Interceptor也就类似这样。在配置文件中配置source的interceptor时,如果是自己定制的interceptor,则需要对type参数赋值:完整类名+¥Builder,比如com.MyInterceptor$Builder即可。
这样设置好headers后,就可以在后续的流转中通过selector实现细分存储。
欢迎大伙交流
感谢XX峰的flume用户指南的中文翻译。
转载于:https://www.cnblogs.com/lxf20061900/p/3664602.html
Flume-NG源码阅读之Interceptor(原创)相关推荐
- mybatis源码阅读(八) ---Interceptor了解一下
转载自 mybatis源码阅读(八) ---Interceptor了解一下 1 Intercetor MyBatis 允许你在已映射语句执行过程中的某一点进行拦截调用.默认情况下,MyBatis允许 ...
- Flume-NG源码阅读之SourceRunner,及选择器selector和拦截器interceptor的执行
在AbstractConfigurationProvider类中loadSources方法会将所有的source进行封装成SourceRunner放到了Map<String, SourceRun ...
- mybatis源码阅读
说下mybatis执行一个sql语句的流程 执行语句,事务等SqlSession都交给了excutor,excutor又委托给statementHandler SimpleExecutor:每执行一次 ...
- SpringMVC源码阅读:过滤器
SpringMVC源码阅读:过滤器 目录 1.前言 2.源码分析 3.自定义过滤器 3.1 自定义过滤器继承OncePerRequestFilter 3.2 自定义过滤器实现Filter接口 4.过滤 ...
- Java struts 2 源码阅读入门
一 搭建源码阅读环境 首先新建一个struts 2 实例工程,并附着源码: 在Eclipse中新建一个动态web工程:完成后结构如下: 添加如下图的包:可以直接拖到lib文件夹:完成后如下: 新建一个 ...
- Flume-NG源码阅读之AvroSink
org.apache.flume.sink.AvroSink是用来通过网络来传输数据的,可以将event发送到RPC服务器(比如AvroSource),使用AvroSink和AvroSource可以组 ...
- mybatis源码阅读(二):mybatis初始化上
转载自 mybatis源码阅读(二):mybatis初始化上 1.初始化入口 //Mybatis 通过SqlSessionFactory获取SqlSession, 然后才能通过SqlSession与 ...
- 【Dubbo源码阅读系列】服务暴露之本地暴露
在上一篇文章中我们介绍 Dubbo 自定义标签解析相关内容,其中我们自定义的 XML 标签 <dubbo:service /> 会被解析为 ServiceBean 对象(传送门:Dubbo ...
- MyBatis 源码阅读 -- 核心操作篇
核心操作包是 MyBatis 进行数据库查询和对象关系映射等工作的包.该包中的类能完成参数解析.数据库查询.结果映射等主要功能.在主要功能的执行过程中还会涉及缓存.懒加载.鉴别器处理.主键自增.插件支 ...
最新文章
- RabbitMQ队列监控
- centos6.5 php5.2,Linux中PHP安装与配置(CentOS-6.5:php-5.2.13)
- Scrapy框架中的crawlspider爬虫
- 大数据分析常用去重算法分析『HyperLogLog 篇』
- Spring mvc3的ajax
- memcached客户端_对比Memcached和Redis,谁才是适合你的缓存?
- 算法----------快乐数 (Java版本)
- Linux-鸟菜-2-主机规划与磁盘分区
- linux服务器搭建_基于LINUX系统的邮件服务器搭建和详细部署(POSTFIX)
- n卡eth挖矿设置_ETH2.0要来了,要不要布局显卡挖矿?
- 与或非逻辑 页面展示html,一种实现与非、或非门逻辑的忆阻器电路
- Robotium测试报告的生成方法(下)
- opencv的sift算法
- 仿美团外卖源码加自己做了个模拟数据加载的效果
- ASCII码与16进制转换表
- 金彩教育:选择关键词要参考哪些数据
- python关于类和对象说法正确的是_关于类和对象的关系,下列描述正确的是()。
- 微信小程序wx:for 循环中item的keng
- 20、中断和动态时钟显示
- 小程序云开发登陆流程
热门文章
- 千寻位置 开发demo_CICV2019:博世相对高精度定位与千寻绝对高精度定位
- 递归法:实现指数型枚举(二叉树递归)
- 块状树(bzoj 3720: Gty的妹子树)
- 循环矩阵的快速幂(bzoj 2510: 弱题)
- Kong API Gateway 配置文件详解
- Quartus17报错Top-level design entity “dff“ is undefined的解决办法
- 【POJ】2454.Jersey Politics
- JavaScript初阶(十)---------- 数组
- 新手学习算法----二叉树(将一个二叉查找树按照中序遍历转换成双向链表)
- zendstudio for mac破解版