一.引用

parquet 文件常见于 Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面主要讲 parquet 文件在 spark 场景下的存储,读取与使用中可能遇到的坑。

二.Parquet 加载方式

1.SparkSession.read.parquet

SparkSession 位于 org.apache.spark.sql.SparkSession 类下,除了支持读取 parquet 的列式文件外,SparkSession 也支持读取 ORC 列式存储文件,可以参考: Spark 读取 ORC FIle

    val conf = new SparkConf().setAppName("ParquetInfo").setMaster("local")val spark = SparkSession.builder.config(conf).getOrCreate()spark.read.parquet(path).foreach(row => {val head = row.getString(0)println(head)})

读取后会获取一个 Sql.DataFrame,支持常见的 sql 语法操作,如果不想使用 sql 才做也可以通过 .rdd 的方法得到 RDD[Row],随后遍历每个 partition 下的 Iterator[Row] 即可。

Tips:

后续可以执行 sql 操作,当然也支持初始化 SqlContext 调用 sql 方法,不过用 SparkSession 也可以搞定。

    val parquetFileDF = spark.read.parquet("path")parquetFileDF.createOrReplaceTempView("tableName")val resultDf = spark.sql("SELECT * FROM tableName")val sqlContext = new SQLContext(sc)sqkContext.sql("xxx")

2.SparkContext.HadoopFile

使用 hadoopFile 读取时需要指定对应的 K-V 以及 InputFormat 的格式,Parquet  文件对应的 K-V 为 Void-ArrayWritable,其 InputFormat 为: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat,获取 ArrayWritable 后通过索引可以获得 Writable。

    val sc = spark.sparkContextsc.setLogLevel("error")val parquetInfo = sc.hadoopFile(path, classOf[MapredParquetInputFormat], classOf[Void], classOf[ArrayWritable])parquetInfo.take(5).foreach(info => {val writable = info._2.get()val head = writable(0)println(writable.length + "\t" + head)})

 Tips:

需要在 SparkConf 中加入序列化的配置,否则 hadoopFile 方法会报错:

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

writable 需要通过反序列化的方式才能再获取具体内容,所以这里推荐使用 SparkSession 的官方 api 读取,不过可以 RcFile SparkSession 暂不支持直接读取,所以可以用 sc.hadoopRdd 的方法读取同样列式存储的 RcFile 格式文件,可以参考: Spark 读取 RcFile

三.Parquet 存储方式

1.静态转换

Parquet -> Parquet,读取 parquet 生成 Sql.DataFrame 再转存,类似 RDD 的 transform:

    spark.read.parquet(path).write.mode(SaveMode.Overwrite).option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").format("parquet").save("/split")

2.RDD[T*] 转换

常规数据 RDD 可以通过加入 import sqlContext.implicits._ 隐式转换的方式由 RDD 转换为 sql.Dataframe,随后完成 parquet 的存储,下面掩饰一个 PairRDD 转换为 df 并存储的方法:

    import sqlContext.implicits._val commonStringRdd = sc.emptyRDD[(String, String)].toDF()commonStringRdd.write.mode(SaveMode.Overwrite).format("parquet").save("")

Tips:

SaveModel 分为 Append 追加、Overwrite 覆盖、ErrorIfExists 报错、Ignore 忽略四种模式,前两个比较好理解,后面两个前者代表如果地址已存在则报错,后者如果地址已存在则忽略且不影响原始数据。SaveModel 通过枚举 Enum 的方式实现:

详细的 RDD 转换 Sql.DataFrame 可以参考:Spark - RDD / ROW / sql.DataFrame 互转 。

3.RDD[Row] 转换

如果有生成的 RDD[Row] 就可以直接调用 sqlContext 将该 RDD 转换为 DataFrame。这里 TABLE_SCHEMA 可以看作是每一列数据的描述,类似 Hive 的 column 的信息,主要是字段名和类型,也可以添加额外的信息,sqlContext 将对应的列属性与 Row 一一匹配,如果 Schema 长度没有达到 Row 的总列数,则后续字段都只能读为 Null。

    val sqlContext = new SQLContext(sc)final val TABLE_SCHEME = StructType(Array(StructField("A", StringType),StructField("B", StringType),StructField("C", StringType),StructField("D", StringType),StructField("E", StringType),StructField("F", StringType),StructField("G", StringType),StructField("H", StringType)))val commonRowRdd = sc.emptyRDD[Row]sqlContext.createDataFrame(commonRowRdd, TABLE_SCHEME).write.mode(SaveMode.Overwrite).format("parquet").save("/split")

Tips:

使用上述语法读取时可能会报错: Illegal pattern component: XXX ,这是因为内部 DataFormat 解析的问题,在代码中加入 .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") 即可。

spark.read.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").parquet(path)

四.Parquet 浅析

Parquet 由于其开源,支持多平台多系统以及高效的存储和编码方案,使得其非常适合大数据场景下的任务开发,下面简单看下他的两个特性,列式存储和元数据存储:

1.列式存储 - 更小的 IO

CSV 是最常见的行式存储,对于一些需要单独特征或列的场景,如果是 CSV 文件需要遍历整行并分割,最终获取目标元素,而 Parquet 方式通过列式存储,对于单独的特征可以直接访问,从而提高了执行的效率,减少了数据 IO。

CSV: A,B,C,D,E -> Split(",")[col]
Parquet: A B C D E -> getString(col)

2.元数据存储 - 更高的压缩比

Parquet 采用多种编码 encoding 方式,保证数据的高效存储和低空间

A.Run Length encoding

游程编码,当一行的多列数据有很多重复数据时,可以通过 "X重复了N次" 的记录方法,缩小记录的成本,虽然 N 可能很大,但存储成本很小:

[1,2,1,1,1,1,2] -> 1-1,2-1,1-4,2-1

B.Dictionary encoding

字典编码,顾名思义就是通过映射,保存重复过多的数据,例如 "0" -> "LongString":

[LongString, LongString, LongString] -> [0, 0, 0]

C.Delta encoding

增量编码,适用于 unix 时间戳,时间戳记录为 1970年1月1日以来的秒数,存储时间戳时可以直接减去初始时间戳,减少存储量,比如 1577808000 作为基准,则可以减少很多存储空间:

[1577808000, 1577808004, 1577808008] -> [0, 4, 8]

3.存储-压缩对比

    val st = System.currentTimeMillis()val pairInfo = (0 to 1000000).zipWithIndex.toArrayval format = "csv" // csv、json、parquetsc.parallelize(pairInfo).toDF("A", "B").write.mode(SaveMode.Overwrite).option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").save(s"./output/$format")val saveType = "gzip" // text、gzipsc.parallelize(pairInfo).saveAsTextFile(s"./output/$saveType", classOf[GzipCodec])val cost = System.currentTimeMillis() - stprintln(s"耗时: $cost")

使用上述两种方法分别将 0 到 1000000 的数组存到对应文件,看一下存储的大小:

类型 Text Gzip Parquet CSV JSON
大小(MB) 15.8  4.6 8 13.8 23.8

相比于表格数据 CSV 和 JSON 存储,parquet 提供了更高的压缩比,Amazon S3 集群曾经对比过 CSV 与 parquet 的效率对比,使用 Parquet 可以缩减 87% 的大小,查询的速度快 34 倍 同时可以节省 99.7 的成本,所以在大数据量加经常需要个别列操作的场景下,Parquet 非常适合。

4.读取-效率对比

再分别读取上述文件:

    val csv = spark.read.csv(path + "/output/csv").rdd.count()val parquet = spark.read.parquet(path + "/output/parquet").rdd.count()val json = spark.read.json(path + "/output/json").count()val common = sc.textFile(path + "/output/common").count()val gz = sc.textFile(path + "/output/gzip").count()
类型 Text Gzip Parquet CSV JSON
耗时(ms) 1417 1448 4952 6870 6766

相比 CSV,JSON 是有优势的,但是相对于行数存储的 Text 和 Gzip,执行 count 类的行统计操作显然不是列式存储文件的强项,所以相差很多,如果是大数据下针对某个或几个字段统计,Parquet 会提供相比于行式存储文件更高的性能。

5.selectExpr

读取 Parquet 文件除了获取原始的字段内容外,也可以通过 selectExpr 操作获取更多额外的信息,方法位于 org.apache.spark.sql.functions 中,内部包含 collect_list 类似的聚合操作,也包含 count 类似的统计操作,还有 max、min、isnull 等等。

      spark.read.parquet(path).selectExpr("count('_c1')").rdd.foreach(row => {println(row.getLong(0))})

上述操作通过 selectExpr 获取了 count(_c1) 特征的数量,count Result:5383。

其中 _c1 为 Parquet 获取的 sql.DataFrame 的默认 schema,可以通过下述方法获取默认的 schema 信息:

      val schema = spark.read.parquet(path).schemaprintln(schema)

这里截取了一部分,特征名从 _c0 开始依次累加,默认为 _c0,_c1 ,如果自己定义了 schema 的 StructField ,使用 spark.read.schema().parqeut() 读出来的 sql.Dataframe 的 selectExpr 函数内操作使用的列名就要换成自己定义的名称,例如 _c1 我定义为 age,则上述写法要改为 count('age'),再使用 _c1 会报错。更详细的 schema 操作可以参考:Parquet 指定 schema

五.总结

Spark - Parquet 大致常用的内容就这些,SparkSession 集成了读取 parquet、orc 的 API 非常的便捷,有需要建议直接通过 API 读取而不是 HadoopRdd / HadoopFile 。最后想说 parquet 的命名确实很好玩,parquet 翻译为地板,而不定长的列名存储,如果通过平面展示也颇有地板的感觉。

Spark 实战 - 3.一文搞懂 parquet相关推荐

  1. 一文搞懂 Java 线程中断

    转载自   一文搞懂 Java 线程中断 在之前的一文<如何"优雅"地终止一个线程>中详细说明了 stop 终止线程的坏处及如何优雅地终止线程,那么还有别的可以终止线程 ...

  2. 一文搞懂AWS EC2, IGW, RT, NAT, SG 基础篇下

    B站实操视频更新 跟着拉面学习AWS--EC2, IGW, RT, NAT, SG 简介 长文多图预警,看结论可以直接拖到"总结"部分 本文承接上一篇文章介绍以下 AWS 基础概念 ...

  3. css股票曲线图图解,如何看懂股票曲线图,一文搞懂这些曲线所代表的含义!

    原标题:如何看懂股票曲线图,一文搞懂这些曲线所代表的含义! 在股票技术分析中,我们经常要分析K线图,分时图等等,有些新手在入门的时候常常把两者弄混,所以统称它们为曲线图,实际它们就是K线图和分时图了, ...

  4. 网络知识扫盲,一文搞懂 DNS

    在找工作面试的过程中,面试官非常喜欢考察基础知识,除了数据结构与算法之外,网络知识也是一个非常重要的考察对象. 而网络知识,通常是很抽象,不容易理解的,有很多同学就在这里裁了跟头.为了更好地通过面试, ...

  5. 一文搞懂MySQL数据库分库分表

    如果数据量过大,大家一般会分库分表.分库需要注意的内容比较少,但分表需要注意的内容就多了. 工作这几年没遇过数据量特别大的业务,那些过亿的数据,因为索引设置合理,单表性能没有影响,所以实战中一直没用过 ...

  6. 一文搞懂RNN(循环神经网络)

    基础篇|一文搞懂RNN(循环神经网络) https://mp.weixin.qq.com/s/va1gmavl2ZESgnM7biORQg 神经网络基础 神经网络可以当做是能够拟合任意函数的黑盒子,只 ...

  7. 一文搞懂 Python 的 import 机制

    一.前言 希望能够让读者一文搞懂 Python 的 import 机制 1.什么是 import 机制? 通常来讲,在一段 Python 代码中去执行引用另一个模块中的代码,就需要使用 Python ...

  8. python语言语句快的标记是什么_一文搞懂Python程序语句

    原标题:一文搞懂Python程序语句 程序流 Python 程序中常用的基本数据类型,包括: 内置的数值数据类型 Tuple 容器类型 String 容器类型 List 容器类型 自然的顺序是从页面或 ...

  9. 一文搞懂HMM(隐马尔可夫模型)-Viterbi algorithm

    ***一文搞懂HMM(隐马尔可夫模型)*** 简单来说,熵是表示物质系统状态的一种度量,用它老表征系统的无序程度.熵越大,系统越无序,意味着系统结构和运动的不确定和无规则:反之,,熵越小,系统越有序, ...

最新文章

  1. 常用MySQL函数存储过程_解析MySQL存储过程、常用函数代码
  2. RDKit | 基于分子指纹的相似性图
  3. 手机算通用计算机还是,电脑手机通用的便签是哪个?有人知道吗
  4. Externalizing Session State for a Spring Boot Application Using Spring-Session
  5. 注释 —— 《clean code》读后感
  6. 漂亮表格的CSS定义
  7. 火速围观!鹅厂中间件产品遭遇暴风吐槽
  8. oracle会话超时,Oracle EBS控制会话时间及超时
  9. HtmlParser提取网页中的纯文本信息
  10. [html] 制作页面时,前端如何适应各种异形屏?
  11. sendencpac文件能删吗_“手机刷短视频”躺赚?安装自动阅读文件,开机就能挣钱?靠谱吗?...
  12. Python基础(12)--模块
  13. html水滴掉下来越来越来越淡代码,水滴落到水面就消失了?没那么简单!看水滴如何翩翩起舞!...
  14. 写软件的需求分析全方位攻略
  15. 力扣-342 4的幂
  16. 【OCP题库-12c】最新CUUG OCP 071考试题库(69题)
  17. 了解HTTP和HTPS的S之差
  18. ZYNQ UltraScale MPSOC,使用PL端AXI_UART16550IP核,且在PS端控制下实现RS485通信-----轮询方式
  19. Visio 连线 取消自动附着,取消自动捕捉
  20. python中oct函数_Python中的oct() 函数是什么?

热门文章

  1. ios 微信h5支付取消或完成支付时不能返回原APP
  2. 基于java葡萄酒销售管理系统计算机毕业设计源码+系统+lw文档+mysql数据库+调试部署
  3. HEVC中什么是profile(档次)、level(水平)、tire(等级)
  4. Framework 全局监听屏幕点击事件 INPUT_EVENT_INJECTION
  5. 组策略最佳实践之“降龙十八掌
  6. windows系统之常用DOC命令汇总以及如何编写BAT脚本文件
  7. 给你打个QQ电话就能知道你在哪,你敢信?
  8. Win7专业版 下安装ArcGIS 9.3总结
  9. JQuery引用iCheck样式
  10. 视频教程-zabbix4.0原理到实战中部(基础架构之四)-Linux