你的Parquet该升级了:IOException: totalValueCount == 0问题定位之旅
本文分享自华为云社区《你的Parquet该升级了:IOException: totalValueCount == 0问题定位之旅》,原文作者:wzhfy 。
1. 问题描述
使用Spark SQL进行ETL任务,在读取某张表的时候报错:“IOException: totalValueCount == 0”,但该表在写入时,并没有什么异常。
2. 初步分析
该表的结果是由两表join后生成。经分析,join的结果产生了数据倾斜,且倾斜key为null。Join后每个task写一个文件,所以partition key为null的那个task将大量的null值写入了一个文件,null值个数达到22亿。
22亿这个数字比较敏感,正好超过int最大值2147483647(21亿多)。因此,初步怀疑parquet在写入超过int.max个value时有问题。
【注】本文只关注大量null值写入同一个文件导致读取时报错的问题。至于该列数据产生如此大量的null是否合理,不在本文讨论范围之内。
3. Deep dive into Parquet (version 1.8.3,部分内容可能需要结合Parquet源码进行理解)
入口:Spark(Spark 2.3版本) -> Parquet
Parquet调用入口在Spark,所以从Spark开始挖掘调用栈。
InsertIntoHadoopFsRelationCommand.run()/SaveAsHiveFile.saveAsHiveFile() -> FileFormatWriter.write()
这里分几个步骤:
- 启动作业前,创建outputWriterFactory: ParquetFileFormat.prepareWrite()。这里会设置一系列与parquet写文件有关的配置信息。其中主要的一个,是设置WriteSupport类:ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]),ParquetWriteSupport是Spark自己定义的类。
- 在executeTask() -> writeTask.execute()中,先通过outputWriterFactory创建OutputWriter (ParquetOutputWriter):outputWriterFactory.newInstance()。
- 对于每行记录,使用ParquetOutputWriter.write(InternalRow)方法依次写入parquet文件。
- Task结束前,调用ParquetOutputWriter.close()关闭资源。
3.1 Write过程
在ParquetOutputWriter中,通过ParquetOutputFormat.getRecordWriter构造一个RecordWriter(ParquetRecordWriter),其中包含了:
- prepareWrite()时设置的WriteSupport:负责转换Spark record并写入parquet结构
- ParquetFileWriter:负责写入文件
ParquetRecordWriter中,其实是把write操作委托给了一个internalWriter(InternalParquetRecordWriter,用WriteSupport和ParquetFileWriter构造)。
现在让我们梳理一下,目前为止的大致流程为:
SingleDirectoryWriteTask/DynamicPartitionWriteTask.execute
-> ParquetOutputWriter.write -> ParquetRecordWriter.write -> InternalParquetRecordWriter.write
接下来,InternalParquetRecordWriter.write里面,就是三件事:
(1)writeSupport.write,即ParquetWriteSupport.write,里面分三个步骤:
- MessageColumnIO.MessageColumnIORecordConsumer.startMessage;
- ParquetWriteSupport.writeFields:写入一行中各个列的值,null值除外;
- MessageColumnIO.MessageColumnIORecordConsumer.endMessage:针对第二步中的missing fields写入null值。
ColumnWriterV1.writeNull -> accountForValueWritten:
1) 增加计数器valueCount (int类型)
2) 检查空间是否已满,需要writePage - 检查点1
(2)增加计数器recordCount(long类型)
(3)检查block size,是否需要flushRowGroupToStore - 检查点2
由于写入的值全是null,在1、2两个检查点的memSize都为0,所以不会刷新page和row group。导致的结果就是,一直在往同一个page里增加null值。而ColumnWriterV1的计数器valueCount是int类型,当超过int.max时,溢出,变为了一个负数。
因此,只有当调用close()方法时(task结束时),才会执行flushRowGroupToStore:
ParquetOutputWriter.close -> ParquetRecordWriter.close
-> InternalParquetRecordWriter.close -> flushRowGroupToStore
-> ColumnWriteStoreV1.flush -> for each column ColumnWriterV1.flush
由于valueCount溢出为负,此处也不会写page。
因为未调用过writePage,所以此处的totalValueCount一直为0。
ColumnWriterV1.writePage -> ColumnChunkPageWriter.writePage -> 累计totalValueCount
在write结束时,InternalParquetRecordWriter.close -> flushRowGroupToStore -> ColumnChunkPageWriteStore.flushToFileWriter -> for each column ColumnChunkPageWriter.writeToFileWriter:
- ParquetFileWriter.startColumn:totalValueCount赋值给currentChunkValueCount
- ParquetFileWriter.writeDataPages
- ParquetFileWriter.endColumn:currentChunkValueCount(为0)和其他元数据信息构造出一个ColumnChunkMetaData,相关信息最终会被写入文件。
3.2 Read过程
同样以Spark为入口,进行查看。
初始化阶段:ParquetFileFormat.BuildReaderWithPartitionValues -> VectorizedParquetRecordReader.initialize -> ParquetFileReader.readFooter -> ParquetMetadataConverter.readParquetMetadata -> fromParquetMetadata -> ColumnChunkMetaData.get,其中包含valueCount(为0)。
读取时:VectorizedParquetRecordReader.nextBatch -> checkEndOfRowGroup:
1) ParquetFileReader.readNextRowGroup -> for each chunk, currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages())
由于getValueCount为0,所以pagesInChunk为空。
2)构造ColumnChunkPageReader:
由于page列表为空,所以totalValueCount为0,导致在构造VectorizedColumnReader时报了问题中的错误。
4. 解决方法:Parquet升级(version 1.11.1)
在新版本中,ParquetWriteSupport.write ->
MessageColumnIO.MessageColumnIORecordConsumer.endMessage ->
ColumnWriteStoreV1(ColumnWriteStoreBase).endRecord:
在endRecord中增加了每个page最大记录条数(默认2w条)的属性和检查逻辑,超出限制时会writePage,使得ColumnWriterV1的valueCount不会溢出(每次writePage后会清零)。
而对比老版本1.8.3中,ColumnWriteStoreV1.endRecord为空函数。
附:Parquet中的一个小trick
Parquet中为了节约空间,当一个long类型的值,在一定范围内时,会使用int来存储,其方法如下:
- 判断是否可以用int存储:
- 如果可以,用IntColumnChunkMetaData代替LongColumnChunkMetaData,构造时转换:
- 使用时,再转回来,IntColumnChunkMetaData.getValueCount -> intToPositiveLong():
普通的int范围是 -2^31 ~ (2^31 - 1),由于元数据信息(如valueCount等)都是非负整数,那么实际只能存储0 ~ (2^31 - 1) 范围的数。而用这种方法,可以表示0 ~ (2^32 - 1) 范围的数,表达范围也大了一倍。
附件:可用于复现的测试用例代码(依赖Spark部分类,可置于Spark工程中运行)
测试用例代码.txt 1.88KB
点击关注,第一时间了解华为云新鲜技术~
你的Parquet该升级了:IOException: totalValueCount == 0问题定位之旅相关推荐
- oracle 10.2 64位,Oracle 10.2.0.5 x64升级到11.2.0.3 x64
说明:11g数据库现在新部署的数量也很多的,对于10g数据库,现在整理一下10g到11g的升级过程.10.2.0.2以上版本才能升级到11.2.0.3版本. 升级说明:10.2.0.5(64)-> ...
- vCenter 5.5升级到vCenter 6.0实战指导
从vCenter 5.5升级到vCenter 6.0的过程是比较方便的,直接根据屏幕向导,一步一步完成就好了.不过这里有个小插曲,如何更改VC的独立磁盘模式,倒是有点小折腾.好了,下面让我们看下整个升 ...
- 为什么升级不了android版本,为啥你的手机无法升级到安卓7.0?原因都在这里!...
标签:安卓(179) 为什么你的手机无法升级到安卓7.0?这个问题可能很多人都在问吧,万年6.0都用烦了.最近,一加正式为旗下的一加3和一加3T推送氢OS 3.0正式版升级,该版本最大的特色就是系统内 ...
- AdminStudio 9.x优惠升级到AdminStudio 11.0的机会只剩三周
2012年1月26日,Flexera Software发布AdminStudio 11.0版本,同时宣布终止AdminStudio 9.0,9.01和9.5(End of Life,简称EOL),在3 ...
- 华为王成录:把安卓最核心部分换得差不多了 手机升级鸿蒙OS 2.0水到渠成
内容来自网易新闻 9月10日下午,华为在东莞松山湖举办了2020华为开发者大会,发布了鸿蒙OS 2.0.EMUI 11,并介绍HMS生态目前最新的成果和进展. 大会上,华为消费者业务CEO余承东表示, ...
- windows 2003 下oracle从10.2.0.1升级到10.2.0.4
方法一: 1. 完全安装10.2.0.1 2. 安装完成后,停止所有的oracle服务,可以通过停止oracle 的window services或者使用以下命令来实现. emctl stop dbc ...
- android——记录从android studio2.3升级到android studio3.0版本遇到的坑
今天手贱,升级到了AS3.0 ,结果遇到了坑.提示需要将gradle升级到3.5及以上版本.然后就开始动手升级.其中遇到了各种困难.首先,就是需要FQ下载.其次,就是配置后提示jcenter下的一个p ...
- Oracle 10g(10.2.0.4)升级到10.2.0.5.19
一.将数据库版本从10.2.0.4 升级到 10.2.0.5,再升级到10.2.0.5.19 (1) 备份等过程略过,一个老库的升级过程,记录之. (2) 一致性关闭数据库及监听 sqlplus ...
- Oracle 11.2.0.1 升级到 11.2.0.3 示例
Oracle 11.2.0.1 单实例升级到11.2.0.3. Oracle 升级的步骤都差不多. 先升级Oracle software,然后升级Oracle instance. Oracle 11. ...
最新文章
- 为什么神经网络的激活函数必须使用线性函数?
- 创业失败后,我决定开源所有产品代码
- 父与子的编程之旅:与小卡特一起学Python.pdf
- UA OPTI512R 傅立叶光学导论 采样定理例题
- 使用 Palette 让你的 UI 色彩与内容更贴合
- 命令行执行php脚本中的$argv和$argc配置方法
- 为提高研发和测试质量而规范Scrum项目需求描述
- c++string函数(一)——find、rfind详细用法
- ISA2006系列之三 详解防火墙的三种客户端(上)
- html怎样自动播放视频,html5如何实现自动播放视频?
- 比赛排行榜如何在LED大屏上实现自动实时滚动播报?
- python模拟登录的实现
- java se和ocjp_OCJP 考试题之七 - osc_sejhgcp0的个人空间 - OSCHINA - 中文开源技术交流社区...
- BearPi-IoT串口收发1-普通模式
- springBoot的shiro的简单项目部署
- MATLAB 数据分析方法(第2版)2.1 基本统计量与数据可视化
- 2020赚钱机会总结,拾元富另附10个副业赚钱必备的工具与平台,看看你到底错过了多少钱!
- 一段话中手机号中间四位做特殊处理
- 5G NR CSI-RS介绍(2)-- TRS
- 无线通信基础知识6:射频器件的基本参数2
热门文章
- 2016河北省职称计算机考试试题及答案,2016河北省职称计算机考试操作题答案.doc...
- 若依的框架怎么样_若依框架的功能代码
- catia创成式外形设计如何将两个面相合_汽车研发:车门铰链设计及布置要求解析!...
- caffe,caffe2 and pytorch
- A Spy in the Metro UVA - 1025
- Linux经常使用命令(八) - touch
- 【转载】gcc 使用中常用的参数及命令
- nodejs-express
- JSP开发环境配置问题解答
- ES 必备插件的安装