前言

有个新需求说来比较简单,就是spark读取hive中的数据,处理完后入es,这里就是简单整理一下流程

流程

伪代码

object Credit_User_Model_To_Es {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(name = s"${this.getClass.getSimpleName}").config("es.index.auto.create", "true").config("es.nodes", "172.16.0.xxx:9200,172.16.0.xxx:9200,172.16.0.xxx:9200").enableHiveSupport().getOrCreate()import spark.sqlimport org.elasticsearch.spark.sql._sql(sqlText = "select * from ads.ads_credit_score_m").saveToEs("credit/doc")spark.stop()}
}

需要注意是:

  1. 两个配置
 .config("es.index.auto.create", "true").config("es.nodes", "172.16.0.xxx:9200,172.16.0.xxx:9200,172.16.0.xxx:9200")
  1. 导包
 import org.elasticsearch.spark.sql._
  1. 写入代码
.saveToEs("credit/doc")
  1. 读取代码
 spark.esDF("credit/doc")

提交代码

spark2-submit \
--class cn.unisk.es.Credit_User_Model_To_Es \
--master yarn \
--deploy-mode  cluster \
--executor-memory 64G \
--total-executor-cores 100 \
--jars /var/lib/hadoop-hdfs/elasticsearch-hadoop-7.3.1.jar \
/var/lib/hadoop-hdfs/cm.jar

需要注意的是:

  1. 当集群的各个节点没有elasticsearch-hadoop的jar包时,必须要在提交的时候手动写–jars,否则报错:
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.sql.package$

因此如要提交的时候添加:

--jars /var/lib/hadoop-hdfs/elasticsearch-hadoop-7.3.1.jar \
  1. 第一次我下载了一个elasticsearch-hadoop-5.5.jar版本的jar包,然后报错:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Unsupported/Unknown Elasticsearch version 7.3.0

显然原因就是这个版本太低了,建议用7.3.0,于是我下载了一个7.3.1的jar包,发现终于成功了

Kibana查询一下数据

GET credit/_search
{"query": {"match_all": {}}
}

结果:

{"took" : 2,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 10000,"relation" : "gte"},"max_score" : 1.0,"hits" : [{"_index" : "credit","_type" : "doc","_id" : "Sx6SGm0BLr7Gr_l6AMkt","_score" : 1.0,"_source" : {"serv_number" : "18666666666","product_type" : "4","is_online" : "0","is_tencent" : "0","credit_scores" : 760,"statis_month" : "201907","pro_code" : "011"}},...

参考

官网

Spark 读写 Es相关推荐

  1. spark写入oracle 优化,spark读写数据库大表分区性能优化

    spark读写数据库大表分区性能优化:经常会遇到spark读写数据库再做分析,像mysql或oracle. 在数据量很大的情况下,如果只有一个worker一个excutor一个task,那你excut ...

  2. Spark读写HBase(主要讲解SHC的使用)

    前言 Spark读写HBase本身来说是没啥可以讲的,最早之前都是基于RDD的,网上的资料就太多了,可以参考: 参考链接1 参考链接2 其实都一样,后来有了Hortonworks公司的研发人员研发了一 ...

  3. Zeppelin上通过Spark读写mysql数据库

    Zeppelin上通过Spark读写mysql数据库 一.从mysql数据库获取数据 二.把处理后的数据再插入到mysql数据库 一.从mysql数据库获取数据 %spark val df = spa ...

  4. 【大数据开发】SparkSQL——RDD、DataFrame、DataSet相互转换、DSL常用方法、SQL风格语法、Spark读写操作、获取Column对象的方式

    take,takeAsList是Action操作 limit⽅法获取指定DataFrame的前n⾏记录,得到⼀个新的DataFrame对象.和take与head不同的是,limit⽅法不是Action ...

  5. Spark Streaming + ES构建美团App异常监控平台

    如果在使用App时遇到闪退,你可能会选择卸载App.到应用商店怒斥开发者等方式来表达不满.但App开发者也同样感到头疼,因为App Crash(崩溃)可能意味着:用户流失.营收下滑.为了降低崩溃率,进 ...

  6. spark读写Doris实现及Doris文档更新

    因为公司要处理流量数据,其中设计到了会话id的处理,从而需要用spark来实现这一功能. 而公司的数仓是基于Doris搭建的,这就涉及到了spark读写Doris,简单来说一下spark读写Doris ...

  7. scala中json与case class对象的转换, spark读取es json转换成case class

    ilinux_one scala中json与对象的转换 遇到的问题 因为要把spark从es读出来的json数据转换为对象,开始想用case class定义类型,通过fastjson做转换.如下 复制 ...

  8. Spark读写Hbase的二种方式对比

    作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处 一.传统方式 这种方式就是常用的TableInputFormat和TableOutputForm ...

  9. Spark读写XML文件及注意事项

    最近有粉丝问浪尖spark 如何读写xml格式的文件,尤其是嵌套型的,spark本身是不支持xml格式文件读取的,但是databricks开源了一个jar,支持xml文件的读写,浪尖这里给大家介绍一下 ...

最新文章

  1. HDU 3826 Squarefree number:题目解答源码
  2. TCP/IP协议 和 如何实现 互联网上点对点的通信
  3. 轻松掌握Ajax.net系列教程二:部署Ajax Control Toolkit
  4. CF396B-On Sum of Fractions【数学】
  5. html 拖拽坐标,Html+css实现拖拽导航条
  6. MAC 更新 PHP 指南 以及 PHP常用命令示例
  7. Spring Boot中的Profile文件
  8. 做折线图_python的visvis库做折线图(line.py)代码详解
  9. 百万年薪的腾讯员工买得起深圳房子吗?
  10. Azure进阶攻略 | 你的程序也能察言观色?这个真的可以有!
  11. 财务自由之路读书笔记二
  12. ux.form.field.KindEditor 所见所得编辑器
  13. 如何连接一个Linux服务器
  14. 人工智能的优点是什么?AI有哪些优势?
  15. stm32 Ctext-M3内核最简单的多任务RTOS
  16. 国产游戏表 (来自游侠)
  17. 用户路径分析之利器“桑基图”
  18. 基于C51实现数码管的显示
  19. HDMI高清线怎么实现百米远距离传输
  20. 题目二:课程设计报告

热门文章

  1. 火狐浏览器怎么安装未经认证的附加组件
  2. Oracle数据库DBA必备基本技能
  3. docker每次都重新拉取远程镜像的问题
  4. 【教程】如何正确的写一个Lemon/Cena的SPJ(special judge)
  5. jQuery1.11源码分析(8)-----jQuery调用Sizzle引擎的相关API
  6. jquery tmpl 详解
  7. Seq2Seq中的Attention
  8. node.js 事件循环
  9. 配置管理系统和整体的变化对系统有什么区别和联系
  10. 关于Parse库的配置问题