一. 前言

Container启动命令是由各个ApplicationMaster通过RPC函数ContainerManagementProtocol#startContainer向NodeManager发起的,NodeManager中的ContainerManager组件(组件实现为ContainerManagerImpl) 负责接收并处理该请求。

二. 协议

应用程序的ApplicationMaster通过ContainerManagementProtocol 协议向NodeManager发起针对Container的相关操作, 包括启动Container、 杀死Container、 获取Container执行状态等。 在该协议中,ApplicationMaster扮演RPC Client的角色, 而NodeManager扮演RPC Server的角色( 由内部
组件ContainerManager实现) , 换句话说, NodeManager与ApplicationMaster之间采用了“push模型”, ApplicationMaster可以将Container相关操作第一时间告诉NodeManager, 相比于“pull模型”, 可大大降低时间延迟。

ContainerManagementProtocol:

AM与NM之间的协议, AM通过该RPC要求NM启动或者停止Container, 获取各个Container的使用状态等信息。

方法名称 描述
startContainers 启动容器
stopContainers 停止容器
getContainerStatuses 获取容器状态
increaseContainersResource [废弃] 增加容器资源
updateContainer 更新容器
signalToContainer 发送信号
localize 本地化容器所需的资源,目前,此API仅适用于运行容器
reInitializeContainer 使用新的 Launch Context 初始化容器
restartContainer 重新启动容器
rollbackLastReInitialization 尝试回滚最后一次重新初始化操作
commitLastReInitialization 尝试提交最后一次初始化操作,如果提交成功则不可以回滚.

ContainerManagementProtocol协议主要提供了以下三个RPC函数 :
startContainer: ApplicationMaster通过该RPC要求NodeManager启动一个Container。该函数有一个StartContainerRequest类型的参数, 封装了Container启动所需的本地资源、 环境变量、 执行命令、 Token等信息。 如果Container启动成功, 则该函数返回一个StartContainerResponse对象。

stopContainer: ApplicationMaster通过该RPC要求NodeManager停止( 杀死) 一个Container。 该函数有一个StopContainerRequest类型的参数, 用于指定待杀死的ContainerID。 如果Container被成功杀死, 则该函数返回一个StopContainer-Response对象。

getContainerStatus: ApplicationMaster通过该RPC获取一个Container的运行状态。 该函数参数类型为GetContainerStatusRequest, 封装了目标Container的ID, 返回值为封装了Container当前运行状态的类型为GetContainerStatusResponse的对象。

三. 属性

/*** 等待应用程序关闭的额外时间。* Extra duration to wait for applications to be killed on shutdown.*/private static final int SHUTDOWN_CLEANUP_SLOP_MS = 1000;private static final Logger LOG =  LoggerFactory.getLogger(ContainerManagerImpl.class);public static final String INVALID_NMTOKEN_MSG = "Invalid NMToken";static final String INVALID_CONTAINERTOKEN_MSG =  "Invalid ContainerToken";// 上下文环境信息protected final Context context;// 容器监控private final ContainersMonitor containersMonitor;// 服务private Server server;// 资源本地化服务private final ResourceLocalizationService rsrcLocalizationSrvc;// 容器启动器private final AbstractContainersLauncher containersLauncher;// 辅助服务private final AuxServices auxiliaryServices;// 度量信息private final NodeManagerMetrics metrics;//节点状态更新程序protected final NodeStatusUpdater nodeStatusUpdater;// 本地目录处理程序protected LocalDirsHandlerService dirsHandler;// 异步调度器protected final AsyncDispatcher dispatcher;// 清理服务private final DeletionService deletionService;// 日志服务private LogHandler logHandler;// 服务是否停止标志.private boolean serviceStopped = false;// 读锁private final ReadLock readLock;// 写锁private final WriteLock writeLock;// AMRM 代理服务private AMRMProxyService amrmProxyService;//是否启用 AMRM 代理服务protected boolean amrmProxyEnabled = false;// Container 调度器private final ContainerScheduler containerScheduler;// 等待Container停止毫秒数private long waitForContainersOnShutdownMillis;// 仅当启用了timeline service v.2时,才会设置NM metrics publisher// NM metrics publisher is set only if the timeline service v.2 is enabledprivate NMTimelinePublisher nmMetricsPublisher;private boolean timelineServiceV2Enabled;

四. 构造方法

ContainerManagerImpl 的构造方法是NodeManager在执行serviceInit时建立的.

public ContainerManagerImpl(Context context, ContainerExecutor exec,DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {super(ContainerManagerImpl.class.getName());this.context = context;// Service org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService in state org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService: NOTINITEDthis.dirsHandler = dirsHandler;// ContainerManager level dispatcher.dispatcher = new AsyncDispatcher("NM ContainerManager dispatcher");// Service org.apache.hadoop.yarn.server.nodemanager.DeletionService in state org.apache.hadoop.yarn.server.nodemanager.DeletionService: NOTINITEDthis.deletionService = deletionContext;this.metrics = metrics;// Service org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService in state org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService: NOTINITEDrsrcLocalizationSrvc =  createResourceLocalizationService(exec, deletionContext, context,   metrics);addService(rsrcLocalizationSrvc);// Service containers-launcher in state containers-launcher: NOTINITEDcontainersLauncher = createContainersLauncher(context, exec);addService(containersLauncher);this.nodeStatusUpdater = nodeStatusUpdater;// Service org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler in state org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler: NOTINITEDthis.containerScheduler = createContainerScheduler(context);addService(containerScheduler);AuxiliaryLocalPathHandler auxiliaryLocalPathHandler =new AuxiliaryLocalPathHandlerImpl(dirsHandler);// Start configurable servicesauxiliaryServices = new AuxServices(auxiliaryLocalPathHandler,this.context, this.deletionService);auxiliaryServices.registerServiceListener(this);addService(auxiliaryServices);// initialize the metrics publisher if the timeline service v.2 is enabled// and the system publisher is enabledConfiguration conf = context.getConf();if (YarnConfiguration.timelineServiceV2Enabled(conf)) {if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {LOG.info("YARN system metrics publishing service is enabled");nmMetricsPublisher = createNMTimelinePublisher(context);context.setNMTimelinePublisher(nmMetricsPublisher);}this.timelineServiceV2Enabled = true;}this.containersMonitor = createContainersMonitor(exec);addService(this.containersMonitor);dispatcher.register(ContainerEventType.class,new ContainerEventDispatcher());dispatcher.register(ApplicationEventType.class,createApplicationEventDispatcher());dispatcher.register(LocalizationEventType.class,new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc,nmMetricsPublisher));dispatcher.register(AuxServicesEventType.class, auxiliaryServices);dispatcher.register(ContainersMonitorEventType.class, containersMonitor);dispatcher.register(ContainersLauncherEventType.class, containersLauncher);dispatcher.register(ContainerSchedulerEventType.class, containerScheduler);addService(dispatcher);ReentrantReadWriteLock lock = new ReentrantReadWriteLock();this.readLock = lock.readLock();this.writeLock = lock.writeLock();}

五.serviceInit

@Overridepublic void serviceInit(Configuration conf) throws Exception {logHandler =createLogHandler(conf, this.context, this.deletionService);addIfService(logHandler);dispatcher.register(LogHandlerEventType.class, logHandler);// add the shared cache upload service (it will do nothing if the shared cache is disabled)// Service org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService in state org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService: NOTINITEDSharedCacheUploadService sharedCacheUploader = createSharedCacheUploaderService();addService(sharedCacheUploader);dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);createAMRMProxyService(conf);// waitForContainersOnShutdownMillis : 6250waitForContainersOnShutdownMillis =conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +SHUTDOWN_CLEANUP_SLOP_MS;super.serviceInit(conf);recover();}

六.serviceStart

 @Overrideprotected void serviceStart() throws Exception {// Enqueue user dirs in deletion contextConfiguration conf = getConfig();//  0.0.0.0/0.0.0.0:0final InetSocketAddress initialAddress = conf.getSocketAddr(YarnConfiguration.NM_BIND_HOST,YarnConfiguration.NM_ADDRESS,YarnConfiguration.DEFAULT_NM_ADDRESS,YarnConfiguration.DEFAULT_NM_PORT);boolean usingEphemeralPort = (initialAddress.getPort() == 0);if (context.getNMStateStore().canRecover() && usingEphemeralPort) {throw new IllegalArgumentException("Cannot support recovery with an "+ "ephemeral server port. Check the setting of "+ YarnConfiguration.NM_ADDRESS);}// If recovering then delay opening the RPC service until the recovery// of resources and containers have completed, otherwise requests from// clients during recovery can interfere with the recovery process.// falsefinal boolean delayedRpcServerStart =    context.getNMStateStore().canRecover();Configuration serverConf = new Configuration(conf);// always enforce it to be token-based.serverConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,SaslRpcServer.AuthMethod.TOKEN.toString());YarnRPC rpc = YarnRPC.create(conf);// ProtobufRpcEngineserver =rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, serverConf, this.context.getNMTokenSecretManager(),conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));// Enable service authorization?if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {refreshServiceAcls(conf, NMPolicyProvider.getInstance());}String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST);String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS);String hostOverride = null;if (bindHost != null && !bindHost.isEmpty()&& nmAddress != null && !nmAddress.isEmpty()) {//a bind-host case with an address, to support overriding the first//hostname found when querying for our hostname with the specified//address, combine the specified address with the actual port listened//on by the serverhostOverride = nmAddress.split(":")[0];}// setup node IDInetSocketAddress connectAddress;if (delayedRpcServerStart) {connectAddress = NetUtils.getConnectAddress(initialAddress);} else {server.start();connectAddress = NetUtils.getConnectAddress(server);}NodeId nodeId = buildNodeId(connectAddress, hostOverride);  // boyi-pro.lan:54950((NodeManager.NMContext)context).setNodeId(nodeId);this.context.getNMTokenSecretManager().setNodeId(nodeId);this.context.getContainerTokenSecretManager().setNodeId(nodeId);// start remaining servicessuper.serviceStart();if (delayedRpcServerStart) {waitForRecoveredContainers();server.start();// check that the node ID is as previously advertisedconnectAddress = NetUtils.getConnectAddress(server);NodeId serverNode = buildNodeId(connectAddress, hostOverride);if (!serverNode.equals(nodeId)) {throw new IOException("Node mismatch after server started, expected '"+ nodeId + "' but found '" + serverNode + "'");}}LOG.info("ContainerManager started at " + connectAddress);LOG.info("ContainerManager bound to " + initialAddress);}

七.startContainers

根据请求, 在NodeManager 中启动的containers

/*** Start a list of containers on this NodeManager.*/@Overridepublic StartContainersResponse startContainers(  StartContainersRequest requests) throws YarnException, IOException {UserGroupInformation remoteUgi = getRemoteUgi();NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);authorizeUser(remoteUgi, nmTokenIdentifier);//成功 succeededContainersList<ContainerId> succeededContainers = new ArrayList<ContainerId>();// 失败 failedContainersMap<ContainerId, SerializedException> failedContainers = new HashMap<ContainerId, SerializedException>();// Synchronize with NodeStatusUpdaterImpl#registerWithRM// to avoid race condition during NM-RM resync (due to RM restart) while a// container is being started, in particular when the container has not yet// been added to the containers map in NMContext.synchronized (this.context) {// 获取启动Container请求for (StartContainerRequest request : requests.getStartContainerRequests()) {ContainerId containerId = null;try {// 权限token校验 & 鉴权if (request.getContainerToken() == null|| request.getContainerToken().getIdentifier() == null) {throw new IOException(INVALID_CONTAINERTOKEN_MSG);}ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());verifyAndGetContainerTokenIdentifier(request.getContainerToken(),containerTokenIdentifier);containerId = containerTokenIdentifier.getContainerID();// 仅当容器的类型为AM并且启用了AMRMProxy服务时,才初始化AMRMProxy服务实例// Initialize the AMRMProxy service instance only if the container is of// type AM and if the AMRMProxy service is enabledif (amrmProxyEnabled && containerTokenIdentifier.getContainerType().equals(ContainerType.APPLICATION_MASTER)) {this.getAMRMProxyService().processApplicationStartRequest(request);}// 执行Container启动前检查performContainerPreStartChecks(nmTokenIdentifier, request, containerTokenIdentifier);// 启动ContainerstartContainerInternal(containerTokenIdentifier, request);// 构建启动成功的containerIdsucceededContainers.add(containerId);} catch (YarnException e) {// 构建启动失败的信息的containerIdfailedContainers.put(containerId, SerializedException.newInstance(e));} catch (InvalidToken ie) {// 构建启动失败的信息的containerIdfailedContainers .put(containerId, SerializedException.newInstance(ie));throw ie;} catch (IOException e) {throw RPCUtil.getRemoteException(e);}}// 返回响应数据return StartContainersResponse.newInstance(getAuxServiceMetaData(), succeededContainers, failedContainers);}}

八. startContainerInternal

构建 ContainerImpl ,

@SuppressWarnings("unchecked")protected void startContainerInternal( ContainerTokenIdentifier containerTokenIdentifier, StartContainerRequest request) throws YarnException, IOException {// 获取containerIdContainerId containerId = containerTokenIdentifier.getContainerID();// 获取containerId 的字符串String containerIdStr = containerId.toString();// 获取提交人String user = containerTokenIdentifier.getApplicationSubmitter();LOG.info("Start request for " + containerIdStr + " by user " + user);ContainerLaunchContext launchContext = request.getContainerLaunchContext();// 本地资源的健全性检查// Sanity check for local resourcesfor (Map.Entry<String, LocalResource> rsrc : launchContext.getLocalResources().entrySet()) {if (rsrc.getValue() == null || rsrc.getValue().getResource() == null) {throw new YarnException("Null resource URL for local resource " + rsrc.getKey() + " : " + rsrc.getValue());} else if (rsrc.getValue().getType() == null) {throw new YarnException("Null resource type for local resource " + rsrc.getKey() + " : " + rsrc.getValue());} else if (rsrc.getValue().getVisibility() == null) {throw new YarnException("Null resource visibility for local resource " + rsrc.getKey() + " : " + rsrc.getValue());}}// 获取凭证信息Credentials credentials = YarnServerSecurityUtils.parseCredentials(launchContext);// Container 启动时间long containerStartTime = SystemClock.getInstance().getTime();// 构建 ContainerImplContainer container = new ContainerImpl(getConfig(), this.dispatcher,launchContext, credentials, metrics, containerTokenIdentifier,context, containerStartTime);//获取ApplicationIdApplicationId applicationID =  containerId.getApplicationAttemptId().getApplicationId();if (context.getContainers().putIfAbsent(containerId, container) != null) {NMAuditLogger.logFailure(user, AuditConstants.START_CONTAINER,"ContainerManagerImpl", "Container already running on this node!",applicationID, containerId);throw RPCUtil.getRemoteException("Container " + containerIdStr+ " already is running on this node!!");}this.readLock.lock();try {if (!isServiceStopped()) {// 应用程序是否包含applicationIDif (!context.getApplications().containsKey(applicationID)) {// 在这里代表第一次启动// Create the application// populate the flow context from the launch context if the timeline// service v.2 is enabled// 如果 timeline 服务v2 启用的话, 从launch上下文中构建flow contextFlowContext flowContext =  getFlowContext(launchContext, applicationID);// 构建ApplicationApplication application = new ApplicationImpl(dispatcher, user, flowContext, applicationID, credentials, context);// 添加 application 信息if (context.getApplications().putIfAbsent(applicationID, application) == null) {LOG.info("Creating a new application reference for app "  + applicationID);LogAggregationContext logAggregationContext = containerTokenIdentifier.getLogAggregationContext();Map<ApplicationAccessType, String> appAcls =  container.getLaunchContext().getApplicationACLs();// 存储Applicationcontext.getNMStateStore().storeApplication(applicationID, buildAppProto(applicationID, user, credentials, appAcls, logAggregationContext, flowContext));// 处理Application init 时间dispatcher.getEventHandler().handle(new ApplicationInitEvent(applicationID, appAcls, logAggregationContext));}} else if (containerTokenIdentifier.getContainerType() == ContainerType.APPLICATION_MASTER) {// Application 已经启动了.// 根据已有的launchContext和applicationID 构建 FlowContextFlowContext flowContext =  getFlowContext(launchContext, applicationID);if (flowContext != null) {// 获取 ApplicationImplApplicationImpl application = (ApplicationImpl) context.getApplications().get(applicationID);// update flowContext reference in ApplicationImplapplication.setFlowContext(flowContext);// Required to update state store for recovery.// 更新存储context.getNMStateStore().storeApplication(applicationID,buildAppProto(applicationID, user, credentials,container.getLaunchContext().getApplicationACLs(),containerTokenIdentifier.getLogAggregationContext(),flowContext));LOG.info("Updated application reference with flowContext " + flowContext+ " for app " + applicationID);} else {LOG.info("TimelineService V2.0 is not enabled. Skipping updating "+ "flowContext for application " + applicationID);}}// 存储 container 信息this.context.getNMStateStore().storeContainer(containerId,containerTokenIdentifier.getVersion(), containerStartTime, request);// 处理 ApplicationContainerInitEvent 事件dispatcher.getEventHandler().handle(      new ApplicationContainerInitEvent(container));// token鉴权this.context.getContainerTokenSecretManager().startContainerSuccessful( containerTokenIdentifier);// 审计记录日志NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER, "ContainerManageImpl", applicationID, containerId);// TODO//  launchedContainer misplaced ->不一定意味着容器启动。//  完成的Application不会启动containers。//  launchedContainer misplaced -> doesn't necessarily mean a container launch.//// A finished Application will not launch containers.metrics.launchedContainer();metrics.allocateContainer(containerTokenIdentifier.getResource());} else {throw new YarnException("Container start failed as the NodeManager is " +"in the process of shutting down");}} finally {this.readLock.unlock();}}

九.handle

FINISH_APPS : Application kill 通过 shutdown 或者 ResourceManager kill
FINISH_CONTAINERS : KILLED_BY_RESOURCEMANAGER
UPDATE_CONTAINERS : 更新 Container
SIGNAL_CONTAINERS : 向 Container 发送信息

  @SuppressWarnings("unchecked")@Overridepublic void handle(ContainerManagerEvent event) {switch (event.getType()) {// 完成 applicationcase FINISH_APPS:CMgrCompletedAppsEvent appsFinishedEvent = (CMgrCompletedAppsEvent) event;for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) {Application app = this.context.getApplications().get(appID);if (app == null) {LOG.info("couldn't find application " + appID + " while processing"+ " FINISH_APPS event. The ResourceManager allocated resources"+ " for this application to the NodeManager but no active"+ " containers were found to process.");continue;}boolean shouldDropEvent = false;// 获取application引用的Containerfor (Container container : app.getContainers().values()) {if (container.isRecovering()) {LOG.info("drop FINISH_APPS event to " + appID + " because "+ "container " + container.getContainerId()+ " is recovering");shouldDropEvent = true;break;}}if (shouldDropEvent) {continue;}String diagnostic = "";if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) {// killdiagnostic = "Application killed on shutdown";} else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {diagnostic = "Application killed by ResourceManager";}// 处理完成事件this.dispatcher.getEventHandler().handle(  new ApplicationFinishEvent(appID,   diagnostic));}break;case FINISH_CONTAINERS:CMgrCompletedContainersEvent containersFinishedEvent = (CMgrCompletedContainersEvent) event;// 获取需要清理的 ContainerIdfor (ContainerId containerId : containersFinishedEvent .getContainersToCleanup()) {// 获取 Application 并验证Application 是不是坏了ApplicationId appId =containerId.getApplicationAttemptId().getApplicationId();Application app = this.context.getApplications().get(appId);if (app == null) {LOG.warn("couldn't find app " + appId + " while processing"+ " FINISH_CONTAINERS event");continue;}// 获取 ContainerContainer container = app.getContainers().get(containerId);if (container == null) {LOG.warn("couldn't find container " + containerId+ " while processing FINISH_CONTAINERS event");continue;}if (container.isRecovering()) {LOG.info("drop FINISH_CONTAINERS event to " + containerId+ " because container is recovering");continue;}// 处理 Containe Kill 事件this.dispatcher.getEventHandler().handle(new ContainerKillEvent(containerId,ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,"Container Killed by ResourceManager"));}break;case UPDATE_CONTAINERS:CMgrUpdateContainersEvent containersDecreasedEvent = (CMgrUpdateContainersEvent) event;for (org.apache.hadoop.yarn.api.records.Container container  : containersDecreasedEvent.getContainersToUpdate()) {try {// 构建鉴权体系ContainerTokenIdentifier containerTokenIdentifier =  BuilderUtils.newContainerTokenIdentifier( container.getContainerToken());// 更新 ContainerupdateContainerInternal(container.getId(),  containerTokenIdentifier);} catch (YarnException e) {LOG.error("Unable to decrease container resource", e);} catch (IOException e) {LOG.error("Unable to update container resource in store", e);}}break;case SIGNAL_CONTAINERS:// 向 Container 发送信息CMgrSignalContainersEvent containersSignalEvent =  (CMgrSignalContainersEvent) event;for (SignalContainerRequest request : containersSignalEvent.getContainersToSignal()) {internalSignalToContainer(request, "ResourceManager");}break;default:throw new YarnRuntimeException("Got an unknown ContainerManagerEvent type: " + event.getType());}}

十.updateContainerInternal

@SuppressWarnings("unchecked")private void updateContainerInternal(ContainerId containerId,ContainerTokenIdentifier containerTokenIdentifier)throws YarnException, IOException {// 获取容器Container container = context.getContainers().get(containerId);// Check container existence// 检查 container 是否存在if (container == null) {if (nodeStatusUpdater.isContainerRecentlyStopped(containerId)) {throw RPCUtil.getRemoteException("Container " + containerId.toString()+ " was recently stopped on node manager.");} else {throw RPCUtil.getRemoteException("Container " + containerId.toString()+ " is not handled by this NodeManager");}}// 检查版本// Check container version.int currentVersion = container.getContainerTokenIdentifier().getVersion();if (containerTokenIdentifier.getVersion() <= currentVersion) {throw RPCUtil.getRemoteException("Container " + containerId.toString()+ " has update version [" + currentVersion + "] >= requested version"+ " [" + containerTokenIdentifier.getVersion() + "]");}// 检查目标资源的有效性。// Check validity of the target resource.Resource currentResource = container.getResource();ExecutionType currentExecType = container.getContainerTokenIdentifier().getExecutionType();boolean isResourceChange = false;boolean isExecTypeUpdate = false;Resource targetResource = containerTokenIdentifier.getResource();ExecutionType targetExecType = containerTokenIdentifier.getExecutionType();// // Is true if either the resources has increased or execution type updated from opportunistic to guaranteedboolean isIncrease = false;if (!currentResource.equals(targetResource)) {isResourceChange = true;isIncrease = Resources.fitsIn(currentResource, targetResource)&& !Resources.fitsIn(targetResource, currentResource);} else if (!currentExecType.equals(targetExecType)) {isExecTypeUpdate = true;isIncrease = currentExecType == ExecutionType.OPPORTUNISTIC &&targetExecType == ExecutionType.GUARANTEED;}if (isIncrease) {org.apache.hadoop.yarn.api.records.Container increasedContainer = null;if (isResourceChange) {increasedContainer =org.apache.hadoop.yarn.api.records.Container.newInstance(containerId, null, null, targetResource, null, null,currentExecType);if (context.getIncreasedContainers().putIfAbsent(containerId,increasedContainer) != null){throw RPCUtil.getRemoteException("Container " + containerId.toString()+ " resource is being increased -or- " +"is undergoing ExecutionType promoted.");}}}this.readLock.lock();try {if (!serviceStopped) {// Dispatch message to Container to actually make the change.// 处理UpdateContainerTokenEvent 事件dispatcher.getEventHandler().handle(new UpdateContainerTokenEvent(container.getContainerId(), containerTokenIdentifier,isResourceChange, isExecTypeUpdate, isIncrease));} else {throw new YarnException("Unable to change container resource as the NodeManager is "+ "in the process of shutting down");}} finally {this.readLock.unlock();}}

十一.internalSignalToContainer

@SuppressWarnings("unchecked")private void internalSignalToContainer(SignalContainerRequest request,String sentBy) {// 获取容器ContainerId containerId = request.getContainerId();Container container = this.context.getContainers().get(containerId);if (container != null) {LOG.info(containerId + " signal request " + request.getCommand()+ " by " + sentBy);// 发送通知事件this.dispatcher.getEventHandler().handle(new SignalContainersLauncherEvent(container,request.getCommand()));} else {LOG.info("Container " + containerId + " no longer exists");}}

Hadoop3.2.1 【 YARN 】源码分析 : ContainerManager浅析相关推荐

  1. YARN源码分析(一)-----ApplicationMaster

    前言 在之前两周主要学了HDFS中的一些模块知识,其中的许多都或多或少有我们借鉴学习的地方,现在将目光转向另外一个块,被誉为MRv2,就是yarn,在Yarn中,解决了MR中JobTracker单点的 ...

  2. YARN源码分析—AM-RM通信协议,获得资源

    在上几篇博文中分析了YARN调度模拟器SLS的源码,重点分析了AM与RM之间的通信协议. 接下来分析在YARN项目中,AM-RM通信如何实现的. 注意点:在YARN中,真正已经实现的只有RM和NM,而 ...

  3. Hadoop3.2.1 【 YARN 】源码分析 : LinuxContainerExecutor 浅析 [ 一 ]

    一 .前言 而LinuxContainerExecutor则以应用程序拥有者的身份启动和停止Container, 因此更加安全, 此外, LinuxContainerExecutor允许用户通过Cgr ...

  4. Hadoop 3.2.1 【 YARN 】源码分析 : DefaultContainerExecutor 浅析

    一 .前言 DefaultContainerExecuter 类提供通用的container 执行服务. 负责启动Container . 是默认实现, 未提供任何权安全措施, 它以NodeManage ...

  5. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  6. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  7. spark读取文件源码分析-1

    文章目录 1. 问题背景 2. 测试代码 3. 生成的DAG图 1. job0 2. job1 4. job0 产生的时机源码分析 1. 调用DataFrameReader.load,DataFram ...

  8. spark 调度模块详解及源码分析

    spark 调度模块详解及源码分析 @(SPARK)[spark] spark 调度模块详解及源码分析 一概述 一三个主要的类 1class DAGScheduler 2trait TaskSched ...

  9. 深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析

    我们讲到了如何启动Master和Worker,还讲到了如何回收资源.但是,我们没有将AppClient是如何启动的,其实它们的启动也涉及到了资源是如何调度的.这篇博文,我们就来讲一下AppClient ...

最新文章

  1. 助力南京银行打造国内首个分布式核心业务系统
  2. cs文件中控制页面table的大小、title名称(.aspx)
  3. 7-2 城市间紧急救援 (25 分)
  4. AES算法重点详解和实现
  5. 内网击穿之 HTTP 穿透:网站没上线?如何让全世界的人都可以访问你本地的网站?
  6. mpython掌控板作品_第1课 Arduino micro:bit 掌控板 创客教育常用的3类主控板
  7. 2017二级c语言选择题,2017年9月计算机二级C语言考试选择题
  8. 误删除Linux中libc.so.6文件急救办法
  9. python 遍历文件夹
  10. AngularJS内置指令 ng-xxx
  11. 为什么选择Netty作为基础通信框架?
  12. Python 类和实例
  13. 基于RV1126平台imx291分析 --- mipi csi 注册
  14. Python时间序列分析指南!
  15. 有什么APP软件可以测试耳环,心理测试选择自己喜欢的耳环,测试自己最招桃花的地方...
  16. ffmpeg滤镜调整颜色明艳和亮度
  17. 使用Struts2 开发一个简易的《B2C电子商务网站》 。续集(适合新手)
  18. 用MOBA游戏的方式来评估候选人实力
  19. (西安电子科技大学,汤小丹等)计算机操作系统——第二章、进程的描述与控制
  20. 赛尔102S助力云南开展2020年白马雪山国家级自然保护区低空无人机生态监测

热门文章

  1. 联想小新310电脑Fn功能键切换
  2. mac-怎么查询mac苹果电脑的保修期
  3. OTB和VOT的评估指标
  4. 巨富笔记:上者劳人,中者劳智,下者劳力
  5. ARM汇编伪指令之word
  6. 流水灯c语言程序tm,51单片机LED流水灯课程设计任务书+论文
  7. 智能小车52单片机c语言,基于单片机的循迹避障智能小车设计文档
  8. java中上传本地图片
  9. ​Python:PyTorch 汽车计数示例
  10. 邮件附件批量下载小程序