需求

  • 定义两个拦截器,一个用于过滤不合法数据,一个用于区分日志类型。
  • ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志。
  • 日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。

导入依赖

 <dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency></dependencies>

ETL拦截器

package com.aura.flume.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;public class LogETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 1 获取数据byte[] body = event.getBody();String log = new String(body, Charset.forName("UTF-8"));// 2 判断数据类型并向Header中赋值if (log.contains("start")) {if (LogUtils.validateStart(log)){return event;}}else {if (LogUtils.validateEvent(log)){return event;}}// 3 返回校验结果return null;}@Overridepublic List<Event> intercept(List<Event> events) {ArrayList<Event> interceptors = new ArrayList<>();for (Event event : events) {Event intercept1 = intercept(event);if (intercept1 != null){interceptors.add(intercept1);}}return interceptors;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new LogETLInterceptor();}@Overridepublic void configure(Context context) {}}
}

LogUtils工具类

package com.aura.flume.interceptor;
import org.apache.commons.lang.math.NumberUtils;public class LogUtils {public static boolean validateEvent(String log) {// 服务器时间 | json// 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}// 1 切割String[] logContents = log.split("\\|");// 2 校验if(logContents.length != 2){return false;}//3 校验服务器时间if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){return false;}// 4 校验jsonif (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){return false;}return true;}public static boolean validateStart(String log) {// {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2","extend1":"","g":"S3HQ7LKM@gmail.com","hw":"640*960","l":"en","la":"-43.4","ln":"-98.3","loading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1","os":"8.2.1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"}if (log == null){return false;}// 校验jsonif (!log.trim().startsWith("{") || !log.trim().endsWith("}")){return false;}return true;}
}

日志类型拦截器

package com.aura.flume.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class LogTypeInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 区分日志类型:   body  header// 1 获取body数据byte[] body = event.getBody();String log = new String(body, Charset.forName("UTF-8"));// 2 获取headerMap<String, String> headers = event.getHeaders();// 3 判断数据类型并向Header中赋值if (log.contains("start")) {headers.put("topic","topic_start");}else {headers.put("topic","topic_event");}return event;}@Overridepublic List<Event> intercept(List<Event> events) {ArrayList<Event> interceptors = new ArrayList<>();for (Event event : events) {Event intercept1 = intercept(event);interceptors.add(intercept1);}return interceptors;}@Overridepublic void close() {}public static class Builder implements  Interceptor.Builder{@Overridepublic Interceptor build() {return new LogTypeInterceptor();}@Overridepublic void configure(Context context) {}}
}

Flume配置文件

#从指定目录加载日志文件,到kafka channel,kafka channel有两个,一个保存启动日志,一个保存行为日志# 指定source、channel
a1.sources = r1
a1.channels = c1 c2# 配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1 c2
# 断点续传索引文件
a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 200# 配置拦截器
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.aura.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.aura.flume.interceptor.LogTypeInterceptor$Builder# 选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
# 指定topic名字
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2# 配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start
# 不包含flume headers信息
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumera1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

Flume自定义拦截器相关推荐

  1. flume自定义拦截器开发步骤

    步骤如下: 1.新建一个java项目,不需要依赖spring等一系列依赖.只需要加上你用的 工具类的依赖.flume的依赖不用加,因为服务器里面有. 2.实现Interceptor接口,重写里面的in ...

  2. Hadoop生态圈-Flume的组件之自定义拦截器(interceptor)

    Hadoop生态圈-Flume的组件之自定义拦截器(interceptor) 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 本篇博客只是举例了一个自定义拦截器的方法,测试字节传输速 ...

  3. Flume的开发之 自定义 source 自定义 sink 自定义拦截器

    一:开发相关 加入 jar 包依赖: <dependency> <groupId>org.apache.flume</groupId> <artifactId ...

  4. 第1节 flume:15、flume案例二,通过自定义拦截器实现数据的脱敏

    1.7.flume案例二 案例需求: 在数据采集之后,通过flume的拦截器,实现不需要的数据过滤掉,并将指定的第一个字段进行加密,加密之后再往hdfs上面保存 原始数据与处理之后的数据对比 图一  ...

  5. flume拦截器及自定义拦截器

    拦截器做什么呢? 时间拦截器 以时间拦截器为例.会在Event的header中添加一个属性进去,属性的key叫做timestamp, value是当前的毫秒值. 问题是写到header然后呢?有啥用呢 ...

  6. 【Flume】(四)IDEA编写自定义拦截器

    IDEA编写自定义拦截器 IDEA中导入以下依赖: <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core ...

  7. Flume四:多路复用(ChannelSelector之Multiplexing)+自定义拦截器

    案例: 自定义拦截器 pom.xml <dependency><groupId>org.apache.flume</groupId><artifactId&g ...

  8. web开发(二十一)之自定义拦截器的使用

    转自博客:http://blog.csdn.net/pkgk2013/article/details/51985817 拦截器的作用 拦截器,在AOP(Aspect-Oriented Programm ...

  9. SpringMVC——自定义拦截器、异常处理以及父子容器配置

    SpringMVC--自定义拦截器.异常处理以及父子容器配置 参考文章: (1)SpringMVC--自定义拦截器.异常处理以及父子容器配置 (2)https://www.cnblogs.com/so ...

最新文章

  1. 计算机网络实验五,计算机网络(实验五).docx
  2. php i 获取不到值,thinkphp I方法讀取不到值~~~
  3. javascript中的constructor
  4. windows 10配置VS+MPI编程环境
  5. POJ 3080 Blue Jeans (多个字符串的最长公共序列,暴力比较)
  6. 数据结构与算法——图解平衡二叉树及代码实现
  7. 中英文对照 —— 几何(数学)
  8. 内联函数、默认参数和函数占位参数
  9. 俄国牛人写的开源爬虫xNet
  10. js执行函数报错Cannot set property 'value' of null
  11. 10篇最新年优秀CISSP认证考试心得分享(值得收藏)
  12. hdoj 5651 xiaoxin juju needs help 【组合数学】
  13. ISO三体系,招投标企业认证最多的资质
  14. Linux系统时间同步方法小结
  15. Java多线程 -- 深入理解JMM(Java内存模型) --(五)锁
  16. arduino入门教程书籍推荐,arduino从入门到精通
  17. 单片机可以替代PLC吗?
  18. U盘空闲空间格式化及自动挂载
  19. 查看java 多少位_如何查看jdk的版本是32位还是64位
  20. 18天精读掌握《费曼物理学讲义卷一》 第4天 2019.6.17

热门文章

  1. 虚幻4地形怎么增加层_怎么快速实现住房自由?学学这位95后小哥哥,花70万自建4层别墅...
  2. 安装配置Apache服务器
  3. 浅谈导航数据中POI搜索技术原理
  4. Islands UVA - 1665
  5. 【单细胞分析】P2.5、聚类,筛选marker基因,可视化
  6. 电动车AMT换挡规律研究——换挡点计算
  7. 请不要在该奋斗的年纪选择了安逸
  8. 报错:HTTP/2 stream 1 was not closed cleanly before end of the underlying stream
  9. 264Echarts - GL 路径图(Use linesGL to draw 1 million ny streets.)
  10. RBP系统管理之业务角色管理