2019独角兽企业重金招聘Python工程师标准>>>

spark整合elasticsearch两种方式

1.自己生成_id等元数据
2.使用ES默认生成

引入对应依赖

<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-13_2.10</artifactId><version>5.0.1</version>
</dependency>

生成元数据方式

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
import utils.PropertiesUtilsimport scala.collection.immutable
import scala.collection.mutable.ListBuffer
object Spark_ES_WithMeta {val buffer = new ListBuffer[Tuple2[String,immutable.Map[String,String]]]def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Custmer_Statistics").setMaster("local[2]")conf.set("es.nodes","rmhadoop01,rmhadoop02,rmhadoop03");conf.set("es.port","9200");conf.set("es.index.auto.create", "true");val sc = new SparkContext(conf)//读取本地文件val result = sc.textFile("C:/work/ideabench/SparkSQL/data/es/gd_py_corp_sharehd_info.txt").map(_.split("\\t")).foreach(d =>{if(PropertiesUtils.getStringByKey("gd_py_corp_sharehd_info").equals("one2many")){val map = Map("id"->d(0),"batch_seq_num"->d(1),"name"->d(2),"contributiveFund"->d(3),"contributivePercent"->d(4),"currency"->d(5),"contributiveDate"->d(6),"corp_basic_info_id"->d(7),"query_time"->d(8))buffer.append((d(0),map))//buffer}else if(PropertiesUtils.getStringByKey("gd_py_corp_sharehd_info").equals("one2one")){//Map(d(1) ->gd_py_corp_sharehd_info(d(0), d(1), d(2), d(3), d(4), d(5), d(6), d(7), d(8)))}} )sc.makeRDD(buffer).saveToEsWithMeta("spark/guofei_gd_py_corp_sharehd_info")}/*** 使用模板类描述表元数据信息**/case class gd_py_corp_sharehd_info(id:String,batch_seq_num:String,name:String,contributiveFund:String,contributivePercent:String,currency:String,contributiveDate:String,corp_basic_info_id:String,query_time:String)}

ES-UI界面

ES.png

使用ES默认元数据方式

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._
object SparkSQL_ES {/*** 使用模板类描述表元数据信息* */case class gd_py_corp_sharehd_info(id:String,batch_seq_num:String,name:String,contributiveFund:String,contributivePercent:String,currency:String,contributiveDate:String,corp_basic_info_id:String,query_time:String)def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Custmer_Statistics").setMaster("local[2]")conf.set("es.nodes","192.168.20.128");conf.set("es.port","9200");conf.set("es.index.auto.create", "true");val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)//RDD隐式转换成DataFrameimport sqlContext.implicits._//读取本地文件val gd_py_corp_sharehd_infoDF = sc.textFile("C:/work/ideabench/SparkSQL/data/es/gd_py_corp_sharehd_info.txt").map(_.split("\\t")).map(d => gd_py_corp_sharehd_info(d(0), d(1), d(2), d(3), d(4), d(5), d(6), d(7), d(8))).toDF()//注册表gd_py_corp_sharehd_infoDF.registerTempTable("gd_py_corp_sharehd_info")/*** */val result = sqlContext.sql("select * from gd_py_corp_sharehd_info limit 10").toDF()result.saveToEs("spark/gd_py_corp_sharehd_info")}}

参考文章

官网:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html

作者:MichaelFly
链接:https://www.jianshu.com/p/a5c669d0ceba
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

转载于:https://my.oschina.net/u/3346994/blog/1847221

Spark整合ElasticSearch相关推荐

  1. Spark 整合ElasticSearch

    Spark 整合ElasticSearch 因为做资料搜索用到了ElasticSearch,最近又了解一下 Spark ML,先来演示一个Spark 读取/写入 ElasticSearch 简单示例. ...

  2. es springboot 不设置id_原创 | 一篇解决Springboot 整合 Elasticsearch

    ElasticSearch 结合业务的场景,在目前的商品体系需要构建搜索服务,主要是为了提供用户更丰富的检索场景以及高速,实时及性能稳定的搜索服务. ElasticSearch是一个基于Lucene的 ...

  3. elasticsearch 9300端口连接不上_SpringBoot2.x系列教程54--SpringBoot整合ElasticSearch方式一...

    SpringBoot2.x系列教程54--NoSQL之SpringBoot整合ElasticSearch方式一 作者:一一哥 一. Elastic Search 1. Elastic Search简介 ...

  4. Spring Boot 整合 Elasticsearch,实现 function score query 权重分查询

    运行环境:JDK 7 或 8,Maven 3.0+ 技术栈:SpringBoot 1.5+,ElasticSearch 2.3.2 本文提纲 一.ES 的使用场景 二.运行 springboot-el ...

  5. Elasticsearch学习(3) spring boot整合Elasticsearch的原生方式

    前面我们已经介绍了spring boot整合Elasticsearch的jpa方式,这种方式虽然简便,但是依旧无法解决我们较为复杂的业务,所以原生的实现方式学习能够解决这些问题,而原生的学习方式也是E ...

  6. 七、SpringBoot整合elasticsearch集群

    @Author : By Runsen @Date : 2020/6/12 作者介绍:Runsen目前大三下学期,专业化学工程与工艺,大学沉迷日语,Python, Java和一系列数据分析软件.导致翘 ...

  7. phoenix+hbase+Spark整合,Spark处理数据操作phoenix入hbase,Spring Cloud整合phoenix

    1 版本要求 Spark版本:spark-2.3.0-bin-hadoop2.7 Phoenix版本:apache-phoenix-4.14.1-HBase-1.4-bin HBASE版本:hbase ...

  8. SpringBoot 2.x (12):整合Elasticsearch

    Elasticsearch:一个优秀的搜索引擎框架 搜索方面最基本的是SQL的like语句 进一步的有Lucene框架 后来有企业级的Solr框架 而Elasticsearch框架尤其适合于数据量特别 ...

  9. Elasticsearch实战篇——Spring Boot整合ElasticSearch

    2019独角兽企业重金招聘Python工程师标准>>> 当前Spring Boot很是流行,包括我自己,也是在用Spring Boot集成其他框架进行项目开发,所以这一节,我们一起来 ...

最新文章

  1. swift中使用core data
  2. NYOJ 643 发短信 暴力求解
  3. 光流(Optical Flow)简介
  4. ASP.NET MVC项目 解决session失效
  5. c创建python虚拟机_Docker-ce运用一:创建虚拟机
  6. 装饰器模式和代理模式的区别
  7. 密码学专题 非对称加密算法指令概述 RSA
  8. 漫步凸分析八——回收锥与无界
  9. [转]Linux下显示硬件信息--lshw
  10. Alex 的 Hadoop 菜鸟教程: 第21课 不只是在HBase中用SQL:Phoenix
  11. 海思hitool工具使用
  12. 惠康游戏手柄 WE-8400 Windows 10 驱动教程
  13. WPS安装自定义项安装程序出错问题
  14. 台式计算机大全,电脑品牌大全..3MT产品库
  15. Destroy销毁物体失败,Can't remove RectTransform because Image (Script) depends on it
  16. 2019保研回顾——西电计科到北理工计科
  17. SCI论文写作(一) | SCI论文的文献综述(Literature Review)部分
  18. Android:通过systrace进行性能分析及使用-详细
  19. 软件需求分析的工作步骤和流程
  20. 2001-2022年全国各城市风速数据(逐日、逐月、逐年)

热门文章

  1. 7种主流案例,告诉你调度器架构设计通用法则(干货!)
  2. 求解一元二次方程的简单c语言程序
  3. html记仇表情包源码,写小本本记仇表情包
  4. Whoops, looks like something went wrong.
  5. js简单判断身份证合法性以及身份证生日合法性
  6. 一款猥琐的PHP后门分析
  7. thinkphp的分页类
  8. Dubbo源码分析系列-深入RPC协议扩展
  9. python 内置函数 builtins_python学习笔记(七)——内置函数
  10. Go1.18 新特性:高效复制,strings, bytes 库新增 Clone 功能