对于日志,flume sink写入HDFS时,如果按照时间生成文件,在没有明确指定时间的情况下,会读取服务器时间作为创建文件的依据,这会导致日志的实际生成日期与文件不符。

这种情况下,可以通过拦截器在flume事件头指定timestamp作为文件的创建依据。

所谓零点漂移,就是上述问题的具体表现。即在按天生成日志文件的情况下,一条23:59:59左右生成的日志发送到服务器后可能已经是第二天了,如果没有指定时间,会被写入第二天对应的文件中,这就是所谓的零点漂移。

要解决零点漂移的问题,通常是将日志中记录的日志创建时间提取出来,写入flume事件头的timestamp字段,有了这个字段,flume创建文件时,会依据这个字段创建文件,这种场景很类似spark、flink的事件事件和处理事件。

示例代码,如下:

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TimeStampInterceptor implements Interceptor {private ArrayList<Event> events = new ArrayList<>();@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {events.clear();for (Event event : list) {events.add(intercept(event));}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimeStampInterceptor();}@Overridepublic void configure(Context context) {}}
}

Flume实战-解决零点漂移-时间戳拦截器相关推荐

  1. 零点漂移 时间戳拦截器

    零点漂移 原因:Sink端写入到HDFS时,如果按照时间生成文件,在未指定具体时间的情况下,会读取服务器时间作为创建文件的依据,从而导致日志产生错误 假设按天生成文件,一个文件在23:59:59产生, ...

  2. springboot 控制台输出错误信息_高级码农Spring Boot实战进阶之过滤器、拦截器的使用...

    众所周知的Spring Boot是很优秀的框架,它的出现简化了新Spring应用的初始搭建以及开发过程,大大减少了代码量,目前已被大多数企业认可和使用.这个专栏将对Spring Boot框架从浅入深, ...

  3. 第1节 flume:15、flume案例二,通过自定义拦截器实现数据的脱敏

    1.7.flume案例二 案例需求: 在数据采集之后,通过flume的拦截器,实现不需要的数据过滤掉,并将指定的第一个字段进行加密,加密之后再往hdfs上面保存 原始数据与处理之后的数据对比 图一  ...

  4. 二元一次方程式解决零点漂移

    今天看到同事写的一个算法,在这里和大家分享一下: 针对压力传感器的零点漂移,AD采样值在零点附近来回波动,这样的AD值根本没办法使用,但是如果直接限制幅度,会造成正压起始点和负压起始点抬高,又要显示从 ...

  5. Flume拦截器实战案例

    日志的采集和汇总 案例场景 A.B两台日志服务机器实时生产日志主要类型为access.log.nginx.log.web.log 现在要求: 把A.B 机器中的access.log.nginx.log ...

  6. 数据仓库:如何解决ODS数据零点漂移问题

    本篇文章讲解的是从业务库同步数据至数仓导致的零点漂移,查看flume+kafka同步数据导致的零点漂移参考该文章:业务数据采集_零点漂移处理方法(Flume+Kafka+HDFS) 一.数据零点漂移概 ...

  7. Hadoop生态Flume(三)拦截器(Interceptor)介绍与使用(1)

    转载自 Flume中的拦截器(Interceptor)介绍与使用(一) Flume中的拦截器(interceptor) 用户Source读取events发送到Sink的时候,在events heade ...

  8. 拦截 数据_大数据之六类Flume拦截器配置

    时间戳拦截器 Timestamp.conf #1.定义agent名, source.channel.sink的名称a4.sources = r1a4.channels = c1a4.sinks = k ...

  9. flume拦截器及自定义拦截器

    拦截器做什么呢? 时间拦截器 以时间拦截器为例.会在Event的header中添加一个属性进去,属性的key叫做timestamp, value是当前的毫秒值. 问题是写到header然后呢?有啥用呢 ...

最新文章

  1. 【免费CDN】俄罗斯DDOS-Guard 支持ssl 防御1.5T
  2. java高级----Thread之ScheduledExecutorService的使用
  3. 【bzoj2820】YY的GCD 莫比乌斯反演
  4. 工业级光纤收发器入网说明
  5. C++11 标准新特性:Defaulted 和 Deleted 函数
  6. OC之ARC环境中的循环strong问题
  7. 图论 —— 网络流 —— 费用流 —— 基于 Dijkstra 的费用流
  8. BookKeeper总结
  9. python画二维图_使用python绘制二维图形示例
  10. IOT(9)--- 基础知识
  11. Lintcode: Permutation Index
  12. windows下测试flask的例子tuorial报错flask KeyError: 'DATABASE'
  13. 入门机器学习(西瓜书+南瓜书)模型选择与评估总结(python代码实现)
  14. html滑动验证图片,纯js实现图片滑块验证
  15. 复变函数 —— 4. 什么是调和函数
  16. 身份证校验码计算:根据身份证前17位计算第18位校验码-c++实现
  17. 3dmax 视频全集
  18. tumblr图片批量下载
  19. Win10系统异常应该怎么修复
  20. windows系统用cmd命令开启WiFi共享功能

热门文章

  1. 利用指示器随机变量计算掷n次骰子总和的期望值
  2. 启动vscode不打开上次文件夹
  3. macOS下鼠标滚轮慢速滚动不起作用的问题解决
  4. 2013华为校园招聘面经
  5. 给 初学者 的十点忠诫
  6. 标准对比-UHAST/稳态湿热
  7. Jose Mourinho: 任何成功都来自处心积虑
  8. c语言中加减乘除英文单词,求一个计算加减乘除的C语言程序
  9. nessus安装成功后,出现“Establishing connection, please wait...”该怎么解决呀?
  10. 使用UltraISO制作U盘启动