2021年大数据Spark(三十二):SparkSQL的External DataSource
目录
External DataSource
数据源与格式
text 数据
json 数据
csv 数据
parquet 数据
jdbc 数据
加载/保存数据-API
Load 加载数据
Save 保存数据
保存模式(SaveMode)
案例演示
External DataSource
在SparkSQL模块,提供一套完成API接口,用于方便读写外部数据源的的数据(从Spark 1.4版本提供),框架本身内置外部数据源:
在Spark 2.4版本中添加支持Image Source(图像数据源)和Avro Source。
数据源与格式
数据分析处理中,数据可以分为结构化数据、非结构化数据及半结构化数据。
1)、结构化数据(Structured)
结构化数据源可提供有效的存储和性能。例如,Parquet和ORC等柱状格式使从列的子集中提取值变得更加容易。
基于行的存储格式(如Avro)可有效地序列化和存储提供存储优势的数据。然而,这些优点通常以灵活性为代价。如因结构的固定性,格式转变可能相对困难。
2)、非结构化数据(UnStructured)
相比之下,非结构化数据源通常是自由格式文本或二进制对象,其不包含标记或元数据以定义数据的结构。
报纸文章,医疗记录,图像,应用程序日志通常被视为非结构化数据。这些类型的源通常要求数据周围的上下文是可解析的。
3)、半结构化数据(Semi-Structured)
半结构化数据源是按记录构建的,但不一定具有跨越所有记录的明确定义的全局模式。每个数据记录都使用其结构信息进行扩充。
半结构化数据格式的好处是,它们在表达数据时提供了最大的灵活性,因为每条记录都是自我描述的。但这些格式的主要缺点是它们会产生额外的解析开销,并且不是特别为ad-hoc(特定)查询而构建的。
text 数据
SparkSession加载文本文件数据,提供两种方法,返回值分别为DataFrame和Dataset,前面【WordCount】中已经使用,下面看一下方法声明:
可以看出textFile方法底层还是调用text方法,先加载数据封装到DataFrame中,再使用as[String]方法将DataFrame转换为Dataset,实际中推荐使用textFile方法,从Spark 2.0开始提供。
无论是text方法还是textFile方法读取文本数据时,一行一行的加载数据,每行数据使用UTF-8编码的字符串,列名称为【value】。
json 数据
实际项目中,有时处理数据以JSON格式存储的,尤其后续结构化流式模块:StructuredStreaming,从Kafka Topic消费数据很多时间是JSON个数据,封装到DataFrame中,需要解析提取字段的值。以读取github操作日志JSON数据为例,数据结构如下:
1)、操作日志数据使用GZ压缩:2015-03-01-11.json.gz,先使用json方法读取。
2)、使用textFile加载数据,对每条JSON格式字符串数据,使用SparkSQL函数库functions中自带get_json_obejct函数提取字段:id、type、public和created_at的值。
函数:get_json_obejct使用说明
示例代码:
package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** SparkSQL读取JSON格式文本数据*/
object SparkSQLJson {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通过装饰模式获取实例对象,此种方式为线程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// TODO: 从LocalFS上读取json格式数据(压缩)val jsonDF: DataFrame = spark.read.json("data/input/2015-03-01-11.json.gz")//jsonDF.printSchema()jsonDF.show(5, truncate = true)println("===================================================")val githubDS: Dataset[String] = spark.read.textFile("data/input/2015-03-01-11.json.gz")//githubDS.printSchema() // value 字段名称,类型就是StringgithubDS.show(5,truncate = true)// TODO:使用SparkSQL自带函数,针对JSON格式数据解析的函数import org.apache.spark.sql.functions._// 获取如下四个字段的值:id、type、public和created_atval gitDF: DataFrame = githubDS.select(get_json_object($"value", "$.id").as("id"),get_json_object($"value", "$.type").as("type"),get_json_object($"value", "$.public").as("public"),get_json_object($"value", "$.created_at").as("created_at"))gitDF.printSchema()gitDF.show(10, truncate = false)// 应用结束,关闭资源spark.stop()}
}
运行结果:
csv 数据
在机器学习中,常常使用的数据存储在csv/tsv文件格式中,所以SparkSQL中也支持直接读取格式数据,从2.0版本开始内置数据源。关于CSV/TSV格式数据说明:
SparkSQL中读取CSV格式数据,可以设置一些选项,重点选项:
1)、分隔符:sep
默认值为逗号,必须单个字符
2)、数据文件首行是否是列名称:header
默认值为false,如果数据文件首行是列名称,设置为true
3)、是否自动推断每个列的数据类型:inferSchema
默认值为false,可以设置为true
官方提供案例:
当读取CSV/TSV格式数据文件首行是否是列名称,读取数据方式(参数设置)不一样的 。
第一点:首行是列的名称,如下方式读取数据文件
// TODO: 读取TSV格式数据val ratingsDF: DataFrame = spark.read// 设置每行数据各个字段之间的分隔符, 默认值为 逗号.option("sep", "\t")// 设置数据文件首行为列名称,默认值为 false.option("header", "true")// 自动推荐数据类型,默认值为false.option("inferSchema", "true")// 指定文件的路径.csv("datas/ml-100k/u.dat")ratingsDF.printSchema()ratingsDF.show(10, truncate = false)
第二点:首行不是列的名称,如下方式读取数据(设置Schema信息)
// 定义Schema信息val schema = StructType(StructField("user_id", IntegerType, nullable = true) ::StructField("movie_id", IntegerType, nullable = true) ::StructField("rating", DoubleType, nullable = true) ::StructField("timestamp", StringType, nullable = true) :: Nil)// TODO: 读取TSV格式数据val mlRatingsDF: DataFrame = spark.read// 设置每行数据各个字段之间的分隔符, 默认值为 逗号.option("sep", "\t")// 指定Schema信息.schema(schema)// 指定文件的路径.csv("datas/ml-100k/u.data")mlRatingsDF.printSchema()mlRatingsDF.show(5, truncate = false)
将DataFrame数据保存至CSV格式文件,演示代码如下:
示例代码
/*** 将电影评分数据保存为CSV格式数据*/mlRatingsDF// 降低分区数,此处设置为1,将所有数据保存到一个文件中.coalesce(1).write// 设置保存模式,依据实际业务场景选择,此处为覆写.mode(SaveMode.Overwrite).option("sep", ",")// TODO: 建议设置首行为列名.option("header", "true").csv("datas/ml-csv-" + System.nanoTime())
package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** SparkSQL 读取CSV/TSV格式数据:* i). 指定Schema信息* ii). 是否有header设置*/
object SparkSQLCsv {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通过装饰模式获取实例对象,此种方式为线程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._/*** 实际企业数据分析中* csv\tsv格式数据,每个文件的第一行(head, 首行),字段的名称(列名)*/// TODO: 读取CSV格式数据val ratingsDF: DataFrame = spark.read// 设置每行数据各个字段之间的分隔符, 默认值为 逗号.option("sep", "\t")// 设置数据文件首行为列名称,默认值为 false.option("header", "true")// 自动推荐数据类型,默认值为false.option("inferSchema", "true")// 指定文件的路径.csv("data/input/rating_100k_with_head.data")ratingsDF.printSchema()ratingsDF.show(10, truncate = false)println("=======================================================")// 定义Schema信息val schema = StructType(StructField("user_id", IntegerType, nullable = true) ::StructField("movie_id", IntegerType, nullable = true) ::StructField("rating", DoubleType, nullable = true) ::StructField("timestamp", StringType, nullable = true) :: Nil)// TODO: 读取CSV格式数据val mlRatingsDF: DataFrame = spark.read// 设置每行数据各个字段之间的分隔符, 默认值为 逗号.option("sep", "\t")// 指定Schema信息.schema(schema)// 指定文件的路径.csv("data/input/rating_100k.data")mlRatingsDF.printSchema()mlRatingsDF.show(10, truncate = false)println("=======================================================")/*** 将电影评分数据保存为CSV格式数据*/mlRatingsDF// 降低分区数,此处设置为1,将所有数据保存到一个文件中.coalesce(1).write// 设置保存模式,依据实际业务场景选择,此处为覆写.mode(SaveMode.Overwrite).option("sep", ",")// TODO: 建议设置首行为列名.option("header", "true").csv("data/output/ml-csv-" + System.currentTimeMillis())// 关闭资源spark.stop()}}
parquet 数据
SparkSQL模块中默认读取数据文件格式就是parquet列式存储数据,通过参数【spark.sql.sources.default】设置,默认值为【parquet】。
示例代码:
直接load加载parquet数据和指定parquet格式加载数据。
运行程序结果:
package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}/*** SparkSQL读取Parquet列式存储数据*/
object SparkSQLParquet {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通过装饰模式获取实例对象,此种方式为线程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// TODO: 从LocalFS上读取parquet格式数据val usersDF: DataFrame = spark.read.parquet("data/input/users.parquet")usersDF.printSchema()usersDF.show(10, truncate = false)println("==================================================")// SparkSQL默认读取文件格式为parquetval df = spark.read.load("data/input/users.parquet")df.printSchema()df.show(10, truncate = false)// 应用结束,关闭资源spark.stop()}
}
jdbc 数据
回顾在SparkCore中读取MySQL表的数据通过JdbcRDD来读取的,在SparkSQL模块中提供对应接口,提供三种方式读取数据:
方式一:单分区模式
方式二:多分区模式,可以设置列的名称,作为分区字段及列的值范围和分区数目
方式三:高度自由分区模式,通过设置条件语句设置分区数据及各个分区数据范围
当加载读取RDBMS表的数据量不大时,可以直接使用单分区模式加载;当数据量很多时,考虑使用多分区及自由分区方式加载。
从RDBMS表中读取数据,需要设置连接数据库相关信息,基本属性选项如下:
演示代码如下:
// 连接数据库三要素信息val url: String = "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true"val table: String = "db_shop.so"// 存储用户和密码等属性val props: Properties = new Properties()props.put("driver", "com.mysql.cj.jdbc.Driver")props.put("user", "root")props.put("password", "123456")// TODO: 从MySQL数据库表:销售订单表 so// def jdbc(url: String, table: String, properties: Properties): DataFrameval sosDF: DataFrame = spark.read.jdbc(url, table, props)println(s"Count = ${sosDF.count()}")sosDF.printSchema()sosDF.show(10, truncate = false)
可以使用option方法设置连接数据库信息,而不使用Properties传递,代码如下:
// TODO: 使用option设置参数val dataframe: DataFrame = spark.read.format("jdbc").option("driver", "com.mysql.cj.jdbc.Driver").option("url", "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true").option("user", "root").option("password", "123456").option("dbtable", "db_shop.so").load()dataframe.show(5, truncate = false)
加载/保存数据-API
SparkSQL提供一套通用外部数据源接口,方便用户从数据源加载和保存数据,例如从MySQL表中既可以加载读取数据:load/read,又可以保存写入数据:save/write。
由于SparkSQL没有内置支持从HBase表中加载和保存数据,但是只要实现外部数据源接口,也能像上面方式一样读取加载数据。
Load 加载数据
在SparkSQL中读取数据使用SparkSession读取,并且封装到数据结构Dataset/DataFrame中。
DataFrameReader专门用于加载load读取外部数据源的数据,基本格式如下:
SparkSQL模块本身自带支持读取外部数据源的数据:
总结起来三种类型数据,也是实际开发中常用的:
第一类:文件格式数据
文本文件text、csv文件和json文件
第二类:列式存储数据
Parquet格式、ORC格式
第三类:数据库表
关系型数据库RDBMS:MySQL、DB2、Oracle和MSSQL
Hive仓库表
官方文档:http://spark.apache.org/docs/2.4.5/sql-data-sources-load-save-functions.html
此外加载文件数据时,可以直接使用SQL语句,指定文件存储格式和路径:
Save 保存数据
SparkSQL模块中可以从某个外部数据源读取数据,就能向某个外部数据源保存数据,提供相应接口,通过DataFrameWrite类将数据进行保存。
与DataFrameReader类似,提供一套规则,将数据Dataset保存,基本格式如下:
SparkSQL模块内部支持保存数据源如下:
所以使用SpakrSQL分析数据时,从数据读取,到数据分析及数据保存,链式操作,更多就是ETL操作。当将结果数据DataFrame/Dataset保存至Hive表中时,可以设置分区partition和分桶bucket,形式如下:
保存模式(SaveMode)
将Dataset/DataFrame数据保存到外部存储系统中,考虑是否存在,存在的情况下的下如何进行保存,DataFrameWriter中有一个mode方法指定模式:
通过源码发现SaveMode时枚举类,使用Java语言编写,如下四种保存模式:
第一种:Append 追加模式,当数据存在时,继续追加;
第二种:Overwrite 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
第三种:ErrorIfExists 存在及报错;
第四种:Ignore 忽略,数据存在时不做任何操作;
实际项目依据具体业务情况选择保存模式,通常选择Append和Overwrite模式。
案例演示
package cn.it.sqlimport java.util.Propertiesimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** Author itcast* Desc 先准备一个df/ds,然后再将该df/ds的数据写入到不同的数据源中,最后再从不同的数据源中读取*/
object DataSourceDemo{case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.准备环境-SparkSession和DFval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val lines: RDD[String] = sc.textFile("data/input/person.txt")val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))import spark.implicits._val personDF: DataFrame = personRDD.toDFpersonDF.show(6,false)/*+---+--------+---+|id |name |age|+---+--------+---+|1 |zhangsan|20 ||2 |lisi |29 ||3 |wangwu |25 ||4 |zhaoliu |30 ||5 |tianqi |35 ||6 |kobe |40 |+---+--------+---+*///2.将personDF写入到不同的数据源personDF.write.mode(SaveMode.Overwrite).json("data/output/json")personDF.write.mode(SaveMode.Overwrite).csv("data/output/csv")personDF.write.mode(SaveMode.Overwrite).parquet("data/output/parquet")val prop = new Properties()prop.setProperty("user","root")prop.setProperty("password","root")personDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)println("写入成功!")//personDF.write.text("data/output/text")//会报错, Text data source supports only a single column, and you have 3 columns.personDF.coalesce(1).write.mode(SaveMode.Overwrite).json("data/output/json1")//personDF.repartition(1)//3.从不同的数据源读取数据val df1: DataFrame = spark.read.json("data/output/json")val df2: DataFrame = spark.read.csv("data/output/csv").toDF("id_my","name","age")val df3: DataFrame = spark.read.parquet("data/output/parquet")val df4: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)df1.show()df2.show()df3.show()df4.show()}
}
2021年大数据Spark(三十二):SparkSQL的External DataSource相关推荐
- 2021年大数据Spark(十二):Spark Core的RDD详解
目录 RDD详解 为什么需要RDD? 什么是RDD? RDD的5大特性 第一个:A list of partitions 第二个:A function for computing each split ...
- 2021年大数据Kafka(十二):❤️Kafka配额限速机制❤️
全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka配额限速机制 限制producer端的速率 限制c ...
- 2021年大数据HBase(十二):Apache Phoenix 二级索引
全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix 二级索引 一.索引分类 ...
- 2021年大数据Hive(十二):Hive综合案例!!!
全网最详细的大数据Hive文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Hive综合案例 一.需求描述 二.项目表的字段 三.进 ...
- 2021年大数据Hadoop(十二):HDFS的API操作
2021大数据领域优质创作博客,带你从入门到精通,该博客每天更新,逐渐完善大数据各个知识体系的文章,帮助大家更高效学习. 有对大数据感兴趣的可以关注微信公众号:三帮大数据 目录 HDFS的API操作 ...
- 2021年大数据ELK(十二):Elasticsearch编程(环境准备)
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Elasticsearch编程 一.环境准备 1.准备IDEA项目结构 2.准 ...
- 2021年大数据Spark(十九):Spark Core的共享变量
目录 共享变量 广播变量 累加器 案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...
- 2021年大数据Spark(十五):Spark Core的RDD常用算子
目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 聚合函数算子 Scala集合中的聚合函数 ...
- 2021年大数据Spark(十四):Spark Core的RDD操作
目录 RDD的操作 函数(算子)分类 Transformation函数 Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...
- 2021年大数据Flink(十二):流批一体API Transformation
目录 Transformation 官网API列表 基本操作-略 map flatMap keyBy filter sum reduce 代码演示 合并-拆分 union和connect split. ...
最新文章
- Unity中GameObject API解析
- 【C 语言】二级指针内存模型 ( 指针数组 | 二维数组 | 自定义二级指针 | 将 一、二 模型数据拷贝到 三 模型中 并 排序 )
- python selenium框架_基于python+selenium的框架思路
- 动画狗奔跑gif图片_常用的GIF制作工具,自媒体人常用,你还不会做表情包
- uboot: RTL8201 100M PHY驱动代码
- 软件需求分析文档模板_小议管理软件需求分析
- 6条shell小技巧,让脚本显得不再业余
- IOT(27)---国内物联网平台的发展、技术架构演进
- 从.fig文件中提取数据
- AgileEAS.NET平台开发实例-药店系统-视频教程系列-索引
- java中基本类型占了几个字节(byte、char等)
- 74HC138(三八译码器)74HC573(锁存器)74HC02(或非门)
- 【ESP32】VSCode+Arduino+Platformio 如何使用ESP32上的PSRAM
- 【使用QGIS入库将shp数据导入postgis、postgres数据库】
- VisualStudio2019 安装时下载不动或者显示下载失败
- 腾讯数据分析师内训课程!
- 计算机excel操作教程,Excel操作教程 -电脑资料
- java加载tensorflow训练的PB模型记录
- zenmap使用方法
- token放在cookie中和放在请求头中的区别