实际这是很久之前的问题了,当时没时间记录

这里简单回顾

项目基于

数据架构不方便说太细,最精简的

somedata-> [kafka]->spark-stream->elasticsearch

在 spark-streaming 引用了elasticsearch-hadoop(实际用的是为支持upsert doc自已打包的,见elasticsearch-hadoop 扩展定制 官方包以支持 update upsert doc)

问题是somedata定入kafka 200w条,最后到elasticsearch 190w条,有10w条不见了,也不报任何错误,批处理任务都是成功的。

首先排入kafka的消费问题,基于kafka自已实现了一套offset偏移维护的机制,不可能在消费kafka这一步丢数

唯一可能的就是 elasticsearch-hadoop 写 elasticsearch 这一步了

class SparkDStreamFunctions(ds: DStream[_]) extends Serializable {def saveToEs(resource: String): Unit = { EsSparkStreaming.saveToEs(ds, resource) }def saveToEs(resource: String, cfg: Map[String, String]): Unit = { EsSparkStreaming.saveToEs(ds, resource, cfg) }def saveToEs(cfg: Map[String, String]): Unit = { EsSparkStreaming.saveToEs(ds, cfg) }}

写入es调用包内的saveToEs方法,scala Unit 类似java 的void 这个方法是无返回值的。这里看不出什么线索

隐约能感觉到问题在哪里。

elasticsearch 是以乐观锁,版本号来实现基本的事务控制

操作elasticsearch时,相信大部分人都遇到过版本冲突的问题,报错类似

{"error" : "VersionConflictEngineException[[website][2] [blog][1]:version conflict, current [2], provided [1]]","status" : 409
}

但saveToEs这个方法是没有返回值的????也就是说能保证不会碰到这个错误?

当然不是,查看源码后发现

saveToEs无返回值,不代表就这批数据就完全成功了

实际会打印错误日志,不过只是在对这个包开启debug后才会打印,默认的情况下是不开的。包的开发者们认为这种版本冲突的错,如果抛到顶层,让整个任务失败太小题大作了,因此也不会往外抛,只会对比较"大"的异常才会抛到顶层。

实际上 elasticsearch-hadoop 会在一批任务写入失败后,隔一段时间重试,重试几次后,直接跳过这组数据,这数据等于就丢弃了。(代码就不贴了,因为github上最新的代码和我当时排查时不一样,可能有变化,问题已经解决,这次回顾也没精力细究了,如果贴错了还误人子弟)

官方的配置文档

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

es.batch.write.retry.count (default 3)
Number of retries for a given batch in case Elasticsearch is overloaded and data is rejected. Note that only the rejected data is retried. If there is still data rejected after the retries have been performed, the Hadoop job is cancelled (and fails). A negative value indicates infinite retries; be careful in setting this value as it can have unwanted side effects.
es.batch.write.retry.wait (default 10s)
Time to wait between batch write retries that are caused by bulk rejections.

这两个参数就是重试相关的配置。

加了这3个参数后,就解决丢数的问题

"es.batch.write.retry.count" -> "-1",
"es.batch.write.retry.wait" -> "60s",
"es.batch.size.entries" -> "50"

es.batch.write.retry.count 表示无限重试,这个得谨慎着用最主要是改这个,我手里这套系统正好可以这么用。

es.batch.write.retry.wait 重试间隔由默认的10s改为60s,这个只是优化的

es.batch.size.entries也是优化的

es.batch.size.entries (default 1000)
Size (in entries) for batch writes using Elasticsearch bulk API - (0 disables it). Companion to es.batch.size.bytes, once one matches, the batch update is executed. Similar to the size, this setting is per task instance; it gets multiplied at runtime by the total number of Hadoop tasks running.

elasticsearch集群本身不提供权限控制,大部分架构都会在之前加个nginx

如果单个文档都很大的话,默认的1000个,可能会超过nginx 限制的单独http的body大小,nginx直接就让请求失败了,把这个数改小,是为了避免这种情况。

转载于:https://www.cnblogs.com/zihunqingxin/p/8632061.html

以elasticsearch-hadoop 向elasticsearch 导数,丢失数据的问题排查相关推荐

  1. ElasticSearch-Hadoop:从Hadoop到ElasticSearch索引产品视图计数和客户顶部搜索查询

    这篇文章涵盖了如何使用ElasticSearch-Hadoop从Hadoop系统读取数据并在ElasticSearch中对其进行索引. 它涵盖的功能是在最近n天中为每个客户的产品浏览量计数和热门搜索查 ...

  2. ElasticSearch-Hadoop:从Hadoop到ElasticSearch的产品视图计数索引和客户顶部搜索查询...

    这篇文章涵盖了如何使用ElasticSearch-Hadoop从Hadoop系统读取数据并在ElasticSearch中对其进行索引. 它涵盖的功能是在最近n天中为每个客户的产品浏览量计数和热门搜索查 ...

  3. 【Elasticsearch】了解Elasticsearch写入磁盘的数据

    文章目录 1.概述 1.从Elasticsearch路径说起 2.文件从哪里来? 3.节点数据 4.索引数据 5.分片数据 6.每个分片的 事务日志(Transaction Log) 7.Lucene ...

  4. 3. ElasticSearch分词器和聚合,数据的丰富和去重

    ElasticSearch分词器和聚合,数据的丰富和去重 1. analyzer 1.1. 什么是analysis? 1.2. 如何定义一个定制的分析器 1.3. 中文分词器 2. Aggregati ...

  5. 我的ElasticSearch集群部署总结--大数据搜索引擎你不得不知

    摘要:世上有三类书籍:1.介绍知识,2.阐述理论,3.工具书:世间也存在两类知识:1.技术,2.思想.以下是我在部署ElasticSearch集群时的经验总结,它们大体属于第一类知识"tec ...

  6. 详述 Elasticsearch 通过范围条件查询索引数据的方法

    文章目录 情景 查询方法 通过命令实现范围查询 通过 API 实现范围查询 情景 在使用 Elasticsearch 的时候,我们可能会遇到需要以范围为条件查询索引数据的需求.有两种方法可以实现我们的 ...

  7. ElasticSearch环境配置-尚硅谷大数据培训

    ElasticSearch(单节点)环境配置 // 通过Wget下载ElasticSearch安装包 [bigdata@linux ~]$wget https://artifacts.elastic. ...

  8. ElasticSearch简介及ElasticSearch部署、原理和使用介绍

    ElasticSearch简介及ElasticSearch部署.原理和使用介绍 第一章:elasticsearch简介 ElasticSearch是一个基于Lucene的搜索服务器.它提供了一个分布式 ...

  9. 【Elasticsearch入门】Elasticsearch集群管理

    1.集群节点监控 在 Elasticsearch的运行期间,一个很重要的方面就是监控.这使得系统管理员能够检测并预防可能性的问题,或至少知道失败时会发生什么. Elasticsearch提供了非常详细 ...

最新文章

  1. ios日历视图实现日期输入
  2. 国外论坛BCH关注度暴涨
  3. Matlab:利用Matlab实现布朗运动模拟
  4. 如何获取微信API的Access Token
  5. linux添加cmd命令行参数,Windows 终端命令行参数 | Microsoft Docs
  6. 长沙.NET技术社区·设计到实现
  7. 人体反应测试仪 c语言,人体反应速度测试仪毕业设计说明
  8. Linux内核驱动调试,Linux内核设备驱动之内核的调试技术笔记整理
  9. 一种内核到用户空间的高效数据传输技术
  10. SQL按字段分组取最大(小)值记录(重复记录)
  11. [python] 对于arcpy的简单使用。
  12. js 设置焦点 判断控件是否获得焦点 判断哪个控件获得焦点
  13. 在线编辑fckeditor3
  14. C++与Python混合编程
  15. python 实现日期计算器
  16. python中转义符的用法大全_Python中的各种转义符\n\r\t
  17. verilog语言实现FPGA板的交通信号灯
  18. TinyXML-2解析XML数据
  19. Uos统信系统 CA根证书搭建
  20. what is Differential steering and skid steering ?

热门文章

  1. 项目小结:日立OA系统(Asp.net)
  2. 【翻译】了解Ext JS 5的小部件
  3. tcpdump + wireshark 抓包组合
  4. centos7升级自带的php5.4版本到php5.6
  5. python中time模块常用功能
  6. 2018.8.18 servlet使用的会话跟踪除session外还有哪些方式
  7. PowerDesigner的安装
  8. 单元测试——第六周作业
  9. 注册服务(addService)
  10. 关于虚继承(在钻石继承体系中,一定要用虚继承!)