Spark SQL之External DataSource外部数据源(二)源代码分析
上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的。
/** Spark SQL源代码分析系列文章*/
(Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)演示样例 http://blog.csdn.net/oopsoom/article/details/42061077)
一、Sources包核心
Spark SQL在Spark1.2中提供了External DataSource API。开发人员能够依据接口来实现自己的外部数据源,如avro, csv, json, parquet等等。
在Spark SQL源代码的org/spark/sql/sources文件夹下,我们会看到关于External DataSource的相关代码。
这里特别介绍几个:
1、DDLParser
专门负责解析外部数据源SQL的SqlParser。解析create temporary table xxx using options (key 'value', key 'value') 创建载入外部数据源表的语句。
protected lazy val createTable: Parser[LogicalPlan] =CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {case tableName ~ provider ~ opts =>CreateTableUsing(tableName, provider, opts)}
2、CreateTableUsing
一个RunnableCommand。通过反射从外部数据源lib中实例化Relation。然后注冊到为temp table。
private[sql] case class CreateTableUsing(tableName: String,provider: String, // org.apache.spark.sql.json options: Map[String, String]) extends RunnableCommand {def run(sqlContext: SQLContext) = {val loader = Utils.getContextOrSparkClassLoaderval clazz: Class[_] = try loader.loadClass(provider) catch { //do reflectioncase cnf: java.lang.ClassNotFoundException =>try loader.loadClass(provider + ".DefaultSource") catch {case cnf: java.lang.ClassNotFoundException =>sys.error(s"Failed to load class for data source: $provider")}}val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] //json包DefaultDataSourceval relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))//创建JsonRelationsqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)//注冊Seq.empty}
}
2、DataSourcesStrategy
在 Strategy 一文中。我已讲过Streategy的作用,用来Plan生成物理计划的。
这里提供了一种专门为了解析外部数据源的策略。
最后会依据不同的BaseRelation生产不同的PhysicalRDD。
不同的BaseRelation的scan策略下文会介绍。
private[sql] object DataSourceStrategy extends Strategy {def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: CatalystScan)) =>pruneFilterProjectRaw(l,projectList,filters,(a, f) => t.buildScan(a, f)) :: Nil......case l @ LogicalRelation(t: TableScan) =>execution.PhysicalRDD(l.output, t.buildScan()) :: Nilcase _ => Nil}
3、interfaces.scala
该文件定义了一系列可扩展的外部数据源接口,对于想要接入的外部数据源,我们仅仅需实现该接口就可以。
里面比較重要的trait RelationProvider 和 BaseRelation,下文会具体介绍。
4、filters.scala
该Filter定义了怎样在载入外部数据源的时候,就进行过滤。注意哦,是载入外部数据源到Table里的时候,而不是Spark里进行filter。
这个有点像hbase的coprocessor,查询过滤在Server上就做了,不在Client端做过滤。
5、LogicalRelation
封装了baseRelation,继承了catalyst的LeafNode,实现MultiInstanceRelation。
二、External DataSource注冊流程
该类是一个RunnableCommand,其run方法会直接运行创建表语句。
三、External DataSource解析流程
四、External Datasource Interfaces
那么久必须定义AvroRelation来继承BaseRelation。同一时候也要实现一个RelationProvider。
abstract class BaseRelation {def sqlContext: SQLContextdef schema: StructType
abstract class PrunedFilteredScan extends BaseRelation {def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
1、schema我们假设自己定义Relation,必须重写schema,就是我们必须描写叙述对于外部数据源的Schema。
trait RelationProvider {/*** Returns a new base relation with the given parameters.* Note: the parameters' keywords are case insensitive and this insensitivity is enforced* by the Map that is passed to the function.*/def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}
五、External Datasource定义演示样例
private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(@transient val sqlContext: SQLContext)extends TableScan {private def baseRDD = sqlContext.sparkContext.textFile(fileName) //读取json fileoverride val schema =JsonRDD.inferSchema( // jsonRDD的inferSchema方法。能自己主动识别json的schema。和类型type。baseRDD,samplingRatio,sqlContext.columnNameOfCorruptRecord)override def buildScan() =JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord) //这里还是JsonRDD,调用jsonStringToRow查询返回Row
}
private[sql] class DefaultSource extends RelationProvider {/** Returns a new base relation with the given parameters. */override def createRelation(sqlContext: SQLContext,parameters: Map[String, String]): BaseRelation = {val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)JSONRelation(fileName, samplingRatio)(sqlContext)}
}
原创文章。转载请注明:
转载自:OopsOutOfMemory盛利的Blog。作者: OopsOutOfMemory
本文链接地址:http://blog.csdn.net/oopsoom/article/details/42064075
注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,可是请保留本文作者署名和文章链接。如若须要用于商业目的或者与授权方面的协商,请联系我。
转载于:https://www.cnblogs.com/yangykaifa/p/6828063.html
Spark SQL之External DataSource外部数据源(二)源代码分析相关推荐
- Spark SQL External DataSource外部数据源
一:介绍 官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html 随着Spark1.2的发布 ...
- Spark SQL External DataSource外部数据源操作流程
一:获取文件 官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html spark本身 有测试 ...
- Spark SQL 源代码分析系列
从决定写Spark SQL文章的源代码分析,到现在一个月的时间,一个又一个几乎相同的结束很快,在这里也做了一个综合指数,方便阅读,下面是读取顺序 :) 第一章 Spark SQL源代码分析之核心流程 ...
- Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)
目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...
- Spark SQL操作外部数据源
目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...
- 「Spark从入门到精通系列」4.Spark SQL和DataFrames:内置数据源简介
来源 | Learning Spark Lightning-Fast Data Analytics,Second Edition 作者 | Damji,et al. 翻译 | 吴邪 大数据4年从业经 ...
- spark写表指定外部表_Spark SQL 之自定义删除外部表
Spark SQL 之自定义删除外部表 前言 Spark SQL 在删除外部表时, 本不能删除外部表的数据的. 本篇文章主要介绍如何修改 Spark SQL 源码实现在删除外部表的时候, 可以带额外选 ...
- Spark SQL玩起来
标签(空格分隔): Spark [toc] 前言 Spark SQL的介绍只包含官方文档的Getting Started.DataSource.Performance Tuning和Distribut ...
- Spark SQL: Relational Data Processing in Spark
Spark SQL: Relational Data Processing in Spark Spark SQL : Spark中关系型处理模块 说明: 类似这样的说明并非是原作者的内容翻译,而是本篇 ...
最新文章
- 林宙辰:实现机器学习科研从 0 到 1,没有什么比一个好的理论更加实用
- 优艾智合机器人科技_在全球最大工业机器人消费市场,优艾智合如何助力产业智能化?...
- linux 蓝牙脚本,arm linux串口蓝牙工具移植及使用(示例代码)
- 5下载的demo在哪_归类专业能力水平评价练习盘!快来下载呀
- 数据结构 二、向量(接口与实现and可扩容向量)
- 看看人家架构师那消息队列中间件玩的,那叫一个优雅!
- python datatime 平均值_python-熊猫时间序列:时间戳列的平均值
- c语言枚举如何当函数返回值,C语言学习五 — 数组与枚举
- 求解偏微分方程开源有限元软件deal.II学习--Step 1
- iOS:练习题中如何用技术去实现一个连线题
- linux查看nginx昅 电视,PHP实现查询汉字笔画、笔画排序、笔画统计
- array函数python_python中如何使用numpy.array函数创建数组?
- Laravel文档梳理3、CSRF保护
- 【腾讯TMQ】老司机教你如何优雅地完成一个小项目测试
- 常见互联网职业英文简写,你是哪个O?
- python 化学_rdkit 化学反应分子式高级功能
- bootstrap table设置列宽
- 【微信公众号】怎么办理信息系统安全等级保护备案证明?
- 2020大疆校招嵌入式B卷编程题
- Fabric - chaincode开发模式