Spark SQL 的数据源------通用的数据 加载/保存功能

Spark SQL支持通过DataFrame接口在各种数据源上进行操作。DataFrame可以使用关系变换进行操作,也可以用来创建临时视图。将DataFrame      注册为临时视图允许您对其数据运行SQL查询。本节介绍使用Spark Data Sources加载和保存数据的一般方法,然后介绍可用于内置数据源的特定选        项。

1, 常用的加载和保存功能。

最简单的形式,默认的数据源(parquet除非另有配置 spark.sql.sources.default)将用于所有的操作。

  val usersDF = spark.read.load("examples/src/main/resources/users.parquet")usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

2,手动指定选项

您也可以手动指定将要使用的数据源以及您想要传递给数据源的其他选项。数据源通过其全名指定(即org.apache.spark.sql.parquet),但内置的来源,你也可以使用自己的短名称(json,parquet,jdbc,orc,libsvm,csv,text)。从任何数据源类型加载的数据框可以使用此语法转换为其他类型。

   val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

3, 直接在文件上运行SQL

您可以使用SQL直接查询该文件,而不是使用读取API将文件加载到DataFrame中并进行查询。

 val peopleDFCsv = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("examples/src/main/resources/people.csv")val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

其中people.csv的数据为:

name;age;job
Jorge;30;Developer
Bob;32;Developer

4,保存模式

保存操作可以选择一个Save Mode,指定如何处理现有的数据(如果存在)。认识到这些保存模式不使用任何锁定数据而不是原子性的操作数据是很重要的。另外,执行时重写数据,数据在写出新数据之前将被删除。常见类型如下:

Scala/Java      Any Language Meaning
SaveMode.ErrorIfExists (default)   "error" (default)      如果数据已经存在,将DataFrame保存到数据源时,则预计会抛出异常。
SaveMode.Append "append" 如果data / table已经存在,将DataFrame保存到数据源时,则DataFrame的内容将被添加到现有数据中。
SaveMode.Overwrite "overwrite" 覆盖模式意味着将DataFrame保存到数据源时,如果data / table已经存在,则现有数据将被DataFrame的内容覆盖。
SaveMode.Ignore "ignore"                   忽略模式意味着,当将DataFrame保存到数据源时,如果数据已经存在,保存操作将不会保存DataFrame的内容,也不会更改现有数据。这与CREATE TABLE IF NOT EXISTSSQL中的类似。

5,保存数据并持久化表

DataFrames也可以使用该saveAsTable 命令将其作为持久表保存到Hive Metastore中。请注意,现有的Hive部署对于使用此功能不是必需的。Spark将为您创建一个默认的本地Hive Metastore(使用Derby)。与createOrReplaceTempView命令不同的是, saveAsTable将实现DataFrame的内容并创建指向Hive Metastore中的数据的指针。即使您的Spark程序重新启动后,永久性表格仍然存在,只要您保持与同一Metastore的连接即可。用于持久表的DataFrame可以通过使用表的名称调用tablea方法来创建SparkSession。
        对于基于文件的数据源,例如文本,parquet,json等,您可以通过path选项指定一个自定义表格路径 ,例如df.write.option("path", "/some/path").saveAsTable("t")。当表被删除时,自定义表路径将不会被删除,表数据仍然存在。如果没有指定自定义表格路径,Spark会将数据写入仓库目录下的默认表格路径。当表被删除时,默认的表路径也将被删除。
       从Spark 2.1开始,持久数据源表具有存储在Hive Metastore中的每个分区元数据。这带来了几个好处:
           1) 由于Metastore只能返回查询所需的分区,因此不再需要发现第一个查询的所有分区。
           2) Hive DDL如ALTER TABLE PARTITION ... SET LOCATION现在可用于使用Datasource API创建的表。
请注意,创建外部数据源表(具有path选项的那些表)时,默认情况下不会收集分区信息。要同步Metastore中的分区信息,可以调用MSCK REPAIR TABLE。

6,Bucketing(分段), Sorting(排序) and Partitioning(分区)

对于基于文件的数据源,也可以对输出进行分类。分段和排序仅适用于持久表:

  peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

而分区则可以同时使用save和saveAsTable使用数据集API。

 usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

同时也可以对单个表使用分区和分区:

   peopleDF.write.partitionBy("favorite_color").bucketBy(42, "name").saveAsTable("people_partitioned_bucketed")

partitionBy创建一个目录结构,如“ Partition Discovery ”部分所述。因此,对基数高的柱子的适用性有限。相比之下 bucketBy,通过固定数量的桶分配数据,并且可以在大量唯一值无界时使用。

上述完整的例子代码如下:

 private def runBasicDataSourceExample(spark: SparkSession): Unit = {val usersDF = spark.read.load("examples/src/main/resources/users.parquet")usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")val peopleDFCsv = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("examples/src/main/resources/people.csv")val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")peopleDF.write.partitionBy("favorite_color").bucketBy(42, "name").saveAsTable("people_partitioned_bucketed")spark.sql("DROP TABLE IF EXISTS people_bucketed")spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")}

其中people.json测试数据如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

转载于:https://www.cnblogs.com/zhou-jun/p/10195711.html

spark SQL(三)数据源 Data Source----通用的数据 加载/保存功能相关推荐

  1. 第七章:在Spark集群上使用文件中的数据加载成为graph并进行操作(3)

    你可以调整graph的构造参数来指定partition的数量. 当数据加载完毕的时候整个web-Googel.txt就缓存进了内存之中,如下所示: 可以看到数据被缓存成了edges. 下面我们使用把m ...

  2. 第七章:在Spark集群上使用文件中的数据加载成为graph并进行操作(2)

    Spark-shell启动后我们可以在控制台看到起运行信息: 点击作业ID即可查看Spark shell运行信息: 下面我们就开始在集群上通过读取hdfs文件的方式来构建graph对象,首先要做的就是 ...

  3. Spark SQL来读取现有Hive中的数据

    Spark SQL主要目的是使得用户可以在Spark上使用SQL,其数据源既可以是RDD,也可以是外部的数据源(比如Parquet.Hive.Json等). Spark SQL的其中一个分支就是Spa ...

  4. Spark SQL(五)之数据加载与存储

    一.数据加载 (1)默认数据源(parquet) 最简单加载数据的方式,所有操作都使用默认数据源(parquet).如果指定默认数据源需要配置 spark.sql.sources.default参数. ...

  5. Spark SQL读取Oracle的number类型的数据时精度丢失问题

    Spark SQL读取Oracle的number类型的数据时精度丢失问题 在程序开发中,使用到了sparkSQL读取Oracle数据库,发现当sparkSQL读取Oracle的number类型字段时, ...

  6. Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源

    Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源 ​ 上文引出了Flink程序自定义数据源的方法,我们来再次回顾下. ​ Flink还提供了数据源接口(抽象类),我们实现该接口 ...

  7. Spark _25.plus _使用idea读取Hive中的数据加载成DataFrame/DataSet(四)

    对Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四) https://georgedage.blog.csdn.net/article/details/10309 ...

  8. Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四)

    由于Hive不在本地,操作略显麻烦.不过细心一点,分析错误,也还好,如果你搭建的hadoop是HA,需要多注意: 这里指出一个错误,如果你报了同类错误,可以参考:https://georgedage. ...

  9. 将CSV和SQL数据加载到Pandas中

    目录 介绍 从CSV加载数据 从SQLite加载数据 基本数据分析 总结 任何数据分析过程的第一步都是摄取数据集,评估数据集的清洁程度,并决定我们需要采取哪些措施来解决继承的问题. 下载CSV和数据库 ...

最新文章

  1. 20180517早课记录12-Hadoop
  2. Go 语言 XML处理
  3. 区块链BaaS云服务(11)招商银行ABS区块链平台
  4. 简单的文本设计就能影响游戏体验?游戏中提升玩家体验的小设计
  5. Promise对象的创建与使用
  6. python找不到csv文件_Python如何读取csv文件
  7. 队列的基本操作_算法与数据结构(五) 栈和队列
  8. 2022年中国即时配送行业趋势研究报告
  9. CentOS下MYSQL数据库的安装
  10. CS231n 课程笔记翻译
  11. 自动化的人肉搜索引擎即将出现?
  12. Recast源码解析(二):NavMesh导航网格生成原理(上)
  13. word转pdf转换器2015注册码
  14. 本地词库翻译php,有道词典词库(您也可以轻松翻译离线的有道词典词库)
  15. C#脚本引擎CS-Script
  16. 开篇记(好记性不如烂笔头)
  17. java如何连接与断开SQL server2008数据库
  18. vs2010+opencv2.4.9配置========重点说明
  19. Python3绘图库之rrdtool模块
  20. 论文中绘制神经网络工具汇总

热门文章

  1. ES group分组聚合的坑
  2. python-列表list和元组tuple
  3. lucene_indexWriter说明、索引库优化
  4. 导出EXCEL遇到问题
  5. ArcGIS for Android示例解析之离线地图-----LocalTiledLayer
  6. Oracl数据库中大数据的备份-1
  7. 和搜狗输入法快捷键冲突_这款输入法被调教多年不输搜狗,爱了奥里给!
  8. python3小游戏源代码_Python3制作仿“FlappyBird”小游戏|python3教程|python入门|python教程...
  9. true,false组合问题
  10. 计算机知识点小报,制作电脑小报的教案