ElasticSearch-hadoop saveToEs源码分析:

类的调用路径关系为:

EsSpark -> EsRDDWriter -> RestService -> RestRepository -> RestClient

他们的作用:

  • EsSpark,读取ES和存储ES的入口
  • EsRDDWriter,调用RestService创建PartitionWriter,对ES进行数据写入
  • RestService,负责创建 RestRepository,PartitionWriter
  • RestRepository,bulk高层抽象,底层利用NetworkClient做真实的http bulk请求

各个类对应的源码追踪如下:

https://github.com/elastic/elasticsearch-hadoop/blob/2.1/spark/core/main/scala/org/elasticsearch/spark/rdd/EsSpark.scala

  def saveToEs(rdd: RDD[_], resource: String) { saveToEs(rdd, Map(ES_RESOURCE_WRITE -> resource)) }def saveToEs(rdd: RDD[_], resource: String, cfg: Map[String, String]) {saveToEs(rdd, collection.mutable.Map(cfg.toSeq: _*) += (ES_RESOURCE_WRITE -> resource))}def saveToEs(rdd: RDD[_], cfg: Map[String, String]) {CompatUtils.warnSchemaRDD(rdd, LogFactory.getLog("org.elasticsearch.spark.rdd.EsSpark"))if (rdd == null || rdd.partitions.length == 0) {return}val sparkCfg = new SparkSettingsManager().load(rdd.sparkContext.getConf)val config = new PropertiesSettings().load(sparkCfg.save())config.merge(cfg.asJava)rdd.sparkContext.runJob(rdd, new EsRDDWriter(config.save()).write _)}

https://github.com/elastic/elasticsearch-hadoop/blob/2.1/spark/core/main/scala/org/elasticsearch/spark/rdd/EsRDDWriter.scala

  def write(taskContext: TaskContext, data: Iterator[T]) {val writer = RestService.createWriter(settings, taskContext.partitionId, -1, log)taskContext.addOnCompleteCallback(() => writer.close())if (runtimeMetadata) {writer.repository.addRuntimeFieldExtractor(metaExtractor)}while (data.hasNext) {writer.repository.writeToIndex(processData(data))}}

https://github.com/elastic/elasticsearch-hadoop/blob/2.1/mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java

    public static PartitionWriter createWriter(Settings settings, int currentSplit, int totalSplits, Log log) {Version.logVersion();InitializationUtils.discoverEsVersion(settings, log);InitializationUtils.discoverNodesIfNeeded(settings, log);InitializationUtils.filterNonClientNodesIfNeeded(settings, log);InitializationUtils.filterNonDataNodesIfNeeded(settings, log);        List<String> nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);// check invalid splits (applicable when running in non-MR environments) - in this case fall back to Random..int selectedNode = (currentSplit < 0) ? new Random().nextInt(nodes.size()) : currentSplit % nodes.size();// select the appropriate nodes first, to spread the load before-hand
        SettingsUtils.pinNode(settings, nodes.get(selectedNode));Resource resource = new Resource(settings, false);log.info(String.format("Writing to [%s]", resource));// single index vs multi indicesIndexExtractor iformat = ObjectUtils.instantiate(settings.getMappingIndexExtractorClassName(), settings);iformat.compile(resource.toString());RestRepository repository = (iformat.hasPattern() ? initMultiIndices(settings, currentSplit, resource, log) : initSingleIndex(settings, currentSplit, resource, log));return new PartitionWriter(settings, currentSplit, totalSplits, repository);}

https://github.com/elastic/elasticsearch-hadoop/blob/2.1/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java

    /*** Writes the objects to index.** @param object object to add to the index*/public void writeToIndex(Object object) {Assert.notNull(object, "no object data given");lazyInitWriting();doWriteToIndex(command.write(object));}

    private void doWriteToIndex(BytesRef payload) {// check space firstif (payload.length() > ba.available()) {if (autoFlush) {flush();}else {throw new EsHadoopIllegalStateException(String.format("Auto-flush disabled and bulk buffer full; disable manual flush or increase capacity [current size %s]; bailing out", ba.capacity()));}}data.copyFrom(payload);payload.reset();dataEntries++;if (bufferEntriesThreshold > 0 && dataEntries >= bufferEntriesThreshold) {if (autoFlush) {flush();}else {// handle the corner case of manual flush that occurs only after the buffer is completely full (think size of 1)if (dataEntries > bufferEntriesThreshold) {throw new EsHadoopIllegalStateException(String.format("Auto-flush disabled and maximum number of entries surpassed; disable manual flush or increase capacity [current size %s]; bailing out",bufferEntriesThreshold));}}}}

    public void flush() {BitSet bulk = tryFlush();if (!bulk.isEmpty()) {throw new EsHadoopException(String.format("Could not write all entries [%s/%s] (maybe ES was overloaded?). Bailing out...", bulk.cardinality(), bulk.size()));}}

    public BitSet tryFlush() {if (log.isDebugEnabled()) {log.debug(String.format("Sending batch of [%d] bytes/[%s] entries", data.length(), dataEntries));}BitSet bulkResult = EMPTY;try {// double check data - it might be a false flush (called on clean-up)if (data.length() > 0) {bulkResult = client.bulk(resourceW, data);executedBulkWrite = true;}} catch (EsHadoopException ex) {hadWriteErrors = true;throw ex;}// discard the data buffer, only if it was properly sent/processed//if (bulkResult.isEmpty()) {// always discard data since there's no code path that uses the in flight data
        discard();//}return bulkResult;}

https://github.com/elastic/elasticsearch-hadoop/blob/2.1/mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java

    public BitSet bulk(Resource resource, TrackingBytesArray data) {Retry retry = retryPolicy.init();int httpStatus = 0;boolean isRetry = false;do {// NB: dynamically get the stats since the transport can changelong start = network.transportStats().netTotalTime;Response response = execute(PUT, resource.bulk(), data);long spent = network.transportStats().netTotalTime - start;stats.bulkTotal++;stats.docsSent += data.entries();stats.bulkTotalTime += spent;// bytes will be counted by the transport layerif (isRetry) {stats.docsRetried += data.entries();stats.bytesRetried += data.length();stats.bulkRetries++;stats.bulkRetriesTotalTime += spent;}isRetry = true;httpStatus = (retryFailedEntries(response, data) ? HttpStatus.SERVICE_UNAVAILABLE : HttpStatus.OK);} while (data.length() > 0 && retry.retry(httpStatus));return data.leftoversPosition();}

转载于:https://www.cnblogs.com/bonelee/p/6054199.html

ElasticSearch-hadoop saveToEs源码分析相关推荐

  1. 第二章:小朱笔记hadoop之源码分析-脚本分析

    第二章:小朱笔记hadoop之源码分析-脚本分析 第一节:start-all.sh 第二节:hadoop-config.sh 第三节:hadoop-env.sh 第四节:start-dfs.sh 第五 ...

  2. 第七章:小朱笔记hadoop之源码分析-hdfs分析 第四节:namenode-LeaseManagerMonitor

    第七章:小朱笔记hadoop之源码分析-hdfs分析 第四节:namenode分析 4.4 namenode文件租约分析LeaseManagerMonitor 文件租约就是将操作的文件和操作它的客户端 ...

  3. 第七章:小朱笔记hadoop之源码分析-hdfs分析 第三节:hdfs实现分析

    第七章:小朱笔记hadoop之源码分析-hdfs分析 第三节:hdfs实现分析 3.3 namenode (1)FSDirectory FSDirectory用来管理HDFS整个文件系统的namesp ...

  4. 第七章:小朱笔记hadoop之源码分析-hdfs分析 第四节:namenode分析-namenode启动过程分析...

    第七章:小朱笔记hadoop之源码分析-hdfs分析 第四节:namenode分析 4.1 namenode启动过程分析 org.apache.hadoop.hdfs.server.namenode. ...

  5. 第七章:小朱笔记hadoop之源码分析-hdfs分析 Datanode 心跳分析

    第七章:小朱笔记hadoop之源码分析-hdfs分析 第五节:Datanode 分析 5.2 Datanode 心跳分析 (1)offerService分析 写道 (a)检查心跳间隔是否超时,如是向n ...

  6. 第七章:小朱笔记hadoop之源码分析-hdfs分析 第五节:Datanode 分析

    第七章:小朱笔记hadoop之源码分析-hdfs分析 第五节:Datanode 分析 5.1 Datanode 启动过程分析 5.2 Datanode 心跳分析 5.3 Datanode 注册分析 5 ...

  7. hadoop loadBalance源码分析

    项目hbase数据库出现很诡异的assignment ,region移动的src和dest都是同一台regionserver,不过时间戳不同,启动的只有一个regionserver, 不知道怎么出现了 ...

  8. 第七章:小朱笔记hadoop之源码分析-hdfs分析 第四节:namenode-ReplicationMonitor

    第四节:namenode分析 4.3 namenode 副本监控分析ReplicationMonitor ReplicationMonitor主要有两个作用: (1)负责为副本不足的数据块选择sour ...

  9. 第四章:小朱笔记hadoop之源码分析-conf分析

    第三章:小朱笔记hadoop之conf分析 一.Configurable void setConf(Configuration conf);     //获取配置信息的方法:     Configur ...

最新文章

  1. 一个用BitMap类完成的网页随机码图片生成类
  2. mybatisplus查询今天的数据_MybatisPlus(CRUD)
  3. Mysql(4)——数据库相关操作
  4. 化验室计算机系统验证风险评估,计算机化系统验证风险评估报告.doc
  5. Python 3 教程一:入门
  6. Skype 释出新的 Linux 客户端
  7. (转)静态变量和全局变量的区别
  8. 初识KnockoutJS
  9. pdf转换成word后有文字叠加_Pdf转换成word文字的好方法
  10. Flutter 竖线 垂直分割线
  11. 【错题】#10兰州烧饼——思考角度的转换
  12. FANUC机器人示教时遇到奇异点的解决办法
  13. Oracle AutoVue 运用场景及操作说明
  14. 【已解决】使用keras对resnet, inception3进行fine-tune出现训练集准确率很高但验证集很低的问题(BN)
  15. 使用了可能未初始化的本地指针变量“xxx”
  16. 用python批量修改后缀名
  17. 根轨迹和系统参数的确定
  18. Prometheus怎么用来帮助解决性能问题之入门篇
  19. RGB-D Camera 汇总
  20. 命令行下打开文件管理器

热门文章

  1. epoll的总结 LT和ET使用EPOLLONESHOT
  2. 《OpenSSL3.0学习之一 加密库简介|CSDN创作打卡》
  3. HTML手风琴原理,使用Html5实现手风琴案例
  4. 大智慧数据文件python_Python 自动化测试(四):数据驱动
  5. python 画云图_【词云图】如何用python的第三方库jieba和wordcloud画词云图
  6. pcb成型板aoi检测_一种PCB板的AOI检测控制系统的制作方法
  7. mysql mcd date_mysql升级5.5 - ifeixiang的个人页面 - OSCHINA - 中文开源技术交流社区
  8. mysql配置文件构成以及具体的配置demo
  9. python清空字典保留变量方法_python学习day06--02字典增删差改以及字符串的一些方法...
  10. 从零开始搭建 web 聊天室(一)