文章目录

  • 内存分区 VS 磁盘分区
  • 简单例子
  • 带`repartition(5)` 的partitionBy
  • partitionBy with repartition(1)
  • Partitioning datasets with a max number of files per partition
  • Partitioning dataset with max rows per file
  • Partitioning dataset with max rows per file pre Spark 2.2
  • Small file problem
  • Conclusion

Spark writters 允许对数据 partitioned 到磁盘使用 partitionBy. 一些查询可以运行50到100倍的更快的在partitioned 数据湖, 所以分区对确定的查询至关重要.

创建或维护分区数据湖非常难.

这个博客发布讨论怎样使用partitionBy和解释partitioning在生产磁盘数据集的挑战.不同内存partitioning方法会被讨论,让partitionBy操作更有效.

你需要掌握在这里的概念来创建分区数据湖在大数据集上,尤其如果你处理高基数或数据倾斜严重的partition key.

确保阅读写出漂亮的 Spark 代码
的怎样创建生产级别分区湖的细节概览.

内存分区 VS 磁盘分区

coalesce()repartition() 更改DataFrame 的内存分区.

partitionBy() 是DataFrameWriter 的一个方法, 这个方法明确指出是否数据应该写到磁盘带文件夹的磁盘中. 默认 Spark 不写数据到磁盘嵌套文件夹中.

内存分区通常独立于磁盘分区. 为了正确的写数据到磁盘, 你几乎总是需要在内存首先重分区数据.

简单例子

假设我们有以下的CSV文件,里边有first_name, last_name, 和country列

first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China

让我们partition 这些数据到磁盘使用country做为分区key. 让我们为每个分区创建1个文件.

val path = new java.io.File("./src/main/resources/ss_europe/").getCanonicalPath
val df = spark.read.option("header", "true").option("charset", "UTF8").csv(path)
val outputPath = new java.io.File("./tmp/partitioned_lake1/").getCanonicalPath
df.repartition(col("country")).write.partitionBy("country").parquet(outputPath)

下边是在磁盘上数据的样子

partitioned_lake1/country=Argentina/part-00044-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquetcountry=China/part-00059-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquetcountry=Russia/part-00002-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet

每个磁盘分区创建一个文件并不适用于在生产环境下大小的数据集. 猜想下中国分区包含100GB数据 - 我们不能写所有数据到单一个文件中.

repartition(5) 的partitionBy

运行repartition(5)使每行数据到独立的内存分区在运行partitionBy之前,查看会怎样影响,文件怎样写到磁盘.

val outputPath = new java.io.File("./tmp/partitioned_lake2/").getCanonicalPath
df.repartition(5).write.partitionBy("country").parquet(outputPath)

下边是文件在磁盘上的样子

partitioned_lake2/country=Argentina/part-00003-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquetcountry=China/part-00000-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquetpart-00004-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquetcountry=Russia/part-00001-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquetpart-00002-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet

The partitionBy writer will write out files on disk for each memory partition. The maximum number of files written out is the number of unique countries multiplied by the number of memory partitions.

In this example, we have 3 unique countries * 5 memory partitions, so up to 15 files could get written out (if each memory partition had one Argentinian, one Chinese, and one Russian person). We only have 5 rows of data, so only 5 files are written in this example.

partitionBy with repartition(1)

If we repartition the data to one memory partition before partitioning on disk with partitionBy, then we’ll write out a maximum of three files. numMemoryPartitions * numUniqueCountries = maxNumFiles. 1 * 3 = 3.

Let’s take a look at the code.

val outputPath = new java.io.File("./tmp/partitioned_lake2/").getCanonicalPath
df.repartition(1).write.partitionBy("country").parquet(outputPath)

Here’s what the files look like on disk:

partitioned_lake3/country=Argentina/part-00000-bc6ce757-d39f-489e-9677-0a7105b29e66.c000.snappy.parquetcountry=China/part-00000-bc6ce757-d39f-489e-9677-0a7105b29e66.c000.snappy.parquetcountry=Russia/part-00000-bc6ce757-d39f-489e-9677-0a7105b29e66.c000.snappy.parquet

Partitioning datasets with a max number of files per partition

Let’s use a dataset with 80 people from China, 15 people from France, and 5 people from Cuba. Here’s a link to the data.

Here’s what the data looks like:

 person_name,person_country
a,China
b,China
c,China
...77 more China rows
a,France
b,France
c,France
...12 more France rows
a,Cuba
b,Cuba
c,Cuba
...2 more Cuba rows

Let’s create 8 memory partitions and scatter the data randomly across the memory partitions (we’ll write out the data to disk, so we can inspect the contents of a memory partition).

 val outputPath = new java.io.File("./tmp/repartition_for_lake4/").getCanonicalPath
df.repartition(8, col("person_country"), rand).write.csv(outputPath)

Let’s look at one of the CSV files that is outputted:

p,China
f1,China
n1,China
a2,China
b2,China
d2,China
e2,China
f,France
c,Cuba

This technique helps us set a maximum number of files per partition when creating a partitioned lake. Let’s write out the data to disk and observe the output.

val outputPath = new java.io.File("./tmp/partitioned_lake4/").getCanonicalPath
df.repartition(8, col("person_country"), rand).write.partitionBy("person_country").csv(outputPath)

Here’s what the files look like on disk:

partitioned_lake4/person_country=China/part-00000-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csvpart-00001-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv... 6 more filesperson_country=Cuba/part-00002-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csvpart-00003-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv... 2 more filesperson_country=France/part-00000-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csvpart-00001-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv... 5 more files

Each disk partition will have up to 8 files. The data is split randomly in the 8 memory partitions. There won’t be any output files for a given disk partition if the memory partition doesn’t have any data for the country.

This is better, but still not ideal. We have 4 files for Cuba and seven files for France, so too many small files are being created.

Let’s review the contents of our memory partition from earlier:

p,China
f1,China
n1,China
a2,China
b2,China
d2,China
e2,China
f,France
c,Cuba

partitionBy will split up this particular memory partition into three files: one China file with 7 rows of data, one France file with one row of data, and one Cuba file with one row of data.

Partitioning dataset with max rows per file

Let’s write some code that’ll create partitions with ten rows of data per file. We’d like our data to be stored in 8 files for China, one file for Cuba, and two files for France.

We can use the maxRecordsPerFile option to output files with 10 rows.

val outputPath = new java.io.File("./tmp/partitioned_lake5/").getCanonicalPath
df.repartition(col("person_country")).write.option("maxRecordsPerFile", 10).partitionBy("person_country").csv(outputPath)

This technique is particularity important for partition keys that are highly skewed. The number of inhabitants by country is a good example of a partition key with high skew. For example Jamaica has 3 million people and China has 1.4 billion people – we’ll want ~467 times more files in the China partition than the Jamaica partition.

Partitioning dataset with max rows per file pre Spark 2.2

The maxRecordsPerFile option was added in Spark 2.2, so you’ll need to write your own custom solution if you’re using an earlier version of Spark.

val countDF = df.groupBy("person_country").count()
val desiredRowsPerPartition = 10
val joinedDF = df.join(countDF, Seq("person_country")).withColumn("my_secret_partition_key",(rand(10) * col("count") / desiredRowsPerPartition).cast(IntegerType))
val outputPath = new java.io.File("./tmp/partitioned_lake6/").getCanonicalPath
joinedDF.repartition(col("person_country"), col("my_secret_partition_key")).drop("count", "my_secret_partition_key").write.partitionBy("person_country").csv(outputPath)

We calculate the total number of records per partition key and then create a my_secret_partition_key column rather than relying on a fixed number of partitions.

You should choose the desiredRowsPerPartition based on what will give you ~1 GB files. If you have a 500 GB dataset with 750 million rows, set desiredRowsPerPartition to 1,500,000.

Small file problem

Partitioned data lakes can quickly develop a small file problem when they’re updated incrementally. It’s hard to compact partitioned data lakes. As we’ve seen, it’s even hard to make a partitioned data lake!

Use the tactics outlined in this blog post to build your partitioned data lakes and start them off without the small file problem!

Conclusion

Partitioned data lakes can be much faster to query (when filtering on the partition keys) because they allow for a massive data skipping.

Creating and maintaining partitioned data lakes is challenging, but the performance gains make them a worthwhile effort.

Spark Partitioning on Disk with partitionBy相关推荐

  1. spark按照key分区:partitionBy

    说明 RDD中的元素按照key指定的分区规则进行分区. RDD中的元素必须是键值对类型. 如果原有的partitionRDD和现有的partitionRDD一致的话就不进行分区,否则会发生shuffl ...

  2. 国内外大型系统架构设计准则和设计案例

    目录 Principle Scalability Availability Stability Performance Intelligence Architecture Interview Orga ...

  3. centos7系列Cobbler+kickstart全自动装机实战

    配置yum源,以及epel源 [root@crobbler-90111 ~]# cat /etc/yum.repos.d/aliyun.repo [epel] name=ailiyun baseurl ...

  4. hadoop-hbase-spark单机版安装

    0 需要开放的外网端口 50070,8088,60010 ,7077 1 设置ssh免密码登录 ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa cat ~/.ssh/ ...

  5. linux创建分区_在Linux中创建分区-分步指南

    linux创建分区 In this tutorial, we'll be covering the steps to create a partition in Linux. This can hel ...

  6. 在 Oracle Enterprise Linux 和 iSCSI 上构建您自己的 Oracle RAC 集群(续)

    DBA:Linux    下载  Oracle 数据库 11g    标签 linux, rac, clustering, 全部 在 Oracle Enterprise Linux 和 iSCSI 上 ...

  7. 在 Oracle Enterprise Linux 和 iSCSI 上构建您自己的 Oracle RAC 11g 集群

    作者:Jeffrey Hunter 了解如何以低于 2,700 美元的费用在 Oracle Enterprise Linux 上安装并配置 Oracle RAC 11g 第 2 版开发集群. 本指南中 ...

  8. Area of a circle

    In geometry, the area enclosed by a circle of radius r is πr2. Here the Greek letter π represents th ...

  9. Linux学习笔记(二) 安装Fedora Linux

    第二课:安装Fedora Linux .安装前的准备 .收集硬件信息 .准备安装文件 .准备安装空间 .确定安装方式 .开始安装 .安装后的配置 收集硬件信息 .检查兼容性 .RedHat Linux ...

最新文章

  1. 关于LUA+Unity开发_toLua篇【二】
  2. golang beego 文件上传
  3. 网络编程五种IO模型之epoll模型
  4. c++语言标准 pdf,C++14标准.pdf
  5. Linux基础学习七:mysql的安装和配置教程
  6. zc702运行linux,笔记:ZC702之linux运行
  7. mybatis查询返回null解决方案
  8. 7-9 龟兔赛跑 (20 分)
  9. caffe MNISTAutoencoder
  10. CentOs 开启ssh服务
  11. (继承及其访问限定符)(派生类及其默认成员函数)(赋值兼容规则)
  12. Java并发之Condition接口
  13. 深度置信网络(DBN)【经典的DBN网络结构是由若干层 RBM(受限波尔兹曼机)和一层 BP 组成的一种深层神经网络】
  14. 全智通A+常见问题汇总解答—A+维修管理—维修领料,修改领料单材料归属到了另一个维修单下
  15. MME中DNS服务器的作用,2.1 EPC中通过DNS解析PGW IP地址实例
  16. 鹏业安装算量软件项目管理功能的操作步骤
  17. 计算机程序设计c++ 4.9:字符串子串
  18. Visio2010中设置线为直线
  19. Java实现split字符串分割方法
  20. 5、中小企业网络架构-核心层交换机基本配置

热门文章

  1. 【Flutter- 渲染机制-渲染模型】
  2. python暴力破解压缩包密码(python暴力破解zip压缩包)
  3. 1011 World Cup Betting
  4. 巴菲特致股东的信pdf_巴菲特2020年致股东信:长线持股胜过买债券;好企业具备三大特征...
  5. LOGO SEO优化
  6. Extreme Table简介及使用
  7. 解析江民杀毒软件反病毒核心技术
  8. 算法 | 布朗运动与醉汉 赌徒的关系
  9. 2022北航敏捷软件工程 第四次博客作业
  10. 如何用python打印等腰三角形