文章目录

  • 背景
  • 一些尝试
    • MergeTable
    • 性能优化
  • 后记

对 spark 任务数据落地(HDFS) 碎片文件过多的问题的优化实践及思考。

背景

此文是关于公司在 Delta Lake 上线之前对Spark任务写入数据产生碎片文件优化的一些实践。

  • 形成原因
    数据在流转过程中经历 filter/shuffle 等过程后,开发人员难以评估作业写出的数据量。即使使用了 Spark 提供的AE功能,目前也只能控制 shuffle read 阶段的数据量,写出数据的大小实际还会受压缩算法及格式的影响,因此在任务运行时,对分区的数据评估非常困难。

    • shuffle 分区过多过碎,写入性能会较差且生成的小文件会非常多。
    • shuffle 分区过少过大,则写入并发度可能会不够,影响任务运行时间。
  • 不利影响
    在产生大量碎片文件后,任务数据读取的速度会变慢(需要寻找读入大量的文件,如果是机械盘更是需要大量的寻址操作),同时会对 hdfs namenode 内存造成很大的压力。

在这种情况下,只能让业务/开发人员主动的合并下数据或者控制分区数量,提高了用户的学习及使用成本,往往效果还非常不理想。
既然在运行过程中对最终落地数据的评估如此困难,是否能将该操作放在数据落地后进行?对此我们进行了一些尝试,希望能自动化的解决/缓解此类问题。

一些尝试

大致做了这么一些工作:

  1. 修改 Spark FileFormatWriter 源码,数据落盘时,记录相关的 metrics,主要是一些分区/表的记录数量和文件数量信息。
  2. 在发生落盘操作后,会自动触发碎片文件检测,判断是否需要追加合并数据任务。
  3. ​实现一个 MergeTable 语法用于合并表/分区碎片文件,通过系统或者用户直接调用。

第1和第2点主要是平台化的一些工作,包括监测数据落盘,根据采集的 metrics 信息再判断是否需要进行 MergeTable 操作,下文是关于 MergeTable 的一些细节实现。

MergeTable

功能:

  1. 能够指定表或者分区进行合并
  2. 合并分区表但不指定分区,则会递归对所有分区进行检测合并
  3. ​指定了生成的文件数量,就会跳过规则校验,直接按该数量进行合并

语法:

merge table [表名] [options (fileCount=合并后文件数量)]  --非分区表
merge table [表名] PARTITION (分区信息) [options (fileCount=合并后文件数量)] --分区表

碎片文件校验及合并流程图​:

性能优化

对合并操作的性能优化

  1. 只合并碎片文件
    如果设置的碎片阈值是128M,那么只会将该表/分区内小于该阈值的文件进行合并,同时如果碎片文件数量小于一定阈值,将不会触发合并,这里主要考虑的是合并任务存在一定性能开销,因此允许系统中存在一定量的小文件​。

  2. 分区数量及合并方式
    定义了一些规则用于计算输出文件数量及合并方式的选择,获取任务的最大并发度 maxConcurrency 用于计算数据的分块大小,再根据数据碎片文件的总大小选择合并(coalesce/repartition)方式。

    • 开启 dynamicAllocation
      maxConcurrency = spark.dynamicAllocation.maxExecutors * spark.executor.cores
    • 未开启 dynamicAllocation
      maxConcurrency = spark.executor.instances * spark.executor.cores

    以几个场景为例对比优化前后​的性能:
    场景1:最大并发度100,碎片文件数据100,碎片文件总大小100M,如果使用 coalesce(1),将会只会有1个线程去读/写数据,改为 repartition(1),则会有100个并发读,一个线程顺序写。性能相差100X。

    场景2:最大并发度100,碎片文件数量10000,碎片文件总大小100G,如果使用 repartition(200),将会导致100G的数据发生 shuffle,改为 coalesce(200),则能在保持相同并发的情况下避免 200G数据的IO。

    场景3:最大并发度200,碎片文件数量10000,碎片文件总大小50G,如果使用 coalesce(100),会保存出100个500M文件,但是会浪费一半的计算性能,改为 coalesce(200),合并耗时会下降为原来的50%。

    上述例子的核心都是在充分计算资源的同时避免不必要的IO。

  3. 修复元数据
    因为 merge 操作会修改数据的创建及访问时间,所以在目录替换时需要将元数据信息修改到 merge 前的一个状态,该操作还能避免冷数据扫描的误判。最后还要调用 refresh table 更新表在 spark 中的状态缓存。​

  4. commit 前进行校验
    在最终提交前对数据进行校验,判断合并前后数据量是否发生变化(从数据块元数据中直接获取数量,避免发生IO),存在异常则会进行回滚,放弃合并操作。​

数据写入后,自动合并效果图:

后记

收益
该同步合并的方式已经在我们的线上稳定运行了1年多,成功的将平均文件大小从150M提升到了270M左右,提高了数据读取速度,与此同时 Namenode 的内存压力也得到了极大缓解。

​对 MergeTable 操作做了上述的相关优化后,根据不同的数据场景下,能带来数倍至数十倍的性能提升。

缺陷
因为采用的是同步合并的方式,由于没有事务控制,所以在合并过程中数据不可用,这也是我们后来开始引入 D​elta Lake 的一个原因。

Spark 小文件合并优化实践相关推荐

  1. 关于hive on spark的distribute by和group by使用以及小文件合并问题

    欢迎关注交流微信公众号:小满锅 问题导言 最近在使用hive时,发现一些任务的因为使用mapreduce的缘故,跑的太慢了,才几十个G的数据就经常跑一个多小时,于是有了切换spark的想法. 但是刚刚 ...

  2. 大数据教程(10.6)自定义inputFormat(小文件合并)

    2019独角兽企业重金招聘Python工程师标准>>> 上一篇文章分析了运营商流量日志解析增强的实现,至此,mapreduce的组件中除了inputFormat全都自定义写过了!博主 ...

  3. 京东快递小程序分包优化实践

    前言- 随着项目规模增大,小程序分包优化是必须要面对的问题.分包不合理,不仅影响项目迭代和上线计划,还关乎用户体验,甚至因此导致 C 端用户流失.本文主要介绍京东快递小程序分包过程中踩过的坑,以及小程 ...

  4. MapReduce将小文件合并成大文件,并设置每个切片的大小的案例

    测试代码: package cn.toto.bigdata.combinefile;import java.io.IOException;import org.apache.hadoop.conf.C ...

  5. Hive insert into小文件问题优化解决

    Hive insert into小文件问题优化解决 insert into table hhl values ('1','11'); insert into table hhl values ('1' ...

  6. Hadoop HDFS 小文件合并

    HDFS的小文件合并 由于Hadoop擅长存储大文件,因为大文件的元数据信息比较少,如果Hadoop集群当中有大量的小文件,那么每个小文件都需要维护一份元数据信息,会大大的增加集群管理元数据的内存压力 ...

  7. MapReduce处理小文件合并

    一:小文件合并几种方式: 1. 在数据采集的时候,客户端就将小文件或小批数据合成大文件再上传HDFS 2. 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并 3. 在mapred ...

  8. spark 实现HDFS小文件合并

    一.首先使用sparksql读取需要合并的数据.当然有两种情况, 一种是读取全部数据,即需要合并所有小文件. 第二种是合并部分数据,比如只查询某一天的数据,只合并某一个天分区下的小文件. val df ...

  9. hdfs合并块_hdfs 小文件合并 问题

    玩过大数据的人,肯定都遇到过小文件问题.这也是玩大数据,必须跨过去的一个坎,要不然,大数据玩不转. 一,怎么定义小文件 hadoop1,默认存储64M.hadoop2,hadoop3默认是128M,当 ...

  10. Flink从入门到精通100篇(十五)-Flink SQL FileSystem Connector 分区提交与自定义小文件合并策略 ​

    前言 本文先通过源码简单过一下分区提交机制的两个要素--即触发(trigger)和策略(policy)的实现,然后用合并小文件的实例说一下自定义分区提交策略的方法. PartitionCommitTr ...

最新文章

  1. (转)小小的研究了一下linux下的”注册表“ gconf-editor
  2. Kmeans++、Mini-Batch Kmeans、Bisecting Kmeans、K中心点(K-Medoids)算法、K众数聚类、核K均值聚类
  3. python简单代码input-【python系统学习05】input函数——实现人机交互
  4. 计算机应用技术教程的答案,计算机应用技术教程第3章办公自动化答案
  5. JavaScript新发展 CoffeeScript Dart Rust Node.js
  6. 使用容器服务支持开发者快速搭建小程序后端服务
  7. Java集合 -- HashSet 和 HashMap
  8. 详解OTT与IPTV的不同之处
  9. antdesignpro 重定向到登录页面_URL重定向跳转绕过
  10. 清华大学计算机学科馆员,清华大学图书馆 学科馆员工作
  11. php 取整十整百,php取整数的方法与实例总结
  12. 黑马程序员-JavaSE核心知识-01Java介绍
  13. Ubuntu14.04安装文泉驿字体
  14. vr视频制作软件哪个好?Nibiru creator是企业的标配
  15. 使用Matlab对大地测量学中的大地坐标和大地空间直角坐标进行相互转换
  16. HCNE的110个知识点
  17. oracle数据库rank over用法,Oracle 中rank() over()的用法
  18. 阿里香港轻量服务器加上BBR会发生什么?
  19. Android无埋点数据收集SDK关键技术解析
  20. uniApp 生命周期【应用生命周期 和 页面生命周期】

热门文章

  1. 品牌笔记本主板刷BIOS升级NVME ssd启动系统
  2. 写一篇简单的微信接入
  3. Dummy variable (变量dummy化)
  4. SAP项目上的疑难杂症-(制品区分)如何处理?
  5. Java软件工程师面试常见问题集锦之一
  6. Vue中报如下错误Uncaught (in promise) NavigationDuplicated解决方案
  7. 方向键按键转发,模仿笔记本Fn按键
  8. csv用excel打开后乱码?
  9. 八、T100库存管理系统之月结管理
  10. python数学公式编辑工具_Karl的良心佳软推荐 篇二:目前最好用的数学公式编辑神器——Mathpix Snipping Tool...