Spark整合ElasticSearch
2019独角兽企业重金招聘Python工程师标准>>>
spark整合elasticsearch两种方式
1.自己生成_id等元数据
2.使用ES默认生成
引入对应依赖
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-13_2.10</artifactId><version>5.0.1</version>
</dependency>
生成元数据方式
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
import utils.PropertiesUtilsimport scala.collection.immutable
import scala.collection.mutable.ListBuffer
object Spark_ES_WithMeta {val buffer = new ListBuffer[Tuple2[String,immutable.Map[String,String]]]def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Custmer_Statistics").setMaster("local[2]")conf.set("es.nodes","rmhadoop01,rmhadoop02,rmhadoop03");conf.set("es.port","9200");conf.set("es.index.auto.create", "true");val sc = new SparkContext(conf)//读取本地文件val result = sc.textFile("C:/work/ideabench/SparkSQL/data/es/gd_py_corp_sharehd_info.txt").map(_.split("\\t")).foreach(d =>{if(PropertiesUtils.getStringByKey("gd_py_corp_sharehd_info").equals("one2many")){val map = Map("id"->d(0),"batch_seq_num"->d(1),"name"->d(2),"contributiveFund"->d(3),"contributivePercent"->d(4),"currency"->d(5),"contributiveDate"->d(6),"corp_basic_info_id"->d(7),"query_time"->d(8))buffer.append((d(0),map))//buffer}else if(PropertiesUtils.getStringByKey("gd_py_corp_sharehd_info").equals("one2one")){//Map(d(1) ->gd_py_corp_sharehd_info(d(0), d(1), d(2), d(3), d(4), d(5), d(6), d(7), d(8)))}} )sc.makeRDD(buffer).saveToEsWithMeta("spark/guofei_gd_py_corp_sharehd_info")}/*** 使用模板类描述表元数据信息**/case class gd_py_corp_sharehd_info(id:String,batch_seq_num:String,name:String,contributiveFund:String,contributivePercent:String,currency:String,contributiveDate:String,corp_basic_info_id:String,query_time:String)}
ES-UI界面
ES.png
使用ES默认元数据方式
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._
object SparkSQL_ES {/*** 使用模板类描述表元数据信息* */case class gd_py_corp_sharehd_info(id:String,batch_seq_num:String,name:String,contributiveFund:String,contributivePercent:String,currency:String,contributiveDate:String,corp_basic_info_id:String,query_time:String)def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Custmer_Statistics").setMaster("local[2]")conf.set("es.nodes","192.168.20.128");conf.set("es.port","9200");conf.set("es.index.auto.create", "true");val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)//RDD隐式转换成DataFrameimport sqlContext.implicits._//读取本地文件val gd_py_corp_sharehd_infoDF = sc.textFile("C:/work/ideabench/SparkSQL/data/es/gd_py_corp_sharehd_info.txt").map(_.split("\\t")).map(d => gd_py_corp_sharehd_info(d(0), d(1), d(2), d(3), d(4), d(5), d(6), d(7), d(8))).toDF()//注册表gd_py_corp_sharehd_infoDF.registerTempTable("gd_py_corp_sharehd_info")/*** */val result = sqlContext.sql("select * from gd_py_corp_sharehd_info limit 10").toDF()result.saveToEs("spark/gd_py_corp_sharehd_info")}}
参考文章
官网:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
作者:MichaelFly
链接:https://www.jianshu.com/p/a5c669d0ceba
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
转载于:https://my.oschina.net/u/3346994/blog/1847221
Spark整合ElasticSearch相关推荐
- Spark 整合ElasticSearch
Spark 整合ElasticSearch 因为做资料搜索用到了ElasticSearch,最近又了解一下 Spark ML,先来演示一个Spark 读取/写入 ElasticSearch 简单示例. ...
- es springboot 不设置id_原创 | 一篇解决Springboot 整合 Elasticsearch
ElasticSearch 结合业务的场景,在目前的商品体系需要构建搜索服务,主要是为了提供用户更丰富的检索场景以及高速,实时及性能稳定的搜索服务. ElasticSearch是一个基于Lucene的 ...
- elasticsearch 9300端口连接不上_SpringBoot2.x系列教程54--SpringBoot整合ElasticSearch方式一...
SpringBoot2.x系列教程54--NoSQL之SpringBoot整合ElasticSearch方式一 作者:一一哥 一. Elastic Search 1. Elastic Search简介 ...
- Spring Boot 整合 Elasticsearch,实现 function score query 权重分查询
运行环境:JDK 7 或 8,Maven 3.0+ 技术栈:SpringBoot 1.5+,ElasticSearch 2.3.2 本文提纲 一.ES 的使用场景 二.运行 springboot-el ...
- Elasticsearch学习(3) spring boot整合Elasticsearch的原生方式
前面我们已经介绍了spring boot整合Elasticsearch的jpa方式,这种方式虽然简便,但是依旧无法解决我们较为复杂的业务,所以原生的实现方式学习能够解决这些问题,而原生的学习方式也是E ...
- 七、SpringBoot整合elasticsearch集群
@Author : By Runsen @Date : 2020/6/12 作者介绍:Runsen目前大三下学期,专业化学工程与工艺,大学沉迷日语,Python, Java和一系列数据分析软件.导致翘 ...
- phoenix+hbase+Spark整合,Spark处理数据操作phoenix入hbase,Spring Cloud整合phoenix
1 版本要求 Spark版本:spark-2.3.0-bin-hadoop2.7 Phoenix版本:apache-phoenix-4.14.1-HBase-1.4-bin HBASE版本:hbase ...
- SpringBoot 2.x (12):整合Elasticsearch
Elasticsearch:一个优秀的搜索引擎框架 搜索方面最基本的是SQL的like语句 进一步的有Lucene框架 后来有企业级的Solr框架 而Elasticsearch框架尤其适合于数据量特别 ...
- Elasticsearch实战篇——Spring Boot整合ElasticSearch
2019独角兽企业重金招聘Python工程师标准>>> 当前Spring Boot很是流行,包括我自己,也是在用Spring Boot集成其他框架进行项目开发,所以这一节,我们一起来 ...
最新文章
- swift中使用core data
- NYOJ 643 发短信 暴力求解
- 光流(Optical Flow)简介
- ASP.NET MVC项目 解决session失效
- c创建python虚拟机_Docker-ce运用一:创建虚拟机
- 装饰器模式和代理模式的区别
- 密码学专题 非对称加密算法指令概述 RSA
- 漫步凸分析八——回收锥与无界
- [转]Linux下显示硬件信息--lshw
- Alex 的 Hadoop 菜鸟教程: 第21课 不只是在HBase中用SQL:Phoenix
- 海思hitool工具使用
- 惠康游戏手柄 WE-8400 Windows 10 驱动教程
- WPS安装自定义项安装程序出错问题
- 台式计算机大全,电脑品牌大全..3MT产品库
- Destroy销毁物体失败,Can't remove RectTransform because Image (Script) depends on it
- 2019保研回顾——西电计科到北理工计科
- SCI论文写作(一) | SCI论文的文献综述(Literature Review)部分
- Android:通过systrace进行性能分析及使用-详细
- 软件需求分析的工作步骤和流程
- 2001-2022年全国各城市风速数据(逐日、逐月、逐年)
热门文章
- 7种主流案例,告诉你调度器架构设计通用法则(干货!)
- 求解一元二次方程的简单c语言程序
- html记仇表情包源码,写小本本记仇表情包
- Whoops, looks like something went wrong.
- js简单判断身份证合法性以及身份证生日合法性
- 一款猥琐的PHP后门分析
- thinkphp的分页类
- Dubbo源码分析系列-深入RPC协议扩展
- python 内置函数 builtins_python学习笔记(七)——内置函数
- Go1.18 新特性:高效复制,strings, bytes 库新增 Clone 功能