【Flink】Flink 资源相关 Slot SlotPool
1.概述
转载并且补充:http://www.qishunwang.net/news_show_82511.aspx
2.SlotPool
2.1. 介绍
SlotPool 是JobMaster用于管理slot的pool . 是一个接口类, 定义了相关slot的管理操作…
主要有如下方法
2.1.1. 生命周期相关接口
接口 | 含义 |
---|---|
start | 启动 |
suspend | 挂起 |
close | 关闭 |
2.1.2 resource manager 连接相关
接口 | 含义 |
---|---|
connectToResourceManager | 与ResourceManager建立连接 |
disconnectResourceManager | 关闭ResourceManager连接 |
registerTaskManager | 通过给定的ResourceId 注册一个TaskExecutor |
releaseTaskManager | 释放TaskExecutor |
2.1.3 Slot操作相关
接口 | 含义 |
---|---|
offerSlots | 释放slot |
failAllocation | 根据给定的allocation id 标识slot为失败 |
getAvailableSlotsInformation | 获取当前可用的slots 信息. |
getAllocatedSlotsInformation | 获取所有的slot信息 |
allocateAvailableSlot |
在给定的 request id 下使用给定的 allocation id 分配可用的slot。 如果没有具有给定分配id的插槽可用,则此方法返回{@code null}。 |
requestNewAllocatedSlot |
从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。 |
requestNewAllocatedBatchSlot |
从 resource manager 请求分配新的批处理slot 与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。 此外,它不会对来自资源管理器的故障信号做出反应。 |
disableBatchSlotRequestTimeoutCheck |
禁用批处理slot请求超时检查。 当其他人要接管超时检查职责时调用。 |
createAllocatedSlotReport | 创建有关属于指定 task manager 的已分配slot的报告。 |
3.SlotPoolImpl 实现类
SlotPoolImpl 是SlotPool接口的实现类.
slot pool为{@link ExecutionGraph}发出的slot请求提供服务。
当它无法提供slot请求时,它将尝试从ResourceManager获取新的slot。
如果当前没有可用的ResourceManager,或者ResourceManager拒绝了它,或者请求超时,那么它将使slot请求失败。
slot pool还保存提供给它并被接受的所有slot,因此即使ResourceManager关闭,也可以提供注册的空闲slot。
slot只有在无用时才会释放,例如,当作业完全运行时,但我们仍有一些可用slot。
所有的分配或槽提供都将由自己生成的AllocationID标识,我们将使用它来消除歧义。
3.1. 属性
/** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level.* SlotPool在调试级别上写其槽分布的时间间隔(毫秒)。* */private static final long STATUS_LOG_INTERVAL_MS = 60_000;private final JobID jobId;/** All registered TaskManagers, slots will be accepted and used only if the resource is registered.* 仅当资源已注册时,才会接受和使用所有已注册的TaskManager、slot。* */private final HashSet<ResourceID> registeredTaskManagers;/** The book-keeping of all allocated slots.* //所有分配给当前 JobManager 的 slots* */private final AllocatedSlots allocatedSlots;/** The book-keeping of all available slots.* 所有可用的 slots(已经分配给该 JobManager,但还没有装载 payload)* */private final AvailableSlots availableSlots;/** All pending requests waiting for slots.* 所有处于等待状态的slot request(已经发送请求给 ResourceManager) 等待slot的所有挂起请求。* */private final DualKeyLinkedMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;/** The requests that are waiting for the resource manager to be connected.* 处于等待状态的 slot request (还没有发送请求给 ResourceManager,此时没有和 ResourceManager 建立连接)* 等待连接 resource manager 的请求。* */private final LinkedHashMap<SlotRequestId, PendingRequest> waitingForResourceManager;/** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor).* 外部请求调用超时(例如,到ResourceManager或TaskExecutor)。* */private final Time rpcTimeout;/** Timeout for releasing idle slots.* 释放空闲的slots超时时间* */private final Time idleSlotTimeout;/** Timeout for batch slot requests.* 批处理slot请求超时* */private final Time batchSlotTimeout;private final Clock clock;/** the fencing token of the job manager. */private JobMasterId jobMasterId;/** The gateway to communicate with resource manager. */private ResourceManagerGateway resourceManagerGateway;private String jobManagerAddress;// 组件主线程执行器private ComponentMainThreadExecutor componentMainThreadExecutor;
3.2 生命周期相关接口
接口 | 含义 |
---|---|
start | 启动 |
suspend | 挂起 |
close | 关闭 |
3.2.1 start方法
/*** Start the slot pool to accept RPC calls.** 启动slot池以接受RPC调用。** @param jobMasterId The necessary leader id for running the job.* @param newJobManagerAddress for the slot requests which are sent to the resource manager* @param componentMainThreadExecutor The main thread executor for the job master's main thread.*/public void start(@Nonnull JobMasterId jobMasterId,@Nonnull String newJobManagerAddress,@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {this.jobMasterId = jobMasterId;this.jobManagerAddress = newJobManagerAddress;this.componentMainThreadExecutor = componentMainThreadExecutor;// 超时相关操作scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);if (log.isDebugEnabled()) {scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);}}
3.2.2 suspend
/*** Suspends this pool, meaning it has lost its authority to accept and distribute slots.** 挂起此池,意味着它已失去接受和分发slot的权限。*/@Overridepublic void suspend() {componentMainThreadExecutor.assertRunningInMainThread();log.info("Suspending SlotPool.");// cancel all pending allocations --> we can request these slots// again after we regained the leadershipSet<AllocationID> allocationIds = pendingRequests.keySetB();for (AllocationID allocationId : allocationIds) {// resourceManagerGateway 取消 SlotRequest操作resourceManagerGateway.cancelSlotRequest(allocationId);}// do not accept any requestsjobMasterId = null;resourceManagerGateway = null;// Clear (but not release!) the available slots. The TaskManagers should re-register them// at the new leader JobManager/SlotPoolclear();}
3.2.3 close
@Overridepublic void close() {log.info("Stopping SlotPool.");// cancel all pending allocations// 取消挂起的SlotRequestsSet<AllocationID> allocationIds = pendingRequests.keySetB();for (AllocationID allocationId : allocationIds) {resourceManagerGateway.cancelSlotRequest(allocationId);}// 释放资源 通过释放相应的TaskExecutor来释放所有注册的插槽// release all registered slots by releasing the corresponding TaskExecutorsfor (ResourceID taskManagerResourceId : registeredTaskManagers) {final FlinkException cause = new FlinkException("Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool");releaseTaskManagerInternal(taskManagerResourceId, cause);}clear();}
3.3 resource manager 连接相关
接口 | 含义 |
---|---|
connectToResourceManager | 与ResourceManager建立连接 |
disconnectResourceManager | 关闭ResourceManager连接 |
registerTaskManager | 通过给定的ResourceId 注册一个TaskExecutor |
releaseTaskManager | 释放TaskExecutor |
3.3.1 connectToResourceManager
/*** 与ResourceManager建立连接, 处理阻塞/挂起的请求…* @param resourceManagerGateway The RPC gateway for the resource manager.*/@Overridepublic void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {this.resourceManagerGateway = checkNotNull(resourceManagerGateway);// 处理挂起的PendingRequest 请求.// work on all slots waiting for this connectionfor (PendingRequest pendingRequest : waitingForResourceManager.values()) {// 请求 RM / 获取资源requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);}// all sent offwaitingForResourceManager.clear();}
3.3.2 disconnectResourceManager
关闭ResourceManager 连接.
@Overridepublic void disconnectResourceManager() {this.resourceManagerGateway = null;}
3.3.3 registerTaskManager
/*** Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid.* Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.** @param resourceID The id of the TaskManager*** 将TaskManager注册到此 pool ,只有来自已注册TaskManager的slot才被视为有效。* 它还为我们提供了一种方法,使“dead”或“abnormal”任务管理者远离这个池*/@Overridepublic boolean registerTaskManager(final ResourceID resourceID) {componentMainThreadExecutor.assertRunningInMainThread();log.debug("Register new TaskExecutor {}.", resourceID);return registeredTaskManagers.add(resourceID);}
3.3.4 releaseTaskManager
/*** Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called* when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.* 从该池中注销TaskManager,将释放所有相关slot并取消任务。* 当我们发现某个TaskManager变得“dead”或“abnormal”,并且我们决定不再使用其中的slot时调用。* * @param resourceId The id of the TaskManager* @param cause for the releasing of the TaskManager*/@Overridepublic boolean releaseTaskManager(final ResourceID resourceId, final Exception cause) {componentMainThreadExecutor.assertRunningInMainThread();if (registeredTaskManagers.remove(resourceId)) {releaseTaskManagerInternal(resourceId, cause);return true;} else {return false;}}
3.4 Slot操作相关
接口 | 含义 |
---|---|
offerSlots | 消费slot |
failAllocation | 根据给定的allocation id 标识slot为失败 |
getAvailableSlotsInformation | 获取当前可用的slots 信息. |
getAllocatedSlotsInformation | 获取所有的slot信息 |
allocateAvailableSlot |
在给定的 request id 下使用给定的 allocation id 分配可用的slot。 如果没有具有给定分配id的插槽可用,则此方法返回{@code null}。 |
requestNewAllocatedSlot |
从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。 |
requestNewAllocatedBatchSlot |
从 resource manager 请求分配新的批处理slot 与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。 此外,它不会对来自资源管理器的故障信号做出反应。 |
disableBatchSlotRequestTimeoutCheck |
禁用批处理slot请求超时检查。 当其他人要接管超时检查职责时调用。 |
createAllocatedSlotReport | 创建有关属于指定 task manager 的已分配slot的报告。 |
3.4.1 offerSlots
/*** 根据AllocationID , TaskExecutor 提供Slot** AllocationID最初由该 pool 生成,并通过ResourceManager传输到TaskManager** 我们用它来区分我们发行的不同分配。** 如果我们发现某个Slot不匹配或实际上没有等待此Slot的挂起请求(可能由其他返回的Slot完成),则Slot提供可能会被拒绝。** Slot offering by TaskExecutor with AllocationID. The AllocationID is originally generated by this pool and* transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation* we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending* request waiting for this slot (maybe fulfilled by some other returned slot).** @param taskManagerLocation location from where the offer comes from* @param taskManagerGateway TaskManager gateway* @param slotOffer the offered slot* @return True if we accept the offering*/boolean offerSlot(final TaskManagerLocation taskManagerLocation,final TaskManagerGateway taskManagerGateway,final SlotOffer slotOffer) {componentMainThreadExecutor.assertRunningInMainThread();// 检测 TaskManager是否有效// check if this TaskManager is validfinal ResourceID resourceID = taskManagerLocation.getResourceID();final AllocationID allocationID = slotOffer.getAllocationId();// 必须是已注册的TaskManagers 中的slotOfferif (!registeredTaskManagers.contains(resourceID)) {log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",slotOffer.getAllocationId(), taskManagerLocation);return false;}// 如果当前 slot 关联的 AllocationID 已经在 SlotPool 中出现 检查是否已使用此slot// check whether we have already using this slotAllocatedSlot existingSlot;if ((existingSlot = allocatedSlots.get(allocationID)) != null ||(existingSlot = availableSlots.get(allocationID)) != null) {// we need to figure out if this is a repeated offer for the exact same slot,// or another offer that comes from a different TaskManager after the ResourceManager// re-tried the request// 我们需要弄清楚这是对完全相同的slot的重复offer,// 还是在ResourceManager重新尝试请求后来自不同TaskManager的另一个offer// 我们用比较SlotID的方式来写这个,因为SlotIDD是 TaskManager上实际slot的标识符// we write this in terms of comparing slot IDs, because the Slot IDs are the identifiers of// the actual slots on the TaskManagers// Note: The slotOffer should have the SlotID// 获取已存在的SlotIDfinal SlotID existingSlotId = existingSlot.getSlotId();// 获取新的SlotIDfinal SlotID newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex());//这个 slot 在之前已经被 SlotPool 接受了,相当于 TaskExecutor 发送了一个重复的 offerif (existingSlotId.equals(newSlotId)) {log.info("Received repeated offer for slot [{}]. Ignoring.", allocationID);// return true here so that the sender will get a positive acknowledgement to the retry// and mark the offering as a successreturn true;} else {//已经有一个其他的 AllocatedSlot 和 这个 AllocationID 关联了,因此不能接受当前的这个 slot// the allocation has been fulfilled by another slot, reject the offer so the task executor// will offer the slot to the resource managerreturn false;}}// 到这里代表这个slot还没有人用过.//这个 slot 关联的 AllocationID 此前没有出现过//新建一个 AllocatedSlot 对象,表示新分配的 slotfinal AllocatedSlot allocatedSlot = new AllocatedSlot(allocationID,taskManagerLocation,slotOffer.getSlotIndex(),slotOffer.getResourceProfile(),taskManagerGateway);// 检查是否有一个 request 和 这个 AllocationID 关联// check whether we have request waiting for this slotPendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);if (pendingRequest != null) {// we were waiting for this!//有一个pending request 正在等待这个 slotallocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);//尝试去完成那个等待的请求if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) {// we could not complete the pending slot future --> try to fulfill another pending request//失败了allocatedSlots.remove(pendingRequest.getSlotRequestId());//尝试去满足其他在等待的请求,使用 slot 以请求的顺序完成挂起的请求tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);} else {log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID);}}else {//没有请求在等待这个slot,可能请求已经被满足了// we were actually not waiting for this:// - could be that this request had been fulfilled// - we are receiving the slots from TaskManagers after becoming leaders//尝试去满足其他在等待的请求tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);}// we accepted the request in any case. slot will be released after it idled for// too long and timed out// 无论如何我么都接受了这个请求.// slot在空闲时间过长和超时后将被释放return true;}
tryFulfillSlotRequestOrMakeAvailable
/*** Tries to fulfill with the given allocated slot a pending slot request or add the* allocated slot to the set of available slots if no matching request is available.** 尝试使用给定的已分配slot完成挂起的slot请求,* 或者如果没有匹配的请求,则将已分配的slot归还到可用slot集。** @param allocatedSlot which shall be returned*/private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");//查找和当前 AllocatedSlot 的计算资源相匹配的还在等待的请求final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);if (pendingRequest != null) {//如果有匹配的请求,那么将 AllocatedSlot 分配给等待的请求log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]",pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());// 将当前分配的slot加入到已分配的allocatedSlots集合中, 标识已被使用.allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);// 回调请求,返回allocatedSlot 信息. 标识slot分配已经完成...pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);} else {//如果没有,那么这个 AllocatedSlot 变成 available 的// 没有可用的PendingRequest , 归还allocatedSlot .log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());availableSlots.add(allocatedSlot, clock.relativeTimeMillis());}}
3.4.2 failAllocation
@Overridepublic Optional<ResourceID> failAllocation(final AllocationID allocationID, final Exception cause) {componentMainThreadExecutor.assertRunningInMainThread();// 获取PendingRequestfinal PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);if (pendingRequest != null) {if (isBatchRequestAndFailureCanBeIgnored(pendingRequest, cause)) {// pending batch requests don't react to this signal --> put it backpendingRequests.put(pendingRequest.getSlotRequestId(), allocationID, pendingRequest);} else {// request was still pendingfailPendingRequest(pendingRequest, cause);}return Optional.empty();}else {return tryFailingAllocatedSlot(allocationID, cause);}// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase}
tryFailingAllocatedSlot
private Optional<ResourceID> tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) {// 获取分配失败的AllocatedSlotAllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID);if (allocatedSlot == null) {allocatedSlot = allocatedSlots.remove(allocationID);}if (allocatedSlot != null) {log.debug("Failed allocated slot [{}]: {}", allocationID, cause.getMessage());// notify TaskExecutor about the failure// 通知TaskExecutor 分配失败了..allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout);// release the slot.// since it is not in 'allocatedSlots' any more, it will be dropped o return'// 释放slot,并且将这个slot丢弃allocatedSlot.releasePayload(cause);final ResourceID taskManagerId = allocatedSlot.getTaskManagerId();if (!availableSlots.containsTaskManager(taskManagerId) && !allocatedSlots.containResource(taskManagerId)) {return Optional.of(taskManagerId);}}return Optional.empty();}
3.4.3 getAvailableSlotsInformation
获取可用的slot信息
/*** 列出当前可用的 slot* @return*/@Override@Nonnullpublic Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {final Map<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = availableSlots.getSlotsByTaskManager();final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsSlotsByTaskManager = allocatedSlots.getSlotsByTaskManager();return availableSlotsByTaskManager.entrySet().stream().flatMap(entry -> {final int numberAllocatedSlots = allocatedSlotsSlotsByTaskManager.getOrDefault(entry.getKey(), Collections.emptySet()).size();final int numberAvailableSlots = entry.getValue().size();final double taskExecutorUtilization = (double) numberAllocatedSlots / (numberAllocatedSlots + numberAvailableSlots);return entry.getValue().stream().map(slot -> SlotInfoWithUtilization.from(slot, taskExecutorUtilization));}).collect(Collectors.toList());}
3.4.4 getAllocatedSlotsInformation
获取所有已分配的solt信息
private Collection<SlotInfo> getAllocatedSlotsInformation() {return allocatedSlots.listSlotInfo();}
3.4.5 allocateAvailableSlot
获取所有已有效的solt信息
/*** 将 allocationID 关联的 slot 分配给 slotRequestId 对应的请求* @param slotRequestId identifying the requested slot* @param allocationID the allocation id of the requested available slot* @return*/@Overridepublic Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull SlotRequestId slotRequestId,@Nonnull AllocationID allocationID) {componentMainThreadExecutor.assertRunningInMainThread();//从 availableSlots 中移除AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID);if (allocatedSlot != null) {//加入已分配的映射关系中allocatedSlots.add(slotRequestId, allocatedSlot);return Optional.of(allocatedSlot);} else {return Optional.empty();}}
3.4.6 requestNewAllocatedSlot
从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。
/*** 向RM申请新的 slot* * 从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,* 而是将向该池添加一个新slot,该slot将立即分配并返回。** @param slotRequestId identifying the requested slot* @param resourceProfile resource profile that specifies the resource requirements for the requested slot* @param timeout timeout for the allocation procedure* @return*/@Nonnull@Overridepublic CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId,@Nonnull ResourceProfile resourceProfile,Time timeout) {componentMainThreadExecutor.assertRunningInMainThread();final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);// register request timeoutFutureUtils.orTimeout(pendingRequest.getAllocatedSlotFuture(),timeout.toMilliseconds(),TimeUnit.MILLISECONDS,componentMainThreadExecutor).whenComplete((AllocatedSlot ignored, Throwable throwable) -> {if (throwable instanceof TimeoutException) {timeoutPendingSlotRequest(slotRequestId);}});return requestNewAllocatedSlotInternal(pendingRequest).thenApply((Function.identity()));}
requestNewAllocatedSlotInternal
/**** 从RM中请求一个新的slot** Requests a new slot from the ResourceManager. If there is currently not ResourceManager* connected, then the request is stashed and send once a new ResourceManager is connected.** @param pendingRequest pending slot request* @return An {@link AllocatedSlot} future which is completed once the slot is offered to the {@link SlotPool}*/@Nonnullprivate CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) {if (resourceManagerGateway == null) {stashRequestWaitingForResourceManager(pendingRequest);} else {// 从RM中请求一个新的slotrequestSlotFromResourceManager(resourceManagerGateway, pendingRequest);}return pendingRequest.getAllocatedSlotFuture();}
3.4.7 requestNewAllocatedBatchSlot
从 resource manager 请求分配新的批处理slot 与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。 此外,它不会对来自资源管理器的故障信号做出反应。
@Nonnull@Overridepublic CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(@Nonnull SlotRequestId slotRequestId,@Nonnull ResourceProfile resourceProfile) {componentMainThreadExecutor.assertRunningInMainThread();final PendingRequest pendingRequest = PendingRequest.createBatchRequest(slotRequestId, resourceProfile);return requestNewAllocatedSlotInternal(pendingRequest).thenApply(Function.identity());}
3.4.8 disableBatchSlotRequestTimeoutCheck
禁用批处理slot请求超时检查。当其他人要接管超时检查职责时调用。
@Overridepublic void disableBatchSlotRequestTimeoutCheck() {batchSlotRequestTimeoutCheckEnabled = false;}
3.4.9 createAllocatedSlotReport
创建有关属于指定 task manager 的已分配slot的报告。
/*** 创建有关属于指定 task manager 的已分配slot的报告。* @param taskManagerId identifies the task manager* @return*/@Overridepublic AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) {final Set<AllocatedSlot> availableSlotsForTaskManager = availableSlots.getSlotsForTaskManager(taskManagerId);final Set<AllocatedSlot> allocatedSlotsForTaskManager = allocatedSlots.getSlotsForTaskManager(taskManagerId);List<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<>(availableSlotsForTaskManager.size() + allocatedSlotsForTaskManager.size());for (AllocatedSlot allocatedSlot : Iterables.concat(availableSlotsForTaskManager, allocatedSlotsForTaskManager)) {allocatedSlotInfos.add(new AllocatedSlotInfo(allocatedSlot.getPhysicalSlotNumber(), allocatedSlot.getAllocationId()));}return new AllocatedSlotReport(jobId, allocatedSlotInfos);}
【Flink】Flink 资源相关 Slot SlotPool相关推荐
- flink 不设置水印_从0到1学习Flink—— Flink parallelism 和 Slot 介绍
前言 之所以写这个是因为前段时间自己的项目出现过这样的一个问题: 1Caused by: akka.pattern.AskTimeoutException: 2Ask timed out on [Ac ...
- 【Flink】Flink Flink 1.14 新特性预览
1.概述 转载:Flink 1.14 新特性预览 简介: 一文了解 Flink 1.14 版本新特性及最新进展 本文由社区志愿者陈政羽整理,内容源自阿里巴巴技术专家宋辛童 (五藏) 在 8 月 7 日 ...
- 大数据计算引擎之Flink Flink CEP复杂事件编程
原文地址:大数据计算引擎之Flink Flink CEP复杂事件编程 复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利 ...
- Loadrunner-web资源相关图表
web资源相关图表可以提供web服务器性能的相关信息. 1.每秒点击数(Hits per Second)图表可以通过该图标查看性能测试过程中每一秒内虚拟用户向web服务器发送的HTTP请求数.通常情况 ...
- 凌波微步Flink——Flink的技术逻辑与编程步骤剖析
转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95459606 本文出自[我是干勾鱼的博客] Ingredients: Java: ...
- 福州印发四部数据资源相关管理办法,包括全国首部地级市数据开放管理办法...
▼金猿奖·2019年度征集评选▼ 大数据产业创新服务媒体 --聚焦数据 · 改变商业 近日,<福州市政务数据资源管理办法><福州市政务数据汇聚共享管理暂行办法><福州市政 ...
- 凌波微步Flink——Flink API中的一些基础概念
转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95355619 本文出自[我是干勾鱼的博客] Ingredients: Java: ...
- 【Flink】 Flink 应用资源分配问题排查思路
1.概述 转载:https://blog.csdn.net/weixin_43970890/article/details/102483720 如果 Flink 应用不能正常启动达到 RUNNING ...
- 大数据之flink共享资源槽
算子链: 为方便执行,Flink将不同算子的子任务(subtask)链接到一个任务里,每一个任务在一个线程中执行.这是一个非常有用的优化方式,它减小了进程间数据交换和缓存的开销,而且在减少延迟同时增加 ...
最新文章
- Ubuntu16.04系统下汉字显示为方框解决办法(图文详解)
- 【iOS学习笔记】IOS开发中设置applicationIconBadgeNumber和消息推送
- [工具]Tomcat CVE-2017-12615 远程代码执行
- python序列类型-python序列类型有哪些
- 利用blink+MQ实现流计算中的超时统计问题
- LeetCode--27. 移除元素(双指针)
- ZABBIX 3.2.7 (源码包)安装部署
- 京东商品知识图谱,约10万商品品牌,约65万品牌销售关系
- Spark-生产案例
- 【现代版】为人处世三十六计详解,真的很受益!
- LabVIEW参考资料汇总
- Android 11 系统字体加载流程
- CAD线型设置:CAD软件中如何加粗曲线?
- wps中vbe6ext.olb不能被加载问题(附WPS2019宏下载)
- Ubuntu下将dmg文件转换成dcr和ISO文件
- springboot项目如何查看MP运行日志
- c++中fail函数
- php转行当保安,一个保安转行做头条号年入50万,他是怎么做到的?
- 【软考】--软考总结
- 01-空投Lynda
热门文章
- 天猫双11:1日到11日0点45分 382个品牌成交额超过1亿元
- 一加9RT外观和部分参数揭晓:搭载骁龙888+E4直屏
- 杨笠代言电脑遭投诉抵制,网友吵翻!英特尔回应了...
- 荣耀V40渲染图曝光 将搭载双曲面瀑布屏
- 粉笔网CEO怒斥湖南卫视扶贫节目作秀:自己赞助1000万太傻
- 支付宝又要改版了:首页顶栏新增了这个模块
- 金字火腿推出“人造牛排” 股价连日上涨接近历史高点
- 拼多多回应驰伟插座起诉:积极应诉 希望其莫充当电商“二选一”插头
- 哈啰出行回应单车违规投放:将尽快缴纳罚金 积极整改
- 流浪地球票房43亿元 今起电影最低票价降10元