1.概述

转载并且补充:http://www.qishunwang.net/news_show_82511.aspx

2.SlotPool

2.1. 介绍

SlotPool 是JobMaster用于管理slot的pool . 是一个接口类, 定义了相关slot的管理操作…


主要有如下方法

2.1.1. 生命周期相关接口

接口 含义
start 启动
suspend 挂起
close 关闭

2.1.2 resource manager 连接相关

接口 含义
connectToResourceManager 与ResourceManager建立连接
disconnectResourceManager 关闭ResourceManager连接
registerTaskManager 通过给定的ResourceId 注册一个TaskExecutor
releaseTaskManager 释放TaskExecutor

2.1.3 Slot操作相关

接口 含义
offerSlots 释放slot
failAllocation 根据给定的allocation id 标识slot为失败
getAvailableSlotsInformation 获取当前可用的slots 信息.
getAllocatedSlotsInformation 获取所有的slot信息
allocateAvailableSlot 在给定的 request id 下使用给定的 allocation id 分配可用的slot。
如果没有具有给定分配id的插槽可用,则此方法返回{@code null}。
requestNewAllocatedSlot 从resource manager 请求分配新slot。
此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。
requestNewAllocatedBatchSlot 从 resource manager 请求分配新的批处理slot
与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。
此外,它不会对来自资源管理器的故障信号做出反应。
disableBatchSlotRequestTimeoutCheck 禁用批处理slot请求超时检查。
当其他人要接管超时检查职责时调用。
createAllocatedSlotReport 创建有关属于指定 task manager 的已分配slot的报告。

3.SlotPoolImpl 实现类

SlotPoolImpl 是SlotPool接口的实现类.

slot pool为{@link ExecutionGraph}发出的slot请求提供服务。

当它无法提供slot请求时,它将尝试从ResourceManager获取新的slot。

如果当前没有可用的ResourceManager,或者ResourceManager拒绝了它,或者请求超时,那么它将使slot请求失败。

slot pool还保存提供给它并被接受的所有slot,因此即使ResourceManager关闭,也可以提供注册的空闲slot。

slot只有在无用时才会释放,例如,当作业完全运行时,但我们仍有一些可用slot。

所有的分配或槽提供都将由自己生成的AllocationID标识,我们将使用它来消除歧义。

3.1. 属性

/** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level.* SlotPool在调试级别上写其槽分布的时间间隔(毫秒)。* */private static final long STATUS_LOG_INTERVAL_MS = 60_000;private final JobID jobId;/** All registered TaskManagers, slots will be accepted and used only if the resource is registered.* 仅当资源已注册时,才会接受和使用所有已注册的TaskManager、slot。* */private final HashSet<ResourceID> registeredTaskManagers;/** The book-keeping of all allocated slots.* //所有分配给当前 JobManager 的 slots* */private final AllocatedSlots allocatedSlots;/** The book-keeping of all available slots.* 所有可用的 slots(已经分配给该 JobManager,但还没有装载 payload)* */private final AvailableSlots availableSlots;/** All pending requests waiting for slots.* 所有处于等待状态的slot request(已经发送请求给 ResourceManager) 等待slot的所有挂起请求。* */private final DualKeyLinkedMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;/** The requests that are waiting for the resource manager to be connected.* 处于等待状态的 slot request (还没有发送请求给 ResourceManager,此时没有和 ResourceManager 建立连接)* 等待连接 resource manager 的请求。* */private final LinkedHashMap<SlotRequestId, PendingRequest> waitingForResourceManager;/** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor).* 外部请求调用超时(例如,到ResourceManager或TaskExecutor)。* */private final Time rpcTimeout;/** Timeout for releasing idle slots.* 释放空闲的slots超时时间* */private final Time idleSlotTimeout;/** Timeout for batch slot requests.* 批处理slot请求超时* */private final Time batchSlotTimeout;private final Clock clock;/** the fencing token of the job manager. */private JobMasterId jobMasterId;/** The gateway to communicate with resource manager. */private ResourceManagerGateway resourceManagerGateway;private String jobManagerAddress;// 组件主线程执行器private ComponentMainThreadExecutor componentMainThreadExecutor;

3.2 生命周期相关接口

接口 含义
start 启动
suspend 挂起
close 关闭

3.2.1 start方法

/*** Start the slot pool to accept RPC calls.** 启动slot池以接受RPC调用。** @param jobMasterId The necessary leader id for running the job.* @param newJobManagerAddress for the slot requests which are sent to the resource manager* @param componentMainThreadExecutor The main thread executor for the job master's main thread.*/public void start(@Nonnull JobMasterId jobMasterId,@Nonnull String newJobManagerAddress,@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {this.jobMasterId = jobMasterId;this.jobManagerAddress = newJobManagerAddress;this.componentMainThreadExecutor = componentMainThreadExecutor;// 超时相关操作scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);if (log.isDebugEnabled()) {scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);}}

3.2.2 suspend

/*** Suspends this pool, meaning it has lost its authority to accept and distribute slots.** 挂起此池,意味着它已失去接受和分发slot的权限。*/@Overridepublic void suspend() {componentMainThreadExecutor.assertRunningInMainThread();log.info("Suspending SlotPool.");// cancel all pending allocations --> we can request these slots// again after we regained the leadershipSet<AllocationID> allocationIds = pendingRequests.keySetB();for (AllocationID allocationId : allocationIds) {// resourceManagerGateway 取消 SlotRequest操作resourceManagerGateway.cancelSlotRequest(allocationId);}// do not accept any requestsjobMasterId = null;resourceManagerGateway = null;// Clear (but not release!) the available slots. The TaskManagers should re-register them// at the new leader JobManager/SlotPoolclear();}

3.2.3 close

@Overridepublic void close() {log.info("Stopping SlotPool.");// cancel all pending allocations// 取消挂起的SlotRequestsSet<AllocationID> allocationIds = pendingRequests.keySetB();for (AllocationID allocationId : allocationIds) {resourceManagerGateway.cancelSlotRequest(allocationId);}// 释放资源 通过释放相应的TaskExecutor来释放所有注册的插槽// release all registered slots by releasing the corresponding TaskExecutorsfor (ResourceID taskManagerResourceId : registeredTaskManagers) {final FlinkException cause = new FlinkException("Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool");releaseTaskManagerInternal(taskManagerResourceId, cause);}clear();}

3.3 resource manager 连接相关

接口 含义
connectToResourceManager 与ResourceManager建立连接
disconnectResourceManager 关闭ResourceManager连接
registerTaskManager 通过给定的ResourceId 注册一个TaskExecutor
releaseTaskManager 释放TaskExecutor

3.3.1 connectToResourceManager

/*** 与ResourceManager建立连接, 处理阻塞/挂起的请求…* @param resourceManagerGateway  The RPC gateway for the resource manager.*/@Overridepublic void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {this.resourceManagerGateway = checkNotNull(resourceManagerGateway);// 处理挂起的PendingRequest 请求.// work on all slots waiting for this connectionfor (PendingRequest pendingRequest : waitingForResourceManager.values()) {// 请求 RM / 获取资源requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);}// all sent offwaitingForResourceManager.clear();}

3.3.2 disconnectResourceManager

关闭ResourceManager 连接.

@Overridepublic void disconnectResourceManager() {this.resourceManagerGateway = null;}

3.3.3 registerTaskManager

/*** Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid.* Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.** @param resourceID The id of the TaskManager***                      将TaskManager注册到此 pool ,只有来自已注册TaskManager的slot才被视为有效。* 它还为我们提供了一种方法,使“dead”或“abnormal”任务管理者远离这个池*/@Overridepublic boolean registerTaskManager(final ResourceID resourceID) {componentMainThreadExecutor.assertRunningInMainThread();log.debug("Register new TaskExecutor {}.", resourceID);return registeredTaskManagers.add(resourceID);}

3.3.4 releaseTaskManager

/*** Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called* when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.* 从该池中注销TaskManager,将释放所有相关slot并取消任务。* 当我们发现某个TaskManager变得“dead”或“abnormal”,并且我们决定不再使用其中的slot时调用。* * @param resourceId The id of the TaskManager* @param cause for the releasing of the TaskManager*/@Overridepublic boolean releaseTaskManager(final ResourceID resourceId, final Exception cause) {componentMainThreadExecutor.assertRunningInMainThread();if (registeredTaskManagers.remove(resourceId)) {releaseTaskManagerInternal(resourceId, cause);return true;} else {return false;}}

3.4 Slot操作相关

接口 含义
offerSlots 消费slot
failAllocation 根据给定的allocation id 标识slot为失败
getAvailableSlotsInformation 获取当前可用的slots 信息.
getAllocatedSlotsInformation 获取所有的slot信息
allocateAvailableSlot 在给定的 request id 下使用给定的 allocation id 分配可用的slot。
如果没有具有给定分配id的插槽可用,则此方法返回{@code null}。
requestNewAllocatedSlot 从resource manager 请求分配新slot。
此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。
requestNewAllocatedBatchSlot 从 resource manager 请求分配新的批处理slot
与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。
此外,它不会对来自资源管理器的故障信号做出反应。
disableBatchSlotRequestTimeoutCheck 禁用批处理slot请求超时检查。
当其他人要接管超时检查职责时调用。
createAllocatedSlotReport 创建有关属于指定 task manager 的已分配slot的报告。

3.4.1 offerSlots

/*** 根据AllocationID , TaskExecutor 提供Slot** AllocationID最初由该 pool 生成,并通过ResourceManager传输到TaskManager** 我们用它来区分我们发行的不同分配。** 如果我们发现某个Slot不匹配或实际上没有等待此Slot的挂起请求(可能由其他返回的Slot完成),则Slot提供可能会被拒绝。** Slot offering by TaskExecutor with AllocationID. The AllocationID is originally generated by this pool and* transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation* we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending* request waiting for this slot (maybe fulfilled by some other returned slot).** @param taskManagerLocation location from where the offer comes from* @param taskManagerGateway TaskManager gateway* @param slotOffer the offered slot* @return True if we accept the offering*/boolean offerSlot(final TaskManagerLocation taskManagerLocation,final TaskManagerGateway taskManagerGateway,final SlotOffer slotOffer) {componentMainThreadExecutor.assertRunningInMainThread();// 检测 TaskManager是否有效// check if this TaskManager is validfinal ResourceID resourceID = taskManagerLocation.getResourceID();final AllocationID allocationID = slotOffer.getAllocationId();// 必须是已注册的TaskManagers 中的slotOfferif (!registeredTaskManagers.contains(resourceID)) {log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",slotOffer.getAllocationId(), taskManagerLocation);return false;}// 如果当前 slot 关联的 AllocationID 已经在 SlotPool 中出现   检查是否已使用此slot// check whether we have already using this slotAllocatedSlot existingSlot;if ((existingSlot = allocatedSlots.get(allocationID)) != null ||(existingSlot = availableSlots.get(allocationID)) != null) {// we need to figure out if this is a repeated offer for the exact same slot,// or another offer that comes from a different TaskManager after the ResourceManager// re-tried the request//  我们需要弄清楚这是对完全相同的slot的重复offer,//  还是在ResourceManager重新尝试请求后来自不同TaskManager的另一个offer// 我们用比较SlotID的方式来写这个,因为SlotIDD是 TaskManager上实际slot的标识符// we write this in terms of comparing slot IDs, because the Slot IDs are the identifiers of// the actual slots on the TaskManagers// Note: The slotOffer should have the SlotID// 获取已存在的SlotIDfinal SlotID existingSlotId = existingSlot.getSlotId();// 获取新的SlotIDfinal SlotID newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex());//这个 slot 在之前已经被 SlotPool 接受了,相当于 TaskExecutor 发送了一个重复的 offerif (existingSlotId.equals(newSlotId)) {log.info("Received repeated offer for slot [{}]. Ignoring.", allocationID);// return true here so that the sender will get a positive acknowledgement to the retry// and mark the offering as a successreturn true;} else {//已经有一个其他的 AllocatedSlot 和 这个 AllocationID 关联了,因此不能接受当前的这个 slot// the allocation has been fulfilled by another slot, reject the offer so the task executor// will offer the slot to the resource managerreturn false;}}// 到这里代表这个slot还没有人用过.//这个 slot 关联的 AllocationID 此前没有出现过//新建一个 AllocatedSlot 对象,表示新分配的 slotfinal AllocatedSlot allocatedSlot = new AllocatedSlot(allocationID,taskManagerLocation,slotOffer.getSlotIndex(),slotOffer.getResourceProfile(),taskManagerGateway);// 检查是否有一个 request 和 这个 AllocationID 关联// check whether we have request waiting for this slotPendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);if (pendingRequest != null) {// we were waiting for this!//有一个pending request 正在等待这个 slotallocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);//尝试去完成那个等待的请求if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) {// we could not complete the pending slot future --> try to fulfill another pending request//失败了allocatedSlots.remove(pendingRequest.getSlotRequestId());//尝试去满足其他在等待的请求,使用 slot 以请求的顺序完成挂起的请求tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);} else {log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID);}}else {//没有请求在等待这个slot,可能请求已经被满足了// we were actually not waiting for this://   - could be that this request had been fulfilled//   - we are receiving the slots from TaskManagers after becoming leaders//尝试去满足其他在等待的请求tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);}// we accepted the request in any case. slot will be released after it idled for// too long and timed out// 无论如何我么都接受了这个请求.// slot在空闲时间过长和超时后将被释放return true;}

tryFulfillSlotRequestOrMakeAvailable

/*** Tries to fulfill with the given allocated slot a pending slot request or add the* allocated slot to the set of available slots if no matching request is available.** 尝试使用给定的已分配slot完成挂起的slot请求,* 或者如果没有匹配的请求,则将已分配的slot归还到可用slot集。** @param allocatedSlot which shall be returned*/private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");//查找和当前 AllocatedSlot 的计算资源相匹配的还在等待的请求final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);if (pendingRequest != null) {//如果有匹配的请求,那么将 AllocatedSlot 分配给等待的请求log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]",pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());// 将当前分配的slot加入到已分配的allocatedSlots集合中, 标识已被使用.allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);// 回调请求,返回allocatedSlot 信息.  标识slot分配已经完成...pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);} else {//如果没有,那么这个 AllocatedSlot 变成 available 的// 没有可用的PendingRequest , 归还allocatedSlot .log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());availableSlots.add(allocatedSlot, clock.relativeTimeMillis());}}

3.4.2 failAllocation

@Overridepublic Optional<ResourceID> failAllocation(final AllocationID allocationID, final Exception cause) {componentMainThreadExecutor.assertRunningInMainThread();// 获取PendingRequestfinal PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);if (pendingRequest != null) {if (isBatchRequestAndFailureCanBeIgnored(pendingRequest, cause)) {// pending batch requests don't react to this signal --> put it backpendingRequests.put(pendingRequest.getSlotRequestId(), allocationID, pendingRequest);} else {// request was still pendingfailPendingRequest(pendingRequest, cause);}return Optional.empty();}else {return tryFailingAllocatedSlot(allocationID, cause);}// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase}

tryFailingAllocatedSlot

private Optional<ResourceID> tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) {// 获取分配失败的AllocatedSlotAllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID);if (allocatedSlot == null) {allocatedSlot = allocatedSlots.remove(allocationID);}if (allocatedSlot != null) {log.debug("Failed allocated slot [{}]: {}", allocationID, cause.getMessage());// notify TaskExecutor about the failure// 通知TaskExecutor 分配失败了..allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout);// release the slot.// since it is not in 'allocatedSlots' any more, it will be dropped o return'// 释放slot,并且将这个slot丢弃allocatedSlot.releasePayload(cause);final ResourceID taskManagerId = allocatedSlot.getTaskManagerId();if (!availableSlots.containsTaskManager(taskManagerId) && !allocatedSlots.containResource(taskManagerId)) {return Optional.of(taskManagerId);}}return Optional.empty();}

3.4.3 getAvailableSlotsInformation

获取可用的slot信息

/*** 列出当前可用的 slot* @return*/@Override@Nonnullpublic Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {final Map<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = availableSlots.getSlotsByTaskManager();final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsSlotsByTaskManager = allocatedSlots.getSlotsByTaskManager();return availableSlotsByTaskManager.entrySet().stream().flatMap(entry -> {final int numberAllocatedSlots = allocatedSlotsSlotsByTaskManager.getOrDefault(entry.getKey(), Collections.emptySet()).size();final int numberAvailableSlots = entry.getValue().size();final double taskExecutorUtilization = (double) numberAllocatedSlots / (numberAllocatedSlots + numberAvailableSlots);return entry.getValue().stream().map(slot -> SlotInfoWithUtilization.from(slot, taskExecutorUtilization));}).collect(Collectors.toList());}

3.4.4 getAllocatedSlotsInformation

获取所有已分配的solt信息

 private Collection<SlotInfo> getAllocatedSlotsInformation() {return allocatedSlots.listSlotInfo();}

3.4.5 allocateAvailableSlot

获取所有已有效的solt信息

 /*** 将 allocationID 关联的 slot 分配给 slotRequestId 对应的请求* @param slotRequestId identifying the requested slot* @param allocationID the allocation id of the requested available slot* @return*/@Overridepublic Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull SlotRequestId slotRequestId,@Nonnull AllocationID allocationID) {componentMainThreadExecutor.assertRunningInMainThread();//从 availableSlots 中移除AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID);if (allocatedSlot != null) {//加入已分配的映射关系中allocatedSlots.add(slotRequestId, allocatedSlot);return Optional.of(allocatedSlot);} else {return Optional.empty();}}

3.4.6 requestNewAllocatedSlot

从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。

/*** 向RM申请新的 slot* * 从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,* 而是将向该池添加一个新slot,该slot将立即分配并返回。** @param slotRequestId identifying the requested slot* @param resourceProfile resource profile that specifies the resource requirements for the requested slot* @param timeout timeout for the allocation procedure* @return*/@Nonnull@Overridepublic CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId,@Nonnull ResourceProfile resourceProfile,Time timeout) {componentMainThreadExecutor.assertRunningInMainThread();final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);// register request timeoutFutureUtils.orTimeout(pendingRequest.getAllocatedSlotFuture(),timeout.toMilliseconds(),TimeUnit.MILLISECONDS,componentMainThreadExecutor).whenComplete((AllocatedSlot ignored, Throwable throwable) -> {if (throwable instanceof TimeoutException) {timeoutPendingSlotRequest(slotRequestId);}});return requestNewAllocatedSlotInternal(pendingRequest).thenApply((Function.identity()));}

requestNewAllocatedSlotInternal

/**** 从RM中请求一个新的slot** Requests a new slot from the ResourceManager. If there is currently not ResourceManager* connected, then the request is stashed and send once a new ResourceManager is connected.** @param pendingRequest pending slot request* @return An {@link AllocatedSlot} future which is completed once the slot is offered to the {@link SlotPool}*/@Nonnullprivate CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) {if (resourceManagerGateway == null) {stashRequestWaitingForResourceManager(pendingRequest);} else {// 从RM中请求一个新的slotrequestSlotFromResourceManager(resourceManagerGateway, pendingRequest);}return pendingRequest.getAllocatedSlotFuture();}

3.4.7 requestNewAllocatedBatchSlot

从 resource manager 请求分配新的批处理slot 与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。 此外,它不会对来自资源管理器的故障信号做出反应。

@Nonnull@Overridepublic CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(@Nonnull SlotRequestId slotRequestId,@Nonnull ResourceProfile resourceProfile) {componentMainThreadExecutor.assertRunningInMainThread();final PendingRequest pendingRequest = PendingRequest.createBatchRequest(slotRequestId, resourceProfile);return requestNewAllocatedSlotInternal(pendingRequest).thenApply(Function.identity());}

3.4.8 disableBatchSlotRequestTimeoutCheck

禁用批处理slot请求超时检查。当其他人要接管超时检查职责时调用。

  @Overridepublic void disableBatchSlotRequestTimeoutCheck() {batchSlotRequestTimeoutCheckEnabled = false;}

3.4.9 createAllocatedSlotReport

创建有关属于指定 task manager 的已分配slot的报告。

/*** 创建有关属于指定 task manager 的已分配slot的报告。* @param taskManagerId identifies the task manager* @return*/@Overridepublic AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) {final Set<AllocatedSlot> availableSlotsForTaskManager = availableSlots.getSlotsForTaskManager(taskManagerId);final Set<AllocatedSlot> allocatedSlotsForTaskManager = allocatedSlots.getSlotsForTaskManager(taskManagerId);List<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<>(availableSlotsForTaskManager.size() + allocatedSlotsForTaskManager.size());for (AllocatedSlot allocatedSlot : Iterables.concat(availableSlotsForTaskManager, allocatedSlotsForTaskManager)) {allocatedSlotInfos.add(new AllocatedSlotInfo(allocatedSlot.getPhysicalSlotNumber(), allocatedSlot.getAllocationId()));}return new AllocatedSlotReport(jobId, allocatedSlotInfos);}

【Flink】Flink 资源相关 Slot SlotPool相关推荐

  1. flink 不设置水印_从0到1学习Flink—— Flink parallelism 和 Slot 介绍

    前言 之所以写这个是因为前段时间自己的项目出现过这样的一个问题: 1Caused by: akka.pattern.AskTimeoutException: 2Ask timed out on [Ac ...

  2. 【Flink】Flink Flink 1.14 新特性预览

    1.概述 转载:Flink 1.14 新特性预览 简介: 一文了解 Flink 1.14 版本新特性及最新进展 本文由社区志愿者陈政羽整理,内容源自阿里巴巴技术专家宋辛童 (五藏) 在 8 月 7 日 ...

  3. 大数据计算引擎之Flink Flink CEP复杂事件编程

    原文地址:大数据计算引擎之Flink Flink CEP复杂事件编程 复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利 ...

  4. Loadrunner-web资源相关图表

    web资源相关图表可以提供web服务器性能的相关信息. 1.每秒点击数(Hits per Second)图表可以通过该图标查看性能测试过程中每一秒内虚拟用户向web服务器发送的HTTP请求数.通常情况 ...

  5. 凌波微步Flink——Flink的技术逻辑与编程步骤剖析

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95459606 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  6. 福州印发四部数据资源相关管理办法,包括全国首部地级市数据开放管理办法...

    ▼金猿奖·2019年度征集评选▼ 大数据产业创新服务媒体 --聚焦数据 · 改变商业 近日,<福州市政务数据资源管理办法><福州市政务数据汇聚共享管理暂行办法><福州市政 ...

  7. 凌波微步Flink——Flink API中的一些基础概念

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95355619 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  8. 【Flink】 Flink 应用资源分配问题排查思路

    1.概述 转载:https://blog.csdn.net/weixin_43970890/article/details/102483720 如果 Flink 应用不能正常启动达到 RUNNING ...

  9. 大数据之flink共享资源槽

    算子链: 为方便执行,Flink将不同算子的子任务(subtask)链接到一个任务里,每一个任务在一个线程中执行.这是一个非常有用的优化方式,它减小了进程间数据交换和缓存的开销,而且在减少延迟同时增加 ...

最新文章

  1. Ubuntu16.04系统下汉字显示为方框解决办法(图文详解)
  2. 【iOS学习笔记】IOS开发中设置applicationIconBadgeNumber和消息推送
  3. [工具]Tomcat CVE-2017-12615 远程代码执行
  4. python序列类型-python序列类型有哪些
  5. 利用blink+MQ实现流计算中的超时统计问题
  6. LeetCode--27. 移除元素(双指针)
  7. ZABBIX 3.2.7 (源码包)安装部署
  8. 京东商品知识图谱,约10万商品品牌,约65万品牌销售关系
  9. Spark-生产案例
  10. 【现代版】为人处世三十六计详解,真的很受益!
  11. LabVIEW参考资料汇总
  12. Android 11 系统字体加载流程
  13. CAD线型设置:CAD软件中如何加粗曲线?
  14. wps中vbe6ext.olb不能被加载问题(附WPS2019宏下载)
  15. Ubuntu下将dmg文件转换成dcr和ISO文件
  16. springboot项目如何查看MP运行日志
  17. c++中fail函数
  18. php转行当保安,一个保安转行做头条号年入50万,他是怎么做到的?
  19. 【软考】--软考总结
  20. 01-空投Lynda

热门文章

  1. 天猫双11:1日到11日0点45分 382个品牌成交额超过1亿元
  2. 一加9RT外观和部分参数揭晓:搭载骁龙888+E4直屏
  3. 杨笠代言电脑遭投诉抵制,网友吵翻!英特尔回应了...
  4. 荣耀V40渲染图曝光 将搭载双曲面瀑布屏
  5. 粉笔网CEO怒斥湖南卫视扶贫节目作秀:自己赞助1000万太傻
  6. 支付宝又要改版了:首页顶栏新增了这个模块
  7. 金字火腿推出“人造牛排” 股价连日上涨接近历史高点
  8. 拼多多回应驰伟插座起诉:积极应诉 希望其莫充当电商“二选一”插头
  9. 哈啰出行回应单车违规投放:将尽快缴纳罚金 积极整改
  10. 流浪地球票房43亿元 今起电影最低票价降10元