2019独角兽企业重金招聘Python工程师标准>>>

flume收集到的日志,采用ElasticSearch作为存储,运行一段时间后,发现每天早点八点才会创建索引文件,比北京时间晚了8个小时,导致0点到8点这段时间的数据并没有存储到今天的索引文件中,而是存储在前一天的索引中,当时以为数据丢失了,全文检索后发现数据被存在了前一天的索引文件中。首先确定系统的时间是准备的,基本定位到应该是flume自身创建索引的时候除了问题,查看源代码。

flume创建ElasticSearch索引文件的代码如下

if (indexRequestBuilderFactory == null) {indexRequestBuilder = client.prepareIndex(indexNameBuilder.getIndexName(event), indexType).setSource(serializer.getContentBuilder(event).bytes());
} else {indexRequestBuilder = indexRequestBuilderFactory.createIndexRequest(client, indexNameBuilder.getIndexPrefix(event), indexType, event);
}

下面看indexNameBuilder.getIndexName(event)获取索引

@Overridepublic String getIndexName(Event event) {TimestampedEvent timestampedEvent = new TimestampedEvent(event);long timestamp = timestampedEvent.getTimestamp();return new StringBuilder(indexPrefix).append('-').append(fastDateFormat.format(timestamp)).toString();}
TimestampedEvent(Event base) {setBody(base.getBody());Map<String, String> headers = Maps.newHashMap(base.getHeaders());String timestampString = headers.get("timestamp");if (StringUtils.isBlank(timestampString)) {timestampString = headers.get("@timestamp");}if (StringUtils.isBlank(timestampString)) {this.timestamp = DateTimeUtils.currentTimeMillis();headers.put("timestamp", String.valueOf(timestamp ));} else {this.timestamp = Long.valueOf(timestampString);}setHeaders(headers);}

TimestampedEvent会先获取事件中的timestamp,如果事件中没有timestamp,就取当前的毫秒时间。

fastDateFormat会格式化timestamp,也就是生成索引后面的日期,如flume-2015-01-01,fastDateFormat默认会采用Etc/UTC时区,也就是会比北京时间晚8小时,北京时间早上8点,获取到的就是0点。

private FastDateFormat fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd",TimeZone.getTimeZone("Etc/UTC"));

另外可以通过配置文件来配置fastDateFormat的时区,我们采用即可

a1.sinks.k1.timeZone=Asia/Shanghai

使用ElasticSearch sink的时候,要加上上面这句话,这样就可以解决创建索引晚8小时的问题了。

转载于:https://my.oschina.net/u/2311010/blog/597754

flume建立ElasticSearch索引时间的问题相关推荐

  1. Elasticsearch索引(company)_Centos下CURL增删改

    目录 返回目录:http://www.cnblogs.com/hanyinglong/p/5464604.html 1.Elasticsearch索引说明 a. 通过上面几篇博客已经将Elastics ...

  2. Elasticsearch索引原理

    最近在参与一个基于Elasticsearch作为底层数据框架提供大数据量(亿级)的实时统计查询的方案设计工作,花了些时间学习Elasticsearch的基础理论知识,整理了一下,希望能对Elastic ...

  3. elasticsearch索引和映射

    目录 1. elasticsearch如何实现搜索 1.1 搜索实例 1.2 es中数据的类型 1.3 倒排索引 1.4 分析与分析器 1.4.1 什么是分析器 1.4.2 内置分析器种类 1.4.3 ...

  4. Elasticsearch 索引容量管理实践

    作者:gavinyao,腾讯 PCG 后台开发工程师 Elasticsearch 是目前大数据领域最热门的技术栈之一,腾讯云 Elasticsearch Service(ES)是基于开源搜索引擎 El ...

  5. sql server cdc 清理_基于CDC技术的ElasticSearch索引同步机制

    概述 ElasticSearch作为一个基于Lucene的搜索引擎被广泛应用于各种应用系统,比如电商.新闻类.咨询类网站.在使用ElasticSearch开发应用的过程中,一个非常重要的过程是将数据导 ...

  6. 干货 | Elasticsearch 索引设计实战指南

    题记 随着 Elastic 的上市,ELK Stack 不仅在 BAT 的大公司得到长足的发展,而且在各个中小公司都得到非常广泛的应用,甚至连"婚庆网站"都开始使用 Elastic ...

  7. 看完这篇文章,再也不怕 Elasticsearch 索引设计

    题记 随着 Elastic 的上市,ELK Stack 不仅在 BAT 的大公司得到长足的发展,而且在各个中小公司都得到非常广泛的应用,甚至连"婚庆网站"都开始使用 Elastic ...

  8. 大数据开发:ElasticSearch 索引设置

    提起ElasticSearch,大家首先会联想到的往往是其特殊的索引机制,带来的快速查询性能优势.前面我们也对ElasticSearch的索引机制做了简单的介绍,今天的大数据开发分享,我们来讲讲Ela ...

  9. Elasticsearch索引容量管理实践

    最新活动 包含文章发布时段最新活动,前往ES产品介绍页,可查找ES当前活动统一入口 Elasticsearch Service自建迁移特惠政策>> Elasticsearch Servic ...

  10. Elasticsearch索引生命周期管理方案

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 一.前言 在 Elasticsearch 的日常中,有很多 ...

最新文章

  1. Machine Learning | (8) Scikit-learn的分类器算法-随机森林(Random Forest)
  2. 使用MongoDB存储Docker日志(续)
  3. ngx_pagespeed加速nginx
  4. hyperopt中文文档:Recipes
  5. Java8函数式编程(1)--Principle
  6. 源码编译安装httpd服务
  7. Python自然语言处理学习笔记(41):5.2 标注语料库
  8. 如何删除所有已合并的Git分支?
  9. [转载] Python学习笔记 String类型常用方法
  10. [下载]Internet Explorer 9 预览版
  11. 翻译「C++ Rvalue References Explained」C++右值引用详解 Part3:右值引用
  12. 面试题:逗号表达式运用
  13. 简单解说思科命令大全
  14. python读取csv数据出错_使用Pandas在python中读取csv文件时出错
  15. 档案管理制度计算机管理制度,档案管理制度保密制度
  16. centos7磁盘分区格式化
  17. [经验教程]百度Robots检测:您的服务器配置有误,百度暂时无法连接您的服务器,请检查服务器的设置,确保您网站的服务器能被正常访问。
  18. 【玩转linux】head命令
  19. python爬虫二:网易博客的图片
  20. 大一下c语言笔记本电脑,大一新生上大学笔记本电脑推荐

热门文章

  1. 智能优化算法:金鹰优化算法-附代码
  2. 【LeetCode】【数组】题号:*661,图片平滑器
  3. 实习踩坑之路:使用@Value注解导致Parse attempt failed for value [08:00],时间转换出错?
  4. 如约而至!第二期 Flink 极客训练营上线啦
  5. 面试今日头条Android开发,结束时我问了面试官是否有女朋友,结果你猜?
  6. 3 缓存文件写入失败_分布式缓存数据库一致性问题
  7. java 读取wav采样数据_读取wav文件中的音频数据操作
  8. python调用hive与java调用区别_python 调用hive查询实现类似存储过程
  9. 集群服务器分布式iis_使用nginx实现分布式限流的方法
  10. c语言作业及参考答案,C语言试题及答案