1.通过RDD+case class创建DataFrame

package com.doit.spark.day10import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}object CaseClassCreateDataFrame {def main(args: Array[String]): Unit = {//构建SparkSessionval sparkSession: SparkSession = SparkSession.builder().appName(this.getClass().getSimpleName).master("local[*]").getOrCreate()//使用RDD读取切分数据,放入case class中val lineRDD: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\WoBo\\Desktop\\user.txt")val userRDD: RDD[User] = lineRDD.map(x => {val arr: Array[String] = x.split(",")val name: String = arr(0)val age: Int = arr(1).toIntval fv: Double = arr(2).toDoubleUser(name, age, fv)})//通过RDD创建DataFrameval dataFrame: DataFrame = sparkSession.createDataFrame(userRDD)//创建好DataFrame之后,就能通过RDD获取到表格结构信息和表内数据dataFrame.printSchema() //打印表结构dataFrame.show()  //展示表内数据,默认show前20行}
}
case class User(name:String,age:Int,fv :Double)

2.通过RowRDD+StructType创建DataFrame

package com.doit.spark.day10import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object DataStructTypeCreateDataFrame {def main(args: Array[String]): Unit = {//构建SparkSessionval sparkSession: SparkSession = SparkSession.builder().appName(this.getClass().getSimpleName).master("local[*]").getOrCreate()//使用RDD读取切分数据,放入Row中val lineRDD: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\WoBo\\Desktop\\user.txt")val rowRDD: RDD[Row] = lineRDD.map(x => {val arr: Array[String] = x.split(",")val name: String = arr(0)val age: Int = arr(1).toIntval fv: Double = arr(2).toDoubleRow(name, age, fv)})// new StructType ,它里面装的是一个装字段描述信息的List集合val structType = new StructType()//参数一:字段名,参数二:字段类型,参数三:字段是否可以为null(false表示不能,true表示可以).add("name",DataTypes.StringType,false).add("age",DataTypes.IntegerType,true).add("fv",DataTypes.DoubleType,true)//或者使用如下方式创建//StructType(List(StructField("name", DataTypes.StringType, false), StructField("age", DataTypes.IntegerType, false), StructField("fv", DataTypes.DoubleType, false)))//将rowRDD和structType联系起来,创建出一张完整的表val dataFrame: DataFrame = sparkSession.createDataFrame(rowRDD, structType)//创建好DataFrame之后,就能通过RDD获取到表格结构信息和表内数据dataFrame.printSchema() //打印表结构dataFrame.show()  //展示表内数据,默认show前20行}
}

3.RDD+toDF创建DataFrame

package com.doit.spark.day10import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object RDDtoDFCreateDataFrame {def main(args: Array[String]): Unit = {//构建SparkSessionval sparkSession: SparkSession = SparkSession.builder().appName(this.getClass().getSimpleName).master("local[*]").getOrCreate()//使用RDD读取切分数据,放入元组中val lineRDD: RDD[String] = sparkSession.sparkContext.textFile("C:\\Users\\WoBo\\Desktop\\user.txt")val tupleRDD: RDD[(String, Int, Double)] = lineRDD.map(x => {val arr: Array[String] = x.split(",")val name: String = arr(0)val age: Int = arr(1).toIntval fv: Double = arr(2).toDouble(name, age, fv)})//要RDD使用toDF需要先导入隐式转换,但元组内最多只能装22个字段,所以要装很多字段,还是推荐使用case classimport sparkSession.implicits._val dataFrame: DataFrame = tupleRDD.toDF("name", "age", "fv")dataFrame.printSchema()dataFrame.show()}
}

4.读取JSON格式的数据,生成DataFrame

package com.doit.spark.day10import org.apache.spark.sql.{DataFrame, SparkSession}object ReadJSONCreateDataFrame {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName(this.getClass().getSimpleName).master("local[*]").getOrCreate()val dataFrame: DataFrame = sparkSession.read.json("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.json")//或者使用第二种方式读取,这种方式虽然繁琐一点,但是更灵活,文件类型可以通过参数传进代码中val dataFrame1: DataFrame = sparkSession.read.format("json").load("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.json")dataFrame1.createTempView("v_user")dataFrame1.printSchema()//查询所有脏数据sparkSession.sql("select * from v_user where _corrupt_record is NOT NULL").show()}
}

在读取json格式的数据时,在处理数据之前,就触发一次Action,并进行全表检索,目的是生成表的schema信息,但json类型的数据,每一行的字段个数可能不一样,所以需要全表检索,最终将出现过的字段都会生成表的字段,没有该字段的行赋予null,读取json类型的数据时,还会自动推导数据的类型.
如果json数据中存在脏数据,会专门生成一个字段(_corrupt_record),将所有的脏数据都放入该字段中.

+--------------------+----+-------+------+------+--------+
|     _corrupt_record| age|     fv|gender|height|    name|
+--------------------+----+-------+------+------+--------+
|                null|  18|9999.99|  null|  null| laozhao|
|                null|  28| 999.99|  null|  null| laoduan|
|                null|  20| 999.99|  null|  null|nianhang|
|{"name": "heihei"...|null|   null|  null|  null|    null|
|                null|  20| 999.99|  male|  null|    nana|
|                null|null|   null|  null| 180.2|    test|
|                null|  18|9999.99|  null|  null| laozhao|
|                null|  28| 999.99|  null|  null| laoduan|
|                null|  20| 999.99|  null|  null|nianhang|
root|-- _corrupt_record: string (nullable = true)|-- age: long (nullable = true)|-- fv: double (nullable = true)|-- gender: string (nullable = true)|-- height: double (nullable = true)|-- name: string (nullable = true)
+--------------------+----+----+------+------+----+
|     _corrupt_record| age|  fv|gender|height|name|
+--------------------+----+----+------+------+----+
|{"name": "heihei"...|null|null|  null|  null|null|
|{"name": "heihei"...|null|null|  null|  null|null|
|{"name": "heihei"...|null|null|  null|  null|null|
|{"name": "heihei"...|null|null|  null|  null|null|
|{"name": "heihei"...|null|null|  null|  null|null|
|{"name": "heihei"...|null|null|  null|  null|null|
+--------------------+----+----+------+------+----+

5.读取CSV格式的数据,生成DataFrame

package com.doit.spark.day10import org.apache.spark.sql.{DataFrame, SparkSession}object ReadCSVCreateDataFrame {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName(this.getClass().getSimpleName).master("local[*]").getOrCreate()//当数据中没有表头时val dataFrame: DataFrame = sparkSession.read.option("inferSchema", "true") //自动推断数据类型,不指定为true的话,读取到的数据都是String类型的.csv("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.csv").toDF("name", "age", "fv")//当数据中有表头时val dataFrame1: DataFrame = sparkSession.read.option("header", "true") //读取数据的第一行作为表的字段名.option("inferSchema", "true") //自动推断数据类型.csv("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.csv")dataFrame.printSchema()dataFrame.show()}
}

读取csv文件时,读取表头会触发一次Action,如果自动推导数据类型还会再触发一次Action,而且会全表检索,因为它需要判断后面的每行数据的每个字段是否都是同一数据类型的,那么这样自动推导数据类型就会十分浪费运行速度,所以最佳的方式为手动指定StructType

package com.doit.spark.day10import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}object ReadCSVCreateDataFrame {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName(this.getClass().getSimpleName).master("local[*]").getOrCreate()val structType = new StructType()//参数一:字段名,参数二:字段类型,参数三:字段是否可以为null(false表示不能,true表示可以).add("name",DataTypes.StringType,false).add("age",DataTypes.DoubleType,true).add("fv",DataTypes.DoubleType,true)val dataFrame2: DataFrame = sparkSession.read.option("header", "true").schema(structType).csv("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.csv")dataFrame2.printSchema()dataFrame2.show()}
}

当数据有表头时,可以使用.option(“header”, “true”),将表头读取掉

csv数据中,当某行多一个字段时,会自动忽略该行,少字段时,会补null

6.读取parquet格式的数据,生成DataFrame

首先parquet格式的数据它是一种列式存储文件,它比普通格式的文件,更加紧凑,且支持压缩,当我们写sql进行数据查询时,查询某个字段不用全表检索,大大提高了检索速度,且它自带schema信息(包括字段名称,字段类型,字段索引等信息),parquet格式的数据是spark最喜欢的文件格式

但parquet文件不能手动创建,只能系统生成:

package com.doit.spark.day10import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object CreateParquetDataFrame {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName(this.getClass().getSimpleName).master("local[*]").getOrCreate()val dataFrame: DataFrame = sparkSession.read.json("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.json")//通过读取json格式数据创建好的DataFrame,创建parquet文件的数据dataFrame.write.parquet("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.parquet")//写出数据时,可以采用多种方式写出//追加写入dataFrame.write.mode(SaveMode.Append).parquet("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.parquet")//删除原文件,生成新文件,相当于覆盖写入dataFrame.write.mode(SaveMode.Overwrite)//如果原文件存在,就不写入,且不报错dataFrame.write.mode(SaveMode.Ignore)//再读取刚刚创建好的parquet数据val dataFrame1: DataFrame = sparkSession.read.parquet("D:\\每日总结\\视频\\spark\\spark-day10\\资料\\user.parquet")dataFrame1.printSchema()dataFrame1.show()}
}

7.读写JDBC中的数据

package com.doit.spark.day10import java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object JDBCAndDataFrame {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName(this.getClass().getSimpleName).master("local[*]").getOrCreate()val properties = new Properties()properties.setProperty("user","root")properties.setProperty("password","123456")//通过jdbc,mysql数据库中的表,创建DataFrame,参数一:jdbc的url,参数二:表名,参数三:用户名和密码的配置信息//如果jdbc的路径写错,和RDD不一样在读取数据之前就会报错,因为DataFrame是强类型的,RDD是弱类型的(在真正处理数据之前不会去路径中读取数据)//注意指定编码格式为UTF-8val dataFrame: DataFrame = sparkSession.read.jdbc("jdbc:mysql://localhost:3306/db_demo3?characterEncoding=UTF-8", "student", properties)//读取jdbc中的数据,会将jdbc中的表结构,字段信息,数据类型等一并读取过来,且是在Driver端获取的Schema信息dataFrame.printSchema()dataFrame.show()//往jdbc中写数据,当表不存在时,会自动创建表dataFrame.write.mode(SaveMode.Append)  //追加写入.jdbc("jdbc:mysql://localhost:3306/db_demo3?characterEncoding=UTF-8", "student", properties)}
}

大数据之spark_spark SQL的建表语句相关推荐

  1. SQL Server 怎样使用SQL输出建表语句

    在一般的数据库系统中可使用 表名右键--[Script Table as]--[Create To]--XX 输出建表SQL语句,但在某些SQL Server系统中可能客户仅提供数据库查询权限,导致无 ...

  2. mysql日期维表sql文件_《MySQL必知必会》笔记(SQL练习+建表语句)

    站在巨人的肩上 Standing On Shoulders Of Giants 部分转自:https://www.jianshu.com/p/294502893128 https://blog.csd ...

  3. hive建表语句_Hive数据如何同步到MaxCompute之实践讲解

    摘要:本次分享主要介绍 Hive数据如何迁移到MaxCompute.MMA(MaxCompute Migration Assist)是一款MaxCompute数据迁移工具,本文将为大家介绍MMA工具的 ...

  4. 数据库建表原则,SQL数据库建表前期优化,SQL数据库操作优化,数据库命名规范...

    2019独角兽企业重金招聘Python工程师标准>>> 关键字: 数据库建表原则 ·1. 原始单据与实体之间的关系 可以是一对一.一对多.多对多的关系.在一般情况下,它们是一对一的关 ...

  5. 大数据平台回归SQL

    先说观点:因为还没找到更好的. 接下来说原因,首先来看看大数据平台都在干什么. 原因 结构化数据计算仍是重中之重 大数据平台主要是为了应对海量数据存储和分析的需求,海量数据存储的确不假,除了生产经营产 ...

  6. 05_ClickHouse、MergeTree系列引擎概述与存储结构、建表模板、建表语句、MergeTree设置、建表示例、数据存储、数据片段(data part)

    2.MergeTree系列引擎概述与存储结构 2.1.建表模板 2.2.建表语句 2.3.MergeTree设置 2.4.建表示例 2.5.数据存储 2.6.数据片段(data part) 2.Mer ...

  7. 解决方案 -SQL脚本建表产生ORA-00942错误

    解决方案 -SQL脚本建表产生ORA-00942错误 参考文章: (1)解决方案 -SQL脚本建表产生ORA-00942错误 (2)https://www.cnblogs.com/sh086/p/83 ...

  8. 大数据课程——Spark SQL

    大数据课程--Spark SQL   实验内容以及要求 现有一份汽车销售记录(文件名:Cars.csv),销售记录包括时间.地点.邮政编码.车辆类型等信息,每条记录信息包含39项数据项.按步骤完成如下 ...

  9. SQL代码建表时引用外键,有红线提示引用了无效的表

    SQL代码建表时引用外键,有红线提示引用了无效的表 解决:应该先建被引用的外键的表,再建要引用外键的表. 通俗讲就是,A这个表要用外键,就得先建好含有外键的B表,就是顺序问题

  10. SQL server 建表时的一些知识 常用的Sql函数

    5.11 SQL server  建表时的一些知识    SQL server 中 bit 类型的非零自动为 1 (如果是字母的话则报错) 一个新的存储过程    declare @sql nvarc ...

最新文章

  1. 2022-2028年中国塑料编织品的制造行业市场竞争态势及投资方向分析报告
  2. Android之jdbc的学习
  3. 11.6 mpstat:CPU信息统计
  4. c语言扫描图片的坐标,tc 如何在指定坐标处 输出bmp图片??
  5. jQuery.Callbacks之demo
  6. 评论:IBM大型机能靠云计算挽回颓势吗?
  7. potala(5)——Unit Test and Cache
  8. 带音效的计算机软件,普通电脑WIN7上安装杜比音效增强软件Dolby Home TheaterV4教程...
  9. 低频声音功率放大器电子设计报告
  10. 【应用统计学】简单随机抽样的区间估计和样本容量的确定
  11. 真-vue使用jsonp跨域
  12. n分频器 verilog_verilog 语言实现任意分频
  13. DNW的详细配置及使用过程
  14. 欢迎使用CSDN-markdown编辑器123213
  15. win10无限蓝屏_windows10系统蓝屏无限重启命令解决教程
  16. win+linux双系统实现efi共存(即通过linux启动界面切换系统)
  17. yolov5 nms 源码理解
  18. 漫画:什么是 “幼态持续” ?
  19. bzoj4605 崂山白花蛇草水 权值线段树套kd树
  20. Esp32 spi slave配置

热门文章

  1. 马哥linux脚本,马哥全套linux运维教程
  2. 宝塔linux怎么运行war,宝塔Linux面板在线解压WAR压缩文件
  3. python期权价格计算器_使用Python自带GUI tkinter编写一个期权价格计算器
  4. 过采样 Oversampling
  5. linux怎么把dos改成unix_Linux命令之dos2unix – 将DOS格式文本文件转换成UNIX格式
  6. 用python写一个地铁线路图_python制作一线城市地铁运行动态图
  7. 硬件设计——DC-DC转换器
  8. 【c++算法】《c/c++实现SM4加密解密算法》
  9. Android一键清空内存,教你一键深度清理手机垃圾,瞬间释放几个G,再也不怕内存不够了...
  10. 基于Easy CHM和VS的帮助文档制作