【Flink】Flink 1.12.2 TaskSlotTable
1.概述
转载:Flink 1.12.2 源码浅析 : TaskSlotTable
多个{@link TaskSlot}实例的容器。
此外,它还维护多个索引,以便更快地访问任务和分配的slots。
当slots无法分配给job manager时会自动超时…
在使用 TaskSlotTable 之前,必须通过 {@link #start} 方法启动它。
二 .TaskSlotTable 接口
名称 | 描述 |
---|---|
start | 根据给定的 slot actions 启动 task slot table |
getAllocationIdsPerJob | 返回job所有的AllocationID |
getActiveTaskSlotAllocationIds | 返回TaskSlotTable中 所有active task的 AllocationID |
getActiveTaskSlotAllocationIdsPerJob | 返回 jobId 中 acitve TaskSlot 中所有的 AllocationID |
createSlotReport | 返回 SlotReport |
allocateSlot |
为给定job 和allocation id 分配具有给定索引的slot。 如果给定负指数,则生成一个新的自增指数。 如果可以分配slot,则返回true。否则返回false。 |
markSlotActive | 将给定分配id下的slot标记为活动。 |
markSlotInactive | 将给定分配id下的slot标记为非活动。 |
freeSlot |
试着释放这个slot。 如果slot为空,它将任务slot的状态设置为free并返回其索引 如果slot不是空的,那么它会将任务slot的状态设置为releasing,fail all tasks并返回-1。 |
isValidTimeout | 根据allocation id. 检查ticket 是否超时 |
isAllocated | 根据给定的 index , job和 allocation id .检车slot状态是否为 ALLOCATED |
tryMarkSlotActive | 根据JobID 和 AllocationID 标记匹配的slot状态为ACTIVE(如果该slot的状态为allocated) |
isSlotFree | 检测 slot状态是否为free |
hasAllocatedSlots | 检查作业是否已分配(非活动)slot。 |
getAllocatedSlots | 根据job id 返回 所有的TaskSlot |
getOwningJob | 根据AllocationID 返回所属的 JobID |
addTask | 将给定任务添加到由任务的 allocation id 标识的slot中。 |
removeTask |
从task slot中删除具有给定执行attempt id 的任务。 如果拥有的task slot处于释放状态并且在删除任务后为空,则通过slot actions 释放slot。 |
getTask | 根据ExecutionAttemptID / JobID 获取task |
getCurrentAllocation | 根据index获取改slot所分配的AllocationID |
getTaskMemoryManager | 根据AllocationID 获取 MemoryManager |
三 .TaskSlotTableImpl
TaskSlotTableImpl 是 TaskSlotTable 接口的实现类. 负责TaskExecutor端的slot管理.
# 构造调用顺序:
org.apache.flink.yarn.YarnTaskExecutorRunner#mainorg.apache.flink.runtime.taskexecutor.TaskManagerRunner#runTaskManagerProcessSecurelyorg.apache.flink.runtime.taskexecutor.TaskManagerRunner#runTaskManagerorg.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager (返回TaskExecutor对象实例)org.apache.flink.runtime.taskexecutor.TaskManagerServices#fromConfiguration
3.1. 属性相关
3.1.1. slot 相关
/*** 静态slot分配中的slot数。* 如果请求带索引的slot,则所请求的索引必须在[0,numberSlots)范围内。* 生成slot report时,即使该slot不存在,我们也应始终生成索引为[0,numberSlots)的广告位。* * Number of slots in static slot allocation.* If slot is requested with an index, the requested index must within the range of [0, numberSlots).** When generating slot report, we should always generate slots with index in [0, numberSlots) even the slot does not exist.*/private final int numberSlots;/*** 缓存 index -> TaskSlot* The list of all task slots. */private final Map<Integer, TaskSlot<T>> taskSlots;/*** 缓存 AllocationID -> TaskSlot* Mapping from allocation id to task slot.* */private final Map<AllocationID, TaskSlot<T>> allocatedSlots;/*** ExecutionAttemptID -> TaskSlotMapping* Mapping from execution attempt id to task and task slot.* */private final Map<ExecutionAttemptID, TaskSlotMapping<T>> taskSlotMappings;/***** Mapping from job id to allocated slots for a job.* */private final Map<JobID, Set<AllocationID>> slotsPerJob;/** Interface for slot actions, such as freeing them or timing them out. */@Nullable private SlotActions slotActions;/*** 状态相关 : CREATED,* RUNNING,* CLOSING,* CLOSED* The table state. */private volatile State state;
3.1.2. 其他属性
/*** 用于静态slot分配的slot资源配置文件。* Slot resource profile for static slot allocation.* */private final ResourceProfile defaultSlotResourceProfile;/** Page size for memory manager. */private final int memoryPageSize;/** Timer service used to time out allocated slots. */private final TimerService<AllocationID> timerService;private final ResourceBudgetManager budgetManager;/** The closing future is completed when all slot are freed and state is closed. */private final CompletableFuture<Void> closingFuture;/** {@link ComponentMainThreadExecutor} to schedule internal calls to the main thread. */private ComponentMainThreadExecutor mainThreadExecutor =new DummyComponentMainThreadExecutor("TaskSlotTableImpl is not initialized with proper main thread executor, "+ "call to TaskSlotTableImpl#start is required");/** {@link Executor} for background actions, e.g. verify all managed memory released. */private final Executor memoryVerificationExecutor;
3.1.3. 构造方法
public TaskSlotTableImpl(final int numberSlots,final ResourceProfile totalAvailableResourceProfile,final ResourceProfile defaultSlotResourceProfile,final int memoryPageSize,final TimerService<AllocationID> timerService,final Executor memoryVerificationExecutor) {Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0.");this.numberSlots = numberSlots;this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);this.memoryPageSize = memoryPageSize;this.taskSlots = new HashMap<>(numberSlots);this.timerService = Preconditions.checkNotNull(timerService);budgetManager =new ResourceBudgetManager(Preconditions.checkNotNull(totalAvailableResourceProfile));allocatedSlots = new HashMap<>(numberSlots);taskSlotMappings = new HashMap<>(4 * numberSlots);slotsPerJob = new HashMap<>(4);slotActions = null;state = State.CREATED;closingFuture = new CompletableFuture<>();this.memoryVerificationExecutor = memoryVerificationExecutor;}
3.2. 方法相关
3.2.1. start
@Overridepublic void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {Preconditions.checkState(state == State.CREATED,"The %s has to be just created before starting",TaskSlotTableImpl.class.getSimpleName());this.slotActions = Preconditions.checkNotNull(initialSlotActions);this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);timerService.start(this);// 修改状态为 RUNNINGstate = State.RUNNING;}
3.2.2. closeAsync
@Overridepublic CompletableFuture<Void> closeAsync() {if (state == State.CREATED) {state = State.CLOSED;closingFuture.complete(null);} else if (state == State.RUNNING) {state = State.CLOSING;final FlinkException cause = new FlinkException("Closing task slot table");CompletableFuture<Void> cleanupFuture =FutureUtils.waitForAll(// 释放slotnew ArrayList<>(allocatedSlots.values()).stream().map(slot -> freeSlotInternal(slot, cause)).collect(Collectors.toList())).thenRunAsync(() -> {state = State.CLOSED;timerService.stop();},mainThreadExecutor);FutureUtils.forward(cleanupFuture, closingFuture);}return closingFuture;}
3.2.3. createSlotReport
@Overridepublic SlotReport createSlotReport(ResourceID resourceId) {List<SlotStatus> slotStatuses = new ArrayList<>();// 循环每一个slotfor (int i = 0; i < numberSlots; i++) {// 构建SlotIDSlotID slotId = new SlotID(resourceId, i);SlotStatus slotStatus;if (taskSlots.containsKey(i)) {//该slot已经分配TaskSlot<T> taskSlot = taskSlots.get(i);// 构建 SlotStatusslotStatus =new SlotStatus(slotId,taskSlot.getResourceProfile(),taskSlot.getJobId(),taskSlot.getAllocationId());} else {//该slot尚未分配slotStatus = new SlotStatus(slotId, defaultSlotResourceProfile, null, null);}slotStatuses.add(slotStatus);}// 循环所有的 allocatedSlots 处理 异常的slot ??? for (TaskSlot<T> taskSlot : allocatedSlots.values()) {// 处理 异常的slot ??? if (taskSlot.getIndex() < 0) {SlotID slotID = SlotID.generateDynamicSlotID(resourceId);SlotStatus slotStatus =new SlotStatus(slotID,taskSlot.getResourceProfile(),taskSlot.getJobId(),taskSlot.getAllocationId());slotStatuses.add(slotStatus);}}// 构建SlotReportfinal SlotReport slotReport = new SlotReport(slotStatuses);return slotReport;}
3.2.4. allocateSlot
@Overridepublic boolean allocateSlot(int index,JobID jobId,AllocationID allocationId,ResourceProfile resourceProfile,Time slotTimeout) {checkRunning();Preconditions.checkArgument(index < numberSlots);// 获取TaskSlotTaskSlot<T> taskSlot = allocatedSlots.get(allocationId);if (taskSlot != null) {LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);return false;}// 如果taskSlots 已经包含indexif (taskSlots.containsKey(index)) {TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",index,duplicatedTaskSlot.getResourceProfile(),duplicatedTaskSlot.getJobId(),duplicatedTaskSlot.getAllocationId());return duplicatedTaskSlot.getJobId().equals(jobId)&& duplicatedTaskSlot.getAllocationId().equals(allocationId);} else if (allocatedSlots.containsKey(allocationId)) {return true;}// 获取 resourceProfile// resourceProfile = {ResourceProfile@6124} "ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}"// cpuCores = {CPUResource@6139} "Resource(CPU: 1.0000000000000000)"// taskHeapMemory = {MemorySize@6140} "100663293 bytes"// taskOffHeapMemory = {MemorySize@6141} "0 bytes"// managedMemory = {MemorySize@6142} "134217730 bytes"// networkMemory = {MemorySize@6143} "32 mb"// extendedResources = {HashMap@6144} size = 0resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;// 存储resourceProfileif (!budgetManager.reserve(resourceProfile)) {LOG.info("Cannot allocate the requested resources. Trying to allocate {}, "+ "while the currently remaining available resources are {}, total is {}.",resourceProfile,budgetManager.getAvailableBudget(),budgetManager.getTotalBudget());return false;}// 构建 taskSlot// taskSlot = {TaskSlot@6191} "TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationId: a9ce7abc6f1d6f264dbdce5564efcb76, jobId: 05fdf1bc744b274be1525c918c1ad378)"// index = 0// resourceProfile = {ResourceProfile@6124} "ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}"// tasks = {HashMap@6197} size = 0// memoryManager = {MemoryManager@6198}// state = {TaskSlotState@6199} "ALLOCATED"// jobId = {JobID@6056} "05fdf1bc744b274be1525c918c1ad378"// allocationId = {AllocationID@6057} "a9ce7abc6f1d6f264dbdce5564efcb76"// closingFuture = {CompletableFuture@6200} "java.util.concurrent.CompletableFuture@670d0482[Not completed]"// asyncExecutor = {ThreadPoolExecutor@6076} "java.util.concurrent.ThreadPoolExecutor@da5c1a9[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]"taskSlot =new TaskSlot<>(index,resourceProfile,memoryPageSize,jobId,allocationId,memoryVerificationExecutor);if (index >= 0) {// 加入缓存...taskSlots.put(index, taskSlot);}// 更新 allocatedSlots// update the allocation id to task slot mapallocatedSlots.put(allocationId, taskSlot);// 注册超时时间// register a timeout for this slot since it's in state allocatedtimerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());// 更新 slotsPerJob 的slot 集合// add this slot to the set of job slotsSet<AllocationID> slots = slotsPerJob.get(jobId);if (slots == null) {slots = new HashSet<>(4);slotsPerJob.put(jobId, slots);}slots.add(allocationId);return true;}
3.2.5. freeSlot -> freeSlotInternal
清理掉各种缓存然后调用task的shutdown方法关闭任务.
private CompletableFuture<Void> freeSlotInternal(TaskSlot<T> taskSlot, Throwable cause) {AllocationID allocationId = taskSlot.getAllocationId();if (LOG.isDebugEnabled()) {LOG.debug("Free slot {}.", taskSlot, cause);} else {LOG.info("Free slot {}.", taskSlot);}if (taskSlot.isEmpty()) {// remove the allocation id to task slot mappingallocatedSlots.remove(allocationId);// unregister a potential timeouttimerService.unregisterTimeout(allocationId);JobID jobId = taskSlot.getJobId();Set<AllocationID> slots = slotsPerJob.get(jobId);if (slots == null) {throw new IllegalStateException("There are no more slots allocated for the job "+ jobId+ ". This indicates a programming bug.");}slots.remove(allocationId);if (slots.isEmpty()) {slotsPerJob.remove(jobId);}taskSlots.remove(taskSlot.getIndex());budgetManager.release(taskSlot.getResourceProfile());}return taskSlot.closeAsync(cause);}
3.2.6. 任务相关(add/remove/getTask)
@Overridepublic boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {checkRunning();Preconditions.checkNotNull(task);TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());if (taskSlot != null) {if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {// 根据任务的 task.getExecutionId() // 加入到slot的任务缓存 Map<ExecutionAttemptID, T> tasks 中.if (taskSlot.add(task)) {taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping<>(task, taskSlot));return true;} else {return false;}} else {throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());}} else {throw new SlotNotFoundException(task.getAllocationId());}}@Overridepublic T removeTask(ExecutionAttemptID executionAttemptID) {checkStarted();TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.remove(executionAttemptID);if (taskSlotMapping != null) {T task = taskSlotMapping.getTask();TaskSlot<T> taskSlot = taskSlotMapping.getTaskSlot();taskSlot.remove(task.getExecutionId());if (taskSlot.isReleasing() && taskSlot.isEmpty()) {slotActions.freeSlot(taskSlot.getAllocationId());}return task;} else {return null;}}@Overridepublic T getTask(ExecutionAttemptID executionAttemptID) {TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.get(executionAttemptID);if (taskSlotMapping != null) {return taskSlotMapping.getTask();} else {return null;}}@Overridepublic Iterator<T> getTasks(JobID jobId) {return new PayloadIterator(jobId);}@Overridepublic AllocationID getCurrentAllocation(int index) {TaskSlot<T> taskSlot = taskSlots.get(index);if (taskSlot == null) {return null;}return taskSlot.getAllocationId();}@Overridepublic MemoryManager getTaskMemoryManager(AllocationID allocationID)throws SlotNotFoundException {TaskSlot<T> taskSlot = getTaskSlot(allocationID);if (taskSlot != null) {return taskSlot.getMemoryManager();} else {throw new SlotNotFoundException(allocationID);}}
3.2.7. 获取信息相关
@Overridepublic boolean hasAllocatedSlots(JobID jobId) {return getAllocatedSlots(jobId).hasNext();}@Overridepublic Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED);}@Override@Nullablepublic JobID getOwningJob(AllocationID allocationId) {final TaskSlot<T> taskSlot = getTaskSlot(allocationId);if (taskSlot != null) {return taskSlot.getJobId();} else {return null;}}
3.2.8. TimeoutListener
@Overridepublic void notifyTimeout(AllocationID key, UUID ticket) {checkStarted();if (slotActions != null) {slotActions.timeoutSlot(key, ticket);}}
3.2.11. 状态相关
@Overridepublic boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {TaskSlot<T> taskSlot = taskSlots.get(index);if (taskSlot != null) {return taskSlot.isAllocated(jobId, allocationId);} else if (index < 0) {return allocatedSlots.containsKey(allocationId);} else {return false;}}@Overridepublic boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {TaskSlot<T> taskSlot = getTaskSlot(allocationId);if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) {return markExistingSlotActive(taskSlot);} else {return false;}}@Overridepublic boolean isSlotFree(int index) {return !taskSlots.containsKey(index);}@Overridepublic boolean hasAllocatedSlots(JobID jobId) {return getAllocatedSlots(jobId).hasNext();}
3.3. TaskSlotMapping
Mapping class between a {@link TaskSlotPayload} and a {@link TaskSlot}.
// ---------------------------------------------------------------------// Static utility classes// ---------------------------------------------------------------------/** Mapping class between a {@link TaskSlotPayload} and a {@link TaskSlot}. */private static final class TaskSlotMapping<T extends TaskSlotPayload> {private final T task;private final TaskSlot<T> taskSlot;private TaskSlotMapping(T task, TaskSlot<T> taskSlot) {this.task = Preconditions.checkNotNull(task);this.taskSlot = Preconditions.checkNotNull(taskSlot);}public T getTask() {return task;}public TaskSlot<T> getTaskSlot() {return taskSlot;}}
3.4. 迭代器相关
TaskSlotIterator
/**** 足给定状态条件并属于给定作业的{@link TaskSlot}上的迭代器。** Iterator over {@link TaskSlot} which fulfill a given state condition and belong to the given* job.*/private final class TaskSlotIterator implements Iterator<TaskSlot<T>> {private final Iterator<AllocationID> allSlots;private final TaskSlotState state;private TaskSlot<T> currentSlot;private TaskSlotIterator(TaskSlotState state) {this(slotsPerJob.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).iterator(),state);}private TaskSlotIterator(JobID jobId, TaskSlotState state) {this(slotsPerJob.get(jobId) == null? Collections.emptyIterator(): slotsPerJob.get(jobId).iterator(),state);}private TaskSlotIterator(Iterator<AllocationID> allocationIDIterator, TaskSlotState state) {this.allSlots = Preconditions.checkNotNull(allocationIDIterator);this.state = Preconditions.checkNotNull(state);this.currentSlot = null;}@Overridepublic boolean hasNext() {while (currentSlot == null && allSlots.hasNext()) {AllocationID tempSlot = allSlots.next();TaskSlot<T> taskSlot = getTaskSlot(tempSlot);if (taskSlot != null && taskSlot.getState() == state) {currentSlot = taskSlot;}}return currentSlot != null;}@Overridepublic TaskSlot<T> next() {if (currentSlot != null) {TaskSlot<T> result = currentSlot;currentSlot = null;return result;} else {while (true) {AllocationID tempSlot;try {tempSlot = allSlots.next();} catch (NoSuchElementException e) {throw new NoSuchElementException("No more task slots.");}TaskSlot<T> taskSlot = getTaskSlot(tempSlot);if (taskSlot != null && taskSlot.getState() == state) {return taskSlot;}}}}
PayloadIterator
/** Iterator over all {@link TaskSlotPayload} for a given job. */private final class PayloadIterator implements Iterator<T> {private final Iterator<TaskSlot<T>> taskSlotIterator;private Iterator<T> currentTasks;private PayloadIterator(JobID jobId) {this.taskSlotIterator = new TaskSlotIterator(jobId, TaskSlotState.ACTIVE);this.currentTasks = null;}@Overridepublic boolean hasNext() {while ((currentTasks == null || !currentTasks.hasNext())&& taskSlotIterator.hasNext()) {TaskSlot<T> taskSlot = taskSlotIterator.next();currentTasks = taskSlot.getTasks();}return (currentTasks != null && currentTasks.hasNext());}@Overridepublic T next() {while ((currentTasks == null || !currentTasks.hasNext())) {TaskSlot<T> taskSlot;try {taskSlot = taskSlotIterator.next();} catch (NoSuchElementException e) {throw new NoSuchElementException("No more tasks.");}currentTasks = taskSlot.getTasks();}return currentTasks.next();}@Overridepublic void remove() {throw new UnsupportedOperationException("Cannot remove tasks via this iterator.");}}
3.5. State
private enum State {CREATED,RUNNING,CLOSING,CLOSED}
【Flink】Flink 1.12.2 TaskSlotTable相关推荐
- 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动
1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [四] 上一篇: [flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 Jo ...
- 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint
1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [三] 上一章:[flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yar ...
- 【Flink】FLink 1.12 版本的 Row 类型 中的 RowKind 是干嘛的
1.概述 在[Flink]Flink 从 1.9.1 版本 升级到 1.12.4 版本的 注意事项 以及 过程 升级的时候,偶然看到row类型多了一个属性. @PublicEvolving publi ...
- 【FLink】Flink 1.9 升级到 1.12.4 无配置页面 无日志
文章目录 1.概述 2.场景再现2 2.1 概述 2.2 日志的配置 2.3 加载2个 2.4 缺少文件 2.7 扩展 2.5.1 Flink 默认日志框架 2.5.2 slf4j 基础概念 2.5. ...
- 【Flink】Flink 1.9 升级 1.12.4 本地可以运行 打包后 集群运行就找不到类 ClassNotFoundException
1.场景1 1.1 概述 升级过程请参考: [Flink]Flink 从 1.9.1 版本 升级到 1.12.4 版本的 注意事项 以及 过程 然后本地运行好好的,但是而且,是在主类中类. 打开jar ...
- 【2】flink数据流转换算子
[README] 本文记录了flink对数据的转换操作,包括 基本转换,map,flatMap,filter: 滚动聚合(min minBy max maxBy sum): 规约聚合-reduce: ...
- 【flink】flink 报错 key group from 44 to 45 does not contain 4
文章目录 1.概述 本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载. 1.概述 此问题和一个问题很相似:[Flink]Flink KeyGroupRang ...
- 【flink】Flink常见Checkpoint超时问题排查思路
1.概述 转载:Flink常见Checkpoint超时问题排查思路 这里仅仅是自己学习. 在日常flink应用中,相信大家经常会遇到checkpoint超时失败这类的问题,遇到这种情况的时候仅仅只会在 ...
- 【Flink】flink 升级 the given -yarn-cluster does not contain a valid port
文章目录 1.场景1 1.1 概述 1.场景1 1.1 概述 在本次场景中,我是从flink 1.9升级到1.12.4 升级请参考:[Flink]Flink 从 1.9.1 版本 升级到 1.12.4 ...
- 【3】flink sink
[README] 本文记录了flink sink操作,输出目的存储器(中间件)包括 kafka: es: db: 等等有很多: 本文只给出了 sink2kafka的代码: 本文使用的flink为 1. ...
最新文章
- python输入正整数n、求n以内能被17整除的最大正整数_求100之内自然数中最大的能被17整除的数...
- jsf登录注册页面_您将在下一个项目中使用JSF吗?
- amd cpu排行_【每日热点】长盈精密成功开发纽扣电池壳体组件;AMD处理器全球份额有望达到20%;英伟达承诺将ARM总部留在英国...
- 函数指针,指针函数,函数指针数组
- 使用Gitolite搭建Gitserver
- 学成在线--0.项目概述
- 选择软路由的七大理由
- 序列每天从0开始_【算法打卡】分割数组为连续子序列
- restController
- FIS如何成为制霸北美的金融科技航母?丨亿欧解案例
- 安装genymotion模拟器
- windows使用小技巧 ━━ windows11的WLAN图标突然消失的解决办法,wifi没了可以找回来,一般的驱动突然坏了都可以用这个方法找回
- c++的vector初始化
- taobao滑动验证码解决方法
- 用户画像分析相关整理
- 人工智能有没有可能在未来超越人类?
- VSCODE使用EmbeddedIDE插件开发51单片机
- 按照网络规模来分,服务器分为哪几类?
- 有必要说一说即将到来的春招(经历+重要性+如何准备)
- Pytorch中KL loss