
转载:Flink 1.12.2 源码浅析 : TaskSlot

属于同一slot的多个{@link TaskSlotPayload tasks}的容器。

TaskSlot 可以处于以下状态之一:

  1. 空闲[Free]-slot为空,未分配给作业
  2. 释放中[Releasing]-slot变空后即将释放。
  3. 已分配[Allocated]-已为作业分配slot。
  4. 活动[Active]-slot正由job manager使用, job manager 是分配job 的负责人
  • 只有在task slot处于空闲状态时才能分配它。

  • 分配的task slot可以转换为活动状态。

  • 活动slot允许从相应作业添加具有正确分配id的任务。

  • 活动slot可以标记为非活动,从而将状态设置回已分配状态。

  • 分配的或活动的slot只有在为空时才能释放。

  • 如果它不是空的,那么它的状态可以设置为releasing,表示一旦它变空就可以被释放。

二 . 属性

/***  task slot的下标索引* Index of the task slot.* */private final int index;/*** 此插槽的资源特征。* Resource characteristics for this slot.* */private final ResourceProfile resourceProfile;/*** 在这个slot中运行的task* Tasks running in this slot.* */private final Map<ExecutionAttemptID, T> tasks;/*** 内存管理*/private final MemoryManager memoryManager;/*** slot的状态:* 1. ACTIVE, // Slot is in active use by a job manager responsible for a job* 2. ALLOCATED, // Slot has been allocated for a job but not yet given to a job manager* 3. RELEASING // Slot is not empty but tasks are failed. Upon removal of all tasks, it will be released** State of this slot. */private TaskSlotState state;/*** 已分配插槽的 job id。* Job id to which the slot has been allocated.* */private final JobID jobId;/*** 此插槽的Allocation id。* Allocation id of this slot.* */private final AllocationID allocationId;/*** 当插槽被释放和关闭时,关闭操作完成。* The closing future is completed when the slot is freed and closed.* */private final CompletableFuture<Void> closingFuture;/** * {@link Executor}用于后台操作,例如验证所有已释放的托管内存* {@link Executor} for background actions, e.g. verify all managed memory released. * */private final Executor asyncExecutor;


ACTIVE : Slot 已经被 job manager 使用
ALLOCATED : Slot 已经被分配,但是尚未分配Job manager使用.
RELEASING : Slot 不为空,但task失败. 在移除所有任务后,它将被释放 .

三. 方法

3.1. 任务相关

3.1.1. 获取标识

    /*** Generate the slot offer from this TaskSlot.** @return The sot offer which this task slot can provide*/public SlotOffer generateSlotOffer() {Preconditions.checkState(TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state,"The task slot is not in state active or allocated.");Preconditions.checkState(allocationId != null, "The task slot are not allocated");return new SlotOffer(allocationId, index, resourceProfile);}

3.1.2. 添加任务

如果任务slot状态未激活,则会抛出{@link IllegalStateException}。
如果任务的作业ID和分配ID与为其分配了任务slot的作业ID和分配ID不匹配,则会抛出{@link IllegalArgumentException}。

/*** 将给定任务添加到任务slot。* 仅当还没有另一个具有相同执行尝试ID的任务添加到任务slot时,才有可能。* 在这种情况下,该方法返回true。* 否则,任务slot将保持不变,并返回false。** 如果任务slot状态未激活,则会抛出{@link IllegalStateException}。** 如果任务的作业ID和分配ID与为其分配了任务slot的作业ID和分配ID不匹配,则会抛出{@link IllegalArgumentException}。** Add the given task to the task slot. This is only possible if there is not already another* task with the same execution attempt id added to the task slot. In this case, the method* returns true. Otherwise the task slot is left unchanged and false is returned.** <p>In case that the task slot state is not active an {@link IllegalStateException} is thrown.* In case that the task's job id and allocation id don't match with the job id and allocation* id for which the task slot has been allocated, an {@link IllegalArgumentException} is thrown.** @param task to be added to the task slot* @throws IllegalStateException if the task slot is not in state active* @return true if the task was added to the task slot; otherwise false*/public boolean add(T task) {// Check that this slot has been assigned to the job sending this taskPreconditions.checkArgument(task.getJobID().equals(jobId),"The task's job id does not match the "+ "job id for which the slot has been allocated.");Preconditions.checkArgument(task.getAllocationId().equals(allocationId),"The task's allocation "+ "id does not match the allocation id for which the slot has been allocated.");Preconditions.checkState(TaskSlotState.ACTIVE == state, "The task slot is not in state active.");T oldTask = tasks.put(task.getExecutionId(), task);if (oldTask != null) {tasks.put(task.getExecutionId(), oldTask);return false;} else {return true;}}

3.1.3. 获取任务

    /*** Get all tasks running in this task slot.** @return Iterator to all currently contained tasks in this task slot.*/public Iterator<T> getTasks() {return tasks.values().iterator();}

3.1.4. 移除任务

删除由给定的execution attempt id标识的任务。

    /*** Remove the task identified by the given execution attempt id.** @param executionAttemptId identifying the task to be removed* @return The removed task if there was any; otherwise null.*/public T remove(ExecutionAttemptID executionAttemptId) {return tasks.remove(executionAttemptId);}

3.1.5. 清理所有task

    /** Removes all tasks from this task slot. */public void clear() {tasks.clear();}

3.2. 状态相关

名称 描述
isActive 状态是否为ACTIVE
isAllocated 状态为ACTIVE或者ALLOCATED
isReleasing 状态是否为RELEASING
markActive 标记状态为ACTIVE
markInactive 标记状态为ALLOCATED
closeAsync 关闭,标记状态为RELEASING

3.3. get/set相关

index: int
resourceProfile: ResourceProfile
memoryManager: MemoryManager
state: TaskSlotState
jobId: JobID
allocationId: AllocationID
empty: boolean
releasing: boolean
tasks: Iterator<T>

