Flume实战-解决零点漂移-时间戳拦截器
对于日志,flume sink写入HDFS时,如果按照时间生成文件,在没有明确指定时间的情况下,会读取服务器时间作为创建文件的依据,这会导致日志的实际生成日期与文件不符。
这种情况下,可以通过拦截器在flume事件头指定timestamp作为文件的创建依据。
所谓零点漂移,就是上述问题的具体表现。即在按天生成日志文件的情况下,一条23:59:59左右生成的日志发送到服务器后可能已经是第二天了,如果没有指定时间,会被写入第二天对应的文件中,这就是所谓的零点漂移。
要解决零点漂移的问题,通常是将日志中记录的日志创建时间提取出来,写入flume事件头的timestamp字段,有了这个字段,flume创建文件时,会依据这个字段创建文件,这种场景很类似spark、flink的事件事件和处理事件。
示例代码,如下:
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TimeStampInterceptor implements Interceptor {private ArrayList<Event> events = new ArrayList<>();@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {events.clear();for (Event event : list) {events.add(intercept(event));}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimeStampInterceptor();}@Overridepublic void configure(Context context) {}}
}
Flume实战-解决零点漂移-时间戳拦截器相关推荐
- 零点漂移 时间戳拦截器
零点漂移 原因:Sink端写入到HDFS时,如果按照时间生成文件,在未指定具体时间的情况下,会读取服务器时间作为创建文件的依据,从而导致日志产生错误 假设按天生成文件,一个文件在23:59:59产生, ...
- springboot 控制台输出错误信息_高级码农Spring Boot实战进阶之过滤器、拦截器的使用...
众所周知的Spring Boot是很优秀的框架,它的出现简化了新Spring应用的初始搭建以及开发过程,大大减少了代码量,目前已被大多数企业认可和使用.这个专栏将对Spring Boot框架从浅入深, ...
- 第1节 flume:15、flume案例二,通过自定义拦截器实现数据的脱敏
1.7.flume案例二 案例需求: 在数据采集之后,通过flume的拦截器,实现不需要的数据过滤掉,并将指定的第一个字段进行加密,加密之后再往hdfs上面保存 原始数据与处理之后的数据对比 图一 ...
- 二元一次方程式解决零点漂移
今天看到同事写的一个算法,在这里和大家分享一下: 针对压力传感器的零点漂移,AD采样值在零点附近来回波动,这样的AD值根本没办法使用,但是如果直接限制幅度,会造成正压起始点和负压起始点抬高,又要显示从 ...
- Flume拦截器实战案例
日志的采集和汇总 案例场景 A.B两台日志服务机器实时生产日志主要类型为access.log.nginx.log.web.log 现在要求: 把A.B 机器中的access.log.nginx.log ...
- 数据仓库:如何解决ODS数据零点漂移问题
本篇文章讲解的是从业务库同步数据至数仓导致的零点漂移,查看flume+kafka同步数据导致的零点漂移参考该文章:业务数据采集_零点漂移处理方法(Flume+Kafka+HDFS) 一.数据零点漂移概 ...
- Hadoop生态Flume(三)拦截器(Interceptor)介绍与使用(1)
转载自 Flume中的拦截器(Interceptor)介绍与使用(一) Flume中的拦截器(interceptor) 用户Source读取events发送到Sink的时候,在events heade ...
- 拦截 数据_大数据之六类Flume拦截器配置
时间戳拦截器 Timestamp.conf #1.定义agent名, source.channel.sink的名称a4.sources = r1a4.channels = c1a4.sinks = k ...
- flume拦截器及自定义拦截器
拦截器做什么呢? 时间拦截器 以时间拦截器为例.会在Event的header中添加一个属性进去,属性的key叫做timestamp, value是当前的毫秒值. 问题是写到header然后呢?有啥用呢 ...
最新文章
- 【免费CDN】俄罗斯DDOS-Guard 支持ssl 防御1.5T
- java高级----Thread之ScheduledExecutorService的使用
- 【bzoj2820】YY的GCD 莫比乌斯反演
- 工业级光纤收发器入网说明
- C++11 标准新特性:Defaulted 和 Deleted 函数
- OC之ARC环境中的循环strong问题
- 图论 —— 网络流 —— 费用流 —— 基于 Dijkstra 的费用流
- BookKeeper总结
- python画二维图_使用python绘制二维图形示例
- IOT(9)--- 基础知识
- Lintcode: Permutation Index
- windows下测试flask的例子tuorial报错flask KeyError: 'DATABASE'
- 入门机器学习(西瓜书+南瓜书)模型选择与评估总结(python代码实现)
- html滑动验证图片,纯js实现图片滑块验证
- 复变函数 —— 4. 什么是调和函数
- 身份证校验码计算:根据身份证前17位计算第18位校验码-c++实现
- 3dmax 视频全集
- tumblr图片批量下载
- Win10系统异常应该怎么修复
- windows系统用cmd命令开启WiFi共享功能
热门文章
- 利用指示器随机变量计算掷n次骰子总和的期望值
- 启动vscode不打开上次文件夹
- macOS下鼠标滚轮慢速滚动不起作用的问题解决
- 2013华为校园招聘面经
- 给 初学者 的十点忠诫
- 标准对比-UHAST/稳态湿热
- Jose Mourinho: 任何成功都来自处心积虑
- c语言中加减乘除英文单词,求一个计算加减乘除的C语言程序
- nessus安装成功后,出现“Establishing connection, please wait...”该怎么解决呀?
- 使用UltraISO制作U盘启动