什么是SparkSQL

SparkSQL是Spark用于结构化数据处理的模块

SparkSQL的原理

SparkSQL提供了两个编程抽象,DataFrame和DataSet

DataFrame

1)DataFrame是一种类似RDD的分布式数据集,类似于传统数据库中的二维表格。

2)DataFrame与RDD的主要区别在于,DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。

注意: Spark SQL性能上比RDD要高。因为Spark SQL了解数据内部结构,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在Stage层面进行简单、通用的流水线优化。

DataSet

1.DataSet是分布式数据集。

2. DataSet是强类型的。比如可以有DataSet[Car],DataSet[User]。具有类型安全检查

3.DataFrame是DataSet的特例,type DataFrame = DataSet[Row] ,Row是一个类型,跟Car、User这些的类型一样,所有的表结构信息都用Row来表示。

Rdd、Dataframe、DataSet的关系

(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利

(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会开始遍历运算

(3)三者有许多共同的函数,如filter,排序等

(4)三者都会根据Spark的内存情况自动缓存运算

(5)三者都有分区的概念

三者转换图:

 //注意:在做转换之前,一定要导入隐式转换import spark.implicits._//RDD=>DF//普通RDD转换DF,需要手动补充列名val df01: DataFrame = dataRDD.toDF("name","age")//样例类RDD=>DF,自动会把样例类的属性名,作为列名val df: DataFrame = caseRDD.toDF()df01.show()df.show()//DF => RDD//DF转RDD,直接.rdd即可,但是需要注意的是DF不会保留原始数据的类型,类型统统为ROWval rdd01: RDD[Row] = df.rddval res: RDD[User] = rdd01.map(row => {User(row.getString(0), row.getLong(1))})//res.collect().foreach(println)//RDD=>DSval ds01: Dataset[(String, Long)] = dataRDD.toDS()val ds02: Dataset[User] = caseRDD.toDS()ds01.show()ds02.show()//DS => RDDval ds2rdd: RDD[User] = ds02.rddval ds2rdd02: RDD[(String, Long)] = ds01.rddds2rdd.collect().foreach(println)val df2df: DataFrame = df.toDF("name2","age2")df2df.show()val df2ds: Dataset[User] = df2df.as[User]df2ds.show()//DS=>DFval ds2df: DataFrame = df2ds.toDF("name","age")ds2df.show()

自定义函数

UDF 一进一出

    val df: DataFrame = spark.read.json("E:\\bigdata_study\\sparkSQL\\input\\user.json")//创建DataFrame临时视图df.createTempView("user")spark.udf.register("addname",(name:String)=>{"Name:" + name})spark.udf.register("double",(age:Long)=>{"double:" + age * 2})spark.sql("select addName(name),double(age) from user").show

UDAF 输入多行,返回一行

  def main(args: Array[String]): Unit = {// 1 创建上下文环境配置对象val conf: SparkConf = new SparkConf().setAppName("SparkSQLTest").setMaster("local[*]")// 2 创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()val df: DataFrame = spark.read.json("E:\\bigdata_study\\sparkSQL\\input\\user.json")df.createTempView("user")spark.udf.register("myAvg",functions.udaf(new MyAvgUADF))spark.sql("select myAvg(age) from user").show()// 5 释放资源spark.stop()}
}
case class Buff(var sum:Long, var count:Double)class MyAvgUADF extends Aggregator[Long,Buff,Double]{override def zero: Buff = Buff(0L,0L)//buff在单个分区内的聚合方法override def reduce(buff: Buff, age: Long): Buff = {buff.sum += agebuff.count += 1buff}//多个buff在分区间的合并方法override def merge(b1: Buff, b2: Buff): Buff = {b1.sum += b2.sumb1.count += b2.countb1}override def finish(reduction: Buff): Double = {reduction.sum.toDouble/reduction.count}//序列化方法override def bufferEncoder: Encoder[Buff] = Encoders.productoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

加载数据

//特定加载
val df01: DataFrame = spark.read.json("E:\\bigdata_study\\sparkSQL\\input\\user.json")
val df02: DataFrame = spark.read.csv("E:\\bigdata_study\\sparkSQL\\input\\user.txt")//通用加载
val df03: DataFrame = spark.read.load("E:\\bigdata_study\\sparkSQL\\input\\user.json")

写出数据

    //特定
df.write.json("E:\\bigdata_study\\sparkSQL\\input\\out01")
df.write.csv("E:\\bigdata_study\\sparkSQL\\input\\out02")/*** 写出模式* 1.默认模式 存在即报错  如果目录不存在,正常写出;如果目录存在,报错!* 2.追加模式 追加写出   如果目录不存在,正常写出;如果目录存在,追加写出!* 3.覆盖模式 覆盖写出   如果目录不存在,正常写出;如果目录存在,删除该目录,重新创建同名目录,写出* 4.忽略模式 忽略写出   如果目录不存在,正常写出;如果目录存在,忽略本次操作,不报错! (慎用)*/
//通用
df.write.mode(SaveMode.Ignore).save("E:\\bigdata_study\\sparkSQL\\input\\out02")

Spark之SparkSQL相关推荐

  1. Hive on Spark与SparkSql的区别

    Hive on Spark与SparkSql的区别 hive on spark大体与SparkSQL结构类似,只是SQL引擎不同,但是计算引擎都是spark! 核心代码 #初始化Spark SQL # ...

  2. Spark操作sparkSql报错:metastore.ObjectStore: Version information found in metastore differs 2.3.0 from e

    Spark操作sparkSql报错:metastore.ObjectStore: Version information found in metastore differs 2.3.0 from e ...

  3. Spark之SparkSQL数据源

    SparkSQL数据源:parquet Json Mysql Hive: SparkSQL数据源 手动指定选项 Spark SQL的DataFrame接口支持多种数据源的操作.一个DataFrame可 ...

  4. Spark之SparkSQL理论篇

    Spark SQL 理论学习: 简介 Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用. 特点 1)易整合 2) ...

  5. Spark之SparkSQL实战

    DataFrames 基本操作和 DSL SQL风格 UDF函数 以及数据源: SparkSQL查询 Json数据准备 123 {"name":"Michael" ...

  6. 【Spark】sparksql中使用自定义函数

    代码中分别用面向对象和面向函数两种写法自定义了两个函数: low2Up: 小写转大写 up2Low: 大写转小写 import org.apache.spark.sql.types.StringTyp ...

  7. 大数据中spark跟sparksql写入es数据库

    作者:小涛 object Legend01 { private val logger: Logger = LoggerFactory.getLogger("Legend") def ...

  8. spark学习-SparkSQL一些函数的使用

    1.parallelizePairs余parallelize /** Distribute a local Scala collection to form an RDD. */def paralle ...

  9. Spark与sparkSql开发

    目录 一. 软件环境 二. 开发环境准备 1. scala编译环境搭建 2. IDEA扩展Scala支持插件

最新文章

  1. java debug体系为什么不能debug到jdk里所有的代码
  2. NKStartup的参数KData
  3. C和C++中读取不定数量的输入数据
  4. 05--MySQL自学教程:DDL(Data Definition Language:数据库定义语言)操作数据库(一)
  5. 把数据保存到cook_JavaScript数据存储 Cookie篇
  6. SAP CRM和SAP Hybris的订单日志
  7. Javascript之全局变量和局部变量部分讲解
  8. java值类型和引用类型 == 比较,Java中值类型和引用类型的比较与问题解决
  9. 详解SaaS产品的5类核心指标
  10. yum提示“Cannot retrieve metalink for repository: epel/x86_64” 解决方法
  11. 2.Node.js access_token的获取、存储及更新
  12. 大样本OLS模型假设及R实现
  13. 功能丰富强大的开源HEVC分析软件 “ Gitl HEVC Analyzer ”
  14. 小麦苗的常用代码--常用命令(仅限自己使用)--下
  15. hdu 4311 4312 Meeting point 曼哈顿距离之和最小
  16. CCID 设备通讯 (Windows 平台)
  17. 树莓派卸载系统自带应用增大硬盘空间
  18. 内存屏障 Memory Barriers
  19. ie8与ie9的区别
  20. 2012/10/03---生化危机

热门文章

  1. ECShop开源网店系统的代码写的怎么样?
  2. 大数据领域的杰出公司(国内外1)
  3. popcap资源管理
  4. python数据分析二一:图形化显示海地地震危机数据
  5. TT100K/BDD100K数据集格式转换
  6. 打造完美可随意安装的WinXP镜像
  7. 【成为博客专家】大数据面试题
  8. 【转】购买智能手机必须要知道的一些知识(cortex A8/A9/A5/A15 智能手机名称整理)...
  9. CA解扰 数字电视加密技术(EMM ECM)
  10. 各类免费API接口推荐,再也不怕找不到免费API了