下面的代码演示了通过Case Class进行表Schema定义的例子:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)

(1)sql方法返回DataFrame

  def sql(sqlText: String): DataFrame = {DataFrame(this, parseSql(sqlText))}
  • 1
  • 2

其中parseSql(sqlText)方法生成相应的LogicalPlan得到,该方法源码如下:

//根据传入的sql语句,生成LogicalPlan
protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
  • 1

ddlParser对象定义如下:

protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))
  • 1

(2)然后调用DataFrame的apply方法

private[sql] object DataFrame {def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {new DataFrame(sqlContext, logicalPlan)}
}
  • 1

可以看到,apply方法参数有两个,分别是SQLContext和LogicalPlan,调用的是DataFrame的构造方法,具体源码如下:

//DataFrame构造方法,该构造方法会自动对LogicalPlan进行分析,然后返回QueryExecution对象
def this(sqlContext: SQLContext, logicalPlan: LogicalPlan) = {this(sqlContext, {val qe = sqlContext.executePlan(logicalPlan)//判断是否已经创建,如果是则抛异常if (sqlContext.conf.dataFrameEagerAnalysis) {qe.assertAnalyzed()  // This should force analysis and throw errors if there are any}qe})}
  • 1

(3)val qe = sqlContext.executePlan(logicalPlan) 返回QueryExecution, sqlContext.executePlan方法源码如下:

protected[sql] def executePlan(plan: LogicalPlan) =new sparkexecution.QueryExecution(this, plan)
  • 1

QueryExecution类中表达了Spark执行SQL的主要工作流程,具体如下

class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {@VisibleForTestingdef assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed)lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)lazy val withCachedData: LogicalPlan = {assertAnalyzed()sqlContext.cacheManager.useCachedData(analyzed)}lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)// TODO: Don't just pick the first one...lazy val sparkPlan: SparkPlan = {SparkPlan.currentContext.set(sqlContext)sqlContext.planner.plan(optimizedPlan).next()}// executedPlan should not be used to initialize any SparkPlan. It should be// only used for execution.lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan)/** Internal version of the RDD. Avoids copies and has no schema *///调用toRDD方法执行任务将结果转换为RDDlazy val toRdd: RDD[InternalRow] = executedPlan.execute()protected def stringOrError[A](f: => A): String =try f.toString catch { case e: Throwable => e.toString }def simpleString: String = {s"""== Physical Plan ==|${stringOrError(executedPlan)}""".stripMargin.trim}override def toString: String = {def output =analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")s"""== Parsed Logical Plan ==|${stringOrError(logical)}|== Analyzed Logical Plan ==|${stringOrError(output)}|${stringOrError(analyzed)}|== Optimized Logical Plan ==|${stringOrError(optimizedPlan)}|== Physical Plan ==|${stringOrError(executedPlan)}|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}""".stripMargin.trim}
}
  • 1

可以看到,SQL的执行流程为
1.Parsed Logical Plan:LogicalPlan
2.Analyzed Logical Plan:
lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)
3.Optimized Logical Plan:lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)
4. Physical Plan:lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan)

可以调用results.queryExecution方法查看,代码如下:

scala> results.queryExecution
res1: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias('name)]
 'UnresolvedRelation [people], None== Analyzed Logical Plan ==
name: string
Project [name#0]
 Subquery people
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at createDataFrame at <console>:47== Optimized Logical Plan ==
Project [name#0]
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at createDataFrame at <console>:47== Physical Plan ==
TungstenProject [name#0]
 Scan PhysicalRDD[name#0,age#1]Code Generation: true
  • 1
  • 2

(4) 然后调用DataFrame的主构造器完成DataFrame的构造

class DataFrame private[sql](@transient val sqlContext: SQLContext,@DeveloperApi @transient val queryExecution: QueryExecution) extends Serializable 
  • 1

(5)
当调用DataFrame的collect等方法时,便会触发执行executedPlan

  def collect(): Array[Row] = withNewExecutionId {queryExecution.executedPlan.executeCollect()}
  • 1

例如:

scala> results.collect
res6: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])
  • 1

整体流程图如下:

Spark SQL 处理流程分析 (一)相关推荐

  1. Spark SQL运行流程及性能优化:RBO和CBO

    1 Spark SQL运行流程 1.1 Spark SQL核心--Catalyst Spark SQL的核心是Catalyst查询编译器,它将用户程序中的SQL/Dataset/DataFrame经过 ...

  2. Spark SQL Catalyst源代码分析之TreeNode Library

    /** Spark SQL源代码分析系列文章*/ 前几篇文章介绍了Spark SQL的Catalyst的核心执行流程.SqlParser,和Analyzer,本来打算直接写Optimizer的,可是发 ...

  3. Spark SQL Catalyst源代码分析Optimizer

    /** Spark SQL源代码分析系列*/ 前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将具体解说S ...

  4. 【华为云技术分享】大数据实践解析(下):Spark的读写流程分析

    摘要:本文通过简单的例子来解析,大数据实践中的Spark读写流程,内容主要聚焦于Spark中的高效并行读写以及在写过程中如何保证事务性. 导读: 众所周知,在大数据/数据库领域,数据的存储格式直接影响 ...

  5. MyBatis系列之浅谈SQL执行流程分析

    目录 独立使用Mybatis Mybatis执行流程 SqlSessionFactory\SqlSession MapperProxy Excutor 独立使用Mybatis 这篇文章主要以分析Myb ...

  6. TP5 框架 SQL 执行流程分析及 5.0.9 SQL 注入漏洞分析

    文章目录 SQL查询流程 TP 5.0.9 SQL注入 修复 SQL查询流程 TP5手册:https://www.kancloud.cn/manual/thinkphp5/118044 在分析 tp5 ...

  7. Spark SQL执行流程解析之 sql执行顺序

    一.Spark SQL模板解析 Spark Sql 关键字执行顺序跟Sql执行顺序类似: 1.先将from前两表做笛卡尔积加载进来形成虚拟表vt1, 2.on条件后为真的插入虚拟表形成vt2, 如果包 ...

  8. 深入浅出Mybatis系列(十)---SQL执行流程分析(源码篇)

    原文地址:http://www.cnblogs.com/dongying/p/4142476.html 最近太忙了,一直没时间继续更新博客,今天忙里偷闲继续我的Mybatis学习之旅.在前九篇中,介绍 ...

  9. MySQL 文件结构、逻辑架构及 sql 执行流程分析作者:Java后端架构

    1.MySQL 文件说明 1.1 MySQL 文件夹文件 linux 服务器上 MySQL 安装好之后都有如下文件: auto.cnf:每一个 MySQL 实例都有一个唯一 ID 蓝色文件夹:表示数据 ...

最新文章

  1. 设置vue运行npm run dev时候,项目在浏览器自动打开页面的方法
  2. 调查:中国CIO在亚太拥最大战略影响力
  3. Hive学习之七《 Sqoop import 从关系数据库抽取到HDFS》
  4. Redis BitMap适应场景
  5. 课程升级 | 极速构建知识体系,即学即用 Serverless
  6. ie9怎么开兼容模式
  7. [css] 举例说说你对white-space属性的理解
  8. 64位linux安装mysql数据库吗_Linux下安装Mysql数据库
  9. linux配置java环境变量(转)
  10. Vim 编辑器底端 [noeol], [dos] 的含义
  11. python web框架 - Django
  12. Linux/Windows 文件交互读取转义字符变换
  13. jquery中的ajax方法详解
  14. 排查链接是否失效_【知了堂信安笔记】Linux入侵排查
  15. php实现微信登录详细教程,PHP开发微信授权登录操作教程
  16. 从师傅到伙伴:华为背后总是有IBM的影子
  17. Android 微信聊天页面
  18. 特斯拉中国召回近5万辆Model S和X,美国法务:中国驾驶员使用不当-1
  19. 一个TCP连接总是以1KB的最大段发送TCP段,发送方有足够多的数据要发送。当拥塞窗口为16KB时发生了超时,如果接下来的4个RTT(往返时间)时间内的TCP段的传输都是成功的,那么当第4个RTT时间
  20. module 'gensim' has no attribute 'corpora'

热门文章

  1. 满分最优解法:1007 素数对猜想 (20分)
  2. mysql事务用法介绍及储存引擎介绍(MyLSAM,Innodb)
  3. Android与服务器端数据交互(基于SOAP协议整合android+webservice)
  4. python写水仙花的作文_python自动打开浏览器下载zip,并且提取内容写入excel
  5. seo优化源码_武汉seo公司关键词SEO优化实战记录,1个月三个站在首页
  6. java 矩阵题目_一些数学分析不错的题目
  7. python交换两列的位置_如何更改 pandas dataframe 中两列的位置
  8. ad19pcb所有元件都在报错_PLC的线圈输出和置复位,可以混用吗?很多人都会犯这个错误!...
  9. sqlite like concat 怎么 替代_Joplin:真正的 Evernote 开源替代品
  10. java多线程爬虫_Java 多线程爬虫及分布式爬虫架构