spark infer parquet schema
背景:最近需要根据parquet文件来解析schema信息,便参考学习了sparksql中infer parquet schema的相关代码
一、infer schema代码入口:
package位置:org.apache.spark.sql.execution.datasources.parquet
入口类:ParquetFileFormat是sparksql中paquert格式的data source,该类继承自FileFormat,类似的类还有OrcFileFormat, AvroFileFormat 等
函数:
override def inferSchema(sparkSession: SparkSession,parameters: Map[String, String],files: Seq[FileStatus]): Option[StructType] = {ParquetUtils.inferSchema(sparkSession, parameters, files)
}
函数中调用了类ParquetUtils中的inferSchema方法,
类中提供了val shouldMergeSchemas = parquetOptions.mergeSchema来觉得infer出的schema是否需要进行merge,可以通过参数spark.sql.hive.convertMetastoreParquet来进行设置
继续往下看,ParquetUtils中通过调用ParquetFileFormat中的函数:
ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession)
mergeSchemasInParallel函数:
def mergeSchemasInParallel(parameters: Map[String, String],filesToTouch: Seq[FileStatus],sparkSession: SparkSession): Option[StructType] = {val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsStringval assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestampval reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`val converter = new ParquetToSparkSchemaConverter(assumeBinaryIsString = assumeBinaryIsString,assumeInt96IsTimestamp = assumeInt96IsTimestamp)readParquetFootersInParallel(conf, files, ignoreCorruptFiles).map(ParquetFileFormat.readSchemaFromFooter(_, converter))}SchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader)}
解析parquet schema的函数便是该方法,readParquetFootersInParallel函数负责读取到parquet的footer,其中readfooter方法便是调用parquet-hadoop方法中的api获取schema信息
private[parquet] def readParquetFootersInParallel(conf: Configuration,partFiles: Seq[FileStatus],ignoreCorruptFiles: Boolean): Seq[Footer] = {ThreadUtils.parmap(partFiles, "readingParquetFooters", 8) { currentFile =>try {// Skips row group information since we only need the schema.// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,// when it can't read the footer.Some(new Footer(currentFile.getPath(),ParquetFileReader.readFooter(conf, currentFile, SKIP_ROW_GROUPS)))} catch { case e: RuntimeException =>if (ignoreCorruptFiles) {logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)None} else {throw new IOException(s"Could not read footer for file: $currentFile", e)}}}.flatten}
上面的代码走读为通过parquet reader中的readFooter方法读取parquet文件的schema,
二、Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
一个很重要的逻辑就是将parquet的MessageType转换为Spark SQL中的StructType
def readSchemaFromFooter(footer: Footer, converter: ParquetToSparkSchemaConverter): StructType = {val fileMetaData = footer.getParquetMetadata.getFileMetaDatafileMetaData.getKeyValueMetaData.asScala.toMap.get(ParquetReadSupport.SPARK_METADATA_KEY).flatMap(deserializeSchemaString).getOrElse(converter.convert(fileMetaData.getSchema))}
上述代码中的convert 方法便是该功能,该函数位于类:
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter
具体代码如下:
/*** Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].*/def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType())private def convert(parquetSchema: GroupType): StructType = {val fields = parquetSchema.getFields.asScala.map { field =>field.getRepetition match {case OPTIONAL =>StructField(field.getName, convertField(field), nullable = true)case REQUIRED =>StructField(field.getName, convertField(field), nullable = false)case REPEATED =>// A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor// annotated by `LIST` or `MAP` should be interpreted as a required list of required// elements where the element type is the type of the field.val arrayType = ArrayType(convertField(field), containsNull = false)StructField(field.getName, arrayType, nullable = false)}}StructType(fields)}
本质上的逻辑便是将Parquet Type 转换为Spark SQL的DataType,具体逻辑不在贴代码,感兴趣的可以自己找到对应代码学习。
spark infer parquet schema相关推荐
- Spark读取Parquet格式的数据为Dataframe
SaveMode指定文件保存时的模式: OverWrite 覆盖 Append 追加 ErrorIfExists 如果存在就报错 Ignore 如果存在就忽略 val spark = SparkSes ...
- spark之parquet
列式存储布局(比如 Parquet)可以加速查询,因为它只检查所有需要的列并对它们的值执行计算,因此只读取一个数据文件或表的小部分数据.Parquet 还支持灵活的压缩选项,因此可以显著减少磁盘上的存 ...
- Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)
目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...
- Spark SQL将rdd转换为数据集-反射来推断Inferring the Schema Using Reflection
一:解读 官网:https://spark.apache.org/docs/latest/sql-getting-started.html The first method uses reflecti ...
- Spark Parquet使用
Spark SQL下的Parquet使用最佳实践和代码实战 分类: spark-sql(1) 一.Spark SQL下的Parquet使用最佳实践 1)过去整个业界对大数据的分析的技术栈的Pipe ...
- Spark操作外部数据源(RDBMS,Hive,HBase,Parquet)
文章目录 一.Spark SQL 二.Spark on Hive 三.Hive on Spark 四.Spark读取Parquet文件 五.Spark连接HBase 1.Maven工程添加依赖 2.代 ...
- spark读取hive表异常,处理WARN HiveExternalCatalog: The table schema given by Hive metastore
文章目录 1 问题概述 1.1 数据库表状况 1.2 问题背景 2 报错场景 2.1 修改Hive元数据信息 2.2 报错信息 2.3 其他现象 2.4 查看表结构时的发现 2.5 报错分析 2.6 ...
- Spark 实战 - 3.一文搞懂 parquet
一.引用 parquet 文件常见于 Spark.Hive.Streamin.MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面主要讲 parquet 文 ...
- spark读取文件源码分析-2
文章目录 1. job1产生时机源码分析 1. DataSoure.getOrInferFileFormatSchema() 2. ParquetFileFormat.inferSchema 1. 简 ...
- Spark SQL中的DataFrame
在2014年7月1日的 Spark Summit 上,Databricks 宣布终止对 Shark 的开发,将重点放到 Spark SQL 上.在会议上,Databricks 表示,Shark 更多是 ...
最新文章
- java类接口的区别_【Java基础】java接口和类的区别-瑶瑶吖的回答
- java读取jsp文件内容_使用Java读取Excel文件内容
- Proteus仿真STM32F103R6的寄存器版跑马灯程序(存储器宏定义)
- shell中的Here Document
- Java入门, 线程
- 【C++ grammar】对象和类(创建对象、对象拷贝、分离声明与实现)
- 矩阵连乘问题(c++)
- GBDT和XGBoost
- 英国电信移除华为设备后,多家运营商继续与华为合作,并达成20亿英镑协议...
- 【Java】面向对象基本特性-封装
- Datawhale编程学习之二叉树和堆(5)
- Json对象直接存取数据库
- NLP自然语言处理 之 jieba中文处理
- 高效能人士的七个习惯-第二章-阅读
- Mysql中LENGTH()函数
- 手把手教做无人驾驶算法(二十八)--Tube MPC与MPC区别
- piblic class 和class的区别
- 微信公众平台开发【发送消息】被动回复消息
- java杨戬的角色_非人哉:游戏角色反映了神仙们的爱好,杨戬太让人意外了
- 解决Windows 由于路径过长而无法删除文件的问题
热门文章
- containers matlab,Matlab 中实用数据结构之 containers.Map
- BugkuCTF –备份是个好习惯
- 推荐6款习惯养成APP,送给想要提升自己的人!
- Collaborative Evolutionary Reinforcement Learning
- 16岁天才开发的Summly获李嘉诚种子投资
- MATLAB——更换主题颜色
- cmd 一键清除系统垃圾
- 让孩子更快乐地学编程,一套积木就够了,长毛象AI百变编程积木套件体验
- C#往图片上面添加文字
- 某酒店App sign、appcode签名解析(一) 带壳分析 r0tracer