【Flink】Flink 1.12.2 SlotManager
1.概述
转载:Flink 1.12.2 源码浅析 :SlotManager
SlotManager
位于ResourceManager
中.
SlotManager 负责维护所有已注册的任务管理器slot、它们的分配和所有挂起的slot请求的视图。
无论何时注册新slot或释放分配的slot,它都会尝试执行另一个挂起的slot请求。 每当没有足够的可用slot时,slot管理器将通过{@link ResourceActions#allocateResource(WorkerResourceSpec)}通知资源管理器。
为了释放资源并避免资源泄漏,空闲任务管理器(其slot当前未使用的任务管理器)和挂起的slot请求分别超时,从而触发其释放和失败。
二 .SlotManager
2.1. 介绍
SlotManager 是一个接口类,定义了ResourceManager中管理Slot的相关操作.
2.2. 接口清单
名称 | 描述 |
---|---|
start | 启动 |
suspend | 挂起 |
int getNumberRegisteredSlots() | 获取注册slot数量 |
int getNumberRegisteredSlotsOf(InstanceID instanceId) | 根据InstanceID获取slot数量 |
int getNumberFreeSlots() | 获取空闲slot数量 |
int getNumberFreeSlotsOf(InstanceID instanceId) | 根据InstanceID获取空闲slot数量 |
Map<WorkerResourceSpec, Integer> getRequiredResources(); | 获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。 |
ResourceProfile getRegisteredResource(); | 获取注册的Resource |
ResourceProfile getRegisteredResourceOf(InstanceID instanceID); | 根据InstanceId 获取注册的Resource |
ResourceProfile getFreeResource(); | 获取空闲Resource |
ResourceProfile getFreeResourceOf(InstanceID instanceID); | 根据InstanceID获取空闲的Resource |
int getNumberPendingSlotRequests(); | 挂起的SlotRequests数量 |
void processResourceRequirements(ResourceRequirements resourceRequirements); | 通知slot manager关于 job需要的资源信息 |
boolean registerSlotRequest(SlotRequest slotRequest) | 发送slot请求. |
boolean unregisterSlotRequest(AllocationID allocationId) | 取消发送slot请求. |
registerTaskManager | 注册TaskManager |
unregisterTaskManager | 取消注册TaskManager |
boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport); | 报告 SlotStatus |
void freeSlot(SlotID slotId, AllocationID allocationId); | 释放Slot |
void setFailUnfulfillableRequest(boolean failUnfulfillableRequest); | 设置失败未实现的请求. |
三 .SlotManagerImpl
SlotManagerImpl是SlotManager接口的实现类.
3.1. 属性清单
/** * 所有当前可用slot的索引。* Map for all registered slots. */private final HashMap<SlotID, TaskManagerSlot> slots;/** Index of all currently free slots. */private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;/*** 所有当前注册的任务管理器。* All currently registered task managers. */private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;/*** 用于请求重复数据消除的已完成和活动分配的映射。* Map of fulfilled and active allocations for request deduplication purposes.* */private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;/*** 挂起/未完成的slot分配请求的映射* Map of pending/unfulfilled slot allocation requests. */private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;/** Scheduled executor for timeouts. */private final ScheduledExecutor scheduledExecutor;/** Timeout for slot requests to the task manager. */private final Time taskManagerRequestTimeout;/** Timeout after which an allocation is discarded. */private final Time slotRequestTimeout;/** Timeout after which an unused TaskManager is released. */private final Time taskManagerTimeout;private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;private final SlotMatchingStrategy slotMatchingStrategy;/** ResourceManager's id. */private ResourceManagerId resourceManagerId;/** Executor for future callbacks which have to be "synchronized". */private Executor mainThreadExecutor;/** Callbacks for resource (de-)allocations. */private ResourceActions resourceActions;private ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;private ScheduledFuture<?> slotRequestTimeoutCheck;/** True iff the component has been started. */private boolean started;/*** Release task executor only when each produced result partition is either consumed or failed.*/private final boolean waitResultConsumedBeforeRelease;/** Defines the max limitation of the total number of slots. */private final int maxSlotNum;/** Defines the number of redundant taskmanagers. */private final int redundantTaskManagerNum;/*** If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request* to pend. A slot request is considered unfulfillable if it cannot be fulfilled by neither a* slot that is already registered (including allocated ones) nor a pending slot that the {@link* ResourceActions} can allocate.*/private boolean failUnfulfillableRequest = true;/** The default resource spec of workers to request. */private final WorkerResourceSpec defaultWorkerResourceSpec;private final int numSlotsPerWorker;private final ResourceProfile defaultSlotResourceProfile;private final SlotManagerMetricGroup slotManagerMetricGroup;
3.2. 构造方法
构造方法就是普通的赋值&初始化的过程…
public SlotManagerImpl(ScheduledExecutor scheduledExecutor,SlotManagerConfiguration slotManagerConfiguration,SlotManagerMetricGroup slotManagerMetricGroup) {this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);Preconditions.checkNotNull(slotManagerConfiguration);this.slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy();this.taskManagerRequestTimeout = slotManagerConfiguration.getTaskManagerRequestTimeout();this.slotRequestTimeout = slotManagerConfiguration.getSlotRequestTimeout();this.taskManagerTimeout = slotManagerConfiguration.getTaskManagerTimeout();this.waitResultConsumedBeforeRelease =slotManagerConfiguration.isWaitResultConsumedBeforeRelease();this.defaultWorkerResourceSpec = slotManagerConfiguration.getDefaultWorkerResourceSpec();this.numSlotsPerWorker = slotManagerConfiguration.getNumSlotsPerWorker();this.defaultSlotResourceProfile =generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker);this.slotManagerMetricGroup = Preconditions.checkNotNull(slotManagerMetricGroup);this.maxSlotNum = slotManagerConfiguration.getMaxSlotNum();this.redundantTaskManagerNum = slotManagerConfiguration.getRedundantTaskManagerNum();slots = new HashMap<>(16);freeSlots = new LinkedHashMap<>(16);taskManagerRegistrations = new HashMap<>(4);fulfilledSlotRequests = new HashMap<>(16);pendingSlotRequests = new HashMap<>(16);pendingSlots = new HashMap<>(16);resourceManagerId = null;resourceActions = null;mainThreadExecutor = null;taskManagerTimeoutsAndRedundancyCheck = null;slotRequestTimeoutCheck = null;started = false;}
3.3. 方法详单
3.3.1. start
启动
/*** Starts the slot manager with the given leader id and resource manager actions.** @param newResourceManagerId to use for communication with the task managers* @param newMainThreadExecutor to use to run code in the ResourceManager's main thread* @param newResourceActions to use for resource (de-)allocations*/@Overridepublic void start(ResourceManagerId newResourceManagerId,Executor newMainThreadExecutor,ResourceActions newResourceActions) {LOG.info("Starting the SlotManager.");this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);resourceActions = Preconditions.checkNotNull(newResourceActions);started = true;taskManagerTimeoutsAndRedundancyCheck =scheduledExecutor.scheduleWithFixedDelay(() ->mainThreadExecutor.execute(() -> checkTaskManagerTimeoutsAndRedundancy()),0L,taskManagerTimeout.toMilliseconds(),TimeUnit.MILLISECONDS);slotRequestTimeoutCheck =scheduledExecutor.scheduleWithFixedDelay(() -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()),0L,slotRequestTimeout.toMilliseconds(),TimeUnit.MILLISECONDS);registerSlotManagerMetrics();}
3.3.2. suspend
挂起
/** Suspends the component. This clears the internal state of the slot manager. */@Overridepublic void suspend() {LOG.info("Suspending the SlotManager.");// stop the timeout checks for the TaskManagers and the SlotRequestsif (taskManagerTimeoutsAndRedundancyCheck != null) {taskManagerTimeoutsAndRedundancyCheck.cancel(false);taskManagerTimeoutsAndRedundancyCheck = null;}if (slotRequestTimeoutCheck != null) {slotRequestTimeoutCheck.cancel(false);slotRequestTimeoutCheck = null;}for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {cancelPendingSlotRequest(pendingSlotRequest);}pendingSlotRequests.clear();ArrayList<InstanceID> registeredTaskManagers =new ArrayList<>(taskManagerRegistrations.keySet());for (InstanceID registeredTaskManager : registeredTaskManagers) {unregisterTaskManager(registeredTaskManager,new SlotManagerException("The slot manager is being suspended."));}resourceManagerId = null;resourceActions = null;started = false;}
3.3.3. int getNumberRegisteredSlots()
获取注册slot数量
@Overridepublic int getNumberRegisteredSlots() {return slots.size();}
3.3.4. int getNumberRegisteredSlotsOf(InstanceID instanceId)
根据InstanceID获取slot数量
@Overridepublic int getNumberRegisteredSlotsOf(InstanceID instanceId) {TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);if (taskManagerRegistration != null) {return taskManagerRegistration.getNumberRegisteredSlots();} else {return 0;}}
3.3.5. int getNumberFreeSlots()
获取空闲slot数量
@Overridepublic int getNumberFreeSlots() {return freeSlots.size();}
3.3.6. int getNumberFreeSlotsOf(InstanceID instanceId)
根据InstanceID获取空闲slot数量
@Overridepublic int getNumberFreeSlotsOf(InstanceID instanceId) {TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);if (taskManagerRegistration != null) {return taskManagerRegistration.getNumberFreeSlots();} else {return 0;}}
3.3.7. Map<WorkerResourceSpec, Integer> getRequiredResources();
获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。
@Overridepublic Map<WorkerResourceSpec, Integer> getRequiredResources() {final int pendingWorkerNum =MathUtils.divideRoundUp(pendingSlots.size(), numSlotsPerWorker);return pendingWorkerNum > 0? Collections.singletonMap(defaultWorkerResourceSpec, pendingWorkerNum): Collections.emptyMap();}
3.3.8. ResourceProfile getRegisteredResource();
获取注册的Resource
@Overridepublic ResourceProfile getRegisteredResource() {return getResourceFromNumSlots(getNumberRegisteredSlots());}
3.3.9. ResourceProfile getRegisteredResourceOf(InstanceID instanceID);
根据InstanceId 获取注册的Resource
@Overridepublic ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));}
3.3.10. ResourceProfile getFreeResource();
获取空闲Resource
@Overridepublic ResourceProfile getFreeResource() {return getResourceFromNumSlots(getNumberFreeSlots());}
3.3.11. ResourceProfile getFreeResourceOf(InstanceID instanceID);
根据InstanceID获取空闲的Resource
@Overridepublic ResourceProfile getFreeResourceOf(InstanceID instanceID) {return getResourceFromNumSlots(getNumberFreeSlotsOf(instanceID));}
3.3.12. int getNumberPendingSlotRequests();
挂起的SlotRequests数量
@Overridepublic ResourceProfile getFreeResource() {return getResourceFromNumSlots(getNumberFreeSlots());}
3.3.13. void processResourceRequirements(ResourceRequirements resourceRequirements);
通知slot manager关于 job需要的资源信息
@Overridepublic void processResourceRequirements(ResourceRequirements resourceRequirements) {// no-op; don't throw an UnsupportedOperationException here because there are code paths// where the resource// manager calls this method regardless of whether declarative resource management is used// or not}
3.3.14. boolean registerSlotRequest(SlotRequest slotRequest)
发送slot请求.
/*** Requests a slot with the respective resource profile.** @param slotRequest specifying the requested slot specs* @return true if the slot request was registered; false if the request is a duplicate* @throws ResourceManagerException if the slot request failed (e.g. not enough resources left)*/@Overridepublic boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {checkInit();if (checkDuplicateRequest(slotRequest.getAllocationId())) {LOG.debug("Ignoring a duplicate slot request with allocation id {}.",slotRequest.getAllocationId());return false;} else {//构建请求PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);try {// 请求操作internalRequestSlot(pendingSlotRequest);} catch (ResourceManagerException e) {// requesting the slot failed --> remove pending slot requestpendingSlotRequests.remove(slotRequest.getAllocationId());throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);}return true;}}/*** Tries to allocate a slot for the given slot request. If there is no slot available, the* resource manager is informed to allocate more resources and a timeout for the request is* registered.** @param pendingSlotRequest to allocate a slot for* @throws ResourceManagerException if the slot request failed or is unfulfillable*/private void internalRequestSlot(PendingSlotRequest pendingSlotRequest)throws ResourceManagerException {// 获取配置...final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();OptionalConsumer.of(findMatchingSlot(resourceProfile)).ifPresent(taskManagerSlot -> {// taskManagerSlot 存在操作allocateSlot(taskManagerSlot, pendingSlotRequest);}).ifNotPresent(() -> {// taskManagerSlot 不存在操作 ==> 启动 TaskManagerfulfillPendingSlotRequestWithPendingTaskManagerSlot( pendingSlotRequest);});}
3.3.15. boolean unregisterSlotRequest(AllocationID allocationId)
取消发送slot请求.
/*** Cancels and removes a pending slot request with the given allocation id. If there is no such* pending request, then nothing is done.** @param allocationId identifying the pending slot request* @return True if a pending slot request was found; otherwise false*/@Overridepublic boolean unregisterSlotRequest(AllocationID allocationId) {checkInit();PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);if (null != pendingSlotRequest) {LOG.debug("Cancel slot request {}.", allocationId);cancelPendingSlotRequest(pendingSlotRequest);return true;} else {LOG.debug("No pending slot request with allocation id {} found. Ignoring unregistration request.",allocationId);return false;}}
3.3.16. registerTaskManager
注册TaskManager
/**** 在 slot manager中注册一个新的task manager** 从而是 task managers slots 可以被感知/调度** Registers a new task manager at the slot manager.* This will make the task managers slots known and, thus, available for allocation.** @param taskExecutorConnection for the new task manager* @param initialSlotReport for the new task manager* @return True if the task manager has not been registered before and is registered* successfully; otherwise false*/@Overridepublic boolean registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {// 初始化检查// The slot manager has not been started.checkInit();LOG.debug("Registering TaskManager {} under {} at the SlotManager.",taskExecutorConnection.getResourceID().getStringWithMetadata(),taskExecutorConnection.getInstanceID());// 我们通过任务管理器的实例id来识别它们// we identify task managers by their instance idif (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {// 之间已经连接过, 直接报搞slot的状态.reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);return false;} else {if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {// 是否查过最大的 slot 数量...LOG.info("The total number of slots exceeds the max limitation {}, release the excess resource.",maxSlotNum);resourceActions.releaseResource(taskExecutorConnection.getInstanceID(),new FlinkException("The total number of slots exceeds the max limitation."));return false;}// 第一次注册TaskManager// first register the TaskManagerArrayList<SlotID> reportedSlots = new ArrayList<>();for (SlotStatus slotStatus : initialSlotReport) {reportedSlots.add(slotStatus.getSlotID());}TaskManagerRegistration taskManagerRegistration =new TaskManagerRegistration(taskExecutorConnection, reportedSlots);taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);// next register the new slotsfor (SlotStatus slotStatus : initialSlotReport) {// 开始注册slotsregisterSlot(slotStatus.getSlotID(),slotStatus.getAllocationID(),slotStatus.getJobID(),slotStatus.getResourceProfile(),taskExecutorConnection);}return true;}}
3.3.17. unregisterTaskManager
取消注册TaskManager
@Overridepublic boolean unregisterTaskManager(InstanceID instanceId, Exception cause) {checkInit();LOG.debug("Unregister TaskManager {} from the SlotManager.", instanceId);TaskManagerRegistration taskManagerRegistration =taskManagerRegistrations.remove(instanceId);if (null != taskManagerRegistration) {internalUnregisterTaskManager(taskManagerRegistration, cause);return true;} else {LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.",instanceId);return false;}}
3.3.18. boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport);
报告 SlotStatus
3.3.19. void freeSlot(SlotID slotId, AllocationID allocationId);
释放Slot
/*** Reports the current slot allocations for a task manager identified by the given instance id.** @param instanceId identifying the task manager for which to report the slot status* @param slotReport containing the status for all of its slots* @return true if the slot status has been updated successfully, otherwise false*/@Overridepublic boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {checkInit();TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);if (null != taskManagerRegistration) {// Received slot report from instance// 5ad0a12f8bb03f9d016f8d1e18380563 :// SlotReport{// SlotStatus{// slotID=container_1615446205104_0025_01_000002_0,// allocationID=3755cb8f9962a9a7738db04f2a02084c,// jobID=694474d11da6100e82744c9e47e2f511,// resourceProfile=ResourceProfile{// cpuCores=1.0000000000000000,// taskHeapMemory=384.000mb (402653174 bytes),// taskOffHeapMemory=0 bytes,// managedMemory=512.000mb (536870920 bytes),// networkMemory=128.000mb (134217730 bytes)// }// }// }.LOG.debug("Received slot report from instance {}: {}.", instanceId, slotReport);for (SlotStatus slotStatus : slotReport) {updateSlot(slotStatus.getSlotID(),slotStatus.getAllocationID(),slotStatus.getJobID());}return true;} else {LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.",instanceId);return false;}}
3.3.20. void setFailUnfulfillableRequest(boolean failUnfulfillableRequest);
设置失败未实现的请求.
@Overridepublic void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {if (!this.failUnfulfillableRequest && failUnfulfillableRequest) {// fail unfulfillable pending requestsIterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator =pendingSlotRequests.entrySet().iterator();while (slotRequestIterator.hasNext()) {PendingSlotRequest pendingSlotRequest = slotRequestIterator.next().getValue();if (pendingSlotRequest.getAssignedPendingTaskManagerSlot() != null) {continue;}if (!isFulfillableByRegisteredOrPendingSlots(pendingSlotRequest.getResourceProfile())) {slotRequestIterator.remove();resourceActions.notifyAllocationFailure(pendingSlotRequest.getJobId(),pendingSlotRequest.getAllocationId(),new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(),pendingSlotRequest.getResourceProfile()));}}}this.failUnfulfillableRequest = failUnfulfillableRequest;}
3.4. freeSlot
/*** Free the given slot from the given allocation. If the slot is still allocated by the given* allocation id, then the slot will be marked as free and will be subject to new slot requests.** @param slotId identifying the slot to free* @param allocationId with which the slot is presumably allocated*/@Overridepublic void freeSlot(SlotID slotId, AllocationID allocationId) {checkInit();TaskManagerSlot slot = slots.get(slotId);if (null != slot) {if (slot.getState() == SlotState.ALLOCATED) {if (Objects.equals(allocationId, slot.getAllocationId())) {TaskManagerRegistration taskManagerRegistration =taskManagerRegistrations.get(slot.getInstanceId());if (taskManagerRegistration == null) {throw new IllegalStateException("Trying to free a slot from a TaskManager "+ slot.getInstanceId()+ " which has not been registered.");}updateSlotState(slot, taskManagerRegistration, null, null);} else {LOG.debug("Received request to free slot {} with expected allocation id {}, "+ "but actual allocation id {} differs. Ignoring the request.",slotId,allocationId,slot.getAllocationId());}} else {LOG.debug("Slot {} has not been allocated.", allocationId);}} else {LOG.debug("Trying to free a slot {} which has not been registered. Ignoring this message.",slotId);}}
3.5. updateSlotState
// 根据slot的状态进行不同的操作...private void updateSlotState(TaskManagerSlot slot,TaskManagerRegistration taskManagerRegistration,@Nullable AllocationID allocationId,@Nullable JobID jobId) {if (null != allocationId) {switch (slot.getState()) {// 挂起...case PENDING:// we have a pending slot request --> check whether we have to reject itPendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest();if (Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) {// we can cancel the slot request because it has been fulfilledcancelPendingSlotRequest(pendingSlotRequest);// remove the pending slot request, since it has been completedpendingSlotRequests.remove(pendingSlotRequest.getAllocationId());slot.completeAllocation(allocationId, jobId);} else {// we first have to free the slot in order to set a new allocationIdslot.clearPendingSlotRequest();// set the allocation id such that the slot won't be considered for the// pending slot requestslot.updateAllocation(allocationId, jobId);// remove the pending request if any as it has been assignedfinal PendingSlotRequest actualPendingSlotRequest =pendingSlotRequests.remove(allocationId);if (actualPendingSlotRequest != null) {cancelPendingSlotRequest(actualPendingSlotRequest);}// this will try to find a new slot for the requestrejectPendingSlotRequest(pendingSlotRequest,new Exception("Task manager reported slot "+ slot.getSlotId()+ " being already allocated."));}taskManagerRegistration.occupySlot();break;case ALLOCATED:// 分配if (!Objects.equals(allocationId, slot.getAllocationId())) {slot.freeSlot();slot.updateAllocation(allocationId, jobId);}break;case FREE:// 释放..// the slot is currently free --> it is stored in freeSlotsfreeSlots.remove(slot.getSlotId());slot.updateAllocation(allocationId, jobId);taskManagerRegistration.occupySlot();break;}fulfilledSlotRequests.put(allocationId, slot.getSlotId());} else {// no allocation reportedswitch (slot.getState()) {case FREE:// 处理空闲handleFreeSlot(slot);break;case PENDING:// 不要做任何事情,因为我们还有一个挂起的slot请求// don't do anything because we still have a pending slot requestbreak;case ALLOCATED:// 分配操作...AllocationID oldAllocation = slot.getAllocationId();slot.freeSlot();fulfilledSlotRequests.remove(oldAllocation);taskManagerRegistration.freeSlot();handleFreeSlot(slot);break;}}}
【Flink】Flink 1.12.2 SlotManager相关推荐
- 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动
1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [四] 上一篇: [flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 Jo ...
- 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint
1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [三] 上一章:[flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yar ...
- 【Flink】FLink 1.12 版本的 Row 类型 中的 RowKind 是干嘛的
1.概述 在[Flink]Flink 从 1.9.1 版本 升级到 1.12.4 版本的 注意事项 以及 过程 升级的时候,偶然看到row类型多了一个属性. @PublicEvolving publi ...
- 【FLink】Flink 1.9 升级到 1.12.4 无配置页面 无日志
文章目录 1.概述 2.场景再现2 2.1 概述 2.2 日志的配置 2.3 加载2个 2.4 缺少文件 2.7 扩展 2.5.1 Flink 默认日志框架 2.5.2 slf4j 基础概念 2.5. ...
- 【Flink】Flink 1.9 升级 1.12.4 本地可以运行 打包后 集群运行就找不到类 ClassNotFoundException
1.场景1 1.1 概述 升级过程请参考: [Flink]Flink 从 1.9.1 版本 升级到 1.12.4 版本的 注意事项 以及 过程 然后本地运行好好的,但是而且,是在主类中类. 打开jar ...
- 【2】flink数据流转换算子
[README] 本文记录了flink对数据的转换操作,包括 基本转换,map,flatMap,filter: 滚动聚合(min minBy max maxBy sum): 规约聚合-reduce: ...
- 【flink】flink 报错 key group from 44 to 45 does not contain 4
文章目录 1.概述 本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载. 1.概述 此问题和一个问题很相似:[Flink]Flink KeyGroupRang ...
- 【flink】Flink常见Checkpoint超时问题排查思路
1.概述 转载:Flink常见Checkpoint超时问题排查思路 这里仅仅是自己学习. 在日常flink应用中,相信大家经常会遇到checkpoint超时失败这类的问题,遇到这种情况的时候仅仅只会在 ...
- 【Flink】flink 升级 the given -yarn-cluster does not contain a valid port
文章目录 1.场景1 1.1 概述 1.场景1 1.1 概述 在本次场景中,我是从flink 1.9升级到1.12.4 升级请参考:[Flink]Flink 从 1.9.1 版本 升级到 1.12.4 ...
- 【3】flink sink
[README] 本文记录了flink sink操作,输出目的存储器(中间件)包括 kafka: es: db: 等等有很多: 本文只给出了 sink2kafka的代码: 本文使用的flink为 1. ...
最新文章
- Java最全文件操作实例汇总
- https ddos攻击——由于有了认证和加解密 后果更严重 看绿盟的产品目前对于https的ddos cc攻击需要基于内容做检测...
- 静态工厂配置bean
- POJ_1862 Stripies 【贪心】
- python的数据结构
- mysql8.0. linux二进制_linux下安装mysql8.0(二进制方式)
- redis的info
- linux日志.pdf,一种用于LINUX的AUDIT日志分析方法.pdf
- R语言作图之ggplot2初识(1)
- QT分析之网络编程(七)
- 一筐鸡蛋筐拿鸡蛋的问题
- python爬取全国真实地址_python爬虫学习之爬取全国各省市县级城市邮政编码
- PHP icbc工商银行开放平台聚合支付,二维码扫码支付API云收呗对接步骤,稳步发展
- Android系统优化实操总结
- Unity 之 2D水插件推荐和模拟水效果制作分享
- 我的发明20220723
- 互联网广告人--联合御寒--品牌,代理,平台,达人 多方携手御寒
- java七行情书_七行情书
- 3DMAX如何打开mat文件
- 设置电脑右下角显示秒钟
热门文章
- 曾比海底捞还牛,如今关店1200家!肯德基的猪队友,快被中国人抛弃了?
- 抖音发布2020数据报告:日均视频搜索量破4亿,70后最爱发表情包
- 游族内部信:年终奖如期发放 继续招聘全球化游戏人才
- 三星Galaxy Note20系列价格曝光:大小杯差距明显
- 《俄罗斯方块》正版授权手游开启预约:支持QQ、微信双平台
- 哔哩哔哩2019年Q4及全年财报:全年营收67.8亿元,同比增长64%
- 雷军:小米CC9 Pro人像镜头简直太奢华了
- 墨迹天气回应IPO失败:不会因一次失利而止步
- 传统金融为什么要做AI?平安保险CEO解读行业痛点
- DxOMark排名更新榜首易主 华为被拉下马:我还会回来的!