Spark 读写 Es
前言
有个新需求说来比较简单,就是spark读取hive中的数据,处理完后入es,这里就是简单整理一下流程
流程
伪代码
object Credit_User_Model_To_Es {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(name = s"${this.getClass.getSimpleName}").config("es.index.auto.create", "true").config("es.nodes", "172.16.0.xxx:9200,172.16.0.xxx:9200,172.16.0.xxx:9200").enableHiveSupport().getOrCreate()import spark.sqlimport org.elasticsearch.spark.sql._sql(sqlText = "select * from ads.ads_credit_score_m").saveToEs("credit/doc")spark.stop()}
}
需要注意是:
- 两个配置
.config("es.index.auto.create", "true").config("es.nodes", "172.16.0.xxx:9200,172.16.0.xxx:9200,172.16.0.xxx:9200")
- 导包
import org.elasticsearch.spark.sql._
- 写入代码
.saveToEs("credit/doc")
- 读取代码
spark.esDF("credit/doc")
提交代码
spark2-submit \
--class cn.unisk.es.Credit_User_Model_To_Es \
--master yarn \
--deploy-mode cluster \
--executor-memory 64G \
--total-executor-cores 100 \
--jars /var/lib/hadoop-hdfs/elasticsearch-hadoop-7.3.1.jar \
/var/lib/hadoop-hdfs/cm.jar
需要注意的是:
- 当集群的各个节点没有elasticsearch-hadoop的jar包时,必须要在提交的时候手动写–jars,否则报错:
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.sql.package$
因此如要提交的时候添加:
--jars /var/lib/hadoop-hdfs/elasticsearch-hadoop-7.3.1.jar \
- 第一次我下载了一个elasticsearch-hadoop-5.5.jar版本的jar包,然后报错:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Unsupported/Unknown Elasticsearch version 7.3.0
显然原因就是这个版本太低了,建议用7.3.0,于是我下载了一个7.3.1的jar包,发现终于成功了
Kibana查询一下数据
GET credit/_search
{"query": {"match_all": {}}
}
结果:
{"took" : 2,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 10000,"relation" : "gte"},"max_score" : 1.0,"hits" : [{"_index" : "credit","_type" : "doc","_id" : "Sx6SGm0BLr7Gr_l6AMkt","_score" : 1.0,"_source" : {"serv_number" : "18666666666","product_type" : "4","is_online" : "0","is_tencent" : "0","credit_scores" : 760,"statis_month" : "201907","pro_code" : "011"}},...
参考
官网
Spark 读写 Es相关推荐
- spark写入oracle 优化,spark读写数据库大表分区性能优化
spark读写数据库大表分区性能优化:经常会遇到spark读写数据库再做分析,像mysql或oracle. 在数据量很大的情况下,如果只有一个worker一个excutor一个task,那你excut ...
- Spark读写HBase(主要讲解SHC的使用)
前言 Spark读写HBase本身来说是没啥可以讲的,最早之前都是基于RDD的,网上的资料就太多了,可以参考: 参考链接1 参考链接2 其实都一样,后来有了Hortonworks公司的研发人员研发了一 ...
- Zeppelin上通过Spark读写mysql数据库
Zeppelin上通过Spark读写mysql数据库 一.从mysql数据库获取数据 二.把处理后的数据再插入到mysql数据库 一.从mysql数据库获取数据 %spark val df = spa ...
- 【大数据开发】SparkSQL——RDD、DataFrame、DataSet相互转换、DSL常用方法、SQL风格语法、Spark读写操作、获取Column对象的方式
take,takeAsList是Action操作 limit⽅法获取指定DataFrame的前n⾏记录,得到⼀个新的DataFrame对象.和take与head不同的是,limit⽅法不是Action ...
- Spark Streaming + ES构建美团App异常监控平台
如果在使用App时遇到闪退,你可能会选择卸载App.到应用商店怒斥开发者等方式来表达不满.但App开发者也同样感到头疼,因为App Crash(崩溃)可能意味着:用户流失.营收下滑.为了降低崩溃率,进 ...
- spark读写Doris实现及Doris文档更新
因为公司要处理流量数据,其中设计到了会话id的处理,从而需要用spark来实现这一功能. 而公司的数仓是基于Doris搭建的,这就涉及到了spark读写Doris,简单来说一下spark读写Doris ...
- scala中json与case class对象的转换, spark读取es json转换成case class
ilinux_one scala中json与对象的转换 遇到的问题 因为要把spark从es读出来的json数据转换为对象,开始想用case class定义类型,通过fastjson做转换.如下 复制 ...
- Spark读写Hbase的二种方式对比
作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处 一.传统方式 这种方式就是常用的TableInputFormat和TableOutputForm ...
- Spark读写XML文件及注意事项
最近有粉丝问浪尖spark 如何读写xml格式的文件,尤其是嵌套型的,spark本身是不支持xml格式文件读取的,但是databricks开源了一个jar,支持xml文件的读写,浪尖这里给大家介绍一下 ...
最新文章
- HDU 3826 Squarefree number:题目解答源码
- TCP/IP协议 和 如何实现 互联网上点对点的通信
- 轻松掌握Ajax.net系列教程二:部署Ajax Control Toolkit
- CF396B-On Sum of Fractions【数学】
- html 拖拽坐标,Html+css实现拖拽导航条
- MAC 更新 PHP 指南 以及 PHP常用命令示例
- Spring Boot中的Profile文件
- 做折线图_python的visvis库做折线图(line.py)代码详解
- 百万年薪的腾讯员工买得起深圳房子吗?
- Azure进阶攻略 | 你的程序也能察言观色?这个真的可以有!
- 财务自由之路读书笔记二
- ux.form.field.KindEditor 所见所得编辑器
- 如何连接一个Linux服务器
- 人工智能的优点是什么?AI有哪些优势?
- stm32 Ctext-M3内核最简单的多任务RTOS
- 国产游戏表 (来自游侠)
- 用户路径分析之利器“桑基图”
- 基于C51实现数码管的显示
- HDMI高清线怎么实现百米远距离传输
- 题目二:课程设计报告