1.概述

转载并且补充:一文讲透hdfs的delegation token 最近我也在研究这个,学习一下。

1.1 起因

我最近在做FLink kerberos认证。我在flink配置文件中配置正确的认证方式,然后我都是RichSinkFunction 我在RichSinkFunction中构建了new KafkaProducer 自己构建了客户端,我还需要给我New的客户端传入相关的认证信息吗?还是不用了


然后搜索到一篇文章说如下


所以在这里向了解一下tokern是什么?凭什么能做到一次认证呢?为啥我的不行。我的在open方法创建的kafka生产者还有进行认证。

1.2 概述

前一段时间总结了hadoop中的token认证、yarn任务运行中的token,其中也都提到了delegation token。而最近也遇到了一个问题,问题现象是:flink任务运行超过七天后,由于宿主机异常导致任务失败,继而触发任务的重试,但接连重试几次都是失败的,并且任务的日志也没有聚合,导致无法分析问题失败的原因。最后发现是和delegation token有关,本文就来总结下相关的原理。

2. 【原理】

2.1. 什么是delegation token

先简单描述下为什么需要delegation token。在开启kerberos之后,服务之间交互前,都需要先向KDC认证获取对应的票据。而在一个yarn任务运行过程中可能会产生很多任务container,每个这样的任务container都可能会访问hdfs,由于访问前需要先获取票据来进行认证,那么这个时候KDC就很容易成为性能瓶颈。delegation token(委派token)就是为了减少不必要的认证工作而出现的。

2.2. delegation token在任务提交运行过程中的使用

任务提交运行过程中,delegation token相关的流程如下图所示:


1)首先,RM启动后,内部会创建一个服务线程专门用于处理token的更新

// ResourceManager.java
protected void serviceInit(Configuration configuration) throws Exception {...if (UserGroupInformation.isSecurityEnabled()) {delegationTokenRenewer = createDelegationTokenRenewer();rmContext.setDelegationTokenRenewer(delegationTokenRenewer);}....
}protected DelegationTokenRenewer createDelegationTokenRenewer() {return new DelegationTokenRenewer();
}

2)客户端申请delegation token

客户端在提交任务前,通常需要先向hdfs上传资源文件(包括运行所需的jar包等),在此过程中会向nn申请一个delegation token,并放到任务启动上下文中,然后向rm发送提交任务请求(请求中包含任务的启动上下文)。

下面是flink on yarn提交任务时的代码片段:

// flink YarnClusterDescriptor.java
private ApplicationReport startAppMaster(...){// 开启kerberos的情况下,获取tokenif (UserGroupInformation.isSecurityEnabled()) {// set HDFS delegation tokens when security is enabledLOG.info("Adding delegation token to the AM container.");List<Path> yarnAccessList =ConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.YARN_ACCESS, Path::new);Utils.setTokensFor(amContainer,ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),yarnConfiguration);}
}public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf)throws IOException {Credentials credentials = new Credentials();// for HDFSTokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);// for HBaseobtainTokenForHBase(credentials, conf);// for userUserGroupInformation currUsr = UserGroupInformation.getCurrentUser();// 获取到的token 放到启动上下文中Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();for (Token<? extends TokenIdentifier> token : usrTok) {final Text id = new Text(token.getIdentifier());LOG.info("Adding user token " + id + " with " + token);credentials.addToken(id, token);}try (DataOutputBuffer dob = new DataOutputBuffer()) {credentials.writeTokenStorageToStream(dob);if (LOG.isDebugEnabled()) {LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());}ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());amContainer.setTokens(securityTokens);}
}// TokenCache.java
// 调用hadoop的接口 向nn请求token
public static void obtainTokensForNamenodes(Credentials credentials,Path[] ps, Configuration conf) throws IOException {if (!UserGroupInformation.isSecurityEnabled()) {return;}obtainTokensForNamenodesInternal(credentials, ps, conf);
}static void obtainTokensForNamenodesInternal(Credentials credentials,Path[] ps, Configuration conf) throws IOException {Set<FileSystem> fsSet = new HashSet<FileSystem>();for (Path p : ps) {fsSet.add(p.getFileSystem(conf));}String masterPrincipal = Master.getMasterPrincipal(conf);for (FileSystem fs : fsSet) {obtainTokensForNamenodesInternal(fs, credentials, conf, masterPrincipal);}
}static void obtainTokensForNamenodesInternal(FileSystem fs,Credentials credentials, Configuration conf, String renewer)throws IOException {...final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer, credentials);...
}// FileSystem.java
public Token<?>[] addDelegationTokens(final String renewer, Credentials credentials) throws IOException {if (credentials == null) {credentials = new Credentials();}final List<Token<?>> tokens = new ArrayList<>();collectDelegationTokens(renewer, credentials, tokens);return tokens.toArray(new Token<?>[tokens.size()]);
}private void collectDelegationTokens(final String renewer,final Credentials credentials,final List<Token<?>> tokens)throws IOException {final String serviceName = getCanonicalServiceName();// Collect token of the this filesystem and then of its embedded childrenif (serviceName != null) { // fs has token, grab itfinal Text service = new Text(serviceName);Token<?> token = credentials.getToken(service);if (token == null) {// 向NN 请求delegation tokentoken = getDelegationToken(renewer);if (token != null) {tokens.add(token);credentials.addToken(service, token);}}}...
}

3)RM将token添加到delegation token更新服务中

RM在处理客户端提交任务请求时,判断是否启用kerberos认证,如果启用则从任务启动上下文中解析出delegation token,并添加到delegation token更新服务中。在该服务中,会启动线程定时对delegation token进行更新。此后,继续向NM发送启动container的请求,delegation token则随启动上下文被带到NM中。

// RMAppManager.java
protected void submitApplication(ApplicationSubmissionContext submissionContext, long submitTime,String user)throws YarnException {...if (UserGroupInformation.isSecurityEnabled()) {this.rmContext.getDelegationTokenRenewer().addApplicationAsync(applicationId,BuilderUtils.parseCredentials(submissionContext),submissionContext.getCancelTokensWhenComplete(),application.getUser(),BuilderUtils.parseTokensConf(submissionContext));}...
}

4)NM使用delegation token

NM收到启动container的请求后,从请求(任务启动上下文)中解析出delegation token,并为该container构造一个对应的实例对象,同时将delegation token保存在该实例对象中,然后为该container进行资源本地化,即从hdfs中下载必须的资源文件,这里就会用到传递过来的delegation token。同时在任务结束时,如果需要进行任务日志聚合,仍旧会使用该delegation token将任务的日志上传到hdfs的指定路径。

另外,delegation token还会写入到持久化文件中,一方面用于NM的异常恢复,另一方面是将token传递给任务container进程以供使用。

3. delegation token的更新与生命周期

1)申请token时已经指定了token的最大生命周期

// FSNamesystem.java
Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException {...DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, renewer, realUser);token = new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);...return token;
}// Token.java
public Token(T id, SecretManager<T> mgr) {password = mgr.createPassword(id);identifier = id.getBytes();kind = id.getKind();service = new Text();
}// AbstractDelegationTokenSecretManager
protected synchronized byte[] createPassword(TokenIdent identifier) {long now = Time.now();identifier.setMaxDate(now + tokenMaxLifetime);...
}

2)RM接收到任务提交请求后,先进行一次更新得到token的下次超时时间,然后再根据超时时间设置定时器时间触发进行更新。

public void addApplicationSync(ApplicationId applicationId, Credentials ts,boolean shouldCancelAtEnd, String user) throws IOException, InterruptedException {handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent(applicationId, ts, shouldCancelAtEnd, user, new Configuration()));
}private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)throws IOException, InterruptedException {...Credentials ts = evt.getCredentials();Collection<Token<?>> tokens = ts.getAllTokens();for (Token<?> token : tokens) {DelegationTokenToRenew dttr = allTokens.get(token);if (dttr == null) {dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token, tokenConf, now, shouldCancelAtEnd, evt.getUser());try {// 先进行一次更新renewToken(dttr)} catch (IOException ioe) {...}}tokenList.add(dttr);}if (!tokenList.isEmpty()) {for (DelegationTokenToRenew dtr : tokenList) {DelegationTokenToRenew currentDtr = allTokens.putIfAbsent(dtr.token, dtr);if (currentDtr != null) {// another job beat uscurrentDtr.referringAppIds.add(applicationId);appTokens.get(applicationId).add(currentDtr);} else {appTokens.get(applicationId).add(dtr);setTimerForTokenRenewal(dtr);}}}
}protected void renewToken(final DelegationTokenToRenew dttr)throws IOException {// need to use doAs so that http can find the kerberos tgt// NOTE: token renewers should be responsible for the correct UGI!try {// 更新delegation token 并得到下次超时时间dttr.expirationDate =UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Long>() {@Overridepublic Long run() throws Exception {return dttr.token.renew(dttr.conf);}});} catch (InterruptedException e) {throw new IOException(e);}LOG.info("Renewed delegation-token= [" + dttr + "]");
}protected void setTimerForTokenRenewal(DelegationTokenToRenew token)throws IOException {// calculate timer timelong expiresIn = token.expirationDate - System.currentTimeMillis();if (expiresIn <= 0) {LOG.info("Will not renew token " + token);return;}long renewIn = token.expirationDate - expiresIn / 10; // little bit before the expiration// need to create new task every timeRenewalTimerTask tTask = new RenewalTimerTask(token);token.setTimerTask(tTask); // keep reference to the timerrenewalTimer.schedule(token.timerTask, new Date(renewIn));LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = " +token.referringAppIds);
}

再来看更新token的请求与处理细节:

// 客户端发送更新请求
public long renew(Token<?> token, Configuration conf) throws IOException {Token<DelegationTokenIdentifier> delToken = (Token<DelegationTokenIdentifier>) token;ClientProtocol nn = getNNProxy(delToken, conf);try {return nn.renewDelegationToken(delToken);} catch (RemoteException re) {throw re.unwrapRemoteException(InvalidToken.class,AccessControlException.class);}
}// 服务端的响应处理
long renewDelegationToken(Token<DelegationTokenIdentifier> token)throws InvalidToken, IOException {try {...expiryTime = dtSecretManager.renewToken(token, renewer);} catch (AccessControlException ace) {...}return expiryTime;
}public synchronized long renewToken(Token<TokenIdent> token,String renewer) throws InvalidToken, IOException {ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());DataInputStream in = new DataInputStream(buf);TokenIdent id = createIdentifier();id.readFields(in);LOG.info("Token renewal for identifier: " + formatTokenId(id) +"; total currentTokens " + currentTokens.size());long now = Time.now();if (id.getMaxDate() < now) {throw new InvalidToken(renewer + " tried to renew an expired token " +formatTokenId(id) + " max expiration date: " +Time.formatTime(id.getMaxDate()) +" currentTime: " + Time.formatTime(now));}if ((id.getRenewer() == null) || (id.getRenewer().toString().isEmpty())) {throw new AccessControlException(renewer +" tried to renew a token " + formatTokenId(id) +" without a renewer");}if (!id.getRenewer().toString().equals(renewer)) {throw new AccessControlException(renewer +" tries to renew a token " + formatTokenId(id) +" with non-matching renewer " + id.getRenewer());}DelegationKey key = getDelegationKey(id.getMasterKeyId());if (key == null) {throw new InvalidToken("Unable to find master key for keyId=" +id.getMasterKeyId() +" from cache. Failed to renew an unexpired token " +formatTokenId(id) + " with sequenceNumber=" +id.getSequenceNumber());}byte[] password = createPassword(token.getIdentifier(), key.getKey());if (!MessageDigest.isEqual(password, token.getPassword())) {throw new AccessControlException(renewer +" is trying to renew a token " +formatTokenId(id) + " with wrong password");}long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);String trackingId = getTrackingIdIfEnabled(id);DelegationTokenInformation info = new DelegationTokenInformation(renewTime, password, trackingId);if (getTokenInfo(id) == null) {throw new InvalidToken("Renewal request for unknown token " + formatTokenId(id));}updateToken(id, info);return renewTime;
}

3)token达到最大生命周期的处理

在定时器中,会捕获更新抛出的异常,并直接移除失效的token。

但是注意:在每次更新之前,会按需重新申请新的delegation token(后面再展开讲解)

public void run() {if (cancelled.get()) {return;}Token<?> token = dttr.token;try {// 先判断是否需要申请新的tokenrequestNewHdfsDelegationTokenIfNeeded(dttr);// if the token is not replaced by a new token, renew the tokenif (!dttr.isTimerCancelled()) {renewToken(dttr);setTimerForTokenRenewal(dttr);// set the next one} else {LOG.info("The token was removed already. Token = [" + dttr + "]");}} catch (Exception e) {LOG.error("Exception renewing token" + token + ". Not rescheduled", e);removeFailedDelegationToken(dttr);}
}

4.【问题分析】

来看看前面问题失败的相关日志,复盘分析下。

首先从NM的日志中发现任务在重试时,因为无法下载资源(到本地)导致无法启动任务,而下载资源失败的原因则是因为无效的token。

2022-07-18 13:44:18,665 WARN org.apache.hadoop.ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) can't be found in cache
2022-07-18 13:44:18,669 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService: { hdfs://hdfsHACluster/user/hncscwc/.flink/application_1637733238080_3800/application_1637733238080_38002636034628721129021.tmp, 1656925873322, FILE, null } failed: token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) can't be found in cacheat org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1486)at org.apache.hadoop.ipc.Client.call(Client.java:1432)at org.apache.hadoop.ipc.Client.call(Client.java:1342)at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source)at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:796)at sun.reflect.GeneratedMethodAccessor172.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:411)at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:348)at com.sun.proxy.$Proxy16.getFileInfo(Unknown Source)at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1649)at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1440)at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1437)at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1452)at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922)at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359)at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

为什么会出现无效的token,接着再看RM的日志。

2022-07-04 17:11:13,400 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler: Application 'application_1637733238080_3800' is submitted without priority hence considering default queue/cluster priority: 0
2022-07-04 17:11:13,424 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657012273422; apps=[application_1637733238080_3800]]
2022-07-05 14:47:13,462 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657090033446; apps=[application_1637733238080_3800]]
2022-07-06 12:23:13,467 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657167793465; apps=[application_1637733238080_3800]]
2022-07-07 09:59:13,487 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657245553484; apps=[application_1637733238080_3800]]
2022-07-08 07:35:13,532 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657323313511; apps=[application_1637733238080_3800]]
2022-07-09 05:11:13,551 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657401073532; apps=[application_1637733238080_3800]]
2022-07-10 02:47:13,564 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657478833547; apps=[application_1637733238080_3800]]
2022-07-11 00:23:13,591 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
2022-07-11 17:11:07,361 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
2022-07-11 17:11:07,361 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 6032 ms, appId = [application_1637733238080_3800]
2022-07-11 17:11:12,793 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
2022-07-11 17:11:12,793 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 600 ms, appId = [application_1637733238080_3800]
2022-07-11 17:11:13,337 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
2022-07-11 17:11:13,337 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 56 ms, appId = [application_1637733238080_3800]
2022-07-11 17:11:13,391 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renewed delegation-token= [Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800]]
2022-07-11 17:11:13,391 INFO org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Renew Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc);exp=1657530673393; apps=[application_1637733238080_3800] in 2 ms, appId = [application_1637733238080_3800]
2022-07-11 17:11:13,398 ERROR org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: Exception renewing tokenKind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdfsHACluster, Ident: (HDFS_DELEGATION_TOKEN token 4361 for hncscwc). Not rescheduled
org.apache.hadoop.security.token.SecretManager$InvalidToken: hadoop tried to renew an expired token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) max expiration date: 2022-07-11 17:11:13,393+0800 currentTime: 2022-07-11 17:11:13,394+0800at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:499)at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:5952)at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:675)at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1035)at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922)at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.lang.reflect.Constructor.newInstance(Constructor.java:423)at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:761)at org.apache.hadoop.security.token.Token.renew(Token.java:458)at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer$1.run(DelegationTokenRenewer.java:601)at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer$1.run(DelegationTokenRenewer.java:598)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922)at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.renewToken(DelegationTokenRenewer.java:597)at org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer$RenewalTimerTask.run(DelegationTokenRenewer.java:531)at java.util.TimerThread.mainLoop(Timer.java:555)at java.util.TimerThread.run(Timer.java:505)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): hadoop tried to renew an expired token (HDFS_DELEGATION_TOKEN token 4361 for hncscwc) max expiration date: 2022-07-11 17:11:13,393+0800 currentTime: 2022-07-11 17:11:13,394+0800at org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:499)at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:5952)at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:675)at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1035)at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1922)at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1486)at org.apache.hadoop.ipc.Client.call(Client.java:1432)at org.apache.hadoop.ipc.Client.call(Client.java:1342)at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)at com.sun.proxy.$Proxy94.renewDelegationToken(Unknown Source)at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewDelegationToken(ClientNamenodeProtocolTranslatorPB.java:964)at sun.reflect.GeneratedMethodAccessor277.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:411)at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:348)at com.sun.proxy.$Proxy95.renewDelegationToken(Unknown Source)at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:759)... 10 more
2022-07-11 17:11:13,399 ERROR org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer: removing failed delegation token for appid=[application_1637733238080_3800];t=ha-hdfs:hdfsHACluster

从上面的日志可以看到,任务从提交后,delegation token每天都有在更新,然而运行到第7天后,更新失败而失效。失效后,NN内部会删除无效的token,此时如果任务失败需要重试,或者任务结束需要进行日志聚合,都会继续使用该无效的token来操作hdfs,最终结果就是在NN中找不到对应的token而抛异常导致失败。

5.【问题解决】

要解决该问题,一种最简单直接的办法就是加大delegation token的最大生命周期时间。

但一开始觉得该办法略有些low,尤其对于flink长周期运行的实时任务的场景,是无法确定任务的运行时长的,因此也就无法确定设置token的最大生命周期。

因此,再次分析了源码,发现RM中对于将要过期(超过最大生命周期)的delegation token,会按需重新申请一个新的token,也就是定时器线程中token更新之前的requestNewHdfsDelegationTokenIfNeeded方法。

来看看具体的实现逻辑:

private void requestNewHdfsDelegationTokenIfNeeded(final DelegationTokenToRenew dttr) throws IOException, InterruptedException {// 拥有特权 并且 token类型为委派token 并且 快到最大生命周期if (hasProxyUserPrivileges &&dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining &&dttr.token.getKind().equals(HDFS_DELEGATION_KIND)) {final Collection<ApplicationId> applicationIds;synchronized (dttr.referringAppIds) {applicationIds = new HashSet<>(dttr.referringAppIds);dttr.referringAppIds.clear();}// remove all old expiring hdfs tokens for this application.for (ApplicationId appId : applicationIds) {Set<DelegationTokenToRenew> tokenSet = appTokens.get(appId);if (tokenSet == null || tokenSet.isEmpty()) {continue;}Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();synchronized (tokenSet) {while (iter.hasNext()) {DelegationTokenToRenew t = iter.next();if (t.token.getKind().equals(HDFS_DELEGATION_KIND)) {iter.remove();allTokens.remove(t.token);t.cancelTimer();LOG.info("Removed expiring token " + t);}}}}LOG.info("Token= (" + dttr + ") is expiring, request new token.");requestNewHdfsDelegationTokenAsProxyUser(applicationIds, dttr.user,dttr.shouldCancelAtEnd);}
}

申请到新的token之后,会在RM内部进行更新,然后通过NM的心跳响应同步给NM。

private void requestNewHdfsDelegationTokenAsProxyUser(...// Get new hdfs tokens for this userCredentials credentials = new Credentials();Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);DataOutputBuffer dob = new DataOutputBuffer();credentials.writeTokenStorageToStream(dob);ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());for (ApplicationId applicationId : referringAppIds) {// 更新app的delegation token// 在NM心跳时进行同步rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);}
} public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)throws YarnException, IOException {...ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =rmContext.getSystemCredentialsForApps();if (!systemCredentials.isEmpty()) {nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);}...
}

NM在心跳响应中解析出token并在内存中更新保存,后续任务重试启动资源本地化和任务结束触发日志聚合时会使用到。

注意:这里只提到了资源本地化和日志聚合时会使用到更新后的token,那么正在运行的任务会用到更新后的token吗?

答案是不会(至少是2.X版本不会)。主要是因为:token已经写入到持久化文件中,任务启动时读取该文件获取token并使用;delegation token在更新后没有写入到持久化文件中,即使可以写入(更新)到该文件,也需要有机制通知任务进程更新读取该文件才行。因此正在运行中的任务在token过期后继续操作hdfs仍旧会抛出异常。

另外,在3.X的最新版本中,注意到有相关代码的改动,应该是通知正在运行的container,但具体细节还未深入研究,后面有时间再调研。

6.【相关配置】

与delegation token相关的配置包括

配置项名称 默认值 说明
dfs.namenode.delegation.key.update-interval 1天 token更新密钥的时间间隔
dfs.namenode.delegation.token.renew-interval 1天 token更新的时间间隔
dfs.namenode.delegation.token.max-lifetime 7天 token的最大生命周期
yarn.resourcemanager.delegation-token.alwys-cancel false RM结束时是否需要移除token
yarn.resourcemanager.proxy-user-privileges.enabled false 是否开启特权在delegation token快过期时重新申请新的token
yarn.resourcemanager.system-credentials.valid-time-remaining 10800000 距离最大生命周期之前多长时间进行重新申请token的操作,单位毫秒
yarn.resourcemanager.delegation-token-renewer.thread-count 50 RM中delegation token更新线程的线程数

【hadoop】一文讲透hdfs的delegation token相关推荐

  1. 一文讲透hdfs的delegation token

    [背景] 前一段时间总结了hadoop中的token认证.yarn任务运行中的token,其中也都提到了delegation token.而最近也遇到了一个问题,问题现象是:flink任务运行超过七天 ...

  2. js打印线程id_一文讲透“进程,线程和协程”

    一文讲透"进程,线程和协程" 本文从操作系统原理出发结合代码实践讲解了以下内容: 什么是进程,线程和协程? 它们之间的关系是什么? 为什么说Python中的多线程是伪多线程? 不同 ...

  3. 10自带sftp服务器_一文讲透FTP和SFTP的区别

    阅读本文约需要10分钟,您可以先关注我们或收藏本文,避免下次无法找到. FTP和SFTP都是文件传输协议,我们知道FTP使用的是20和21端口,SFTP使用的是22端口.另外,SFTP前面的S应该是S ...

  4. 双线macd指标参数最佳设置_一文讲透双线MACD指标及其实战运用

    原标题:一文讲透双线MACD指标及其实战运用 船长的舍得交易体系技术理论模型中,我们要用到两大指标,分别是均线系统和双线MACD指标. 很多小伙伴都喜欢用双线MACD这个指标,但是90%的人都不知道其 ...

  5. 【敏捷开发】一文讲透敏捷管理中的DoR、DoD与AC

    文章目录 一.需求侧:DoR 案例: DoR是什么? 如何建立DoR的标准? DoR样例 1.需求 2.交互 3.架构 二.研发侧:DoD DoD是什么? 如何建立DoD的标准? DoD样例 三.用户 ...

  6. 一文讲透『大神修炼心法』!35岁让自己过的越来越好!

    Cocos 的老铁,如果你这几天没有被麒麟子给卷到?那说明你还没有真正进入 Cocos 圈子里来.为什么这么说呢?看下面. 3月1号 23:57 | 2800+字 麒麟子全方位解读 Cocos Cyb ...

  7. 一文讲透植物内生菌研究怎么做 | 微生物专题

    内容导览 1. 隐秘而强大的植物内生菌 2. 难以区分的植物内生菌 3. 更好的植物内生细菌测序方法 3.1 LNA-16S测序鉴定内生细菌原理 3.2 LNA-16S测序鉴定内生细菌占比高达99% ...

  8. cstring只获取到第一个数_一文讲透 Dubbo 负载均衡之最小活跃数算法

    (给ImportNew加星标,提高Java技能) 作者:why技术(本文来自作者投稿) 本文是对于Dubbo负载均衡策略之一的最小活跃数算法的详细分析.文中所示源码,没有特别标注的地方均为2.6.0版 ...

  9. itstime后面跟什么_一文讲透什么是引流

    这个问题老生常谈,都快腻了,还是有人时不时问老马.究其原因,很多人从想做引流.到动手操作,整个流程都是懵逼的状态. 引流不难,难的是一直卡在某个阶段,或者一直停留在那里.这样,你做再多次引流,还是患得 ...

最新文章

  1. day22 案例 发送邮箱激活码 购物车 分析
  2. nodejs中服务器返回响应信息中的中文乱码
  3. ISCC2021 美人计
  4. 六耳猕猴并不存在,真假猴王的六耳原来是他!
  5. 王彪-20162321《程序设计与数据结构2nd》-第十一周学习总结与实验报告
  6. [Reinforcement Learning] Value Function Approximation
  7. sqlplus登录、连接命令
  8. 使用selenium搭建网站自动化测试框架及selenium简介
  9. Linux安装Tomcat-Nginx-FastDFS-Redis-Solr-集群——【第十集之Nginx反向代理原理】(有参考其他文章)...
  10. 身份密匙~迷惑的要点——大盘点
  11. Configure your new Fedora
  12. [转自:https://www.cnblogs.com/dskin/p/4606293.html] C# Winform实现炫酷的透明动画界面 做过.NET Winform窗体美化的人应该都很熟悉U
  13. 基于微信小程序的图书馆管理系统设计与实现(论文+程序设计源码+数据库文件)
  14. python验证码生成器_用Python实现随机验证码
  15. WPS、Excel对大量数据进行统计公式计算
  16. office2016 Excel 打开“宏”分三步
  17. 《前端框架Vue.js》
  18. JMS createSession(false, Session.AUTO_ACKNOWLEDGE); 两个参数不同组合下的区别
  19. Me_STM32学习笔记
  20. BlackArch---让你有电影中黑客的感觉

热门文章

  1. 柔性力控打磨工具,机器人打磨抛光“低调功臣”
  2. Stata | 回归分析(esttab)
  3. PyQt5实例 画板小程序
  4. 关于对《伯乐在线》的回应
  5. 处理vue-element-admin中三级菜单keep-alive页面无法缓存
  6. 教你不花一分钱,用十分钟把旧电脑打造成自己的Windows版NAS系统
  7. (swing读书笔记)JTree简介(上)
  8. Ceph cache tier 中 flush 和 evict 机制源码分析
  9. 华为鸿蒙wifi认证,华为已注册“华为鸿蒙”商标 华为回应被WiFi联盟和SD卡协会除名...
  10. c语言程序设计 第三版 哈工大,C语言程序设计_哈工大(3):选择结构程序设计.pdf...