1.概述

转载:Flink 1.12.2 源码浅析 :SlotManager

SlotManager 位于ResourceManager中.

SlotManager 负责维护所有已注册的任务管理器slot、它们的分配和所有挂起的slot请求的视图。

无论何时注册新slot或释放分配的slot,它都会尝试执行另一个挂起的slot请求。 每当没有足够的可用slot时,slot管理器将通过{@link ResourceActions#allocateResource(WorkerResourceSpec)}通知资源管理器。

为了释放资源并避免资源泄漏,空闲任务管理器(其slot当前未使用的任务管理器)和挂起的slot请求分别超时,从而触发其释放和失败。

二 .SlotManager

2.1. 介绍

SlotManager 是一个接口类,定义了ResourceManager中管理Slot的相关操作.

2.2. 接口清单

名称 描述
start 启动
suspend 挂起
int getNumberRegisteredSlots() 获取注册slot数量
int getNumberRegisteredSlotsOf(InstanceID instanceId) 根据InstanceID获取slot数量
int getNumberFreeSlots() 获取空闲slot数量
int getNumberFreeSlotsOf(InstanceID instanceId) 根据InstanceID获取空闲slot数量
Map<WorkerResourceSpec, Integer> getRequiredResources(); 获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。
ResourceProfile getRegisteredResource(); 获取注册的Resource
ResourceProfile getRegisteredResourceOf(InstanceID instanceID); 根据InstanceId 获取注册的Resource
ResourceProfile getFreeResource(); 获取空闲Resource
ResourceProfile getFreeResourceOf(InstanceID instanceID); 根据InstanceID获取空闲的Resource
int getNumberPendingSlotRequests(); 挂起的SlotRequests数量
void processResourceRequirements(ResourceRequirements resourceRequirements); 通知slot manager关于 job需要的资源信息
boolean registerSlotRequest(SlotRequest slotRequest) 发送slot请求.
boolean unregisterSlotRequest(AllocationID allocationId) 取消发送slot请求.
registerTaskManager 注册TaskManager
unregisterTaskManager 取消注册TaskManager
boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport); 报告 SlotStatus
void freeSlot(SlotID slotId, AllocationID allocationId); 释放Slot
void setFailUnfulfillableRequest(boolean failUnfulfillableRequest); 设置失败未实现的请求.

三 .SlotManagerImpl

SlotManagerImpl是SlotManager接口的实现类.

3.1. 属性清单

/** * 所有当前可用slot的索引。* Map for all registered slots. */private final HashMap<SlotID, TaskManagerSlot> slots;/** Index of all currently free slots. */private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;/*** 所有当前注册的任务管理器。* All currently registered task managers. */private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;/*** 用于请求重复数据消除的已完成和活动分配的映射。* Map of fulfilled and active allocations for request deduplication purposes.* */private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;/*** 挂起/未完成的slot分配请求的映射* Map of pending/unfulfilled slot allocation requests. */private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;/** Scheduled executor for timeouts. */private final ScheduledExecutor scheduledExecutor;/** Timeout for slot requests to the task manager. */private final Time taskManagerRequestTimeout;/** Timeout after which an allocation is discarded. */private final Time slotRequestTimeout;/** Timeout after which an unused TaskManager is released. */private final Time taskManagerTimeout;private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;private final SlotMatchingStrategy slotMatchingStrategy;/** ResourceManager's id. */private ResourceManagerId resourceManagerId;/** Executor for future callbacks which have to be "synchronized". */private Executor mainThreadExecutor;/** Callbacks for resource (de-)allocations. */private ResourceActions resourceActions;private ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;private ScheduledFuture<?> slotRequestTimeoutCheck;/** True iff the component has been started. */private boolean started;/*** Release task executor only when each produced result partition is either consumed or failed.*/private final boolean waitResultConsumedBeforeRelease;/** Defines the max limitation of the total number of slots. */private final int maxSlotNum;/** Defines the number of redundant taskmanagers. */private final int redundantTaskManagerNum;/*** If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request* to pend. A slot request is considered unfulfillable if it cannot be fulfilled by neither a* slot that is already registered (including allocated ones) nor a pending slot that the {@link* ResourceActions} can allocate.*/private boolean failUnfulfillableRequest = true;/** The default resource spec of workers to request. */private final WorkerResourceSpec defaultWorkerResourceSpec;private final int numSlotsPerWorker;private final ResourceProfile defaultSlotResourceProfile;private final SlotManagerMetricGroup slotManagerMetricGroup;

3.2. 构造方法

构造方法就是普通的赋值&初始化的过程…

public SlotManagerImpl(ScheduledExecutor scheduledExecutor,SlotManagerConfiguration slotManagerConfiguration,SlotManagerMetricGroup slotManagerMetricGroup) {this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);Preconditions.checkNotNull(slotManagerConfiguration);this.slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy();this.taskManagerRequestTimeout = slotManagerConfiguration.getTaskManagerRequestTimeout();this.slotRequestTimeout = slotManagerConfiguration.getSlotRequestTimeout();this.taskManagerTimeout = slotManagerConfiguration.getTaskManagerTimeout();this.waitResultConsumedBeforeRelease =slotManagerConfiguration.isWaitResultConsumedBeforeRelease();this.defaultWorkerResourceSpec = slotManagerConfiguration.getDefaultWorkerResourceSpec();this.numSlotsPerWorker = slotManagerConfiguration.getNumSlotsPerWorker();this.defaultSlotResourceProfile =generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker);this.slotManagerMetricGroup = Preconditions.checkNotNull(slotManagerMetricGroup);this.maxSlotNum = slotManagerConfiguration.getMaxSlotNum();this.redundantTaskManagerNum = slotManagerConfiguration.getRedundantTaskManagerNum();slots = new HashMap<>(16);freeSlots = new LinkedHashMap<>(16);taskManagerRegistrations = new HashMap<>(4);fulfilledSlotRequests = new HashMap<>(16);pendingSlotRequests = new HashMap<>(16);pendingSlots = new HashMap<>(16);resourceManagerId = null;resourceActions = null;mainThreadExecutor = null;taskManagerTimeoutsAndRedundancyCheck = null;slotRequestTimeoutCheck = null;started = false;}

3.3. 方法详单

3.3.1. start

启动

/*** Starts the slot manager with the given leader id and resource manager actions.** @param newResourceManagerId to use for communication with the task managers* @param newMainThreadExecutor to use to run code in the ResourceManager's main thread* @param newResourceActions to use for resource (de-)allocations*/@Overridepublic void start(ResourceManagerId newResourceManagerId,Executor newMainThreadExecutor,ResourceActions newResourceActions) {LOG.info("Starting the SlotManager.");this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);resourceActions = Preconditions.checkNotNull(newResourceActions);started = true;taskManagerTimeoutsAndRedundancyCheck =scheduledExecutor.scheduleWithFixedDelay(() ->mainThreadExecutor.execute(() -> checkTaskManagerTimeoutsAndRedundancy()),0L,taskManagerTimeout.toMilliseconds(),TimeUnit.MILLISECONDS);slotRequestTimeoutCheck =scheduledExecutor.scheduleWithFixedDelay(() -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()),0L,slotRequestTimeout.toMilliseconds(),TimeUnit.MILLISECONDS);registerSlotManagerMetrics();}

3.3.2. suspend

挂起

/** Suspends the component. This clears the internal state of the slot manager. */@Overridepublic void suspend() {LOG.info("Suspending the SlotManager.");// stop the timeout checks for the TaskManagers and the SlotRequestsif (taskManagerTimeoutsAndRedundancyCheck != null) {taskManagerTimeoutsAndRedundancyCheck.cancel(false);taskManagerTimeoutsAndRedundancyCheck = null;}if (slotRequestTimeoutCheck != null) {slotRequestTimeoutCheck.cancel(false);slotRequestTimeoutCheck = null;}for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {cancelPendingSlotRequest(pendingSlotRequest);}pendingSlotRequests.clear();ArrayList<InstanceID> registeredTaskManagers =new ArrayList<>(taskManagerRegistrations.keySet());for (InstanceID registeredTaskManager : registeredTaskManagers) {unregisterTaskManager(registeredTaskManager,new SlotManagerException("The slot manager is being suspended."));}resourceManagerId = null;resourceActions = null;started = false;}

3.3.3. int getNumberRegisteredSlots()

获取注册slot数量

    @Overridepublic int getNumberRegisteredSlots() {return slots.size();}

3.3.4. int getNumberRegisteredSlotsOf(InstanceID instanceId)

根据InstanceID获取slot数量

    @Overridepublic int getNumberRegisteredSlotsOf(InstanceID instanceId) {TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);if (taskManagerRegistration != null) {return taskManagerRegistration.getNumberRegisteredSlots();} else {return 0;}}

3.3.5. int getNumberFreeSlots()

获取空闲slot数量

    @Overridepublic int getNumberFreeSlots() {return freeSlots.size();}

3.3.6. int getNumberFreeSlotsOf(InstanceID instanceId)

根据InstanceID获取空闲slot数量

    @Overridepublic int getNumberFreeSlotsOf(InstanceID instanceId) {TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);if (taskManagerRegistration != null) {return taskManagerRegistration.getNumberFreeSlots();} else {return 0;}}

3.3.7. Map<WorkerResourceSpec, Integer> getRequiredResources();

获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。

@Overridepublic Map<WorkerResourceSpec, Integer> getRequiredResources() {final int pendingWorkerNum =MathUtils.divideRoundUp(pendingSlots.size(), numSlotsPerWorker);return pendingWorkerNum > 0? Collections.singletonMap(defaultWorkerResourceSpec, pendingWorkerNum): Collections.emptyMap();}

3.3.8. ResourceProfile getRegisteredResource();

获取注册的Resource

@Overridepublic ResourceProfile getRegisteredResource() {return getResourceFromNumSlots(getNumberRegisteredSlots());}

3.3.9. ResourceProfile getRegisteredResourceOf(InstanceID instanceID);

根据InstanceId 获取注册的Resource

    @Overridepublic ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));}

3.3.10. ResourceProfile getFreeResource();

获取空闲Resource

    @Overridepublic ResourceProfile getFreeResource() {return getResourceFromNumSlots(getNumberFreeSlots());}

3.3.11. ResourceProfile getFreeResourceOf(InstanceID instanceID);

根据InstanceID获取空闲的Resource

    @Overridepublic ResourceProfile getFreeResourceOf(InstanceID instanceID) {return getResourceFromNumSlots(getNumberFreeSlotsOf(instanceID));}

3.3.12. int getNumberPendingSlotRequests();

挂起的SlotRequests数量

    @Overridepublic ResourceProfile getFreeResource() {return getResourceFromNumSlots(getNumberFreeSlots());}

3.3.13. void processResourceRequirements(ResourceRequirements resourceRequirements);

通知slot manager关于 job需要的资源信息

    @Overridepublic void processResourceRequirements(ResourceRequirements resourceRequirements) {// no-op; don't throw an UnsupportedOperationException here because there are code paths// where the resource// manager calls this method regardless of whether declarative resource management is used// or not}

3.3.14. boolean registerSlotRequest(SlotRequest slotRequest)

发送slot请求.

/*** Requests a slot with the respective resource profile.** @param slotRequest specifying the requested slot specs* @return true if the slot request was registered; false if the request is a duplicate* @throws ResourceManagerException if the slot request failed (e.g. not enough resources left)*/@Overridepublic boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {checkInit();if (checkDuplicateRequest(slotRequest.getAllocationId())) {LOG.debug("Ignoring a duplicate slot request with allocation id {}.",slotRequest.getAllocationId());return false;} else {//构建请求PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);try {// 请求操作internalRequestSlot(pendingSlotRequest);} catch (ResourceManagerException e) {// requesting the slot failed --> remove pending slot requestpendingSlotRequests.remove(slotRequest.getAllocationId());throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);}return true;}}/*** Tries to allocate a slot for the given slot request. If there is no slot available, the* resource manager is informed to allocate more resources and a timeout for the request is* registered.** @param pendingSlotRequest to allocate a slot for* @throws ResourceManagerException if the slot request failed or is unfulfillable*/private void internalRequestSlot(PendingSlotRequest pendingSlotRequest)throws ResourceManagerException {// 获取配置...final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();OptionalConsumer.of(findMatchingSlot(resourceProfile)).ifPresent(taskManagerSlot -> {// taskManagerSlot 存在操作allocateSlot(taskManagerSlot, pendingSlotRequest);}).ifNotPresent(() -> {// taskManagerSlot 不存在操作 ==>  启动 TaskManagerfulfillPendingSlotRequestWithPendingTaskManagerSlot( pendingSlotRequest);});}

3.3.15. boolean unregisterSlotRequest(AllocationID allocationId)

取消发送slot请求.

/*** Cancels and removes a pending slot request with the given allocation id. If there is no such* pending request, then nothing is done.** @param allocationId identifying the pending slot request* @return True if a pending slot request was found; otherwise false*/@Overridepublic boolean unregisterSlotRequest(AllocationID allocationId) {checkInit();PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);if (null != pendingSlotRequest) {LOG.debug("Cancel slot request {}.", allocationId);cancelPendingSlotRequest(pendingSlotRequest);return true;} else {LOG.debug("No pending slot request with allocation id {} found. Ignoring unregistration request.",allocationId);return false;}}

3.3.16. registerTaskManager

注册TaskManager

/**** 在 slot manager中注册一个新的task manager** 从而是 task managers slots 可以被感知/调度** Registers a new task manager at the slot manager.* This will make the task managers slots known and, thus, available for allocation.** @param taskExecutorConnection for the new task manager* @param initialSlotReport for the new task manager* @return True if the task manager has not been registered before and is registered*     successfully; otherwise false*/@Overridepublic boolean registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {// 初始化检查// The slot manager has not been started.checkInit();LOG.debug("Registering TaskManager {} under {} at the SlotManager.",taskExecutorConnection.getResourceID().getStringWithMetadata(),taskExecutorConnection.getInstanceID());// 我们通过任务管理器的实例id来识别它们// we identify task managers by their instance idif (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {// 之间已经连接过, 直接报搞slot的状态.reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);return false;} else {if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {// 是否查过最大的 slot 数量...LOG.info("The total number of slots exceeds the max limitation {}, release the excess resource.",maxSlotNum);resourceActions.releaseResource(taskExecutorConnection.getInstanceID(),new FlinkException("The total number of slots exceeds the max limitation."));return false;}// 第一次注册TaskManager// first register the TaskManagerArrayList<SlotID> reportedSlots = new ArrayList<>();for (SlotStatus slotStatus : initialSlotReport) {reportedSlots.add(slotStatus.getSlotID());}TaskManagerRegistration taskManagerRegistration =new TaskManagerRegistration(taskExecutorConnection, reportedSlots);taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);// next register the new slotsfor (SlotStatus slotStatus : initialSlotReport) {// 开始注册slotsregisterSlot(slotStatus.getSlotID(),slotStatus.getAllocationID(),slotStatus.getJobID(),slotStatus.getResourceProfile(),taskExecutorConnection);}return true;}}

3.3.17. unregisterTaskManager

取消注册TaskManager

 @Overridepublic boolean unregisterTaskManager(InstanceID instanceId, Exception cause) {checkInit();LOG.debug("Unregister TaskManager {} from the SlotManager.", instanceId);TaskManagerRegistration taskManagerRegistration =taskManagerRegistrations.remove(instanceId);if (null != taskManagerRegistration) {internalUnregisterTaskManager(taskManagerRegistration, cause);return true;} else {LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.",instanceId);return false;}}

3.3.18. boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport);

报告 SlotStatus

3.3.19. void freeSlot(SlotID slotId, AllocationID allocationId);

释放Slot

/*** Reports the current slot allocations for a task manager identified by the given instance id.** @param instanceId identifying the task manager for which to report the slot status* @param slotReport containing the status for all of its slots* @return true if the slot status has been updated successfully, otherwise false*/@Overridepublic boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {checkInit();TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);if (null != taskManagerRegistration) {// Received slot report from instance//      5ad0a12f8bb03f9d016f8d1e18380563 ://      SlotReport{//          SlotStatus{//              slotID=container_1615446205104_0025_01_000002_0,//              allocationID=3755cb8f9962a9a7738db04f2a02084c,//              jobID=694474d11da6100e82744c9e47e2f511,//              resourceProfile=ResourceProfile{//                  cpuCores=1.0000000000000000,//                  taskHeapMemory=384.000mb (402653174 bytes),//                  taskOffHeapMemory=0 bytes,//                  managedMemory=512.000mb (536870920 bytes),//                  networkMemory=128.000mb (134217730 bytes)//              }//          }//    }.LOG.debug("Received slot report from instance {}: {}.", instanceId, slotReport);for (SlotStatus slotStatus : slotReport) {updateSlot(slotStatus.getSlotID(),slotStatus.getAllocationID(),slotStatus.getJobID());}return true;} else {LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.",instanceId);return false;}}

3.3.20. void setFailUnfulfillableRequest(boolean failUnfulfillableRequest);

设置失败未实现的请求.

@Overridepublic void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {if (!this.failUnfulfillableRequest && failUnfulfillableRequest) {// fail unfulfillable pending requestsIterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator =pendingSlotRequests.entrySet().iterator();while (slotRequestIterator.hasNext()) {PendingSlotRequest pendingSlotRequest = slotRequestIterator.next().getValue();if (pendingSlotRequest.getAssignedPendingTaskManagerSlot() != null) {continue;}if (!isFulfillableByRegisteredOrPendingSlots(pendingSlotRequest.getResourceProfile())) {slotRequestIterator.remove();resourceActions.notifyAllocationFailure(pendingSlotRequest.getJobId(),pendingSlotRequest.getAllocationId(),new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(),pendingSlotRequest.getResourceProfile()));}}}this.failUnfulfillableRequest = failUnfulfillableRequest;}

3.4. freeSlot

/*** Free the given slot from the given allocation. If the slot is still allocated by the given* allocation id, then the slot will be marked as free and will be subject to new slot requests.** @param slotId identifying the slot to free* @param allocationId with which the slot is presumably allocated*/@Overridepublic void freeSlot(SlotID slotId, AllocationID allocationId) {checkInit();TaskManagerSlot slot = slots.get(slotId);if (null != slot) {if (slot.getState() == SlotState.ALLOCATED) {if (Objects.equals(allocationId, slot.getAllocationId())) {TaskManagerRegistration taskManagerRegistration =taskManagerRegistrations.get(slot.getInstanceId());if (taskManagerRegistration == null) {throw new IllegalStateException("Trying to free a slot from a TaskManager "+ slot.getInstanceId()+ " which has not been registered.");}updateSlotState(slot, taskManagerRegistration, null, null);} else {LOG.debug("Received request to free slot {} with expected allocation id {}, "+ "but actual allocation id {} differs. Ignoring the request.",slotId,allocationId,slot.getAllocationId());}} else {LOG.debug("Slot {} has not been allocated.", allocationId);}} else {LOG.debug("Trying to free a slot {} which has not been registered. Ignoring this message.",slotId);}}

3.5. updateSlotState

// 根据slot的状态进行不同的操作...private void updateSlotState(TaskManagerSlot slot,TaskManagerRegistration taskManagerRegistration,@Nullable AllocationID allocationId,@Nullable JobID jobId) {if (null != allocationId) {switch (slot.getState()) {// 挂起...case PENDING:// we have a pending slot request --> check whether we have to reject itPendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest();if (Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) {// we can cancel the slot request because it has been fulfilledcancelPendingSlotRequest(pendingSlotRequest);// remove the pending slot request, since it has been completedpendingSlotRequests.remove(pendingSlotRequest.getAllocationId());slot.completeAllocation(allocationId, jobId);} else {// we first have to free the slot in order to set a new allocationIdslot.clearPendingSlotRequest();// set the allocation id such that the slot won't be considered for the// pending slot requestslot.updateAllocation(allocationId, jobId);// remove the pending request if any as it has been assignedfinal PendingSlotRequest actualPendingSlotRequest =pendingSlotRequests.remove(allocationId);if (actualPendingSlotRequest != null) {cancelPendingSlotRequest(actualPendingSlotRequest);}// this will try to find a new slot for the requestrejectPendingSlotRequest(pendingSlotRequest,new Exception("Task manager reported slot "+ slot.getSlotId()+ " being already allocated."));}taskManagerRegistration.occupySlot();break;case ALLOCATED:// 分配if (!Objects.equals(allocationId, slot.getAllocationId())) {slot.freeSlot();slot.updateAllocation(allocationId, jobId);}break;case FREE:// 释放..// the slot is currently free --> it is stored in freeSlotsfreeSlots.remove(slot.getSlotId());slot.updateAllocation(allocationId, jobId);taskManagerRegistration.occupySlot();break;}fulfilledSlotRequests.put(allocationId, slot.getSlotId());} else {// no allocation reportedswitch (slot.getState()) {case FREE:// 处理空闲handleFreeSlot(slot);break;case PENDING:// 不要做任何事情,因为我们还有一个挂起的slot请求// don't do anything because we still have a pending slot requestbreak;case ALLOCATED:// 分配操作...AllocationID oldAllocation = slot.getAllocationId();slot.freeSlot();fulfilledSlotRequests.remove(oldAllocation);taskManagerRegistration.freeSlot();handleFreeSlot(slot);break;}}}

【Flink】Flink 1.12.2 SlotManager相关推荐

  1. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [四] 上一篇: [flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 Jo ...

  2. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [三] 上一章:[flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yar ...

  3. 【Flink】FLink 1.12 版本的 Row 类型 中的 RowKind 是干嘛的

    1.概述 在[Flink]Flink 从 1.9.1 版本 升级到 1.12.4 版本的 注意事项 以及 过程 升级的时候,偶然看到row类型多了一个属性. @PublicEvolving publi ...

  4. 【FLink】Flink 1.9 升级到 1.12.4 无配置页面 无日志

    文章目录 1.概述 2.场景再现2 2.1 概述 2.2 日志的配置 2.3 加载2个 2.4 缺少文件 2.7 扩展 2.5.1 Flink 默认日志框架 2.5.2 slf4j 基础概念 2.5. ...

  5. 【Flink】Flink 1.9 升级 1.12.4 本地可以运行 打包后 集群运行就找不到类 ClassNotFoundException

    1.场景1 1.1 概述 升级过程请参考: [Flink]Flink 从 1.9.1 版本 升级到 1.12.4 版本的 注意事项 以及 过程 然后本地运行好好的,但是而且,是在主类中类. 打开jar ...

  6. 【2】flink数据流转换算子

    [README] 本文记录了flink对数据的转换操作,包括 基本转换,map,flatMap,filter: 滚动聚合(min minBy max maxBy sum): 规约聚合-reduce: ...

  7. 【flink】flink 报错 key group from 44 to 45 does not contain 4

    文章目录 1.概述 本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载. 1.概述 此问题和一个问题很相似:[Flink]Flink KeyGroupRang ...

  8. 【flink】Flink常见Checkpoint超时问题排查思路

    1.概述 转载:Flink常见Checkpoint超时问题排查思路 这里仅仅是自己学习. 在日常flink应用中,相信大家经常会遇到checkpoint超时失败这类的问题,遇到这种情况的时候仅仅只会在 ...

  9. 【Flink】flink 升级 the given -yarn-cluster does not contain a valid port

    文章目录 1.场景1 1.1 概述 1.场景1 1.1 概述 在本次场景中,我是从flink 1.9升级到1.12.4 升级请参考:[Flink]Flink 从 1.9.1 版本 升级到 1.12.4 ...

  10. 【3】flink sink

    [README] 本文记录了flink sink操作,输出目的存储器(中间件)包括 kafka: es: db: 等等有很多: 本文只给出了 sink2kafka的代码: 本文使用的flink为 1. ...

最新文章

  1. Java最全文件操作实例汇总
  2. https ddos攻击——由于有了认证和加解密 后果更严重 看绿盟的产品目前对于https的ddos cc攻击需要基于内容做检测...
  3. 静态工厂配置bean
  4. POJ_1862 Stripies 【贪心】
  5. python的数据结构
  6. mysql8.0. linux二进制_linux下安装mysql8.0(二进制方式)
  7. redis的info
  8. linux日志.pdf,一种用于LINUX的AUDIT日志分析方法.pdf
  9. R语言作图之ggplot2初识(1)
  10. QT分析之网络编程(七)
  11. 一筐鸡蛋筐拿鸡蛋的问题
  12. python爬取全国真实地址_python爬虫学习之爬取全国各省市县级城市邮政编码
  13. PHP icbc工商银行开放平台聚合支付,二维码扫码支付API云收呗对接步骤,稳步发展
  14. Android系统优化实操总结
  15. Unity 之 2D水插件推荐和模拟水效果制作分享
  16. 我的发明20220723
  17. 互联网广告人--联合御寒--品牌,代理,平台,达人 多方携手御寒
  18. java七行情书_七行情书
  19. 3DMAX如何打开mat文件
  20. 设置电脑右下角显示秒钟

热门文章

  1. 曾比海底捞还牛,如今关店1200家!肯德基的猪队友,快被中国人抛弃了?
  2. 抖音发布2020数据报告:日均视频搜索量破4亿,70后最爱发表情包
  3. 游族内部信:年终奖如期发放 继续招聘全球化游戏人才
  4. 三星Galaxy Note20系列价格曝光:大小杯差距明显
  5. 《俄罗斯方块》正版授权手游开启预约:支持QQ、微信双平台
  6. 哔哩哔哩2019年Q4及全年财报:全年营收67.8亿元,同比增长64%
  7. 雷军:小米CC9 Pro人像镜头简直太奢华了
  8. 墨迹天气回应IPO失败:不会因一次失利而止步
  9. 传统金融为什么要做AI?平安保险CEO解读行业痛点
  10. DxOMark排名更新榜首易主 华为被拉下马:我还会回来的!