本文分享自华为云社区《你的Parquet该升级了:IOException: totalValueCount == 0问题定位之旅》,原文作者:wzhfy 。

1. 问题描述

使用Spark SQL进行ETL任务,在读取某张表的时候报错:“IOException: totalValueCount == 0”,但该表在写入时,并没有什么异常。

2. 初步分析

该表的结果是由两表join后生成。经分析,join的结果产生了数据倾斜,且倾斜key为null。Join后每个task写一个文件,所以partition key为null的那个task将大量的null值写入了一个文件,null值个数达到22亿。

22亿这个数字比较敏感,正好超过int最大值2147483647(21亿多)。因此,初步怀疑parquet在写入超过int.max个value时有问题。

【注】本文只关注大量null值写入同一个文件导致读取时报错的问题。至于该列数据产生如此大量的null是否合理,不在本文讨论范围之内。

3. Deep dive into Parquet (version 1.8.3,部分内容可能需要结合Parquet源码进行理解)

入口:Spark(Spark 2.3版本) -> Parquet

Parquet调用入口在Spark,所以从Spark开始挖掘调用栈。

InsertIntoHadoopFsRelationCommand.run()/SaveAsHiveFile.saveAsHiveFile() -> FileFormatWriter.write()

这里分几个步骤:

  1. 启动作业前,创建outputWriterFactory: ParquetFileFormat.prepareWrite()。这里会设置一系列与parquet写文件有关的配置信息。其中主要的一个,是设置WriteSupport类:ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]),ParquetWriteSupport是Spark自己定义的类。
  2. 在executeTask() -> writeTask.execute()中,先通过outputWriterFactory创建OutputWriter (ParquetOutputWriter):outputWriterFactory.newInstance()。
  3. 对于每行记录,使用ParquetOutputWriter.write(InternalRow)方法依次写入parquet文件。
  4. Task结束前,调用ParquetOutputWriter.close()关闭资源。

3.1 Write过程

在ParquetOutputWriter中,通过ParquetOutputFormat.getRecordWriter构造一个RecordWriter(ParquetRecordWriter),其中包含了:

  1. prepareWrite()时设置的WriteSupport:负责转换Spark record并写入parquet结构
  2. ParquetFileWriter:负责写入文件

ParquetRecordWriter中,其实是把write操作委托给了一个internalWriter(InternalParquetRecordWriter,用WriteSupport和ParquetFileWriter构造)。

现在让我们梳理一下,目前为止的大致流程为:

SingleDirectoryWriteTask/DynamicPartitionWriteTask.execute
-> ParquetOutputWriter.write -> ParquetRecordWriter.write -> InternalParquetRecordWriter.write

接下来,InternalParquetRecordWriter.write里面,就是三件事:

(1)writeSupport.write,即ParquetWriteSupport.write,里面分三个步骤:

  1. MessageColumnIO.MessageColumnIORecordConsumer.startMessage;
  2. ParquetWriteSupport.writeFields:写入一行中各个列的值,null值除外;
  3. MessageColumnIO.MessageColumnIORecordConsumer.endMessage:针对第二步中的missing fields写入null值。
    ColumnWriterV1.writeNull -> accountForValueWritten:
    1) 增加计数器valueCount (int类型)
    2) 检查空间是否已满,需要writePage - 检查点1

(2)增加计数器recordCount(long类型)

(3)检查block size,是否需要flushRowGroupToStore - 检查点2

由于写入的值全是null,在1、2两个检查点的memSize都为0,所以不会刷新page和row group。导致的结果就是,一直在往同一个page里增加null值。而ColumnWriterV1的计数器valueCount是int类型,当超过int.max时,溢出,变为了一个负数。

因此,只有当调用close()方法时(task结束时),才会执行flushRowGroupToStore:
ParquetOutputWriter.close -> ParquetRecordWriter.close
-> InternalParquetRecordWriter.close -> flushRowGroupToStore
-> ColumnWriteStoreV1.flush -> for each column ColumnWriterV1.flush

由于valueCount溢出为负,此处也不会写page。

因为未调用过writePage,所以此处的totalValueCount一直为0。
ColumnWriterV1.writePage -> ColumnChunkPageWriter.writePage -> 累计totalValueCount

在write结束时,InternalParquetRecordWriter.close -> flushRowGroupToStore -> ColumnChunkPageWriteStore.flushToFileWriter -> for each column ColumnChunkPageWriter.writeToFileWriter:

  1. ParquetFileWriter.startColumn:totalValueCount赋值给currentChunkValueCount
  2. ParquetFileWriter.writeDataPages
  3. ParquetFileWriter.endColumn:currentChunkValueCount(为0)和其他元数据信息构造出一个ColumnChunkMetaData,相关信息最终会被写入文件。

3.2 Read过程

同样以Spark为入口,进行查看。
初始化阶段:ParquetFileFormat.BuildReaderWithPartitionValues -> VectorizedParquetRecordReader.initialize -> ParquetFileReader.readFooter -> ParquetMetadataConverter.readParquetMetadata -> fromParquetMetadata -> ColumnChunkMetaData.get,其中包含valueCount(为0)。

读取时:VectorizedParquetRecordReader.nextBatch -> checkEndOfRowGroup:
1) ParquetFileReader.readNextRowGroup -> for each chunk, currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages())

由于getValueCount为0,所以pagesInChunk为空。

2)构造ColumnChunkPageReader:

由于page列表为空,所以totalValueCount为0,导致在构造VectorizedColumnReader时报了问题中的错误。

4. 解决方法:Parquet升级(version 1.11.1)

在新版本中,ParquetWriteSupport.write ->
MessageColumnIO.MessageColumnIORecordConsumer.endMessage ->
ColumnWriteStoreV1(ColumnWriteStoreBase).endRecord:

在endRecord中增加了每个page最大记录条数(默认2w条)的属性和检查逻辑,超出限制时会writePage,使得ColumnWriterV1的valueCount不会溢出(每次writePage后会清零)。

而对比老版本1.8.3中,ColumnWriteStoreV1.endRecord为空函数。

附:Parquet中的一个小trick

Parquet中为了节约空间,当一个long类型的值,在一定范围内时,会使用int来存储,其方法如下:

  • 判断是否可以用int存储:

  • 如果可以,用IntColumnChunkMetaData代替LongColumnChunkMetaData,构造时转换:

  • 使用时,再转回来,IntColumnChunkMetaData.getValueCount -> intToPositiveLong():

普通的int范围是 -2^31 ~ (2^31 - 1),由于元数据信息(如valueCount等)都是非负整数,那么实际只能存储0 ~ (2^31 - 1) 范围的数。而用这种方法,可以表示0 ~ (2^32 - 1) 范围的数,表达范围也大了一倍。

附件:可用于复现的测试用例代码(依赖Spark部分类,可置于Spark工程中运行)

测试用例代码.txt 1.88KB

点击关注,第一时间了解华为云新鲜技术~

你的Parquet该升级了:IOException: totalValueCount == 0问题定位之旅相关推荐

  1. oracle 10.2 64位,Oracle 10.2.0.5 x64升级到11.2.0.3 x64

    说明:11g数据库现在新部署的数量也很多的,对于10g数据库,现在整理一下10g到11g的升级过程.10.2.0.2以上版本才能升级到11.2.0.3版本. 升级说明:10.2.0.5(64)-> ...

  2. vCenter 5.5升级到vCenter 6.0实战指导

    从vCenter 5.5升级到vCenter 6.0的过程是比较方便的,直接根据屏幕向导,一步一步完成就好了.不过这里有个小插曲,如何更改VC的独立磁盘模式,倒是有点小折腾.好了,下面让我们看下整个升 ...

  3. 为什么升级不了android版本,为啥你的手机无法升级到安卓7.0?原因都在这里!...

    标签:安卓(179) 为什么你的手机无法升级到安卓7.0?这个问题可能很多人都在问吧,万年6.0都用烦了.最近,一加正式为旗下的一加3和一加3T推送氢OS 3.0正式版升级,该版本最大的特色就是系统内 ...

  4. AdminStudio 9.x优惠升级到AdminStudio 11.0的机会只剩三周

    2012年1月26日,Flexera Software发布AdminStudio 11.0版本,同时宣布终止AdminStudio 9.0,9.01和9.5(End of Life,简称EOL),在3 ...

  5. 华为王成录:把安卓最核心部分换得差不多了 手机升级鸿蒙OS 2.0水到渠成

    内容来自网易新闻 9月10日下午,华为在东莞松山湖举办了2020华为开发者大会,发布了鸿蒙OS 2.0.EMUI 11,并介绍HMS生态目前最新的成果和进展. 大会上,华为消费者业务CEO余承东表示, ...

  6. windows 2003 下oracle从10.2.0.1升级到10.2.0.4

    方法一: 1. 完全安装10.2.0.1 2. 安装完成后,停止所有的oracle服务,可以通过停止oracle 的window services或者使用以下命令来实现. emctl stop dbc ...

  7. android——记录从android studio2.3升级到android studio3.0版本遇到的坑

    今天手贱,升级到了AS3.0 ,结果遇到了坑.提示需要将gradle升级到3.5及以上版本.然后就开始动手升级.其中遇到了各种困难.首先,就是需要FQ下载.其次,就是配置后提示jcenter下的一个p ...

  8. Oracle 10g(10.2.0.4)升级到10.2.0.5.19

    一.将数据库版本从10.2.0.4 升级到 10.2.0.5,再升级到10.2.0.5.19 (1) 备份等过程略过,一个老库的升级过程,记录之.   (2) 一致性关闭数据库及监听 sqlplus ...

  9. Oracle 11.2.0.1 升级到 11.2.0.3 示例

    Oracle 11.2.0.1 单实例升级到11.2.0.3. Oracle 升级的步骤都差不多. 先升级Oracle software,然后升级Oracle instance. Oracle 11. ...

最新文章

  1. 为什么神经网络的激活函数必须使用线性函数?
  2. 创业失败后,我决定开源所有产品代码
  3. 父与子的编程之旅:与小卡特一起学Python.pdf
  4. UA OPTI512R 傅立叶光学导论 采样定理例题
  5. 使用 Palette 让你的 UI 色彩与内容更贴合
  6. 命令行执行php脚本中的$argv和$argc配置方法
  7. 为提高研发和测试质量而规范Scrum项目需求描述
  8. c++string函数(一)——find、rfind详细用法
  9. ISA2006系列之三 详解防火墙的三种客户端(上)
  10. html怎样自动播放视频,html5如何实现自动播放视频?
  11. 比赛排行榜如何在LED大屏上实现自动实时滚动播报?
  12. python模拟登录的实现
  13. java se和ocjp_OCJP 考试题之七 - osc_sejhgcp0的个人空间 - OSCHINA - 中文开源技术交流社区...
  14. BearPi-IoT串口收发1-普通模式
  15. springBoot的shiro的简单项目部署
  16. MATLAB 数据分析方法(第2版)2.1 基本统计量与数据可视化
  17. 2020赚钱机会总结,拾元富另附10个副业赚钱必备的工具与平台,看看你到底错过了多少钱!
  18. 一段话中手机号中间四位做特殊处理
  19. 5G NR CSI-RS介绍(2)-- TRS
  20. 无线通信基础知识6:射频器件的基本参数2

热门文章

  1. 2016河北省职称计算机考试试题及答案,2016河北省职称计算机考试操作题答案.doc...
  2. 若依的框架怎么样_若依框架的功能代码
  3. catia创成式外形设计如何将两个面相合_汽车研发:车门铰链设计及布置要求解析!...
  4. caffe,caffe2 and pytorch
  5. A Spy in the Metro UVA - 1025
  6. Linux经常使用命令(八) - touch
  7. 【转载】gcc 使用中常用的参数及命令
  8. nodejs-express
  9. JSP开发环境配置问题解答
  10. ES 必备插件的安装