【Flink】Flink 1.12.2 TaskSlot
1.概述
转载:Flink 1.12.2 源码浅析 : TaskSlot
属于同一slot的多个{@link TaskSlotPayload tasks}
的容器。
TaskSlot 可以处于以下状态之一:
- 空闲[Free]-slot为空,未分配给作业
- 释放中[Releasing]-slot变空后即将释放。
- 已分配[Allocated]-已为作业分配slot。
- 活动[Active]-slot正由job manager使用, job manager 是分配job 的负责人
只有在task slot处于空闲状态时才能分配它。
分配的task slot可以转换为活动状态。
活动slot允许从相应作业添加具有正确分配id的任务。
活动slot可以标记为非活动,从而将状态设置回已分配状态。
分配的或活动的slot只有在为空时才能释放。
如果它不是空的,那么它的状态可以设置为releasing,表示一旦它变空就可以被释放。
二 . 属性
/*** task slot的下标索引* Index of the task slot.* */private final int index;/*** 此插槽的资源特征。* Resource characteristics for this slot.* */private final ResourceProfile resourceProfile;/*** 在这个slot中运行的task* Tasks running in this slot.* */private final Map<ExecutionAttemptID, T> tasks;/*** 内存管理*/private final MemoryManager memoryManager;/*** slot的状态:* 1. ACTIVE, // Slot is in active use by a job manager responsible for a job* 2. ALLOCATED, // Slot has been allocated for a job but not yet given to a job manager* 3. RELEASING // Slot is not empty but tasks are failed. Upon removal of all tasks, it will be released** State of this slot. */private TaskSlotState state;/*** 已分配插槽的 job id。* Job id to which the slot has been allocated.* */private final JobID jobId;/*** 此插槽的Allocation id。* Allocation id of this slot.* */private final AllocationID allocationId;/*** 当插槽被释放和关闭时,关闭操作完成。* The closing future is completed when the slot is freed and closed.* */private final CompletableFuture<Void> closingFuture;/** * {@link Executor}用于后台操作,例如验证所有已释放的托管内存* {@link Executor} for background actions, e.g. verify all managed memory released. * */private final Executor asyncExecutor;
Slot的状态
ACTIVE : Slot 已经被 job manager 使用
ALLOCATED : Slot 已经被分配,但是尚未分配Job manager使用.
RELEASING : Slot 不为空,但task失败. 在移除所有任务后,它将被释放 .
三. 方法
3.1. 任务相关
3.1.1. 获取标识
/*** Generate the slot offer from this TaskSlot.** @return The sot offer which this task slot can provide*/public SlotOffer generateSlotOffer() {Preconditions.checkState(TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state,"The task slot is not in state active or allocated.");Preconditions.checkState(allocationId != null, "The task slot are not allocated");return new SlotOffer(allocationId, index, resourceProfile);}
3.1.2. 添加任务
将给定任务添加到任务slot。
仅当还没有另一个具有相同执行尝试ID的任务添加到任务slot时,才有可能。
在这种情况下,该方法返回true。
否则,任务slot将保持不变,并返回false。
如果任务slot状态未激活,则会抛出{@link IllegalStateException}。
如果任务的作业ID和分配ID与为其分配了任务slot的作业ID和分配ID不匹配,则会抛出{@link IllegalArgumentException}。
/*** 将给定任务添加到任务slot。* 仅当还没有另一个具有相同执行尝试ID的任务添加到任务slot时,才有可能。* 在这种情况下,该方法返回true。* 否则,任务slot将保持不变,并返回false。** 如果任务slot状态未激活,则会抛出{@link IllegalStateException}。** 如果任务的作业ID和分配ID与为其分配了任务slot的作业ID和分配ID不匹配,则会抛出{@link IllegalArgumentException}。** Add the given task to the task slot. This is only possible if there is not already another* task with the same execution attempt id added to the task slot. In this case, the method* returns true. Otherwise the task slot is left unchanged and false is returned.** <p>In case that the task slot state is not active an {@link IllegalStateException} is thrown.* In case that the task's job id and allocation id don't match with the job id and allocation* id for which the task slot has been allocated, an {@link IllegalArgumentException} is thrown.** @param task to be added to the task slot* @throws IllegalStateException if the task slot is not in state active* @return true if the task was added to the task slot; otherwise false*/public boolean add(T task) {// Check that this slot has been assigned to the job sending this taskPreconditions.checkArgument(task.getJobID().equals(jobId),"The task's job id does not match the "+ "job id for which the slot has been allocated.");Preconditions.checkArgument(task.getAllocationId().equals(allocationId),"The task's allocation "+ "id does not match the allocation id for which the slot has been allocated.");Preconditions.checkState(TaskSlotState.ACTIVE == state, "The task slot is not in state active.");T oldTask = tasks.put(task.getExecutionId(), task);if (oldTask != null) {tasks.put(task.getExecutionId(), oldTask);return false;} else {return true;}}
3.1.3. 获取任务
/*** Get all tasks running in this task slot.** @return Iterator to all currently contained tasks in this task slot.*/public Iterator<T> getTasks() {return tasks.values().iterator();}
3.1.4. 移除任务
删除由给定的execution attempt id标识的任务。
/*** Remove the task identified by the given execution attempt id.** @param executionAttemptId identifying the task to be removed* @return The removed task if there was any; otherwise null.*/public T remove(ExecutionAttemptID executionAttemptId) {return tasks.remove(executionAttemptId);}
3.1.5. 清理所有task
/** Removes all tasks from this task slot. */public void clear() {tasks.clear();}
3.2. 状态相关
名称 | 描述 |
---|---|
isActive | 状态是否为ACTIVE |
isAllocated | 状态为ACTIVE或者ALLOCATED |
isReleasing | 状态是否为RELEASING |
markActive | 标记状态为ACTIVE |
markInactive | 标记状态为ALLOCATED |
closeAsync | 关闭,标记状态为RELEASING |
3.3. get/set相关
index: int
resourceProfile: ResourceProfile
memoryManager: MemoryManager
state: TaskSlotState
jobId: JobID
allocationId: AllocationID
empty: boolean
releasing: boolean
tasks: Iterator<T>
【Flink】Flink 1.12.2 TaskSlot相关推荐
- 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动
1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [四] 上一篇: [flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 Jo ...
- 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint
1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [三] 上一章:[flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yar ...
- 【Flink】FLink 1.12 版本的 Row 类型 中的 RowKind 是干嘛的
1.概述 在[Flink]Flink 从 1.9.1 版本 升级到 1.12.4 版本的 注意事项 以及 过程 升级的时候,偶然看到row类型多了一个属性. @PublicEvolving publi ...
- 【FLink】Flink 1.9 升级到 1.12.4 无配置页面 无日志
文章目录 1.概述 2.场景再现2 2.1 概述 2.2 日志的配置 2.3 加载2个 2.4 缺少文件 2.7 扩展 2.5.1 Flink 默认日志框架 2.5.2 slf4j 基础概念 2.5. ...
- 【Flink】Flink 1.9 升级 1.12.4 本地可以运行 打包后 集群运行就找不到类 ClassNotFoundException
1.场景1 1.1 概述 升级过程请参考: [Flink]Flink 从 1.9.1 版本 升级到 1.12.4 版本的 注意事项 以及 过程 然后本地运行好好的,但是而且,是在主类中类. 打开jar ...
- 【2】flink数据流转换算子
[README] 本文记录了flink对数据的转换操作,包括 基本转换,map,flatMap,filter: 滚动聚合(min minBy max maxBy sum): 规约聚合-reduce: ...
- 【flink】flink 报错 key group from 44 to 45 does not contain 4
文章目录 1.概述 本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载. 1.概述 此问题和一个问题很相似:[Flink]Flink KeyGroupRang ...
- 【flink】Flink常见Checkpoint超时问题排查思路
1.概述 转载:Flink常见Checkpoint超时问题排查思路 这里仅仅是自己学习. 在日常flink应用中,相信大家经常会遇到checkpoint超时失败这类的问题,遇到这种情况的时候仅仅只会在 ...
- 【Flink】flink 升级 the given -yarn-cluster does not contain a valid port
文章目录 1.场景1 1.1 概述 1.场景1 1.1 概述 在本次场景中,我是从flink 1.9升级到1.12.4 升级请参考:[Flink]Flink 从 1.9.1 版本 升级到 1.12.4 ...
- 【3】flink sink
[README] 本文记录了flink sink操作,输出目的存储器(中间件)包括 kafka: es: db: 等等有很多: 本文只给出了 sink2kafka的代码: 本文使用的flink为 1. ...
最新文章
- 学而思的python课怎么样_有在用学而思网校的同学觉得孙墨漪老师怎么样?报她的课值得吗?...
- Serverless 工程实践 | 细数 Serverless 的配套服务
- restful规范和APIView
- java哈希_Java如何采用哈希码实现分类(以员工分配为例)
- 【设计模式】—— 访问者模式Visitor
- php判断post是否xss,PHP实现表单提交数据的验证处理功能【防SQL注入和XSS攻击等】...
- Python学习 Day4-1 Python3 条件控制、循环语句
- java给文件添加水印_Java在PDF中添加水印(文本/图片水印)
- python高段编程_25个有用的 Python 代码段
- 2018年内蒙古开出4.93亿环保罚单
- navicat超时未激活如何处理?
- 基于java的化妆品购物商城微信小程序的设计与实现 毕业设计毕设参考
- 微星GP76 AX1675x ubuntu 18.04安装有线/无线网卡驱动
- Apple HomeKit
- 『机器学习』入门教程汇总
- 网站使用国外服务器越来越卡、越来越慢的原因
- 父页面调用子页面方法, 子页面加载父页面传送的数据
- PyTorch学习记录——PyTorch生态
- 帆软报表设计器常用代码知识
- OCP-V13-700