文章目录

  • 1. 问题背景
  • 2. 测试代码
  • 3. 生成的DAG图
    • 1. job0
    • 2. job1
  • 4. job0 产生的时机源码分析
    • 1. 调用DataFrameReader.load,DataFrameReader.loadV1Source
    • 2. 调用DataSoure.resolveRelation方法
    • 3. 调用DataSource.getOrInferFileFormatSchema()
    • 4. InMemoryFileIndex 初始化
    • 5. 调用InMemoryFileIndex.bulkListLeafFiles 方法
      • 1. path.size判断是否生成job
      • 2. list-files 的job0
        • 1. 设置job-description
        • 2. 接下来开始创建执行job
  • 5. 调用链总结

1. 问题背景

  在测试spark任务的时候,发现读取目录下的多个文件,和直接读取一个文件,spark的DAG中对应的job个数不一样,读取目录下的多个文件比单个文件多一个job,下面从源码的角度做一个简单的分析,本篇文章比较长,所以分为两篇,第一篇介绍job0的源码分析过程,第二篇介绍job1的源码分析过程。

2. 测试代码

public class UserProfileTest {static String filePath = "hdfs:///user/daily/20200828/*.parquet";public static void main(String[] args) {SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("user_profile_test").set(ConfigurationOptions.ES_NODES, "").set(ConfigurationOptions.ES_PORT, "").set(ConfigurationOptions.ES_MAPPING_ID, "uid");//主要想要考察一下这个地方为什么会产生更多的jobSparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();Dataset<Row> userProfileSource = sparkSession.read().parquet(filePath);userProfileSource.count();userProfileSource.write().parquet("hdfs:///user/daily/result2020082808/");}
}

3. 生成的DAG图

我们这里可以看到

Dataset<Row> userProfileSource = sparkSession.read().parquet(filePath);

这一句产生了两个job,我们这里也只关注这两个job

截取上面的有效部分放大

1. job0

job0的Description

Listing leaf files and directories for 100 paths:
hdfs://hadoop-01:9000/user/daily/20200828/part-00000-0e0dc5b5-5061-41ca-9fa6-9fb7b3e09e98-c000.snappy.parquet, ...
parquet at UserProfileTest.java:26

job1的partition数量是100

2. job1

job1的Description

parquet at UserProfileTest.java:26
parquet at UserProfileTest.java:26

想知道这两个job产生的时机,为什么会有这个区别。

4. job0 产生的时机源码分析

1. 调用DataFrameReader.load,DataFrameReader.loadV1Source

sparkSession.read().parquet(filePath)会走到 DataFrameReader.load方法,执行条件判断的时候会走到最后一个else 执行 loadV1Source

/*** Loads input in as a `DataFrame`, for data sources that support multiple paths.* Only works if the source is a HadoopFsRelationProvider.** @since 1.6.0*/@scala.annotation.varargsdef load(paths: String*): DataFrame = {if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {throw new AnalysisException("Hive data source can only be used with tables, you can not " +"read files of Hive data source directly.")}val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)if (classOf[DataSourceV2].isAssignableFrom(cls)) {val ds = cls.newInstance()val options = new DataSourceOptions((extraOptions ++DataSourceV2Utils.extractSessionConfigs(ds = ds.asInstanceOf[DataSourceV2],conf = sparkSession.sessionState.conf)).asJava)// Streaming also uses the data source V2 API. So it may be that the data source implements// v2, but has no v2 implementation for batch reads. In that case, we fall back to loading// the dataframe as a v1 source.val reader = (ds, userSpecifiedSchema) match {case (ds: ReadSupportWithSchema, Some(schema)) =>ds.createReader(schema, options)case (ds: ReadSupport, None) =>ds.createReader(options)case (ds: ReadSupportWithSchema, None) =>throw new AnalysisException(s"A schema needs to be specified when using $ds.")case (ds: ReadSupport, Some(schema)) =>val reader = ds.createReader(options)if (reader.readSchema() != schema) {throw new AnalysisException(s"$ds does not allow user-specified schemas.")}readercase _ => null // fall back to v1}if (reader == null) {loadV1Source(paths: _*)} else {Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))}} else {// 会走到这里来loadV1Source(paths: _*)}}调用这个方法private def loadV1Source(paths: String*) = {// Code path for data source v1.sparkSession.baseRelationToDataFrame(DataSource.apply(sparkSession,paths = paths,userSpecifiedSchema = userSpecifiedSchema,className = source,options = extraOptions.toMap).resolveRelation())}

loadV1Source中new了一个DataSource对象,这里的apply方法是因为DataSource是case类,所以产生了伴生对象,在其中定义了apply和unapply方法,参考这里进一步了解apply

然后调用了DataSoure对象的resolveRelation()方法。

2. 调用DataSoure.resolveRelation方法

/*** Create a resolved [[BaseRelation]] that can be used to read data from or write data into this* [[DataSource]]** @param checkFilesExist Whether to confirm that the files exist when generating the*                        non-streaming file based datasource. StructuredStreaming jobs already*                        list file existence, and when generating incremental jobs, the batch*                        is considered as a non-streaming file based data source. Since we know*                        that files already exist, we don't need to check them again.*/def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {val relation = (providingClass.newInstance(), userSpecifiedSchema) match {// TODO: Throw when too much is given.case (dataSource: SchemaRelationProvider, Some(schema)) =>dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)case (dataSource: RelationProvider, None) =>dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)case (_: SchemaRelationProvider, None) =>throw new AnalysisException(s"A schema needs to be specified when using $className.")case (dataSource: RelationProvider, Some(schema)) =>val baseRelation =dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)if (baseRelation.schema != schema) {throw new AnalysisException(s"$className does not allow user-specified schemas.")}baseRelation// We are reading from the results of a streaming query. Load files from the metadata log// instead of listing them using HDFS APIs.case (format: FileFormat, _)if FileStreamSink.hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths,sparkSession.sessionState.newHadoopConf()) =>val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)val tempFileCatalog = new MetadataLogFileIndex(sparkSession, basePath, None)val fileCatalog = if (userSpecifiedSchema.nonEmpty) {val partitionSchema = combineInferredAndUserSpecifiedPartitionSchema(tempFileCatalog)new MetadataLogFileIndex(sparkSession, basePath, Option(partitionSchema))} else {tempFileCatalog}val dataSchema = userSpecifiedSchema.orElse {format.inferSchema(sparkSession,caseInsensitiveOptions,fileCatalog.allFiles())}.getOrElse {throw new AnalysisException(s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " +"It must be specified manually")}HadoopFsRelation(fileCatalog,partitionSchema = fileCatalog.partitionSchema,dataSchema = dataSchema,bucketSpec = None,format,caseInsensitiveOptions)(sparkSession)// This is a non-streaming file based datasource.// 最后会命中这个casecase (format: FileFormat, _) =>val allPaths = caseInsensitiveOptions.get("path") ++ pathsval hadoopConf = sparkSession.sessionState.newHadoopConf()val globbedPaths = allPaths.flatMap(DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArrayval fileStatusCache = FileStatusCache.getOrCreate(sparkSession)// 这里会发生调用关系val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytesnew CatalogFileIndex(sparkSession,catalogTable.get,catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))} else {new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache)}HadoopFsRelation(fileCatalog,partitionSchema = partitionSchema,dataSchema = dataSchema.asNullable,bucketSpec = bucketSpec,format,caseInsensitiveOptions)(sparkSession)case _ =>throw new AnalysisException(s"$className is not a valid Spark SQL Data Source.")}relation match {case hs: HadoopFsRelation =>SchemaUtils.checkColumnNameDuplication(hs.dataSchema.map(_.name),"in the data schema",equality)SchemaUtils.checkColumnNameDuplication(hs.partitionSchema.map(_.name),"in the partition schema",equality)case _ =>SchemaUtils.checkColumnNameDuplication(relation.schema.map(_.name),"in the data schema",equality)}relation}

在上面方法的

val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)

调用了 getOrInferFileFormatSchema方法

3. 调用DataSource.getOrInferFileFormatSchema()

private def getOrInferFileFormatSchema(format: FileFormat,fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {// the operations below are expensive therefore try not to do them if we don't need to, e.g.,// in streaming mode, we have already inferred and registered partition columns, we will// never have to materialize the lazy val below// 这里定义的是lazy变量,最终使用的时候才会初始化lazy val tempFileIndex = {val allPaths = caseInsensitiveOptions.get("path") ++ pathsval hadoopConf = sparkSession.sessionState.newHadoopConf()val globbedPaths = allPaths.toSeq.flatMap { path =>val hdfsPath = new Path(path)val fs = hdfsPath.getFileSystem(hadoopConf)val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)}.toArray// 这个地方初始化了InMemoryFileIndex 对象,也就是在这里形成了第一个jobnew InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)}val partitionSchema = if (partitionColumns.isEmpty) {// Try to infer partitioning, because no DataSource in the read path provides the partitioning// columns properly unless it is a Hive DataSource// 在这里第一次真正使用lazy的tempFileIndex变量,也就促使了InMemoryFileIndex 的初始化。combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex)} else {// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred// partitioningif (userSpecifiedSchema.isEmpty) {val inferredPartitions = tempFileIndex.partitionSchemainferredPartitions} else {val partitionFields = partitionColumns.map { partitionColumn =>userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {val inferredPartitions = tempFileIndex.partitionSchemaval inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))if (inferredOpt.isDefined) {logDebug(s"""Type of partition column: $partitionColumn not found in specified schema|for $format.|User Specified Schema|=====================|${userSpecifiedSchema.orNull}||Falling back to inferred dataType if it exists.""".stripMargin)}inferredOpt}.getOrElse {throw new AnalysisException(s"Failed to resolve the schema for $format for " +s"the partition column: $partitionColumn. It must be specified manually.")}}StructType(partitionFields)}}val dataSchema = userSpecifiedSchema.map { schema =>StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))}.orElse {format.inferSchema(sparkSession,caseInsensitiveOptions,tempFileIndex.allFiles())}.getOrElse {throw new AnalysisException(s"Unable to infer schema for $format. It must be specified manually.")}// We just print a waring message if the data schema and partition schema have the duplicate// columns. This is because we allow users to do so in the previous Spark releases and// we have the existing tests for the cases (e.g., `ParquetHadoopFsRelationSuite`).// See SPARK-18108 and SPARK-21144 for related discussions.try {SchemaUtils.checkColumnNameDuplication((dataSchema ++ partitionSchema).map(_.name),"in the data schema and the partition schema",equality)} catch {case e: AnalysisException => logWarning(e.getMessage)}(dataSchema, partitionSchema)}

在这里会调用到

      new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)

4. InMemoryFileIndex 初始化

接着来看看InMemoryFileIndex 类


class InMemoryFileIndex(sparkSession: SparkSession,rootPathsSpecified: Seq[Path],parameters: Map[String, String],partitionSchema: Option[StructType],fileStatusCache: FileStatusCache = NoopCache)extends PartitioningAwareFileIndex(sparkSession, parameters, partitionSchema, fileStatusCache) {// Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir)// or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain// such streaming metadata dir or files, e.g. when after globbing "basePath/*" where "basePath"// is the output of a streaming query.override val rootPaths =rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory(_, hadoopConf))@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _@volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _@volatile private var cachedPartitionSpec: PartitionSpec = _//该类在初始化的时候回执行 ```refresh0 ```方法refresh0()............

该类在初始化的时候回执行 refresh0方法

 private def refresh0(): Unit = {// 这里发生了调用val files = listLeafFiles(rootPaths)cachedLeafFiles =new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)cachedPartitionSpec = null}

refresh0方法中又会调用 listLeafFiles(rootPaths)方法。

/*** List leaf files of given paths. This method will submit a Spark job to do parallel* listing whenever there is a path having more files than the parallel partition discovery* discovery threshold.** This is publicly visible for testing.*/def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {val output = mutable.LinkedHashSet[FileStatus]()val pathsToFetch = mutable.ArrayBuffer[Path]()for (path <- paths) {fileStatusCache.getLeafFiles(path) match {case Some(files) =>HiveCatalogMetrics.incrementFileCacheHits(files.length)output ++= filescase None =>pathsToFetch += path}Unit // for some reasons scalac 2.12 needs this; return type doesn't matter}val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))// 这里发生了bulkListLeafFiles 的调用val discovered = InMemoryFileIndex.bulkListLeafFiles(pathsToFetch, hadoopConf, filter, sparkSession)discovered.foreach { case (path, leafFiles) =>HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)fileStatusCache.putLeafFiles(path, leafFiles.toArray)output ++= leafFiles}output}
}

然后又发生了对InMemoryFileIndex.bulkListLeafFiles方法的调用

5. 调用InMemoryFileIndex.bulkListLeafFiles 方法

/*** Lists a collection of paths recursively. Picks the listing strategy adaptively depending* on the number of paths to list.** This may only be called on the driver.** @return for each input path, the set of discovered files for the path*/private def bulkListLeafFiles(paths: Seq[Path],hadoopConf: Configuration,filter: PathFilter,sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {//在这里如果path下的数量小于32(parallelPartitionDiscoveryThreshold的默认值),就直接返回了,// 如果大于32的话会开一个job单独来查找有哪些文件,防止万一path下的文件太多耗时比较长// Short-circuits parallel listing when serial listing is likely to be faster.if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {return paths.map { path =>(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))}}logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")HiveCatalogMetrics.incrementParallelListingJobCount(1)val sparkContext = sparkSession.sparkContextval serializableConfiguration = new SerializableConfiguration(hadoopConf)val serializedPaths = paths.map(_.toString)val parallelPartitionDiscoveryParallelism =sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism// Set the number of parallelism to prevent following file listing from generating many tasks// in case of large #defaultParallelism.val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)val statusMap = try {// 在这里会判断出 job的description为 Listing leaf files and directories for 100 paths:val description = paths.size match {case 0 =>s"Listing leaf files and directories 0 paths"case 1 =>s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"case s =>s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."}//这里对job Description进行设置sparkContext.setJobDescription(description)sparkContext.parallelize(serializedPaths, numParallelism).mapPartitions { pathStrings =>val hadoopConf = serializableConfiguration.valuepathStrings.map(new Path(_)).toSeq.map { path =>(path, listLeafFiles(path, hadoopConf, filter, None))}.iterator}.map { case (path, statuses) =>val serializableStatuses = statuses.map { status =>// Turn FileStatus into SerializableFileStatus so we can send it back to the driverval blockLocations = status match {case f: LocatedFileStatus =>f.getBlockLocations.map { loc =>SerializableBlockLocation(loc.getNames,loc.getHosts,loc.getOffset,loc.getLength)}case _ =>Array.empty[SerializableBlockLocation]}SerializableFileStatus(status.getPath.toString,status.getLen,status.isDirectory,status.getReplication,status.getBlockSize,status.getModificationTime,status.getAccessTime,blockLocations)}(path.toString, serializableStatuses)// 这里的collect() 为action算子,所以会触发一个job的形成}.collect()} finally {sparkContext.setJobDescription(previousJobDescription)}// turn SerializableFileStatus back to StatusstatusMap.map { case (path, serializableStatuses) =>val statuses = serializableStatuses.map { f =>val blockLocations = f.blockLocations.map { loc =>new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)}new LocatedFileStatus(new FileStatus(f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,new Path(f.path)),blockLocations)}(new Path(path), statuses)}}

下面的代码都是上面InMemoryFileIndex.bulkListLeafFiles方法的部分节选分析

1. path.size判断是否生成job

// Short-circuits parallel listing when serial listing is likely to be faster.if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {return paths.map { path =>(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))}}

这一段代码主要是用来判断传过来的一级目录下有多少path,在我们这里对应的就是匹配路径hdfs:///user/daily/20200828/*.parquet的有多少个path,这个时候spark并不认为匹配的路径是一个文件,只是当作一个目录应对,因为spark支持多级目录的识别,所以,如果目录比较多的话都放在driver端进行查找的话耗时可能会很长,在path的数量大于32的时候会生成一个job,扔到yarn集群中通过多个executor来进行并行的查找。

sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold的值参考下面的代码

def parallelPartitionDiscoveryThreshold: Int =getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold").doc("The maximum number of paths allowed for listing files at driver side. If the number " +"of detected paths exceeds this value during partition discovery, it tries to list the " +"files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +"LibSVM data sources.").intConf.checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " +"files at driver side must not be negative").createWithDefault(32)

这里因为hdfs:///user/daily/20200828/*.parquet有100个文件,所以上面的if并不成立,也就是会走到下面生成job0来查找文件

注意这里如果小于32调用的方法是 listLeafFiles(path, hadoopConf, filter, Some(sparkSession))并不是上面的 listLeafFiles(paths: Seq[Path])

/*** Lists a single filesystem path recursively. If a SparkSession object is specified, this* function may launch Spark jobs to parallelize listing.** If sessionOpt is None, this may be called on executors.** @return all children of path that match the specified filter.*/private def listLeafFiles(path: Path,hadoopConf: Configuration,filter: PathFilter,sessionOpt: Option[SparkSession]): Seq[FileStatus] = {....}

这里省略了方法体,从方法签名上可以看到是Lists a single filesystem path recursively
就是从一个路径下递归的查找文件的意思,也就是说一级路径数量小于32会在driver端对每个路径进行递归的查找。注意这个方法也是属于InMemoryFileIndex,但是和上面出现的

def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {......
}

不是同一个方法

2. list-files 的job0

因为上面的if代码不会执行,接着往下走就是对应生成的job0的代码,因为还是有一些内容的,我们会再拆开了看,当然,在代码中也有详细的注释

val sparkContext = sparkSession.sparkContextval parallelPartitionDiscoveryParallelism =sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism// Set the number of parallelism to prevent following file listing from generating many tasks// in case of large #defaultParallelism.val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)val statusMap = try {// 在这里会判断出 job的description为 Listing leaf files and directories for 100 paths:val description = paths.size match {case 0 =>s"Listing leaf files and directories 0 paths"case 1 =>s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"case s =>s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."}//这里对job Description进行设置sparkContext.setJobDescription(description)sparkContext.parallelize(serializedPaths, numParallelism).mapPartitions { pathStrings =>val hadoopConf = serializableConfiguration.valuepathStrings.map(new Path(_)).toSeq.map { path =>(path, listLeafFiles(path, hadoopConf, filter, None))}.iterator}.map { case (path, statuses) =>val serializableStatuses = statuses.map { status =>// Turn FileStatus into SerializableFileStatus so we can send it back to the driverval blockLocations = status match {case f: LocatedFileStatus =>f.getBlockLocations.map { loc =>SerializableBlockLocation(loc.getNames,loc.getHosts,loc.getOffset,loc.getLength)}case _ =>Array.empty[SerializableBlockLocation]}SerializableFileStatus(status.getPath.toString,status.getLen,status.isDirectory,status.getReplication,status.getBlockSize,status.getModificationTime,status.getAccessTime,blockLocations)}(path.toString, serializableStatuses)// 这里的collect() 为action算子,所以会触发一个job的形成}.collect()
1. 设置job-description

在bulkListLeafFiles() 中设置job-description为


val description = paths.size match {case 0 =>s"Listing leaf files and directories 0 paths"case 1 =>s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"case s =>s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."}sparkContext.setJobDescription(description)
2. 接下来开始创建执行job
val parallelPartitionDiscoveryParallelism = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelismval numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)sparkContext.setJobDescription(description)sparkContext.parallelize(serializedPaths, numParallelism).mapPartitions { pathStrings =>val hadoopConf = serializableConfiguration.valuepathStrings.map(new Path(_)).toSeq.map { path =>(path, listLeafFiles(path, hadoopConf, filter, None))}.iterator}.map { case (path, statuses) =>val serializableStatuses = statuses.map { status =>// Turn FileStatus into SerializableFileStatus so we can send it back to the driverval blockLocations = status match {case f: LocatedFileStatus =>f.getBlockLocations.map { loc =>SerializableBlockLocation(loc.getNames,loc.getHosts,loc.getOffset,loc.getLength)}case _ =>Array.empty[SerializableBlockLocation]}SerializableFileStatus(status.getPath.toString,status.getLen,status.isDirectory,status.getReplication,status.getBlockSize,status.getModificationTime,status.getAccessTime,blockLocations)}(path.toString, serializableStatuses)// 这里的collect() 为action算子,所以会触发一个job的形成}.collect()

这里可以看到,并行度的设置为Math.min(paths.size, parallelPartitionDiscoveryParallelism)
这里的调试发现sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism默认值为10000
所以numParallelism=paths.size 为100(在对应的目录下有100个paquet文件)
而且这个并行任务的最终方式是递归的找到所有文件的block信息,可以通过这段代码看出来

mapPartitions { pathStrings =>val hadoopConf = serializableConfiguration.valuepathStrings.map(new Path(_)).toSeq.map { path =>(path, listLeafFiles(path, hadoopConf, filter, None))}.iterator}

里面的listLeafFiles(path, hadoopConf, filter, None)的定义是递归的从一个路径下查找所有的文件

5. 调用链总结

DataFrameReader.load()
DataFrameReader.loadV1Source()
DataSoure.resolveRelation()
DataSource.getOrInferFileFormatSchema()
new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
InMemoryFileIndex.refresh0()
InMemoryFileIndex.listLeafFiles()
InMemoryFileIndex.bulkListLeafFiles()

spark读取文件源码分析-1相关推荐

  1. spark读取文件源码分析-2

    文章目录 1. job1产生时机源码分析 1. DataSoure.getOrInferFileFormatSchema() 2. ParquetFileFormat.inferSchema 1. 简 ...

  2. spark读取文件源码分析-3

    本篇是spark read一个parquet源码分析的第三篇,这一篇主要介绍spark的默认的partition的设置逻辑,当然,这一篇实际上算不上源码分析了 第一篇 第二篇 1 . userProf ...

  3. Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)

    一.综述 HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode.DataNode.DFSClient等众多角色的分工与合作. 首先上一段代码,客户端是如何写文件的: ...

  4. Spark RPC框架源码分析(二)RPC运行时序

    前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...

  5. Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

    Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...

  6. springboot自动配置文件读取以及源码分析

    今天来讲讲springboot自动配置文件读取以及源码分析 springboot启动之后 1.首先进入@springbootApplication(如上图) 里面的**@EnableAutoConfi ...

  7. php读取图片文件流,详解php文件包含原理(读取文件源码、图片马、各种协议、远程getshell等)...

    详解php文件包含原理(读取文件源码.图片马.各种协议.远程getshell等) 作者是namezz (看完图相当于做了一轮实验系列) 现有文件代码如下 1.png (21.16 KB, 下载次数: ...

  8. include详解 shell_详解php文件包含原理(读取文件源码、图片马、各种协议、远程getshell等) ......

    详解php文件包含原理(读取文件源码.图片马.各种协议.远程getshell等) 作者是namezz (看完图相当于做了一轮实验系列) 现有文件代码如下 include和include_once.re ...

  9. spark 2.3源码分析之SortShuffleWriter

    SortShuffleWriter 概述 SortShuffleWriter它主要是判断在Map端是否需要本地进行combine操作.如果需要聚合,则使用PartitionedAppendOnlyMa ...

最新文章

  1. python3 python2 字符串与hex互转区别
  2. CC2530中常用的控制寄存器
  3. docker安装 kuboard v3 - 内建用户库
  4. G6 图可视化引擎——入门教程——前言
  5. ASP.NET MVC Action Filters
  6. c include 多层目录_Rsync 秒杀一切备份工具,你能手动屏蔽某些目录吗?
  7. docker 多个mysql_mysql8.0 利用docker容器安装配置多主多从集群
  8. java远程执行功能_Java远程连接Linux服务器并执行命令及上传文件功能
  9. 3 款实用的在线JS代码工具推荐
  10. 系统学习深度学习(二十九)--模型压缩
  11. 聊聊rocketmq的SequenceProducerImpl
  12. 关于dom4j中jaxen运行报错问题
  13. 抗滑桩初始弹性系数计算_理正岩土常见问题解答(全)
  14. Clean Code(代码整洁之道)-有意义的命名
  15. 计算机网络技术期末复习考点
  16. Ternary weight networks
  17. There appears to be a gap in the edit log. We expected txid 867311, but got txid 867402.
  18. 我也有拖延症,你呢?
  19. 设计一个猜拳游戏,完成人机猜拳互动游戏的开发。
  20. PostgreSql操作geometry

热门文章

  1. c++空类实例大小不是0原因
  2. 什么代码才是线程安全的
  3. 数据结构与算法 | 直接插入排序、希尔排序
  4. QUIC实战(一) 通过Quiche部署支持HTTP3 的NGINX
  5. 如何进行I/O评估、监控、定位和优化?
  6. 通过实例理解 Go 逃逸分析
  7. 深度解密Go语言之pprof
  8. Open WebRTC Toolkit实时视频分析系统
  9. 分布式之系统底层原理
  10. 直播进行中|谁在玩转数字中国?腾讯里约带你启动数字化转型之旅