Spark sql读写hive需要hive相关的配置,所以一般将hive-site.xml文件放到spark的conf目录下。代码调用都是简单的,关键是源码分析过程,spark是如何与hive交互的。

1. 代码调用

读取hive代码

SparkSession sparkSession = SparkSession.builder().appName("read_hive").enableHiveSupport().getOrCreate();
Dataset<Row> data = sparkSession.sql(sqlText); //select 语句即可 data就是读取的表数据集

写hive代码

SparkSession sparkSession = SparkSession.builder().appName("write_hive").enableHiveSupport().getOrCreate();
/*初始化要写入hive表的数据集
可以是读取文件 sparkSession.read().text/csv/parquet()
或者读取jdbc表sparkSession.read().format("jdbc").option(...).load()
*/
Dataset<Row> data = xxx;
data.createOrReplaceTempView("srcTable"); //创建临时表
sparkSession.sql("insert into tablex select c1,c2... from srcTable") //将临时表数据写入tablex表

注意如果是写parquet格式的表,要使hivesql也能访问,则需要在SparkSession上加个配置项 .config("spark.sql.parquet.writeLegacyFormat", true)。这样hivesql才能访问,不然会报错。

2. 源码相关的分析

spark sql与hive相关的源码就在以下目录:

对于spark sql的执行流程这里不再介绍,整体架构就是:

读写hive的关键操作就是enableHiveSupport()方法,在里面会首先检查是否已经加载了hive的类,然后设置配置项spark.sql.catalogImplementation值为hive。这样在Sparksession初始化SessionState对象时,根据配置获取到的就是hive相关的HiveSessionStateBuilder,然后调用build创建hive感知的SessionState。

/*** Enables Hive support, including connectivity to a persistent Hive metastore, support for* Hive serdes, and Hive user-defined functions.** @since 2.0.0*/def enableHiveSupport(): Builder = synchronized {if (hiveClassesArePresent) {config(CATALOG_IMPLEMENTATION.key, "hive")} else {throw new IllegalArgumentException("Unable to instantiate SparkSession with Hive support because " +"Hive classes are not found.")}}/*** State isolated across sessions, including SQL configurations, temporary tables, registered* functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].* If `parentSessionState` is not null, the `SessionState` will be a copy of the parent.** This is internal to Spark and there is no guarantee on interface stability.** @since 2.2.0*/@InterfaceStability.Unstable@transientlazy val sessionState: SessionState = {parentSessionState.map(_.clone(this)).getOrElse {val state = SparkSession.instantiateSessionState(SparkSession.sessionStateClassName(sparkContext.conf),self)initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }state}}/*** Helper method to create an instance of `SessionState` based on `className` from conf.* The result is either `SessionState` or a Hive based `SessionState`.*/private def instantiateSessionState(className: String,sparkSession: SparkSession): SessionState = {try {// invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])`val clazz = Utils.classForName(className)val ctor = clazz.getConstructors.headctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build()} catch {case NonFatal(e) =>throw new IllegalArgumentException(s"Error while instantiating '$className':", e)}}private def sessionStateClassName(conf: SparkConf): String = {conf.get(CATALOG_IMPLEMENTATION) match {case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAMEcase "in-memory" => classOf[SessionStateBuilder].getCanonicalName}}

SessionState的创建通过BaseSessionStateBuilder.build()来创建

/*** Build the [[SessionState]].*/def build(): SessionState = {new SessionState(session.sharedState,conf,experimentalMethods,functionRegistry,udfRegistration,() => catalog,sqlParser,() => analyzer,() => optimizer,planner,streamingQueryManager,listenerManager,() => resourceLoader,createQueryExecution,createClone)}
}

hive感知的SessionState是通过HiveSessionStateBuilder来创建的。HiveSessionStateBuilder继承BaseSessionStateBuilder,即相应的catalog/analyzer/planner等都会被HiveSessionStateBuilder重写的变量或方法代替。
下面将分析HiveSessionCatalog/Analyzer/SparkPlanner

HiveSessionCatalog
SessionCatalog只是一个代理类,只提供调用的接口,真正与底层系统交互的是ExternalCatalog。而在hive场景下,HiveSessionCatalog继承于SessionCatalog,HiveExternalCatalog继承于ExternalCatalog。

可以看以下类说明:

/*** An internal catalog that is used by a Spark Session. This internal catalog serves as a* proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary* views and functions of the Spark Session that it belongs to.** This class must be thread-safe.*/
class SessionCatalog(val externalCatalog: ExternalCatalog,globalTempViewManager: GlobalTempViewManager,functionRegistry: FunctionRegistry,conf: SQLConf,hadoopConf: Configuration,parser: ParserInterface,functionResourceLoader: FunctionResourceLoader) extends Logging {/*** Interface for the system catalog (of functions, partitions, tables, and databases).** This is only used for non-temporary items, and implementations must be thread-safe as they* can be accessed in multiple threads. This is an external catalog because it is expected to* interact with external systems.** Implementations should throw [[NoSuchDatabaseException]] when databases don't exist.*/
abstract class ExternalCatalogextends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {import CatalogTypes.TablePartitionSpec

在HiveExternalCatalog 中,对数据库、数据表、数据分区和注册函数等信息的读取与操作都通过 HiveClient 完成, Hive Client 是用来与 Hive 进行交互的客户端,在 Spark SQL 中是定义了各种基本操作的接口,具体实现为 HiveClientimpl 对象。然而在实际场景中,因为历史遗留的原因,往往会涉及多种Hive版本,为了有效地支持不同版本,Spark SQL HiveClient的实现由HiveShim通过适配Hive 版本号(HiveVersion)来完成。
在HiveExternalCatalog 中有创建HiveClient的操作,但是最终是调用了IsolatedClientLoader来创建。一般spark sql只会通过HiveClient来访问Hive中的类,为了更好的隔离,IsolatedClientLoader 将不同的类分成3种,不同种类的加载和访问规则各不相同:
-共享类(Shared classes):包括基本的Java、Scala Logging和Spark 中的类。这些类通过当前上下文的 ClassLoader 加载,调用 HiveClient 返回的结果对于外部来说是可见的。
-Hive类(Hive classes):通过加载 Hive 的相关 Jar 包得到的类。默认情况下,加载这些类的ClassLoader 和加载共享类的 ClassLoader 并不相同,因此,无法在外部访问这些类
-桥梁类(Barrier classes):一般包括 HiveClientlmpl和Shim 类,在共享类与 Hive 类之间起到了桥梁的作用,Spark SQL 能够通过这个类访问 Hive 中的类。每个新的 HiveClientlmpl实例都对应一个特定的 Hive 版本。

Analyzer
逻辑执行计划,有着特定于hive的分析规则。
在hive场景中,比基础的多了ResolveHiveSerdeTable、DetermineTableStats、RelationConversions、HiveAnalysis规则。

SparkPlanner
物理执行计划,有着特定于hive的策略。
在hive场景中,比基础的多了HiveTableScans, Scripts策略。

HiveTableScans最终对应的节点HiveTableScanExec,执行hive表的scan操作,分区属性和
晒筛选谓词都可以下推到这里。
Spark sql经过Catalyst的解析,最终转化成的物理执行计划,与hive相关的TreeNode主要就是HiveTableScanExec(读数据)和InsertIntoHiveTable(写数据)。下面主要介绍下这两个类的实现原理。
HiveTableScanExec
HiveTableScanExec的构造方法参数中比较重要的有两个,
Relation(HiveTableRelation), partitionPruningPred(Seq[Expression])
relation中有着hive表相关的信息,而partitionPruningPred中有着hive分区相关的谓词。
读取是由hadoopReader(HadoopTableReader)来进行的,不是分区表则执行
hadoopReader.makeRDDForTable,是分区表则执行hadoopReader.makeRDDForPartitionedTable。
makeRDDForTable里根据hive表的数据目录位置创建HadoopRDD,再调用
HadoopTableReader.fillObject将原始的Writables数据转化成Rows。

InsertIntoHiveTable

InsertIntoHiveTable的执行流程就是获取到HiveExternalCatalog、hadoop相关的配置、hive

表信息、临时写入的目录位置等,然后调用processInsert方法插入,最终再删除临时写入位

置。processInsert方法里会依次调用saveAsHiveFile将RDD写到临时目录文件中,然后再调

用HiveExternalCatalog的loadTable方法(HiveClient.loadTable -> HiveShim.loadTable -> Hive.loadTable即最终会通过反射调用Hive的loadTable方法)将临时写入目录位置的文件

加载到hive表中。

在上面读写的过程中,就会涉及到Sparksql Row与Hive数据类型的映射。该转换功能主要
就是由HiveInspectors来实现。

spark sql读写hive的过程相关推荐

  1. spark sql读取hive底层_[大数据]spark sql读写Hive数据不一致

    在大数据公司中,任何一家公司都不会只使用一个框架吧?! skr,skr~~ 那我们今天就来聊一段 Hive 与 Spark的爱恨情仇 就像 在一些场景中,需要将外部的数据导入到Hive表中,然后再对这 ...

  2. Hive on Spark和Spark sql on Hive,你能分的清楚么

    摘要:结构上Hive On Spark和SparkSQL都是一个翻译层,把一个SQL翻译成分布式可执行的Spark程序. 本文分享自华为云社区<Hive on Spark和Spark sql o ...

  3. 使用Spark SQL读取Hive上的数据

    Spark SQL主要目的是使得用户可以在Spark上使用SQL,其数据源既可以是RDD,也可以是外部的数据源(比如Parquet.Hive.Json等).Spark SQL的其中一个分支就是Spar ...

  4. spark sql on hive初探

    前一段时间由于shark项目停止更新,sql on spark拆分为两个方向,一个是spark sql on hive,另一个是hive on spark.hive on spark达到可用状态估计还 ...

  5. spark sql读取hive底层_scala – 从一个hive表中读取并使用spark sql写回来

    我正在使用Spark SQL读取Hive表并将其分配给 scala val val x = sqlContext.sql("select * from some_table") 然 ...

  6. Spark SQL整合Hive

    Spark SQL官方释义 Spark SQL is Apache Spark's module for working with structured data. 一.使用Spark SQL访问Hi ...

  7. spark基础之Spark SQL和Hive的集成以及ThriftServer配置

    如果希望Maven编译Spark时支持Hive,需要给定-Phive -Phive-thriftserver.比如比如:mvn -Pyarn -Phadoop-2.6 -Dhadoop.version ...

  8. Spark SQL 与 Hive 的第一场会师

    "你好,一杯热美式,加 2 份shot, 1 份焦糖,谢谢" L 跨进汇智国际中心大厦的 Starbucks, 拿着 iPhone 对着点餐机轻轻一扫,对黑带服务员小妹抛出一个笑脸 ...

  9. Spark SQL操作Hive表

    Spark SQL支持从Hive存储中读写数据.然而,Hive存在很多的依赖,而这些依赖又不包含在默认的各类Spark发型版本中.如果将Hive的依赖放入classpath中,Spark将自动加载它们 ...

最新文章

  1. 1.3亿突触、数万神经元,谷歌、哈佛发布史上最强三维人脑地图
  2. Java8新特性Stream API与Lambda表达式详解(1)
  3. ROSA 2012 Enterprise Linux Server 发布
  4. integer比较_每日一题:Integer、int 的区别
  5. 浏览器工作原理(四):浏览器事件解读
  6. 整数点与Pick定理
  7. Java对象转换成Map
  8. RabbitMQ——work queue
  9. SpringBoot接收数组参数
  10. 取消全部呼叫转移代码_中国移动的卡取消呼叫转移的快捷方式是什么?
  11. 关闭安卓系统导航栏右下角自动旋转按钮
  12. ASP.NET网页打印
  13. c语言vc是什么意思,这个VC语句是什么意思
  14. ijkplayer播放器架构从原型到升级
  15. 大数据平台数据管控整体解决方案(48页PPT附下载)
  16. clearcasse 命令
  17. redmine 的安装步骤
  18. ES6 Set() 数组去重
  19. TP5中 save操作 不能foreach循环操作 循环就只有最后一次有效 解决方案
  20. 哈里斯鹰优化算法初步了解笔记 1

热门文章

  1. LED高精度体重秤方案规格书
  2. C# 图书管理系统【含 源代码+数据库】
  3. linux 允许非root用户使用1024以下端口
  4. 【PHP入门】一、注释与变量及2022phpstorm最新版激活码
  5. Linux的bg和fg命令
  6. 天啦噜!在家和爱豆玩“剪刀石头布”,阿里工程师如何办到?
  7. 教你用vmware打开镜像扩展名为.vmdk的硬盘文件!!!
  8. Python输入多行数据
  9. Wdindows WSL(linux子系统)常用命令
  10. K8s部署Heapster踩坑记录