1 Overview

features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML 文件。

2 分析

看看 features 包里的代码。这里面都是 Spark 在 K8S 中构建各种资源的步骤。

/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features
├── BasicDriverFeatureStep.scala
├── BasicExecutorFeatureStep.scala
├── DriverCommandFeatureStep.scala
├── DriverKubernetesCredentialsFeatureStep.scala
├── DriverServiceFeatureStep.scala
├── EnvSecretsFeatureStep.scala
├── ExecutorKubernetesCredentialsFeatureStep.scala
├── HadoopConfDriverFeatureStep.scala
├── KerberosConfDriverFeatureStep.scala
├── KubernetesFeatureConfigStep.scala
├── LocalDirsFeatureStep.scala
├── MountSecretsFeatureStep.scala
├── MountVolumesFeatureStep.scala
└── PodTemplateConfigMapStep.scala

还记得 Spark Kubernetes 的源码分析系列 - submit 文章里提到的,在 KubernetesDriverBuilder 中,有一个 features 这个变量,这里需要 new 很多配置,也就是具体的用来配置 Pod 的一些步骤。

val features = Seq(
new BasicDriverFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf))

下面我们按照顺序来分析一下。

2.1 BasicDriverFeatureStep

类名就告诉我们,他是干嘛用的了,就是 Driver Feature 相对 Basic 的部分 feature,那么 Baisc 的 feature 包括什么呢?

driverPodName // Driver Pod 的名字
driverContainerImage // Driver Container
driverCpuCores // Driver 需要的 Cpu Cores
driverCoresRequest // Driver 的 Request Cpu Cores(K8S相关)
driverLimitCores // Driver 的 Limit Cpu Cores(K8S相关)
driverMemoryMiB // Driver 的内存 MiB
overheadFactor // 这个稍后会讲到
memoryOverheadMiB // 这个稍后会讲到
driverMemoryWithOverheadMiB // 这个稍后会讲到

以上的参数,生成后,主要是用于配置 Pod 和 Container 的参数。这一块内容又长又臭,可以看看我写在里面的注释。

# 一堆的 Builder
val driverContainer = new ContainerBuilder(pod.container)# Container Name.withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))# Image Name.withImage(driverContainerImage)# Image 拉取的策略.withImagePullPolicy(conf.imagePullPolicy)# Driver 的端口.addNewPort().withName(DRIVER_PORT_NAME).withContainerPort(driverPort).withProtocol("TCP").endPort().addNewPort()# Block Manager 的 Port 相关配置.withName(BLOCK_MANAGER_PORT_NAME).withContainerPort(driverBlockManagerPort).withProtocol("TCP").endPort().addNewPort()# Spark UI 的端口配置.withName(UI_PORT_NAME).withContainerPort(driverUIPort).withProtocol("TCP").endPort().addNewEnv()# 一些环境变量.withName(ENV_SPARK_USER).withValue(Utils.getCurrentUserName()).endEnv().addAllToEnv(driverCustomEnvs.asJava).addNewEnv().withName(ENV_DRIVER_BIND_ADDRESS).withValueFrom(new EnvVarSourceBuilder().withNewFieldRef("v1", "status.podIP").build()).endEnv().editOrNewResources()# cpu 相关配置.addToRequests("cpu", driverCpuQuantity).addToLimits(maybeCpuLimitQuantity.toMap.asJava).addToRequests("memory", driverMemoryQuantity).addToLimits("memory", driverMemoryQuantity).addToLimits(driverResourceQuantities.asJava).endResources()# 终于 build 完.build()val driverPod = new PodBuilder(pod.pod)# 如果 Pod 是存在的,表示要么修改,否则就是新增.editOrNewMetadata()# Pod 的名字.withName(driverPodName)# Pod 的 Label.addToLabels(conf.labels.asJava).addToAnnotations(conf.annotations.asJava).endMetadata().editOrNewSpec()# Pod 的重启策略.withRestartPolicy("Never")# Pod 的 NodeSelector 特性.addToNodeSelector(conf.nodeSelector.asJava)# 拉取镜像的 Repository 密码(ru.addToImagePullSecrets(conf.imagePullSecrets: _*).endSpec().build()

此外 getAdditionalPodSystemProperties() 还需要这个方法是拉取其他的配置,比如说 spark.app.id 等等,不赘述了。

2.2 DriverKubernetesCredentialsFeatureStep

这个 Step 是用于配置 Driver 的安全认证相关的配置,一般认为就是 K8S 那一套安全认证的机制了。

maybeMountedOAuthTokenFile // OAuthToken 文件
maybeMountedClientKeyFile // Client Key 文件
maybeMountedClientCertFile // Cient Cert 文件
maybeMountedCaCertFile // Ca Cert 文件
driverServiceAccount // Driver 的 Service Account
oauthTokenBase64 // OauthToken Base64 编码
caCertDataBase64 // CaCert 里面的数据 Base64 编码
clientKeyDataBase64 // Client Key 数据的 Base64 编码
clientCertDataBase64 // Client Cert 数据的 Base 64 编码
shouldMountSecret // 是否需要挂载 Secret
driverCredentialsSecretName // Driver 的认证 Secret 名

这里有很多关于访问 ApiServer 的安全认证的细节,如果不熟悉 K8S 的同学,需要补补课。下面是这个 Step 的关键方法,也就是把这些安全相关的文件通过 secret 保存下来。

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {// 如果 conf 存在以上提及的一些认证文件,则会进行挂载 Secretif (shouldMountSecret) {Seq(createCredentialsSecret())} else {Seq.empty}
}

2.3 DriverServiceFeatureStep

这个就是配置 Driver Service 的 Step,因为 Pod 在 K8S 集群里,创建 Executor 需要不同的 Executor Pod 访问到 Driver Pod,才能注册上,也包括 Block Manager 以及 Spark UI 的端口和服务负载配置。

preferredServiceName // Service Name
resolvedServiceName // 上面的 Service Name 超过63个字符的话需要重新配置
driverPort // Driver 的端口
driverBlockManagerPort // Block Manager 的端口
driverUIPort // Spark UI 的端口

上面的 Service Name 超过63个字符的话需要重新配置。

private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {preferredServiceName
} else {// 超过63个字符,就是需要系统内部重置这个名字了val randomServiceId = KubernetesUtils.uniqueID(clock = clock)val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +s"$shorterServiceName as the driver service's name.")shorterServiceName
}

2.4 MountSecretsFeatureStep

2.5 EnvSecretsFeatureStep

2.6 LocalDirsFeatureStep

resolvedLocalDirs // 本地目录
useLocalDirTmpFs // 如果 conf 配置为 true,则表示本地目录会用其他的存储系统,例如内存,具体请看 spark.kubernetes.local.dirs.tmpfs

2.7 MountVolumesFeatureStep

2.8 DriverCommandFeatureStep

这是关于 Driver 命令行的一些配置,具体看看注释是怎么解释的。

/*** Creates the driver command for running the user app, and propagates needed configuration so* executors can also find the app code.*/

2.9 HadoopConfDriverFeatureStep

这是用于挂载 Hadoop 配置文件的 Step,例如访问 HDFS 的时候,需要 core-site.x ml,hdfs-site.xml 等等。

confDir // Hadoop 相关的环境变量 HADOOP_CONF_DIR
existingConfMap // spark.kubernetes.hadoop.configMapName 提交任务的 configMap 名字,这些可以提前生成,直接挂载
confFiles // 配置文件

然后具体看看 Hadoop 的配置文件是如何通过 configMap 挂载到 Driver Pod 上的。

override def configurePod(original: SparkPod): SparkPod = {original.transform { case pod if hasHadoopConf =>// 如果有环境变量,就从环境变量指定的路径获取val confVolume = if (confDir.isDefined) {val keyPaths = confFiles.map { file =>new KeyToPathBuilder().withKey(file.getName()).withPath(file.getName()).build()}new VolumeBuilder().withName(HADOOP_CONF_VOLUME).withNewConfigMap().withName(newConfigMapName).withItems(keyPaths.asJava).endConfigMap().build()} else {// 没有环境变量的话,就直接用存在的 configMapnew VolumeBuilder().withName(HADOOP_CONF_VOLUME).withNewConfigMap().withName(existingConfMap.get).endConfigMap().build()}// 修改 Pod,通过 editSpec 方法val podWithConf = new PodBuilder(pod.pod).editSpec().addNewVolumeLike(confVolume).endVolume().endSpec().build()// Container Mount 需要的 Volumeval containerWithMount = new ContainerBuilder(pod.container).addNewVolumeMount().withName(HADOOP_CONF_VOLUME).withMountPath(HADOOP_CONF_DIR_PATH).endVolumeMount().addNewEnv().withName(ENV_HADOOP_CONF_DIR).withValue(HADOOP_CONF_DIR_PATH).endEnv().build()SparkPod(podWithConf, containerWithMount)}}

2.10 KerberosConfDriverFeatureStep

这是关于 Kerberos 配置的 Step。

/*** Provide kerberos / service credentials to the Spark driver.** There are three use cases, in order of precedence:* Kerberos 的服务,有三种场景** - keytab: if a kerberos keytab is defined, it is provided to the driver, and the driver will*   manage the kerberos login and the creation of delegation tokens.* - existing tokens: if a secret containing delegation tokens is provided, it will be mounted*   on the driver pod, and the driver will handle distribution of those tokens to executors.* - tgt only: if Hadoop security is enabled, the local TGT will be used to create delegation*   tokens which will be provided to the driver. The driver will handle distribution of the*   tokens to executors.*/
principal // 指的是 KDC 中账号的 Principal
keytab // 指的是 Kerberos 生成的 Keytab
existingSecretName  // 存在的 secret name
existingSecretItemKey // secret 中的 item key
krb5File // Kerberos 服务的配置文件
krb5CMap // krb5 的 configMap
hadoopConf // 多余?
delegationTokens // Hadoop 体系中的轻量级认证 DT

生成 token 的关键代码如下。

private lazy val delegationTokens: Array[Byte] = {// 如果 keytab 和 secret 都是空的,就去生成 DTif (keytab.isEmpty && existingSecretName.isEmpty) {val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf,SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf), null)val creds = UserGroupInformation.getCurrentUser().getCredentials()tokenManager.obtainDelegationTokens(creds)// If no tokens and no secrets are stored in the credentials, make sure nothing is returned,// to avoid creating an unnecessary secret.if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) {SparkHadoopUtil.get.serialize(creds)} else {null}} else {null}
}

2.11 PodTemplateConfigMapStep

可以指定 Executor 的 Pod 的模板 spark.kubernetes.executor.podTemplateFile。所以这个 Step 主要就是来解析这个 Pod Template 的。

3 Summary

可以看到 Driver 的构建是通过多个 feature 的配置来组装起来的,最终都会通过 K8S 的 Java 客户端来跟 ApiServer 交互来在 K8S 集群中生成 Driver。

k8s源码分析 pdf_Spark Kubernetes 的源码分析系列 - features相关推荐

  1. Kubernetes StatefulSet源码分析

    2019独角兽企业重金招聘Python工程师标准>>> Author: xidianwangtao@gmail.com,Based on Kubernetes 1.9 摘要:Kube ...

  2. K8s基础知识学习笔记及部分源码剖析

    K8s基础知识学习笔记及部分源码剖析 在学习b站黑马k8s视频资料的基础上,查阅了配套基础知识笔记和源码剖析,仅作个人学习和回顾使用. 参考资料: 概念 | Kubernetes 四层.七层负载均衡的 ...

  3. 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 二 )

    Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...

  4. 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 )

    Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...

  5. 【OkHttp】OkHttp 源码分析 ( OkHttpClient.Builder 构造器源码分析 )

    OkHttp 系列文章目录 [OkHttp]OkHttp 简介 ( OkHttp 框架特性 | Http 版本简介 ) [OkHttp]Android 项目导入 OkHttp ( 配置依赖 | 配置 ...

  6. HBase源码分析之HRegion上compact流程分析(三)

    在<HBase源码分析之HRegion上compact流程分析(二)>一文中,我们没有讲解真正执行合并的CompactionContext的compact()方法.现在我们来分析下它的具体 ...

  7. Flume 1.7 源码分析(一)源码编译

    Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 1 说明 Flume是Cloudera提供的一个高可用的,高可靠的, ...

  8. Storm源码分析之四: Trident源码分析

    Storm源码分析之四: Trident源码分析 @(STORM)[storm] Storm源码分析之四 Trident源码分析 一概述 0小结 1简介 2关键类 1Spout的创建 2spout的消 ...

  9. 深入java并发包源码(三)AQS独占方法源码分析

    深入java并发包源码(一)简介 深入java并发包源码(二)AQS的介绍与使用 深入java并发包源码(三)AQS独占方法源码分析 AQS 的实现原理 学完用 AQS 自定义一个锁以后,我们可以来看 ...

最新文章

  1. 文本框可编辑查看页面
  2. 独家 | 用XGBoost入门可解释机器学习
  3. 第五届全国大学生智能汽车竞赛获奖名单
  4. mysql命令行执行时不输出列名(字段名)
  5. 原生JS实现图片滚动
  6. javascript --- vue2.x中原型的使用(拦截数组方法) 响应式原理(部分)
  7. 面试准备勿重“难”轻“易”
  8. superset mysql数据源配置_superset 性能优化1-已经使用中的superset更改默认数据源sqlite到mysql...
  9. 计算机视觉中的Transformer的最新进展!
  10. weblogic管理脚本
  11. java数据库的连接方式_java与各个数据库的连接方式
  12. 【知识蒸馏】Distilling the Knowledge in a Neural Network
  13. goc 介绍与源代码分析
  14. linux sed 多个条件,sed多条件匹配
  15. WES学习2:外显子测序分析——小男孩的硬骨化病基因突变
  16. 2022/7/2 Jenkins详细教程
  17. 高分毕业论文答辩自述稿(附注意事项及模板)
  18. 小学生计算能力测试系统设计
  19. 【Shell编程】几个小案例
  20. The world与Mathon2的比较

热门文章

  1. 2006 飞行员配对(二分图最大匹配)
  2. OAuth2.0认证服务器之客户端相关配置
  3. linux命令 renice,Linux命令之nice和renice
  4. html表头跟随滚动,JS实现table表格固定表头且表头随横向滚动而滚动_心善_前端开发者...
  5. iplatui---弹窗
  6. PHP使用weui,微信小程序WeUI引入
  7. javamailsender注入失败_关于SpringBoot使用Redis空指针的问题(不能成功注入的问题)...
  8. cmake 安装_linux下安装cmake
  9. 苹果7支持快充吗_iPhone12的磁吸无线充到底怎么回事?是否支持苹果20W快充?
  10. 关闭eureka注册中心