SlotSharingGroup

表示不同的task可以共享slot,但是这是soft的约束,即也可以不在一个slot

默认情况下,整个StreamGraph都会用一个默认的“default” SlotSharingGroup,即所有的JobVertex的task都可以共用一个slot

/*** A slot sharing units defines which different task (from different job vertices) can be* deployed together within a slot. This is a soft permission, in contrast to the hard constraint* defined by a co-location hint.*/
public class SlotSharingGroup implements java.io.Serializable {
    private final Set<JobVertexID> ids = new TreeSet<JobVertexID>();/** Mapping of tasks to subslots. This field is only needed inside the JobManager, and is not RPCed. */private transient SlotSharingGroupAssignment taskAssignment;

可以看到,关键的部分

1. TreeSet<JobVertexID>(),用于保存属于这个group的Jobvertexid

2. SlotSharingGroupAssignment

CoLocationGroup

只是hard约束,在group中的JobVertices,需要对应index的subtask跑在一个slot中

/*** A Co-location group is a group of JobVertices, where the <i>i-th</i> subtask of one vertex* has to be executed on the same TaskManager as the <i>i-th</i> subtask of all* other JobVertices in the same group.* * <p>The co-location group is used for example to make sure that the i-th subtasks for iteration* head and iteration tail are scheduled to the same TaskManager.</p>*/
public class CoLocationGroup implements java.io.Serializable {/** The ID that describes the slot co-location-constraint as a group */ private final AbstractID id = new AbstractID();/** The vertices participating in the co-location group */private final List<JobVertex> vertices = new ArrayList<JobVertex>();/** The constraints, which hold the shared slots for the co-located operators */private transient ArrayList<CoLocationConstraint> constraints;

CoLocationConstraint,可以看作一种特殊的SharedSlot

/*** A CoLocationConstraint manages the location of a set of tasks* (Execution Vertices). In co-location groups, the different subtasks of* different JobVertices need to be executed on the same {@link Instance}.* This is realized by creating a special shared slot that holds these tasks.* * <p>This class tracks the location and the shared slot for this set of tasks.*/
public class CoLocationConstraint {private final CoLocationGroup group;private volatile SharedSlot sharedSlot;private volatile ResourceID lockedLocation;

几种Slot,

AllocatedSlot ,代表从taskmanager分配出的slot

/*** The {@code AllocatedSlot} represents a slot that the JobManager allocated from a TaskManager.* It represents a slice of allocated resources from the TaskManager.* * <p>To allocate an {@code AllocatedSlot}, the requests a slot from the ResourceManager. The* ResourceManager picks (or starts) a TaskManager that will then allocate the slot to the* JobManager and notify the JobManager.* * <p>Note: Prior to the resource management changes introduced in (Flink Improvement Proposal 6),* an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the* JobManager. All slots had a default unknown resource profile. */
public class AllocatedSlot {/** The ID under which the slot is allocated. Uniquely identifies the slot. */private final AllocationID slotAllocationId;/** The ID of the job this slot is allocated for */private final JobID jobID;/** The location information of the TaskManager to which this slot belongs */private final TaskManagerLocation taskManagerLocation;/** The resource profile of the slot provides */private final ResourceProfile resourceProfile;/** RPC gateway to call the TaskManager that holds this slot */private final TaskManagerGateway taskManagerGateway;/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */private final int slotNumber;

Slot,可以看作对AllocatedSlot的封装

/*** Base class for slots that the Scheduler / ExecutionGraph take from the SlotPool and use to place* tasks to execute into. A slot corresponds to an AllocatedSlot (a slice of a TaskManager's resources),* plus additional fields to track what is currently executed in that slot, or if the slot is still* used or disposed (ExecutionGraph gave it back to the pool).** <p>In the simplest case, a slot holds a single task ({@link SimpleSlot}). In the more complex* case, a slot is shared ({@link SharedSlot}) and contains a set of tasks. Shared slots may contain* other shared slots which in turn can hold simple slots. That way, a shared slot may define a tree* of slots that belong to it.*/
public abstract class Slot {/** The allocated slot that this slot represents. */private final AllocatedSlot allocatedSlot;/** The owner of this slot - the slot was taken from that owner and must be disposed to it */private final SlotOwner owner;/** The parent of this slot in the hierarchy, or null, if this is the parent */@Nullableprivate final SharedSlot parent;/** The id of the group that this slot is allocated to. May be null. */@Nullableprivate final AbstractID groupID;/** The number of the slot on which the task is deployed */private final int slotNumber;

SimpleSlot,放单个task的slot

/*** A SimpleSlot represents a single slot on a TaskManager instance, or a slot within a shared slot.** <p>If this slot is part of a {@link SharedSlot}, then the parent attribute will point to that shared slot.* If not, then the parent attribute is null.*/
public class SimpleSlot extends Slot {/** Task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */private volatile Execution executedTask; //非share,只有一个task/** The locality attached to the slot, defining whether the slot was allocated at the desired location. */private volatile Locality locality = Locality.UNCONSTRAINED;

SharedSlot ,

/*** This class represents a shared slot. A shared slot can have multiple* {@link SimpleSlot} instances within itself. This allows to* schedule multiple tasks simultaneously to the same resource. Sharing a resource with multiple* tasks is crucial for simple pipelined / streamed execution, where both the sender and the receiver* are typically active at the same time.** <p><b>IMPORTANT:</b> This class contains no synchronization. Thus, the caller has to guarantee proper* synchronization. In the current implementation, all concurrently modifying operations are* passed through a {@link SlotSharingGroupAssignment} object which is responsible for* synchronization.*/
public class SharedSlot extends Slot {/** The assignment group os shared slots that manages the availability and release of the slots */private final SlotSharingGroupAssignment assignmentGroup;/** The set os sub-slots allocated from this shared slot */private final Set<Slot> subSlots;

可以看到sharedSlot继承自Slot,而Slot中只有一个

AllocatedSlot allocatedSlot

所以,无论在subSlots有多少slot,但他们都是共用这个allocatedSlot的

从相应的sharedSlot上去分配simpleSlot

    SimpleSlot allocateSubSlot(AbstractID groupId) {if (isAlive()) {SimpleSlot slot = new SimpleSlot(getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(), getTaskManagerGateway(), this, groupId);subSlots.add(slot);return slot;}else {return null;}}

    public SimpleSlot(JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,TaskManagerGateway taskManagerGateway,@Nullable SharedSlot parent, @Nullable AbstractID groupID) {super(parent != null ? //如果有parent,即属于某个sharedSlotparent.getAllocatedSlot() : //使用parent sharedSlotnew AllocatedSlot(NO_ALLOCATION_ID, jobID, location, slotNumber, //创建新的AllocatedSlotResourceProfile.UNKNOWN, taskManagerGateway),owner, slotNumber, parent, groupID);}

SlotSharingGroupAssignment,用于管理一组SharedSlot

注释中的示意图,比较清晰

/*** The SlotSharingGroupAssignment manages a set of shared slots, which are shared between* tasks of a {@link org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup}.* * <p>The assignments shares tasks by allowing a shared slot to hold one vertex per* JobVertexID. For example, consider a program consisting of job vertices "source", "map",* "reduce", and "sink". If the slot sharing group spans all four job vertices, then* each shared slot can hold one parallel subtask of the source, the map, the reduce, and the* sink vertex. Each shared slot holds the actual subtasks in child slots, which are (at the leaf level),* the {@link SimpleSlot}s.</p>* * <p>An exception are the co-location-constraints, that define that the i-th subtask of one* vertex needs to be scheduled strictly together with the i-th subtasks of of the vertices* that share the co-location-constraint. To manage that, a co-location-constraint gets its* own shared slot inside the shared slots of a sharing group.</p>* * <p>Consider a job set up like this:</p>* * <pre>{@code* +-------------- Slot Sharing Group --------------+* |                                                |* |            +-- Co Location Group --+           |* |            |                       |           |* |  (source) ---> (head) ---> (tail) ---> (sink)  |* |            |                       |           |* |            +-----------------------+           |* +------------------------------------------------+* }</pre>* * <p>The slot hierarchy in the slot sharing group will look like the following</p> * * <pre>*     Shared(0)(root)*        |*        +-- Simple(2)(sink)*        |*        +-- Shared(1)(co-location-group)*        |      |*        |      +-- Simple(0)(tail)*        |      +-- Simple(1)(head)*        |*        +-- Simple(0)(source)* </pre>*/
public class SlotSharingGroupAssignment {
    /** All slots currently allocated to this sharing group */private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();/** The slots available per vertex type (JobVertexId), keyed by TaskManager, to make them locatable */private final Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>();

核心的结构,

allSlots,用于保存所有的SharedSlot,这些SharedSlot都是可以共享的,被分配给不同的JobVertex下的task

availableSlotsPerJid,用于记录对应关系,AbstractID表示JobVertexID,ResourceID表示TaskManager

最核心的函数,

getSlotForTask,为task分配slot

   /*** Gets a slot suitable for the given task vertex. This method will prefer slots that are local* (with respect to {@link ExecutionVertex#getPreferredLocationsBasedOnInputs()}), but will return non local* slots if no local slot is available. The method returns null, when this sharing group has* no slot is available for the given JobVertexID. ** @param vertex The vertex to allocate a slot for.** @return A slot to execute the given ExecutionVertex in, or null, if none is available.*/public SimpleSlot getSlotForTask(ExecutionVertex vertex) {return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocationsBasedOnInputs()); //默认以input所分配的slot的location信息,作为Preferred
    }/*** */SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {synchronized (lock) {Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(vertexID, locationPreferences, false); //获取SharedSlot,第三个参数意思是,不是一定要localif (p != null) {SharedSlot ss = p.f0;SimpleSlot slot = ss.allocateSubSlot(vertexID); //从SharedSlot中分配SimpleSlot
                slot.setLocality(p.f1);return slot;}else {return null;}}}

getSlotForTaskInternal

  private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(AbstractID groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly){// check if there is anything at all in this group assignmentif (allSlots.isEmpty()) { //如果没有slots,返回return null;}// get the available slots for the groupMap<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId); //取出JobVertex所对应的结构slotsForGroupif (slotsForGroup == null) { //初始化slotsForGroup// we have a new group, so all slots are availableslotsForGroup = new LinkedHashMap<>();availableSlotsPerJid.put(groupId, slotsForGroup);for (SharedSlot availableSlot : allSlots) { //因为allSlots是共享的,所以都可以加到slotsForGroup作为可用slotsputIntoMultiMap(slotsForGroup, availableSlot.getTaskManagerID(), availableSlot); //将availableSlot注册到slotsForGroup,也就是放到其location所对应的list里面
            }}else if (slotsForGroup.isEmpty()) { //如果slotsForGroup存在,但是没有可用slots// the group exists, but nothing is available for that groupreturn null;}// check whether we can schedule the task to a preferred locationboolean didNotGetPreferred = false;if (preferredLocations != null) { //如果有perferred locationfor (TaskManagerLocation location : preferredLocations) { //对每一个具体的prefer location// set the flag that we failed a preferred location. If one will be found,// we return early anyways and skip the flag evaluationdidNotGetPreferred = true; //tricky逻辑,如果下面return,这里的设置也没用;如果没返回,说明没有找到prefer的,所以设为true没有问题
SharedSlot slot = removeFromMultiMap(slotsForGroup, location.getResourceID()); //如果可以在slotsForGroup找到对应prefer location上的slot,为何要remove,因为一个jobvertex不可能有两个task跑在同一个slot上if (slot != null && slot.isAlive()) {return new Tuple2<>(slot, Locality.LOCAL); //返回,并且满足prefer,所以是local,local的含义是和prefer在同一个taskmanager上
                }}}// if we want only local assignments, exit now with a "not found" resultif (didNotGetPreferred && localOnly) { //如果没有找到prefer local,并且需要localonly,返回nullreturn null;}Locality locality = didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED; //走到这里,并didNotGetPreferred = false,说明preferredLocations = null,即UNCONSTRAINED,没有约束条件// schedule the task to any available location
        SharedSlot slot;while ((slot = pollFromMultiMap(slotsForGroup)) != null) { //在不指定taskmanager location的情况下,随意找一个slotif (slot.isAlive()) {return new Tuple2<>(slot, locality);}}// nothing available after all, all slots were deadreturn null;}

SharedSlot.allocateSubSlot

见上

那么自然有个疑问,allSlots里面的slot哪边来的?

addSharedSlotAndAllocateSubSlot

    private SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot sharedSlot, Locality locality, JobVertexID groupId, CoLocationConstraint constraint) {final ResourceID location = sharedSlot.getTaskManagerID();synchronized (lock) {SimpleSlot subSlot;AbstractID groupIdForMap;// add to the total bookkeepingif (!allSlots.add(sharedSlot)) { //加到allSlots中throw new IllegalArgumentException("Slot was already contained in the assignment group");}if (constraint == null) {// allocate us a sub slot to returnsubSlot = sharedSlot.allocateSubSlot(groupId); //简单的allocate一个simpleSlotgroupIdForMap = groupId;}else { //如果有CoLocationConstraint
                }if (subSlot != null) {// preserve the locality information
                subSlot.setLocality(locality);// let the other groups know that this slot exists and that they// can place a task into this slot.boolean entryForNewJidExists = false;for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {// there is already an entry for this groupIDif (entry.getKey().equals(groupIdForMap)) {entryForNewJidExists = true;continue;}Map<ResourceID, List<SharedSlot>> available = entry.getValue();putIntoMultiMap(available, location, sharedSlot); //对于其他的jobVertex,把sharedSlot加上去
                }// make sure an empty entry exists for this group, if no other entry existsif (!entryForNewJidExists) { //如果存在参数中的groupId,那么就把它的slot信息清空availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<ResourceID, List<SharedSlot>>());}return subSlot;}}// end synchronized (lock)}

而addSharedSlotAndAllocateSubSlot在Scheduler中被调用,

    protected SimpleSlot getNewSlotForSharingGroup(ExecutionVertex vertex,Iterable<TaskManagerLocation> requestedLocations,SlotSharingGroupAssignment groupAssignment,CoLocationConstraint constraint,boolean localOnly){// we need potentially to loop multiple times, because there may be false positives// in the set-with-available-instanceswhile (true) {Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); //根据locations信息找到local的instanceif (instanceLocalityPair == null) { //如果没有可用的instance,返回null// nothing is availablereturn null;}final Instance instanceToUse = instanceLocalityPair.getLeft();final Locality locality = instanceLocalityPair.getRight();try {JobVertexID groupID = vertex.getJobvertexId();// allocate a shared slot from the instanceSharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment); //从instance申请一个SharedSlot// if the instance has further available slots, re-add it to the set of available resources.if (instanceToUse.hasResourcesAvailable()) { //如果这个instance还有多余的资源,再加入instancesWithAvailableResources,下次还能继续用来分配this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);}if (sharedSlot != null) {// add the shared slot to the assignment group and allocate a sub-slotSimpleSlot slot = constraint == null ?groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupID) : //把分配的SharedSlot加到SlotSharingGroup的SlotSharingGroupAssignment中
                            groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, constraint);if (slot != null) {return slot;}else {// could not add and allocate the sub-slot, so release shared slot
                        sharedSlot.releaseSlot();}}}catch (InstanceDiedException e) {// the instance died it has not yet been propagated to this scheduler// remove the instance from the set of available instances
                removeInstance(instanceToUse);}// if we failed to get a slot, fall through the loop
        }}

getNewSlotForSharingGroup是在当SlotSharingGroup没有可用的slot时,会被调用从instance中分配SharedSlot

参考,http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/

Flink – SlotSharingGroup相关推荐

  1. 【Flink】Flink 的 slotSharingGroup 有什么用

    1.概述 转载并且补充:Flink控制任务调度:作业链与处理槽共享组(slot-sharing-group) 为了实现并行执行,Flink应用会将算子划分为不同任务,然后将这些任务分配到集群中的不同进 ...

  2. slot没有毁灭的问题_解析flink之perjob模式下yn参数不生效问题

    概要: 0. 问题背景 1. Stream Job的切分 2. 计算资源的调度 & 任务的执行 3. 最后的总结 0. 问题背景: 开始用flink处理流式作业的时候,用yarn-cluste ...

  3. 追源索骥:透过源码看懂Flink核心框架的执行流程

    https://www.cnblogs.com/bethunebtj/p/9168274.html 追源索骥:透过源码看懂Flink核心框架的执行流程 前言 1.从 Hello,World WordC ...

  4. Flink – JobManager.submitJob

    JobManager作为actor, case SubmitJob(jobGraph, listeningBehaviour) =>val client = sender()val jobInf ...

  5. java消息顺序执行_Apache Flink:如何并行执行但保持消息顺序?

    请在下面找到使用侧输出和插槽组进行本地扩展的示例 . package org.example /* * Licensed to the Apache Software Foundation (ASF) ...

  6. 技术前沿资讯-Apache Flink 1.14 新特性介绍

    一.简介 1.14 新版本原本规划有 35 个比较重要的新特性以及优化工作,目前已经有 26 个工作完成:5 个任务不确定是否能准时完成:另外 4 个特性由于时间或者本身设计上的原因,会放到后续版本完 ...

  7. Flink并行度优先级_集群操作常用指令_运行组件_任务提交流程_数据流图变化过程

    Flink并行度优先级(从高到低) sum(1).setParallelism(1) env.setParallelism(1) ApacheFlinkDashboard任务添加并行度配置 flink ...

  8. Flink 1.14 新特性预览

    简介: 一文了解 Flink 1.14 版本新特性及最新进展 本文由社区志愿者陈政羽整理,内容源自阿里巴巴技术专家宋辛童 (五藏) 在 8 月 7 日线上 Flink Meetup 分享的<Fl ...

  9. flink streamGraph生成

    当完成DataStream的配置后,调用其中环境上下文StreamExecutionEnvironment的getStreamGrahph()方法即可生成关于该DataStream的streamGra ...

最新文章

  1. idea配置jfinal_intellij idea安装与配置(Java开发配置篇)
  2. java plt_matplotlib 画动态图以及plt.ion()和plt.ioff()的使用详解
  3. MVVM架构~knockoutjs系列之包括区域级联列表的增删改
  4. 解决parseSdkContent failed java.lang.NullPointerException错误
  5. Pandas 文本数据方法 find( ) rfind( ) index( ) rindex( )
  6. geotools绘制地图导出图片
  7. 使用bat脚本自动打开cmd并执行命令
  8. HLOJ486 种花小游戏
  9. 动手智能小车记(5)-坦克底盘硬件模块大杂烩
  10. 戴尔计算机的机械硬盘容量,新款戴尔g3加装机械硬盘教程终极版
  11. vue 文件.eslintrc.js 配置规则
  12. 一生的读书计划——影响中国历史进程的中国名人2
  13. Linux she 39 ll,linux_shell 编程学习-初识she'll
  14. 八码数 · 哈希+BFS
  15. 关于Jvav、JQuery
  16. 北方民族大学计算机技术学制,北方民族大学2014年硕士研究生考试调剂信息
  17. 以史为鉴:思考元宇宙的方法
  18. 桌面总是弹出计算机内存不足,Win10提示计算机的内存不足请保存文件并关闭程序...
  19. 一款写书、写手册、电子书制作工具
  20. 前端自学笔记(五)ES6 - (1)

热门文章

  1. Java消息中间件--ActiveMq,RabbitMQ,Kafka
  2. css选择指定class的元素
  3. 用U盘安装Linux系统的简单方法
  4. sql面试题:分组查询和组函数的练习
  5. PHP面向对象中new self( )和 new static( ) 的区别
  6. R语言基础篇-----画图
  7. Zookeeper之session的基本原理
  8. redis数据类型list总结
  9. SpringSecurity权限配置详解
  10. 部分 I. 教程_第 2 章 SQL语言_2.2. 概念