背景:最近需要根据parquet文件来解析schema信息,便参考学习了sparksql中infer parquet schema的相关代码

一、infer schema代码入口:

package位置:org.apache.spark.sql.execution.datasources.parquet

入口类:ParquetFileFormat是sparksql中paquert格式的data source,该类继承自FileFormat,类似的类还有OrcFileFormat,           AvroFileFormat 等

函数:

override def inferSchema(sparkSession: SparkSession,parameters: Map[String, String],files: Seq[FileStatus]): Option[StructType] = {ParquetUtils.inferSchema(sparkSession, parameters, files)
}

函数中调用了类ParquetUtils中的inferSchema方法,

类中提供了val shouldMergeSchemas = parquetOptions.mergeSchema来觉得infer出的schema是否需要进行merge,可以通过参数spark.sql.hive.convertMetastoreParquet来进行设置

继续往下看,ParquetUtils中通过调用ParquetFileFormat中的函数:

ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession)
mergeSchemasInParallel函数:
def mergeSchemasInParallel(parameters: Map[String, String],filesToTouch: Seq[FileStatus],sparkSession: SparkSession): Option[StructType] = {val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsStringval assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestampval reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`val converter = new ParquetToSparkSchemaConverter(assumeBinaryIsString = assumeBinaryIsString,assumeInt96IsTimestamp = assumeInt96IsTimestamp)readParquetFootersInParallel(conf, files, ignoreCorruptFiles).map(ParquetFileFormat.readSchemaFromFooter(_, converter))}SchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader)}

解析parquet schema的函数便是该方法,readParquetFootersInParallel函数负责读取到parquet的footer,其中readfooter方法便是调用parquet-hadoop方法中的api获取schema信息

private[parquet] def readParquetFootersInParallel(conf: Configuration,partFiles: Seq[FileStatus],ignoreCorruptFiles: Boolean): Seq[Footer] = {ThreadUtils.parmap(partFiles, "readingParquetFooters", 8) { currentFile =>try {// Skips row group information since we only need the schema.// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,// when it can't read the footer.Some(new Footer(currentFile.getPath(),ParquetFileReader.readFooter(conf, currentFile, SKIP_ROW_GROUPS)))} catch { case e: RuntimeException =>if (ignoreCorruptFiles) {logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)None} else {throw new IOException(s"Could not read footer for file: $currentFile", e)}}}.flatten}

上面的代码走读为通过parquet reader中的readFooter方法读取parquet文件的schema,

二、Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].

一个很重要的逻辑就是将parquet的MessageType转换为Spark SQL中的StructType

def readSchemaFromFooter(footer: Footer, converter: ParquetToSparkSchemaConverter): StructType = {val fileMetaData = footer.getParquetMetadata.getFileMetaDatafileMetaData.getKeyValueMetaData.asScala.toMap.get(ParquetReadSupport.SPARK_METADATA_KEY).flatMap(deserializeSchemaString).getOrElse(converter.convert(fileMetaData.getSchema))}

上述代码中的convert 方法便是该功能,该函数位于类:

org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter

具体代码如下:

/*** Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].*/def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType())private def convert(parquetSchema: GroupType): StructType = {val fields = parquetSchema.getFields.asScala.map { field =>field.getRepetition match {case OPTIONAL =>StructField(field.getName, convertField(field), nullable = true)case REQUIRED =>StructField(field.getName, convertField(field), nullable = false)case REPEATED =>// A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor// annotated by `LIST` or `MAP` should be interpreted as a required list of required// elements where the element type is the type of the field.val arrayType = ArrayType(convertField(field), containsNull = false)StructField(field.getName, arrayType, nullable = false)}}StructType(fields)}

本质上的逻辑便是将Parquet Type 转换为Spark SQL的DataType,具体逻辑不在贴代码,感兴趣的可以自己找到对应代码学习。

spark infer parquet schema相关推荐

  1. Spark读取Parquet格式的数据为Dataframe

    SaveMode指定文件保存时的模式: OverWrite 覆盖 Append 追加 ErrorIfExists 如果存在就报错 Ignore 如果存在就忽略 val spark = SparkSes ...

  2. spark之parquet

    列式存储布局(比如 Parquet)可以加速查询,因为它只检查所有需要的列并对它们的值执行计算,因此只读取一个数据文件或表的小部分数据.Parquet 还支持灵活的压缩选项,因此可以显著减少磁盘上的存 ...

  3. Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)

    目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...

  4. Spark SQL将rdd转换为数据集-反射来推断Inferring the Schema Using Reflection

    一:解读 官网:https://spark.apache.org/docs/latest/sql-getting-started.html The first method uses reflecti ...

  5. Spark Parquet使用

    Spark SQL下的Parquet使用最佳实践和代码实战  分类: spark-sql(1)  一.Spark SQL下的Parquet使用最佳实践 1)过去整个业界对大数据的分析的技术栈的Pipe ...

  6. Spark操作外部数据源(RDBMS,Hive,HBase,Parquet)

    文章目录 一.Spark SQL 二.Spark on Hive 三.Hive on Spark 四.Spark读取Parquet文件 五.Spark连接HBase 1.Maven工程添加依赖 2.代 ...

  7. spark读取hive表异常,处理WARN HiveExternalCatalog: The table schema given by Hive metastore

    文章目录 1 问题概述 1.1 数据库表状况 1.2 问题背景 2 报错场景 2.1 修改Hive元数据信息 2.2 报错信息 2.3 其他现象 2.4 查看表结构时的发现 2.5 报错分析 2.6 ...

  8. Spark 实战 - 3.一文搞懂 parquet

    一.引用 parquet 文件常见于 Spark.Hive.Streamin.MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面主要讲 parquet 文 ...

  9. spark读取文件源码分析-2

    文章目录 1. job1产生时机源码分析 1. DataSoure.getOrInferFileFormatSchema() 2. ParquetFileFormat.inferSchema 1. 简 ...

  10. Spark SQL中的DataFrame

    在2014年7月1日的 Spark Summit 上,Databricks 宣布终止对 Shark 的开发,将重点放到 Spark SQL 上.在会议上,Databricks 表示,Shark 更多是 ...

最新文章

  1. java类接口的区别_【Java基础】java接口和类的区别-瑶瑶吖的回答
  2. java读取jsp文件内容_使用Java读取Excel文件内容
  3. Proteus仿真STM32F103R6的寄存器版跑马灯程序(存储器宏定义)
  4. shell中的Here Document
  5. Java入门, 线程
  6. 【C++ grammar】对象和类(创建对象、对象拷贝、分离声明与实现)
  7. 矩阵连乘问题(c++)
  8. GBDT和XGBoost
  9. 英国电信移除华为设备后,多家运营商继续与华为合作,并达成20亿英镑协议...
  10. 【Java】面向对象基本特性-封装
  11. Datawhale编程学习之二叉树和堆(5)
  12. Json对象直接存取数据库
  13. NLP自然语言处理 之 jieba中文处理
  14. 高效能人士的七个习惯-第二章-阅读
  15. Mysql中LENGTH()函数
  16. 手把手教做无人驾驶算法(二十八)--Tube MPC与MPC区别
  17. piblic class 和class的区别
  18. 微信公众平台开发【发送消息】被动回复消息
  19. java杨戬的角色_非人哉:游戏角色反映了神仙们的爱好,杨戬太让人意外了
  20. 解决Windows 由于路径过长而无法删除文件的问题

热门文章

  1. containers matlab,Matlab 中实用数据结构之 containers.Map
  2. BugkuCTF –备份是个好习惯
  3. 推荐6款习惯养成APP,送给想要提升自己的人!
  4. Collaborative Evolutionary Reinforcement Learning
  5. 16岁天才开发的Summly获李嘉诚种子投资
  6. MATLAB——更换主题颜色
  7. cmd 一键清除系统垃圾
  8. 让孩子更快乐地学编程,一套积木就够了,长毛象AI百变编程积木套件体验
  9. C#往图片上面添加文字
  10. 某酒店App sign、appcode签名解析(一) 带壳分析 r0tracer