Flume自定义拦截器
需求
- 定义两个拦截器,一个用于过滤不合法数据,一个用于区分日志类型。
- ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志。
- 日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。
导入依赖
<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency></dependencies>
ETL拦截器
package com.aura.flume.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;public class LogETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 1 获取数据byte[] body = event.getBody();String log = new String(body, Charset.forName("UTF-8"));// 2 判断数据类型并向Header中赋值if (log.contains("start")) {if (LogUtils.validateStart(log)){return event;}}else {if (LogUtils.validateEvent(log)){return event;}}// 3 返回校验结果return null;}@Overridepublic List<Event> intercept(List<Event> events) {ArrayList<Event> interceptors = new ArrayList<>();for (Event event : events) {Event intercept1 = intercept(event);if (intercept1 != null){interceptors.add(intercept1);}}return interceptors;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new LogETLInterceptor();}@Overridepublic void configure(Context context) {}}
}
LogUtils工具类
package com.aura.flume.interceptor;
import org.apache.commons.lang.math.NumberUtils;public class LogUtils {public static boolean validateEvent(String log) {// 服务器时间 | json// 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}// 1 切割String[] logContents = log.split("\\|");// 2 校验if(logContents.length != 2){return false;}//3 校验服务器时间if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){return false;}// 4 校验jsonif (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){return false;}return true;}public static boolean validateStart(String log) {// {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2","extend1":"","g":"S3HQ7LKM@gmail.com","hw":"640*960","l":"en","la":"-43.4","ln":"-98.3","loading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1","os":"8.2.1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"}if (log == null){return false;}// 校验jsonif (!log.trim().startsWith("{") || !log.trim().endsWith("}")){return false;}return true;}
}
日志类型拦截器
package com.aura.flume.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class LogTypeInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 区分日志类型: body header// 1 获取body数据byte[] body = event.getBody();String log = new String(body, Charset.forName("UTF-8"));// 2 获取headerMap<String, String> headers = event.getHeaders();// 3 判断数据类型并向Header中赋值if (log.contains("start")) {headers.put("topic","topic_start");}else {headers.put("topic","topic_event");}return event;}@Overridepublic List<Event> intercept(List<Event> events) {ArrayList<Event> interceptors = new ArrayList<>();for (Event event : events) {Event intercept1 = intercept(event);interceptors.add(intercept1);}return interceptors;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new LogTypeInterceptor();}@Overridepublic void configure(Context context) {}}
}
Flume配置文件
#从指定目录加载日志文件,到kafka channel,kafka channel有两个,一个保存启动日志,一个保存行为日志# 指定source、channel
a1.sources = r1
a1.channels = c1 c2# 配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1 c2
# 断点续传索引文件
a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 200# 配置拦截器
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.aura.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.aura.flume.interceptor.LogTypeInterceptor$Builder# 选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
# 指定topic名字
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2# 配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start
# 不包含flume headers信息
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumera1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
Flume自定义拦截器相关推荐
- flume自定义拦截器开发步骤
步骤如下: 1.新建一个java项目,不需要依赖spring等一系列依赖.只需要加上你用的 工具类的依赖.flume的依赖不用加,因为服务器里面有. 2.实现Interceptor接口,重写里面的in ...
- Hadoop生态圈-Flume的组件之自定义拦截器(interceptor)
Hadoop生态圈-Flume的组件之自定义拦截器(interceptor) 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 本篇博客只是举例了一个自定义拦截器的方法,测试字节传输速 ...
- Flume的开发之 自定义 source 自定义 sink 自定义拦截器
一:开发相关 加入 jar 包依赖: <dependency> <groupId>org.apache.flume</groupId> <artifactId ...
- 第1节 flume:15、flume案例二,通过自定义拦截器实现数据的脱敏
1.7.flume案例二 案例需求: 在数据采集之后,通过flume的拦截器,实现不需要的数据过滤掉,并将指定的第一个字段进行加密,加密之后再往hdfs上面保存 原始数据与处理之后的数据对比 图一 ...
- flume拦截器及自定义拦截器
拦截器做什么呢? 时间拦截器 以时间拦截器为例.会在Event的header中添加一个属性进去,属性的key叫做timestamp, value是当前的毫秒值. 问题是写到header然后呢?有啥用呢 ...
- 【Flume】(四)IDEA编写自定义拦截器
IDEA编写自定义拦截器 IDEA中导入以下依赖: <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core ...
- Flume四:多路复用(ChannelSelector之Multiplexing)+自定义拦截器
案例: 自定义拦截器 pom.xml <dependency><groupId>org.apache.flume</groupId><artifactId&g ...
- web开发(二十一)之自定义拦截器的使用
转自博客:http://blog.csdn.net/pkgk2013/article/details/51985817 拦截器的作用 拦截器,在AOP(Aspect-Oriented Programm ...
- SpringMVC——自定义拦截器、异常处理以及父子容器配置
SpringMVC--自定义拦截器.异常处理以及父子容器配置 参考文章: (1)SpringMVC--自定义拦截器.异常处理以及父子容器配置 (2)https://www.cnblogs.com/so ...
最新文章
- 计算机网络实验五,计算机网络(实验五).docx
- php i 获取不到值,thinkphp I方法讀取不到值~~~
- javascript中的constructor
- windows 10配置VS+MPI编程环境
- POJ 3080 Blue Jeans (多个字符串的最长公共序列,暴力比较)
- 数据结构与算法——图解平衡二叉树及代码实现
- 中英文对照 —— 几何(数学)
- 内联函数、默认参数和函数占位参数
- 俄国牛人写的开源爬虫xNet
- js执行函数报错Cannot set property 'value' of null
- 10篇最新年优秀CISSP认证考试心得分享(值得收藏)
- hdoj 5651 xiaoxin juju needs help 【组合数学】
- ISO三体系,招投标企业认证最多的资质
- Linux系统时间同步方法小结
- Java多线程 -- 深入理解JMM(Java内存模型) --(五)锁
- arduino入门教程书籍推荐,arduino从入门到精通
- 单片机可以替代PLC吗?
- U盘空闲空间格式化及自动挂载
- 查看java 多少位_如何查看jdk的版本是32位还是64位
- 18天精读掌握《费曼物理学讲义卷一》 第4天 2019.6.17
热门文章
- 虚幻4地形怎么增加层_怎么快速实现住房自由?学学这位95后小哥哥,花70万自建4层别墅...
- 安装配置Apache服务器
- 浅谈导航数据中POI搜索技术原理
- Islands UVA - 1665
- 【单细胞分析】P2.5、聚类,筛选marker基因,可视化
- 电动车AMT换挡规律研究——换挡点计算
- 请不要在该奋斗的年纪选择了安逸
- 报错:HTTP/2 stream 1 was not closed cleanly before end of the underlying stream
- 264Echarts - GL 路径图(Use linesGL to draw 1 million ny streets.)
- RBP系统管理之业务角色管理