Spark 整合ElasticSearch

因为做资料搜索用到了ElasticSearch,最近又了解一下 Spark ML,先来演示一个Spark 读取/写入 ElasticSearch 简单示例。(spark 读取ElasticSearch中数据)

环境:IDEA2016,JDK8,windows10,安装的 ElasticSearch6.3.2 和 spark-2.3.1-bin-hadoop2.7,使用mvn package 将程序打成jar包,采用spark-submit提交给spark执行。

先在ElasticSearch中创建一个索引用来演示。因为是文本数据,因此采用ik分词。可参考:elasticsearch-ik

  • 创建索引:PUT /index_ik_test

  • 设置mapping 及相应的分词器,这里指定 content 字段为 ElasticSearch 的text 类型,并使用ik_max_word 分词模式

    POST index_ik_test/fulltext/_mapping
    {
    "properties": {
    "content":{
    "type": "text",
    "analyzer": "ik_max_word",
    "search_analyzer": "ik_max_word"
    }
    }
    }

  • 存几篇文档到ElasticSearch中

    POST index_ik_test/fulltext/1
    {"content":"其中有两个人受伤了"}

  • ik 分词器有两种分词模式:ik_max_wordik_smart。可通过如下方式查看一下这两者的区别:

    GET index_ik_test/_analyze
    {
    "text": ["其中国家投资了500万"],
    "tokenizer": "ik_smart"
    }

    分词结果:其中、国家、投资、了、500万

    GET index_ik_test/_analyze
    {
    "text": ["其中国家投资了500万"],
    "tokenizer": "ik_max_word"
    }

    分词结果:其中、中国、国家、投资、了、500、万

  • 使用GET index_ik_test/_mapping可查看索引的配置信息

    {
    "index_ik_test": {
    "mappings": {
    "fulltext": {
    "properties": {
    "content": {
    "type": "text",
    "analyzer": "ik_max_word"
    }
    }
    }
    }
    }
    }

好,现在ElasticSearch中有数据了,现在看怎么基于Spark读取ElasticSearch中的数据。

IDEA2016中新建一个Maven工程,当然也可以用SpringBoot工程,但是这里的是单纯的Maven Project。

ElasticSearch官方提供了elasticsearch-hadoop来供Spark访问ElasticSearch。具体可参考:官方文档es for spark。

官方提供了elasticsearch-hadoopmaven 依赖,这个依赖包括了:ElasticSearch for Hadoop MR、ElasticSearch for Hadoop Hive、ElasticSearch for Hadoop Spark。如果只用到了Spark,也可以只添加ElasticSearch for spark依赖。具体可参考:(这个链接)[https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html]

<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-20_2.10</artifactId><version>6.3.2</version>
</dependency>

创建spark运行上下文时需要spark-sql_2.11依赖,可参考:spark 官方文档quick start。

To build the program, we also write a Maven pom.xml file that lists Spark as a dependency. Note that Spark artifacts are tagged with a Scala version.

在本文的示例中,添加了下面3个maven依赖:

<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop</artifactId><version>6.3.2</version>
</dependency>
<!-- Spark dependency -->
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.3.1</version>
</dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>22.0</version>
</dependency>

下面来直接看示例代码:

向ElasticSearch中写入数据

  • spark配置连接ElasticSearch。可参考:elasticsearch-hadoop-master,我们采用的是:Configure the connector to run in WAN mode

    SparkConf sparkConf = new SparkConf().setAppName("writeEs").setMaster("local[*]").set("es.index.auto.create", "true").set("es.nodes", "ELASTIC_SEARCH_IP").set("es.port", "9200").set("es.nodes.wan.only", "true");
  • 将数据写入到ElasticSearch

    JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
    JavaEsSpark.saveToEs(javaRDD, elasticIndex);

从ElasticSearch查询数据

    JavaRDD<Map<String, Object>> searchRdd = esRDD(jsc, "index_ik_test/fulltext", "?q=中国").values();for (Map<String, Object> item : searchRdd.collect()) {item.forEach((key, value)->{System.out.println("search key:" + key + ", search value:" + value);});}

使用?q=中国作为查询条件。整个完整示例代码如下:

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;import java.util.Map;import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.esRDD;/*** Created by Administrator on 2018/8/28.*/
public class EsSparkTest {public void writeEs() {String elasticIndex = "spark/docs";//https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-nativeSparkConf sparkConf = new SparkConf().setAppName("writeEs").setMaster("local[*]").set("es.index.auto.create", "true").set("es.nodes", "ELASTIC_SEARCH_IP").set("es.port", "9200").set("es.nodes.wan.only", "true");SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapterMap<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));JavaEsSpark.saveToEs(javaRDD, elasticIndex);}public void readEs() {SparkConf sparkConf = new SparkConf().setAppName("writeEs").setMaster("local[*]").set("es.index.auto.create", "true").set("es.nodes", "ELASTIC_SEARCH_IP").set("es.port", "9200").set("es.nodes.wan.only", "true");SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapterJavaRDD<Map<String, Object>> searchRdd = esRDD(jsc, "index_ik_test/fulltext", "?q=中国").values();for (Map<String, Object> item : searchRdd.collect()) {item.forEach((key, value)->{System.out.println("search key:" + key + ", search value:" + value);});}sparkSession.stop();}
}

DemoApplication.java 入口main类

public class DemoApplication {public static void main(String[] args) {new EsSparkTest().readEs();}
}

IDEA菜单栏:view ---> window tools --->maven projects 打开maven 侧边栏。直接双击package打包。

$rz -bey esdemo-1.0-SNAPSHOT.jar 将打成的jar包上传到部署spark服务器上,使用如下命令提交运行:

~/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class DemoApplication esdemo-1.0-SNAPSHOT.jar

--class 是类的全路径名。如果执行过程中抛出ClassNotFoundException异常,要看一下pom.xml中指定的依赖是否在Spark安装目录下的 jars/ 目录下(比如事先把Guava jar 和 elasticsearch-hadoop-6.3.2.jar 上传到 jars/目录下)。最终执行readEs()方法查询得到的文档如下:

因为 content 字段采用的是ik_max_word分词模式,因此文本其中国家投资了500万 分词结果中包含了 中国,从而使得这篇document被查询到了。

后期补充:

在使用Spark 查询ElasticSearch中数据时,由于ElasticSearch索引user中定义了一个日期字段,如下:

    "created": {"type": "date","format": "yyyy-MM-dd HH:mm:ss"}

导致Spark执行下面语句查询

JavaRDD<Map<String, Object>> searchRdd = JavaEsSpark.esRDD(jsc, "user/profile", "?q=test").values();
for (Map<String, Object> item : searchRdd.collect()) {item.forEach((key, value)->{System.out.println("search key:" + key + ", search value:" + value);});
}

反序列化构建日期对象时,报错:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot invoke method public org.joda.time.DateTime org.joda.time.format.DateTimeFormatter.parseDateTime(java.lang.String)
at org.elasticsearch.hadoop.util.ReflectionUtils.invoke(ReflectionUtils.java:93)
at org.elasticsearch.hadoop.util.DateUtils$JodaTime.parseDate(DateUtils.java:105)
at org.elasticsearch.hadoop.util.DateUtils.parseDate(DateUtils.java:122)
at org.elasticsearch.hadoop.serialization.builder.JdkValueReader.parseDate(JdkValueReader.java:424)
at org.elasticsearch.hadoop.serialization.builder.JdkValueReader.date(JdkValueReader.java:412)
at org.elasticsearch.hadoop.serialization.builder.JdkValueReader.readValue(JdkValueReader.java:88)
at org.elasticsearch.hadoop.serialization.ScrollReader.parseValue(ScrollReader.java:789)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:739)
... 31 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.elasticsearch.hadoop.util.ReflectionUtils.invoke(ReflectionUtils.java:91)
... 38 more
Caused by: java.lang.IllegalArgumentException: Invalid format: "2018-10-08 19:00:41" is malformed at " 19:00:41"
at org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:945)
... 43 more

这应该是我索引中定义的日期格式是yyyy-MM-dd HH:mm:ss,而org.joda.time.format.DateTimeFormatter默认使用的日期格式不同导致的,但是又不知道在哪里指定日期格式进行Format,所以真的是又遇到了个坑……

如下测试,joda 是支持如下格式的日期格式的:

        String pattern = "yyyy-MM-dd HH:mm:ss";String aTime = "2018-10-08 19:00:41";DateTimeFormatter format = DateTimeFormat.forPattern(pattern);DateTime dateTime = format.parseDateTime(aTime);//no error

spark2.3中依赖的:joda的版本如下:

~/spark-2.3.1-bin-hadoop2.7/jars$ ls | grep joda
joda-time-2.9.3.jar

Spark 整合ElasticSearch相关推荐

  1. Spark整合ElasticSearch

    2019独角兽企业重金招聘Python工程师标准>>> spark整合elasticsearch两种方式 1.自己生成_id等元数据 2.使用ES默认生成 引入对应依赖 <de ...

  2. es springboot 不设置id_原创 | 一篇解决Springboot 整合 Elasticsearch

    ElasticSearch 结合业务的场景,在目前的商品体系需要构建搜索服务,主要是为了提供用户更丰富的检索场景以及高速,实时及性能稳定的搜索服务. ElasticSearch是一个基于Lucene的 ...

  3. elasticsearch 9300端口连接不上_SpringBoot2.x系列教程54--SpringBoot整合ElasticSearch方式一...

    SpringBoot2.x系列教程54--NoSQL之SpringBoot整合ElasticSearch方式一 作者:一一哥 一. Elastic Search 1. Elastic Search简介 ...

  4. Spring Boot 整合 Elasticsearch,实现 function score query 权重分查询

    运行环境:JDK 7 或 8,Maven 3.0+ 技术栈:SpringBoot 1.5+,ElasticSearch 2.3.2 本文提纲 一.ES 的使用场景 二.运行 springboot-el ...

  5. Elasticsearch学习(3) spring boot整合Elasticsearch的原生方式

    前面我们已经介绍了spring boot整合Elasticsearch的jpa方式,这种方式虽然简便,但是依旧无法解决我们较为复杂的业务,所以原生的实现方式学习能够解决这些问题,而原生的学习方式也是E ...

  6. 七、SpringBoot整合elasticsearch集群

    @Author : By Runsen @Date : 2020/6/12 作者介绍:Runsen目前大三下学期,专业化学工程与工艺,大学沉迷日语,Python, Java和一系列数据分析软件.导致翘 ...

  7. phoenix+hbase+Spark整合,Spark处理数据操作phoenix入hbase,Spring Cloud整合phoenix

    1 版本要求 Spark版本:spark-2.3.0-bin-hadoop2.7 Phoenix版本:apache-phoenix-4.14.1-HBase-1.4-bin HBASE版本:hbase ...

  8. SpringBoot 2.x (12):整合Elasticsearch

    Elasticsearch:一个优秀的搜索引擎框架 搜索方面最基本的是SQL的like语句 进一步的有Lucene框架 后来有企业级的Solr框架 而Elasticsearch框架尤其适合于数据量特别 ...

  9. Elasticsearch实战篇——Spring Boot整合ElasticSearch

    2019独角兽企业重金招聘Python工程师标准>>> 当前Spring Boot很是流行,包括我自己,也是在用Spring Boot集成其他框架进行项目开发,所以这一节,我们一起来 ...

最新文章

  1. 如何在OpenCV中为InRange阈值选择颜色的最佳HSV值
  2. UML总结—时序图(Sequence Diagram)和协作图(Collaboration Diagram)
  3. JAVA面试题解惑系列(四)——final、finally和finalize的区别
  4. webshpere缓存--JSP
  5. 某8位微型计算机地址总线为16位,微机原理试题和答案
  6. mysql技术innodb存储引擎读后感_《Mysql技术内幕-InnoDB存储引擎》读书笔记 (一)...
  7. 如何正确地使用Java的@deprecated标注
  8. 数学分析中的基本定理
  9. 松下计划摆脱对特斯拉依赖 与其他汽车制造商合作
  10. SpringBoot应用监控——Actuator安全隐患及解决方案
  11. ORACLE 批量实例分析
  12. idea背景颜色修改
  13. linux如何解除密码锁屏图案大全,忘记锁屏图案密码的六种解决办法
  14. STM32F103C8T6基于HAL库移植uC/OS-III
  15. day7-字典和集合作业
  16. 让我带你一起了解一下 ls -l 命令输出的内容都有哪些意义,以及文件权限如何调整
  17. Android 腾讯地图 选点定位,仿微信发送位置
  18. 【数据库】用户管理---君权神授
  19. 神经网络预测指标是什么,神经网络怎么预测数据
  20. Tomcat双向SSL认证及CA数字证书安装和配置QQ即时通信协议窥探

热门文章

  1. CSS网页布局垂直居中整理
  2. Impala操作审计
  3. pl/sql配置连接远程数据库oracle,本地没有安装oracle数据库的情况下
  4. Salesforce 社区可泄露业务敏感信息
  5. Valak 6个月上演“变身戏法”,紧盯 Exchange 服务器窃取企业数据
  6. JavaSE复习(二)集合
  7. 中台之上(一):重视业务架构,不要让“业务的归业务、技术的归技术”
  8. 周鸿祎内部讲话:大公司要创新,就必须做小
  9. 数据中心高速需求 推动光通信迈向100Gbps
  10. 最简单的 RabbitMQ 监控方法 - 每天5分钟玩转 OpenStack(158)