文章目录

  • 简介
  • LogStash接入
  • ES-Hadoop接入
    • 重要配置
    • 批处理导入
    • 流数据写入
      • Dstream to ES
      • Structured Streaming to ES
    • 几个注意事项
      • 主键设置
      • 写入冲突
      • Exactly Once
      • ES写入性能

简介

Elasticsearch是一个分布式、RESTful风格的搜索和数据分析引擎,可以对大量数据进行快速的搜索和聚合分析。同时它能够水平扩展,每秒钟可处理海量事件,同时能够自动管理索引和查询在集群中的分布方式,以实现极其流畅的操作。而且其可以确保集群(和数据)的安全性和可用性。

使用Elasticsearch首先需要接入数据,而在大数据应用场景下,数据往往存储在hadoop集群中。如何将Hadoop集群中的数据方便地接入到Elasticsearch集群中是关键。

LogStash接入

Elasticsearch的数据接入,首先想到的是利用LogStash组件。LogStash是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到Elasticsearch。不过从LogStash组件官方支持的接入插件中并不支持HDFS输入。结合其他插件实现HDFS数据接入的三种方式:

  • 将数据导入到kafka集群中,然后使用LogStash的kafka插件接入;
  • 利用HDFS的Webhdfs接口服务读取数据,然后使用LogStash的http插件接入;
  • 利用FUSE安装HDFS,然后使用LogStash的文件插件接入。

ES-Hadoop接入

ES-Hadoop 实现了 Hadoop 生态(Hive、Spark、Pig、Storm 等)与 ElasticSearch 之间的数据交互,借助该组件可以将 Hadoop 生态的数据写入到 ES 中,然后借助 ES 对数据快速进行搜索、过滤、聚合等分析,进一步可以通过 Kibana 来实现数据的可视化。同时,也可以借助 ES 作为数据存储层,然后借助 Hadoop 生态的数据处理工具(Hive、MR、Spark 等)将处理后的数据写入到 HDFS 中。具体可参考:官方文档。

重要配置

ES-Hadoop核心是通过 ES 提供的 restful 接口来进行数据交互,下面是几个重要配置项,更多配置信息请参阅官方说明:

  • es.nodes:需要连接的 es 节点(不需要配置全部master节点,默认会自动发现其他可用节点);
  • es.port:节点 http 通讯端口;
  • es.nodes.discovery:默认为 true,表示自动发现集群可用节点;
  • es.nodes.wan.only:默认为 false,设置为 true 之后,会关闭节点的自动 discovery,只使用 es.nodes 声明的节点进行数据读写操作;如果你需要通过域名进行数据访问,则设置该选项为 true,否则请务必设置为 false;
  • es.index.auto.create:是否自动创建不存在的索引,默认为 true;
  • es.resource:设置写入的索引和类型,索引和类型名均支持动态变量;
  • es.mapping.id:设置文档 _id 对应的字段名;
  • es.mapping.exclude:设置写入时忽略的字段,支持通配符。

设置配置参数主要有两种方式:通过SparkConf的set接口进行设置;在调用save或load函数时动态传入。

批处理导入

在Scala中,只需导入org.elasticsearch.spark.sql包,即可使用saveToEs等方法接入DataFrame数据(RDD数据可导入org.elasticsearch.spark包,接口方法具有相同的签名):

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext._
// elasticsearch-hadoop Spark package import
import org.elasticsearch.spark.sql._      ...// sc = existing SparkContext
val sqlContext = new SQLContext(sc)// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)//  create DataFrame
val people = sc.textFile("people.txt")    .map(_.split(",")).map(p => Person(p(0), p(1), p(2).trim.toInt)).toDF()val cfg = Map(("es.nodes", "localhost"),("es.write.operation" , "upsert"),("es.mapping.id" , "name")
)
// Index the resulting DataFrame to Elasticsearch through the saveToEs method
people.saveToEs("spark/people", cfg)

默认情况下,elasticsearch-hadoop将忽略空值,而不是不写整个文档。 由于DataFrame应被视为结构化表格数据,因此可通过将es.spark.dataframe.write.null设置切换为true,将空值写入DataFrame对象的空值字段。

流数据写入

Dstream to ES

任何DStream都可以保存到Elasticsearch,只要其内容可以翻译成文档即可。 这意味着DStream类型需要是Map(Scala或Java),JavaBean或Scala的case class类。如果不是这种情况,需要在Spark中转换数据或插入自定义的ValueWriter类。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._import org.elasticsearch.spark.streaming._           ...val conf = ...
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))       val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")val rdd = sc.makeRDD(Seq(numbers, airports))
val microbatches = mutable.Queue(rdd)                val cfg = Map(("es.nodes", "localhost"),("es.write.operation" , "upsert")
)
ssc.queueStream(microbatches).saveToEs("spark/docs", cfg) ssc.start()
ssc.awaitTermination()

注意:Streaming支持提供了特殊的优化,以便在运行具有非常小的处理窗口的作业时保留Spark执行程序上的网络资源。出于这个原因,应该更倾向于使用这种集成方式,而不是在DStream的foreachRDD返回的RDD上调用saveToEs。

Structured Streaming to ES

Spark Structured Streaming提供了内置于Spark SQL集成的统一流和批处理的接口。

import org.apache.spark.sql.SparkSession    ...val spark = SparkSession.builder().appName("EsStreamingExample")           .getOrCreate()// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)//  create DataFrame
val people = spark.readStream                   .textFile("/path/to/people/files/*")    .map(_.split(",")).map(p => Person(p(0), p(1), p(2).trim.toInt))val cfg = Map(("es.nodes", "localhost"),("es.write.operation" , "upsert"),("es.mapping.id" , "name")
)
people.writeStream.option("checkpointLocation", "/save/location")  .format("es").start("spark/people", cfg)

Spark在基于批处理和基于流的数据集之间没有进行基于类型的区分,如果在基于流的数据集或DataFrame上调用这些方法,则会抛出非法参数异常。

几个注意事项

主键设置

向ES中插入数据时,如果没有指定主键(即_id)则会自动生成一个id。在数据发生变化或者数据重导时可能会导致赃数据,为了保证数据的可控性,应该在插入数据时显示指定记录的主键值(自定义生成方式)。

写入冲突

如果数据存在重复,写入 ES 时往往会出现数据写入冲突的错误,此时有两种解决方法。

  • 设置 es.write.operation 为 upsert(该方法要求设置记录唯一id),这样达到的效果为如果存在则更新,不存在则进行插入,该配置项默认值为 index。

  • 自定义冲突处理类,通过自定义类来处理相关错误,例如忽略冲突等:

      public class IgnoreConflictsHandler extends BulkWriteErrorHandler {public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {if (entry.getResponseCode() == 409) {StaticLog.warn("Encountered conflict response. Ignoring old data.");return HandlerResult.HANDLED;}return collector.pass("Not a conflict response code.");}}
    

Exactly Once

设置自定义主键id,并将写入模式设置为upsert,可以实现数据导入“Exactly Once”保证。异常数据经过流式处理后,保证结果数据中(并不能保证处理过程中),每条数据最多出现一次,且最少出现一次。

Streaming接口提供了Checkpoint功能,可以让程序再次启动时,从上一次异常退出的位置,重新开始计算。

ES写入性能

单个ES结点的写入速度大概是每秒1万行,增加Spark Streaming的计算能力,无法突破这个瓶颈。在写入数据量过大时会出现拒绝写入错误,因此在新业务上线时需要进行谨慎评估。可以通过增加集群节点来水平扩展,提高写入性能。

Elasticsearch数据接入相关推荐

  1. elasticsearch 数据类型_基于 MySQL Binlog 的 Elasticsearch 数据同步实践

    来源;马蜂窝 一.背景 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存 ...

  2. 基于 MySQL Binlog 的 Elasticsearch 数据同步实践

    一.为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存储业务数 ...

  3. 基于 MySQL Binlog 的 Elasticsearch 数据同步实践 原

    一.背景 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存储业务数据可以 ...

  4. 基于物联网的数据接入与数据存储的一些思考

    基于物联网的数据接入和数据存储 一.边缘计算 边缘计算是指靠近物或数据源头的一侧,采用网络.计算.存储.应用核心能力为一体的开放平台.网络边缘侧可以是从数据源到云计算中心之间的任意功能实体,这些实体搭 ...

  5. 本地日志数据实时接入到hadoop集群的数据接入方案

    1. 概述 本手册主要介绍了,一个将传统数据接入到Hadoop集群的数据接入方案和实施方法.供数据接入和集群运维人员参考. 1.1.  整体方案 Flume作为日志收集工具,监控一个文件目录或者一个文 ...

  6. Elasticsearch数据备份与恢复(基于HDFS)

    Elasticsearch数据备份与恢复(基于HDFS) 1.(所有机子上)安装es hdfs仓库插件repository-hdfs # repository-hdfs一定要和es版本匹配 # 在线 ...

  7. 阿里云 EMR Delta Lake 在流利说数据接入中的架构和实践

    简介: 为了消灭数据孤岛,企业往往会把各个组织的数据都接入到数据湖以提供统一的查询或分析.本文将介绍流利说当前数据接入的整个过程,期间遇到的挑战,以及delta在数据接入中产生的价值. 背景 流利说目 ...

  8. Android前端音视频数据接入GB28181平台意义

    技术背景 在我们研发Android平台GB28181前端音视频接入模块之前,业内听到最多的是,如何用Android或者Windows端,在没有国标IPC设备的前提下,模拟GB28181的信令和媒体流交 ...

  9. Android平台RTMP推流或轻量级RTSP服务(摄像头或同屏)编码前数据接入类型总结

    很多开发者在做Android平台RTMP推流或轻量级RTSP服务(摄像头或同屏)时,总感觉接口不够用,以大牛直播SDK为例 (Github) 我们来总结下,我们常规需要支持的编码前音视频数据有哪些类型 ...

最新文章

  1. ViewPager中Fragment的生命周期和FragmentPageAdapter与FragmentStatePageAdapter对其的影响
  2. HDU_oj_2050 折线分割平面
  3. python语言if语句-python的if语句
  4. 大话数据库连接池简史,你都用过几个?
  5. a b*c的C语言表达式为,在C语言的if语句中,用作判断的表达式为 ______
  6. clob字段怎么导出_Oracle 11g及12c+版本下为啥有些表不能exp导出?
  7. DecimalFormat很强大
  8. 作者:​张群(1988-),女,博士,中国电子技术标准化研究院设备与数据研究室副主任。...
  9. SRT协议应用于直播CDN,实现200ms以下的低延时、弱网传输
  10. pdo mysql连接类_PHP PDO-MYSQL:如何在不同类之间使用数据库连接
  11. 红外遥控c语言,NEC协议红外遥控器
  12. php代码实现文件下载,php实现文件下载的简单代码
  13. 高职对计算机课程要求,高职计算机课程对学生编程能力培养.doc
  14. Java的环境变量配置
  15. kron matlab_Matlab运用kron()函数计算Kronecker乘法
  16. 医学与计算机领域融合的发展前景,浅谈对超声医学的发展现状与前景之探究
  17. PS渐变羽化制作单车
  18. 跨语言rpc框架Thrift
  19. maven 打包 程序包org.project.entity不存在
  20. 加路由时提示Network is unreachable的一种解决方法

热门文章

  1. Pandas的时间序列数据-resample重采样(31)
  2. Python 魔法方法(pythonzho 的 Magic Methods 指南)
  3. 弘辽科技:淘宝店铺过户对个人有风险吗?有何风险?
  4. 修改个人csdn博客域名
  5. yzoj P2344 斯卡布罗集市 题解
  6. NOIP2003年普及组 T1 乒乓球
  7. ceph-mds文件系统操作指南
  8. 华为怎么把系统语言改成英语_怎么给华为手机更改语言?华为手机设置语言,涨知识了...
  9. 算法之路--朴素叶贝斯(七)
  10. 2019.7.8整理记录