导言

随着大数据集群的使用,大数据的安全受到越来越多的关注一个安全的大数据集群的使用,运维必普通的集群更为复杂。
集群的安全通常基于kerberos集群完成安全认证。kerberos基本原理可参考:一张图了解Kerberos访问流程

Spark应用(On Yarn模式下)在安全的hadoop集群下的访问,需要访问各种各样的组件/进程,如ResourceManager,NodeManager,NameNode,DataNode,Kafka,Hmaster,HregionServer,MetaStore等等。尤其是在长时运行的应用,如sparkStreaming,StructedStreaming,如何保证用户认证后的长期有效性,其安全/认证更为复杂。

一个Spark应用提交用户要先在kdc中完成用户的认证,及拿到对应service服务的票据之后才能访问对应的服务。由于Spark应用运行时涉及yarnclient,driver,applicationMaster,executor等多个服务,这其中每个进程都应当是同一个用户启动并运行,这就涉及到多个进程中使用同一个用户的票据来对各种服务进行访问,本文基于Spark2.3对此做简要分析。

  • spark应用包含进程
进程 功能 yarn-client模式 yarn-cluster模式
yarnclient Spark应用提交app的模块 yarn-client模式下生命周期与driver一致; yarn-cluster模式下可以设置为app提交后即退出,或者提交后一直监控app运行状态
driver spark应用驱动器,调度应用逻辑,应用的“大脑” yarn-client模式下运行在yarnclient的JVM中; yarn-cluster模式下运行在applicationMaster中
applicationMaster 基于yarn服务抽象出的app管理者 yarn-client模式下仅仅负责启动/监控container,汇报应用状态的功能; yarn-cluster模式下负责启动/监控container,汇报应用状态的功,同时包含driver功能
Executor spark应用的执行器,yarn应用的container实体,业务逻辑的实际执行者

spark应用的提交用户认证之后才能提交应用,所以在yarnclient/driver的逻辑中必然会执行到kerberos认证相关的登录认证。然而其他的进程如applicationMaster,executor等均需要经过认证,应用提交后才由用户启动,这些进程则可以不进行kerberos认证而是利用Hadoop的token机制完成认证,减小kerberos服务压力,同时提高访问效率。

  • Hadoop Token机制

Hadoop的token实现基类为org.apache.hadoop.security.token.Token,

/*** Construct a token from the components.* @param identifier the token identifier* @param password the token's password* @param kind the kind of token* @param service the service for this token*/public Token(byte[] identifier, byte[] password, Text kind, Text service) {this.identifier = identifier;this.password = password;this.kind = kind;this.service = service;}

不同的服务也可hadoop的token来交互,只要使用不同的identifer来区分token即可。 如NMTokenIdentifier, AMRMTokenIdentifier,AuthenticationTokenIdentifier等不同的tokenIdentifier来区分不同的服务类型的token。

Spark应用各进程的安全实现

yarnclient的实现

此处yarnclient指的是向ResourceManager提交yarn应用的客户端。在spark中,向yarn提交应用有两种应用有yarn-client,yarn-cluster模式。在这两种应用模式下提交应用,yarn client逻辑有些许不同。

安全hadoop场景下spark的用户登录认证机制

  • spark提交应用时,通过--principal, --keytab参数传入认证所需文件。
    在sparkSubmit中prepareSubmitEnvironment时,完成认证

     // assure a keytab is available from any place in a JVMif (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {if (args.principal != null) {if (args.keytab != null) {require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")// Add keytab and principal configurations in sysProps to make them available// for later use; e.g. in spark sql, the isolated class loader used to talk// to HiveMetastore will use these settings. They will be set as Java system// properties and then loaded by SparkConfsparkConf.set(KEYTAB, args.keytab)sparkConf.set(PRINCIPAL, args.principal)UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)}}}
    
  • 在yarn-cluster模式下,不会调用业务层代码,即不会初始化SparkContext,其通过YarnClusterApplication的start方法调用client.submitApplication提交应用

  • 在yarn-client模式下,会在yarnclient逻辑中调用业务代码,即会初始化并运行SparkContext,通过YarnClientSchedulerBackend其调度client.submitApplication提交应用。

在client的submitApplication方法中提交app,之后创建amContext,准备本地资源,此时会将本地的文件上传至HDFS,其中就包括keytab文件,同时会生成spark_conf.properties配置文件以供am使用,该配置文件中会包含keytab的配置

 val props = new Properties()sparkConf.getAll.foreach { case (k, v) =>props.setProperty(k, v)}// Override spark.yarn.key to point to the location in distributed cache which will be used// by AM.Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) }

其中的amKeytabFileName是在setUpCredentials时设置如下,该值为指定的keytab文件加上随机的字符串后缀,骑在am重点使用,可参考下节的介绍。

val f = new File(keytab)// Generate a file name that can be used for the keytab file, that does not conflict// with any user file.amKeytabFileName = f.getName + "-" + UUID.randomUUID().toStringsparkConf.set(PRINCIPAL.key, principal)

获取相关组件的token,注意:此处的token均非与yarn服务交互相关token,这里只有与HDFS,HBASE,Hive服务交互的token。

 def obtainDelegationTokens(hadoopConf: Configuration,creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {// 各provider的obtainDelegationTokens方法中,会获取对应组件的token,并放入credentials中provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)} else {logDebug(s"Service ${provider.serviceName} does not require a token." +s" Check your configuration to see if security is disabled or not.")None}
}.foldLeft(Long.MaxValue)(math.min)

}

Spark中常访问的服务使用token机制的有hive,hbase,hdfs,对应的tokenProvider如下:

服务 tokenProvider token获取类 token获取方法
HDFS HadoopFSDelegationTokenProvider org.apache.hadoop.hbase.security.token.TokenUtil obtainToken
HIVE HiveDelegationTokenProvider org.apache.hadoop.hive.ql.metadata getDelegationToken
HBASE HBaseDelegationTokenProvider org.apache.hadoop.hdfs.DistributedFileSystem addDelegationTokens

以HbaseDelegationTokenProvider为例,主要是通过反射调用hbase的TokenUtil类的obtainTOken方法,对应的obtainDelegationTokens方法如下:

override def obtainDelegationTokens(hadoopConf: Configuration,sparkConf: SparkConf,creds: Credentials): Option[Long] = {
try {val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)val obtainToken = mirror.classLoader.loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").getMethod("obtainToken", classOf[Configuration])logDebug("Attempting to fetch HBase security token.")val token = obtainToken.invoke(null, hbaseConf(hadoopConf)).asInstanceOf[Token[_ <: TokenIdentifier]]logInfo(s"Get token from HBase: ${token.toString}")creds.addToken(token.getService, token)
} catch {case NonFatal(e) =>logDebug(s"Failed to get token from service $serviceName", e)
}
None
}

PS : HBase的token获取的用户需要具有hbase:meta表的exec权限,否则无法成功获取token

在获取token后,将token设置到amContainer中,并放入appContext中

private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
val dob = new DataOutputBuffer
credentials.writeTokenStorageToStream(dob)
amContainer.setTokens(ByteBuffer.wrap(dob.getData))
}
//
appContext.setAMContainerSpec(containerContext)

driver的token更新

在yarn-client模式下,driver在yarnclient进程中启动,同样需要访问业务层及集群的相关组件如hdfs。driver通过读取am更新在hdfs路径下的credentials文件来保证driver节点的token有效。

// SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
// reads the credentials from HDFS, just like the executors and updates its own credentials
// cache.
if (conf.contains("spark.yarn.credentials.file")) {YarnSparkHadoopUtil.startCredentialUpdater(conf)
}

在yarn-cluster模式下,driver运行在applicationMaster的JVM中,其安全相关由Am同一操作

ApplicationMaster 的安全认证

applicationMaster是Yarn进行应用调度/管理的核心,需要与RM/NM等进行交互以便应用运行。其中相关的交互均通过token完成认证,认证实现由Yarn内部框架完成。查看am日志发现,即是在非安全(非kerberos)的场景下,同样会使用到token。而与hdfs,hbase等服务交互使用的token则需Spark框架来实现。

applicationMaster中与YARN相关的认证

  • AM与RM的认证

在ResourceManager接收到应用提交的ApplicationSubmissionContext后,在其AmLauncher.java的run方法中为am设置生成“YARN_AM_RM_TOKEN,该token用于am于rm通信使用”

 public Token<AMRMTokenIdentifier> createAndGetAMRMToken(ApplicationAttemptId appAttemptId) {
this.writeLock.lock();
try {LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);AMRMTokenIdentifier identifier =new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey().getKeyId());byte[] password = this.createPassword(identifier);appAttemptSet.add(appAttemptId);return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,identifier.getKind(), new Text());
} finally {this.writeLock.unlock();
}
}
  • AM与NM的认证

Am在启动之后,会向ResourceManager申请container,并与对应的NodeManager通信以启动container。然而AM与NM通信的token是如何得到的呢?

查看AMRMClientImpl类可以看到,AM向RM发送分配请求,RM接收到请求后,将container要分配至的NM节点的Token放置response中返回给AM。Am接收到response后,会保存NMToken,并判定是否需要更新YARN_AM_RM_TOKEN

//通过rmClient向RM发送分配请求
allocateResponse = rmClient.allocate(allocateRequest);
//拿到response后,保存NMToken并根据response判定是否需要更新AMRM通信的TOken
if (!allocateResponse.getNMTokens().isEmpty()) {populateNMTokens(allocateResponse.getNMTokens());}if (allocateResponse.getAMRMToken() != null) {updateAMRMToken(allocateResponse.getAMRMToken());}

RM通过ApplicationMasterService响应allocation请求

// 通过调度器为cotnainer分配NodeManager并生成该NodeManager的Token放入allcation中Allocation allocation =this.rScheduler.allocate(appAttemptId, ask, release, blacklistAdditions, blacklistRemovals);......if (!allocation.getContainers().isEmpty()) {allocateResponse.setNMTokens(allocation.getNMTokens());}

AM在准备启动container时,将当前用户的token都设置进ContainerLaunchContext中

def startContainer(): java.util.Map[String, ByteBuffer] = {
val ctx = Records.newRecord(classOf[ContainerLaunchContext]).asInstanceOf[ContainerLaunchContext]
val env = prepareEnvironment().asJava
ctx.setLocalResources(localResources.asJava)
ctx.setEnvironment(env)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))

ApplicationMaster业务相关的服务的token更新

Am启动的资源情况

查看Am启动命令大致如下,可以发现有指定配置文件,而该配置文件即为yarnclient生成上传至hdfs,在am启动前由NodeManager从hdfs中copy至本地路径,供container使用:

 /usr/jdk64/jdk1.8.0_77//bin/java -server -Xmx512m -Djava.io.tmpdir=/localpath/*/tmp -Dspark.yarn.app.container.log.dir=/localpath/*/ org.apache.spark.deploy.yarn.ExecutorLauncher --arg host:port --properties-file /localpath/*/__spark_conf__/__spark_conf__.properties

查看此配置文件可以看到有如下配置项:

spark.yarn.principal=ocsp-ygcluster@ASIAINFO.COM
spark.yarn.keytab=hbase.headless.keytab-18f29b79-b7a6-4cb2-b79d-4305432a5e9a

下图为am进程使用到的资源文件

am进程资源.jpg

如上可以看出,am虽然运行在集群中,但运行时认证相关的资源已经准备就绪。下面分析其运行中关于安全的逻辑

Am安全认证及token更新逻辑

在applicationMaster中,定期更新token,并写入文件到hdfs的相关目录,并清理旧文件以供各executor使用。
在ApplicationMaster启动后,进行login登录并启动名为am-kerberos-renewer的dameon线程定期登录,保证用户认证的有效性

private val ugi = {
val original = UserGroupInformation.getCurrentUser()

// If a principal and keytab were provided, log in to kerberos, and set up a thread to
// renew the kerberos ticket when needed. Because the UGI API does not expose the TTL
// of the TGT, use a configuration to define how often to check that a relogin is necessary.
// checkTGTAndReloginFromKeytab() is a no-op if the relogin is not yet needed.
val principal = sparkConf.get(PRINCIPAL).orNull
val keytab = sparkConf.get(KEYTAB).orNull
if (principal != null && keytab != null) {UserGroupInformation.loginUserFromKeytab(principal, keytab)val renewer = new Thread() {override def run(): Unit = Utils.tryLogNonFatalError {while (true) {TimeUnit.SECONDS.sleep(sparkConf.get(KERBEROS_RELOGIN_PERIOD))UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab()}}}renewer.setName("am-kerberos-renewer")renewer.setDaemon(true)renewer.start()// Transfer the original user's tokens to the new user, since that's needed to connect to// YARN. It also copies over any delegation tokens that might have been created by the// client, which will then be transferred over when starting executors (until new ones// are created by the periodic task).val newUser = UserGroupInformation.getCurrentUser()SparkHadoopUtil.get.transferCredentials(original, newUser)newUser
} else {SparkHadoopUtil.get.createSparkUser()
}
}

在am中启动AMCredentialRenewerStarter线程,调度认证登录及token renew逻辑

if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {val credentialRenewerThread = new Thread {setName("AMCredentialRenewerStarter")setContextClassLoader(userClassLoader)override def run(): Unit = {val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf,yarnConf,conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))val credentialRenewer =new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)credentialRenewer.scheduleLoginFromKeytab()}}credentialRenewerThread.start()credentialRenewerThread.join()}

在scheduleLoginFromKeytab中,会周期调度登录,token获取更新写入hdfs文件等操作。
其核心逻辑如下

调度周期:

各种组件的token更新周期如hdfs的更新周期dfs.namenode.delegation.token.renew-interval默认为1天,hbase的token更新周期hbase.auth.key.update.interval默认为1天;调度更新的周期为如上各组件最小值的75%,

调度流程:

//将生成的token写入hdfs目录${spark.yarn.credentials.file}-${timeStamp}-${nextSuffix}
writeNewCredentialsToHDFS(principal, keytab)
//删除逻辑为保留五个(${spark.yarn.credentials.file.retention.count})文件,文件更新时间早于五天(${spark.yarn.credentials.file.retention.days})的全部清理
cleanupOldFiles()

Executor的认证机制

executor的认证同样使用的是token机制。executor启动之后,根据driver启动设置的${spark.yarn.credentials.file}启动token更新:

if (driverConf.contains("spark.yarn.credentials.file")) {logInfo("Will periodically update credentials from: " +driverConf.get("spark.yarn.credentials.file"))Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").getMethod("startCredentialUpdater", classOf[SparkConf]).invoke(null, driverConf)}

Executor中的token更新是读取hdfs目录{timeStamp}-${nextSuffix}目录下的文件,读取到缓存中,以便保证读取到的是更新后的token使用。

安全Spark的使用

Spark框架完成的kerberos认证及使用token与其他服务交互的机制使用较为简单,只需要在提交应用时的spark-submit命令行中加入--principal appuserName --keytab /path/to/user.keytab即可

Spark 框架安全认证实现相关推荐

  1. Spark Streaming揭秘 Day9 从Receiver的设计到Spark框架的扩展

    Spark Streaming揭秘 Day9 从Receiver的设计到Spark框架的扩展 Receiver是SparkStreaming的输入数据来源,从对Receiver整个生命周期的设计,我们 ...

  2. Spark框架概括(Spark 是什么、Spark and Hadoop、Spark or Hadoop、Spark核心模块)

    1. Spark概括 1.1 Spark 是什么 1.2 Spark and Hadoop 从时间节点上来看 从功能上来看 - hadoop与Hadoop不同的是,Spark主要侧重于通过内存计算,以 ...

  3. 强者联盟——Python语言结合Spark框架

    引言:Spark由AMPLab实验室开发,其本质是基于内存的高速迭代框架,"迭代"是机器学习最大的特点,因此很适合做机器学习. 得益于在数据科学中强大的表现,Python语言的粉丝 ...

  4. 微服务架构实战第八节 微服务安全框架,认证与授权

    25 服务安全:如何理解微服务访问的安全需求和实现方案? 今天,我们又将进入一个全新的话题,讨论微服务架构中的服务访问安全性相关的需求和实现方案.在设计微服务架构时,安全性是一个重要但又往往被忽略的主 ...

  5. Azure 框架设计师认证考试2020大更改

    最近微软公司将Azure方向最火热的认证:Azure框架设计师 助理级认证和专家级认证的考试做了调整,其中原AZ-103将被AZ-104替代,原AZ-300将被新的AZ-303替代,原AZ-301将被 ...

  6. java url 授权,Spring MVC框架 - 基本认证之URL 授权认证

    [导读]为控制器添加注解是非常简单的,但这往往并不是最可行的方案.有时候,我们会想要完全控制授权功能. 为控制器添加注解是非常简单的,但这往往并不是最可行的方案.有时候,我们会想要完全控制授权功能. ...

  7. 基于Spark框架的大型分布式矩阵求逆运算实现(二)——大型下三角矩阵求逆运算

    基于实际需要,需要对五百万阶的方阵进行求逆运算,但查看Spark(v. 2.2.0)的官方api并没有此方面的信息,就自己尝试着实现了一个: 先说一下原理: 对于一个可逆矩阵A,必然会得到它的唯一LU ...

  8. Spark框架——RDD算子mapPartitions迭代器(基于Scala语言)

    /*mapPartitions 和map算子是一样的,只不过map是针对每一条数据进行转换,mapPartitions 针对一整个分区的数据进行转换 1.map的func的参数是单条数据,mapPar ...

  9. 2021年大数据Spark(一):框架概述

    目录 Spark框架概述 Spark 是什么 分布式内存迭代计算框架 官方定义: Spark框架概述 Spark 是加州大学伯克利分校AMP实验室(Algorithms Machines and Pe ...

最新文章

  1. 瑞友虚拟服务器网页登录,瑞友云端虚拟专网系统
  2. 聊聊《战魂铭人》的游戏设计
  3. Oracle中sys和system的区别
  4. 【程序设计】模块化程序设计
  5. 前端18个月难度翻番?来这里把握大前端技术本质进展丨稀土开发者大会
  6. SharePoint服务器端对象模型 之 访问文件和文件夹(Part 4)
  7. 网页loading效果 可以通过js控制旋转速度
  8. Axure高保真智慧消防远程监管系统数据可视化大屏看板+web端高保真大数据分析平台看板+大数据交换配置管理平台大屏动态可视化看板
  9. centos6.2+heartbeat+mysql5.5+drbd84高可用安装
  10. shell2 for linux,Linux Shell编程(2): for while
  11. TCPDUMP 用法(转)
  12. 系统集成项目管理工程师计算题(案例计算题、挣值分析、EAC、ETC)
  13. tcl机顶盒 tk 8296刷机固件及教程
  14. Superset 实现可视化报表发布
  15. MBA书籍推荐:打造商业思维,看这一本书就够了
  16. 【学习笔记15】JavaScript的函数
  17. python挖掘B站猛男手游公主连结的另类操作!
  18. mysql 规则引擎_为什么要用规则引擎?
  19. K8s9(2-1) k8s中的通信机制, kube-proxy的ipvs模式 ,无头服务,LoadBalancer,ExternalName,外部公有 ip(externalIPs)
  20. 代码质量管理工具:SonarQube常见的问题及正确解决方案

热门文章

  1. 华为交换机S5700开启telnet
  2. 封头名义厚度如何圆整_压力容器封头厚度计算
  3. python24.dll_2_48_python24.dll
  4. 好想学python下载_Python | 从零开始学(1)
  5. c语言考试题及答案 大一,大一C语言期末考试试题
  6. linux彻底卸载multipath,深度分析LINUX环境下如何配置multipath
  7. mysql b 树 b树_MySQL B树和B+树的区别
  8. php第一行空白,网页头部多出一行空白问题的解决方法 (PHP文件头BOM问题)
  9. 1.1 决策树算法原理
  10. 一段树状无限制级代码