Spark writters 允许对数据 partitioned 到磁盘使用 partitionBy. 一些查询可以运行50到100倍的更快的在partitioned 数据湖, 所以分区对确定的查询至关重要.



你需要掌握在这里的概念来创建分区数据湖在大数据集上,尤其如果你处理高基数或数据倾斜严重的partition key.

内存分区 VS 磁盘分区

coalesce()repartition() 更改DataFrame 的内存分区.

partitionBy() 是DataFrameWriter 的一个方法, 这个方法明确指出是否数据应该写到磁盘带文件夹的磁盘中. 默认 Spark 不写数据到磁盘嵌套文件夹中.

内存分区通常独立于磁盘分区. 为了正确的写数据到磁盘, 你几乎总是需要在内存首先重分区数据.


假设我们有以下的CSV文件,里边有first_name, last_name, 和country列


让我们partition 这些数据到磁盘使用country做为分区key. 让我们为每个分区创建1个文件.

val path = new java.io.File("./src/main/resources/ss_europe/").getCanonicalPath
val df = spark.read.option("header", "true").option("charset", "UTF8").csv(path)
val outputPath = new java.io.File("./tmp/partitioned_lake1/").getCanonicalPath



每个磁盘分区创建一个文件并不适用于在生产环境下大小的数据集. 猜想下中国分区包含100GB数据 - 我们不能写所有数据到单一个文件中.

repartition(5) 的partitionBy


val outputPath = new java.io.File("./tmp/partitioned_lake2/").getCanonicalPath



The partitionBy writer will write out files on disk for each memory partition. The maximum number of files written out is the number of unique countries multiplied by the number of memory partitions.

In this example, we have 3 unique countries * 5 memory partitions, so up to 15 files could get written out (if each memory partition had one Argentinian, one Chinese, and one Russian person). We only have 5 rows of data, so only 5 files are written in this example.

partitionBy with repartition(1)

If we repartition the data to one memory partition before partitioning on disk with partitionBy, then we’ll write out a maximum of three files. numMemoryPartitions * numUniqueCountries = maxNumFiles. 1 * 3 = 3.

Let’s take a look at the code.

val outputPath = new java.io.File("./tmp/partitioned_lake2/").getCanonicalPath

Here’s what the files look like on disk:


Partitioning datasets with a max number of files per partition

Let’s use a dataset with 80 people from China, 15 people from France, and 5 people from Cuba. Here’s a link to the data.

Here’s what the data looks like:

...77 more China rows
...12 more France rows
...2 more Cuba rows

Let’s create 8 memory partitions and scatter the data randomly across the memory partitions (we’ll write out the data to disk, so we can inspect the contents of a memory partition).

 val outputPath = new java.io.File("./tmp/repartition_for_lake4/").getCanonicalPath
df.repartition(8, col("person_country"), rand).write.csv(outputPath)

Let’s look at one of the CSV files that is outputted:


This technique helps us set a maximum number of files per partition when creating a partitioned lake. Let’s write out the data to disk and observe the output.

val outputPath = new java.io.File("./tmp/partitioned_lake4/").getCanonicalPath
df.repartition(8, col("person_country"), rand).write.partitionBy("person_country").csv(outputPath)

Here’s what the files look like on disk:

partitioned_lake4/person_country=China/part-00000-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csvpart-00001-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv... 6 more filesperson_country=Cuba/part-00002-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csvpart-00003-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv... 2 more filesperson_country=France/part-00000-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csvpart-00001-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv... 5 more files

Each disk partition will have up to 8 files. The data is split randomly in the 8 memory partitions. There won’t be any output files for a given disk partition if the memory partition doesn’t have any data for the country.

This is better, but still not ideal. We have 4 files for Cuba and seven files for France, so too many small files are being created.

Let’s review the contents of our memory partition from earlier:


partitionBy will split up this particular memory partition into three files: one China file with 7 rows of data, one France file with one row of data, and one Cuba file with one row of data.

Partitioning dataset with max rows per file

Let’s write some code that’ll create partitions with ten rows of data per file. We’d like our data to be stored in 8 files for China, one file for Cuba, and two files for France.

We can use the maxRecordsPerFile option to output files with 10 rows.

val outputPath = new java.io.File("./tmp/partitioned_lake5/").getCanonicalPath
df.repartition(col("person_country")).write.option("maxRecordsPerFile", 10).partitionBy("person_country").csv(outputPath)

This technique is particularity important for partition keys that are highly skewed. The number of inhabitants by country is a good example of a partition key with high skew. For example Jamaica has 3 million people and China has 1.4 billion people – we’ll want ~467 times more files in the China partition than the Jamaica partition.

Partitioning dataset with max rows per file pre Spark 2.2

The maxRecordsPerFile option was added in Spark 2.2, so you’ll need to write your own custom solution if you’re using an earlier version of Spark.

val countDF = df.groupBy("person_country").count()
val desiredRowsPerPartition = 10
val joinedDF = df.join(countDF, Seq("person_country")).withColumn("my_secret_partition_key",(rand(10) * col("count") / desiredRowsPerPartition).cast(IntegerType))
val outputPath = new java.io.File("./tmp/partitioned_lake6/").getCanonicalPath
joinedDF.repartition(col("person_country"), col("my_secret_partition_key")).drop("count", "my_secret_partition_key").write.partitionBy("person_country").csv(outputPath)

We calculate the total number of records per partition key and then create a my_secret_partition_key column rather than relying on a fixed number of partitions.

You should choose the desiredRowsPerPartition based on what will give you ~1 GB files. If you have a 500 GB dataset with 750 million rows, set desiredRowsPerPartition to 1,500,000.

Small file problem

Partitioned data lakes can quickly develop a small file problem when they’re updated incrementally. It’s hard to compact partitioned data lakes. As we’ve seen, it’s even hard to make a partitioned data lake!

Use the tactics outlined in this blog post to build your partitioned data lakes and start them off without the small file problem!


Partitioned data lakes can be much faster to query (when filtering on the partition keys) because they allow for a massive data skipping.

Creating and maintaining partitioned data lakes is challenging, but the performance gains make them a worthwhile effort.

