1.概述

转载:Flink 1.12.2 源码浅析 : TaskSlotTable

多个{@link TaskSlot}实例的容器。
此外,它还维护多个索引,以便更快地访问任务和分配的slots。
当slots无法分配给job manager时会自动超时…
在使用 TaskSlotTable 之前,必须通过 {@link #start} 方法启动它。

二 .TaskSlotTable 接口

名称 描述
start 根据给定的 slot actions 启动 task slot table
getAllocationIdsPerJob 返回job所有的AllocationID
getActiveTaskSlotAllocationIds 返回TaskSlotTable中 所有active task的 AllocationID
getActiveTaskSlotAllocationIdsPerJob 返回 jobId 中 acitve TaskSlot 中所有的 AllocationID
createSlotReport 返回 SlotReport
allocateSlot 为给定job 和allocation id 分配具有给定索引的slot。
如果给定负指数,则生成一个新的自增指数。
如果可以分配slot,则返回true。否则返回false。
markSlotActive 将给定分配id下的slot标记为活动。
markSlotInactive 将给定分配id下的slot标记为非活动。
freeSlot 试着释放这个slot。
如果slot为空,它将任务slot的状态设置为free并返回其索引
如果slot不是空的,那么它会将任务slot的状态设置为releasing,fail all tasks并返回-1。
isValidTimeout 根据allocation id. 检查ticket 是否超时
isAllocated 根据给定的 index , job和 allocation id .检车slot状态是否为 ALLOCATED
tryMarkSlotActive 根据JobID 和 AllocationID 标记匹配的slot状态为ACTIVE(如果该slot的状态为allocated)
isSlotFree 检测 slot状态是否为free
hasAllocatedSlots 检查作业是否已分配(非活动)slot。
getAllocatedSlots 根据job id 返回 所有的TaskSlot
getOwningJob 根据AllocationID 返回所属的 JobID
addTask 将给定任务添加到由任务的 allocation id 标识的slot中。
removeTask 从task slot中删除具有给定执行attempt id 的任务。
如果拥有的task slot处于释放状态并且在删除任务后为空,则通过slot actions 释放slot。
getTask 根据ExecutionAttemptID / JobID 获取task
getCurrentAllocation 根据index获取改slot所分配的AllocationID
getTaskMemoryManager 根据AllocationID 获取 MemoryManager

三 .TaskSlotTableImpl

TaskSlotTableImpl 是 TaskSlotTable 接口的实现类. 负责TaskExecutor端的slot管理.

# 构造调用顺序:
org.apache.flink.yarn.YarnTaskExecutorRunner#mainorg.apache.flink.runtime.taskexecutor.TaskManagerRunner#runTaskManagerProcessSecurelyorg.apache.flink.runtime.taskexecutor.TaskManagerRunner#runTaskManagerorg.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager  (返回TaskExecutor对象实例)org.apache.flink.runtime.taskexecutor.TaskManagerServices#fromConfiguration

3.1. 属性相关

3.1.1. slot 相关

    /*** 静态slot分配中的slot数。* 如果请求带索引的slot,则所请求的索引必须在[0,numberSlots)范围内。* 生成slot report时,即使该slot不存在,我们也应始终生成索引为[0,numberSlots)的广告位。* * Number of slots in static slot allocation.* If slot is requested with an index, the requested index must within the range of [0, numberSlots).** When generating slot report, we should always generate slots with index in [0, numberSlots) even the slot does not exist.*/private final int numberSlots;/*** 缓存  index -> TaskSlot* The list of all task slots. */private final Map<Integer, TaskSlot<T>> taskSlots;/*** 缓存 AllocationID -> TaskSlot* Mapping from allocation id to task slot.* */private final Map<AllocationID, TaskSlot<T>> allocatedSlots;/*** ExecutionAttemptID -> TaskSlotMapping*  Mapping from execution attempt id to task and task slot.*  */private final Map<ExecutionAttemptID, TaskSlotMapping<T>> taskSlotMappings;/***** Mapping from job id to allocated slots for a job.* */private final Map<JobID, Set<AllocationID>> slotsPerJob;/** Interface for slot actions, such as freeing them or timing them out. */@Nullable private SlotActions slotActions;/*** 状态相关 :   CREATED,*            RUNNING,*            CLOSING,*            CLOSED* The table state. */private volatile State state;

3.1.2. 其他属性

  /*** 用于静态slot分配的slot资源配置文件。* Slot resource profile for static slot allocation.* */private final ResourceProfile defaultSlotResourceProfile;/** Page size for memory manager. */private final int memoryPageSize;/** Timer service used to time out allocated slots. */private final TimerService<AllocationID> timerService;private final ResourceBudgetManager budgetManager;/** The closing future is completed when all slot are freed and state is closed. */private final CompletableFuture<Void> closingFuture;/** {@link ComponentMainThreadExecutor} to schedule internal calls to the main thread. */private ComponentMainThreadExecutor mainThreadExecutor =new DummyComponentMainThreadExecutor("TaskSlotTableImpl is not initialized with proper main thread executor, "+ "call to TaskSlotTableImpl#start is required");/** {@link Executor} for background actions, e.g. verify all managed memory released. */private final Executor memoryVerificationExecutor;

3.1.3. 构造方法

   public TaskSlotTableImpl(final int numberSlots,final ResourceProfile totalAvailableResourceProfile,final ResourceProfile defaultSlotResourceProfile,final int memoryPageSize,final TimerService<AllocationID> timerService,final Executor memoryVerificationExecutor) {Preconditions.checkArgument(0 < numberSlots, "The number of task slots must be greater than 0.");this.numberSlots = numberSlots;this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);this.memoryPageSize = memoryPageSize;this.taskSlots = new HashMap<>(numberSlots);this.timerService = Preconditions.checkNotNull(timerService);budgetManager =new ResourceBudgetManager(Preconditions.checkNotNull(totalAvailableResourceProfile));allocatedSlots = new HashMap<>(numberSlots);taskSlotMappings = new HashMap<>(4 * numberSlots);slotsPerJob = new HashMap<>(4);slotActions = null;state = State.CREATED;closingFuture = new CompletableFuture<>();this.memoryVerificationExecutor = memoryVerificationExecutor;}

3.2. 方法相关

3.2.1. start

    @Overridepublic void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {Preconditions.checkState(state == State.CREATED,"The %s has to be just created before starting",TaskSlotTableImpl.class.getSimpleName());this.slotActions = Preconditions.checkNotNull(initialSlotActions);this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);timerService.start(this);// 修改状态为 RUNNINGstate = State.RUNNING;}

3.2.2. closeAsync

    @Overridepublic CompletableFuture<Void> closeAsync() {if (state == State.CREATED) {state = State.CLOSED;closingFuture.complete(null);} else if (state == State.RUNNING) {state = State.CLOSING;final FlinkException cause = new FlinkException("Closing task slot table");CompletableFuture<Void> cleanupFuture =FutureUtils.waitForAll(// 释放slotnew ArrayList<>(allocatedSlots.values()).stream().map(slot -> freeSlotInternal(slot, cause)).collect(Collectors.toList())).thenRunAsync(() -> {state = State.CLOSED;timerService.stop();},mainThreadExecutor);FutureUtils.forward(cleanupFuture, closingFuture);}return closingFuture;}

3.2.3. createSlotReport

   @Overridepublic SlotReport createSlotReport(ResourceID resourceId) {List<SlotStatus> slotStatuses = new ArrayList<>();// 循环每一个slotfor (int i = 0; i < numberSlots; i++) {// 构建SlotIDSlotID slotId = new SlotID(resourceId, i);SlotStatus slotStatus;if (taskSlots.containsKey(i)) {//该slot已经分配TaskSlot<T> taskSlot = taskSlots.get(i);// 构建 SlotStatusslotStatus =new SlotStatus(slotId,taskSlot.getResourceProfile(),taskSlot.getJobId(),taskSlot.getAllocationId());} else {//该slot尚未分配slotStatus = new SlotStatus(slotId, defaultSlotResourceProfile, null, null);}slotStatuses.add(slotStatus);}// 循环所有的 allocatedSlots 处理 异常的slot ??? for (TaskSlot<T> taskSlot : allocatedSlots.values()) {// 处理 异常的slot ??? if (taskSlot.getIndex() < 0) {SlotID slotID = SlotID.generateDynamicSlotID(resourceId);SlotStatus slotStatus =new SlotStatus(slotID,taskSlot.getResourceProfile(),taskSlot.getJobId(),taskSlot.getAllocationId());slotStatuses.add(slotStatus);}}// 构建SlotReportfinal SlotReport slotReport = new SlotReport(slotStatuses);return slotReport;}

3.2.4. allocateSlot

@Overridepublic boolean allocateSlot(int index,JobID jobId,AllocationID allocationId,ResourceProfile resourceProfile,Time slotTimeout) {checkRunning();Preconditions.checkArgument(index < numberSlots);// 获取TaskSlotTaskSlot<T> taskSlot = allocatedSlots.get(allocationId);if (taskSlot != null) {LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);return false;}// 如果taskSlots 已经包含indexif (taskSlots.containsKey(index)) {TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",index,duplicatedTaskSlot.getResourceProfile(),duplicatedTaskSlot.getJobId(),duplicatedTaskSlot.getAllocationId());return duplicatedTaskSlot.getJobId().equals(jobId)&& duplicatedTaskSlot.getAllocationId().equals(allocationId);} else if (allocatedSlots.containsKey(allocationId)) {return true;}// 获取 resourceProfile//    resourceProfile = {ResourceProfile@6124} "ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}"//        cpuCores = {CPUResource@6139} "Resource(CPU: 1.0000000000000000)"//        taskHeapMemory = {MemorySize@6140} "100663293 bytes"//        taskOffHeapMemory = {MemorySize@6141} "0 bytes"//        managedMemory = {MemorySize@6142} "134217730 bytes"//        networkMemory = {MemorySize@6143} "32 mb"//        extendedResources = {HashMap@6144}  size = 0resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;// 存储resourceProfileif (!budgetManager.reserve(resourceProfile)) {LOG.info("Cannot allocate the requested resources. Trying to allocate {}, "+ "while the currently remaining available resources are {}, total is {}.",resourceProfile,budgetManager.getAvailableBudget(),budgetManager.getTotalBudget());return false;}// 构建 taskSlot//  taskSlot = {TaskSlot@6191} "TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationId: a9ce7abc6f1d6f264dbdce5564efcb76, jobId: 05fdf1bc744b274be1525c918c1ad378)"//    index = 0//    resourceProfile = {ResourceProfile@6124} "ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}"//    tasks = {HashMap@6197}  size = 0//    memoryManager = {MemoryManager@6198}//    state = {TaskSlotState@6199} "ALLOCATED"//    jobId = {JobID@6056} "05fdf1bc744b274be1525c918c1ad378"//    allocationId = {AllocationID@6057} "a9ce7abc6f1d6f264dbdce5564efcb76"//    closingFuture = {CompletableFuture@6200} "java.util.concurrent.CompletableFuture@670d0482[Not completed]"//    asyncExecutor = {ThreadPoolExecutor@6076} "java.util.concurrent.ThreadPoolExecutor@da5c1a9[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]"taskSlot =new TaskSlot<>(index,resourceProfile,memoryPageSize,jobId,allocationId,memoryVerificationExecutor);if (index >= 0) {// 加入缓存...taskSlots.put(index, taskSlot);}// 更新 allocatedSlots// update the allocation id to task slot mapallocatedSlots.put(allocationId, taskSlot);// 注册超时时间// register a timeout for this slot since it's in state allocatedtimerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());//  更新 slotsPerJob 的slot 集合// add this slot to the set of job slotsSet<AllocationID> slots = slotsPerJob.get(jobId);if (slots == null) {slots = new HashSet<>(4);slotsPerJob.put(jobId, slots);}slots.add(allocationId);return true;}

3.2.5. freeSlot -> freeSlotInternal

清理掉各种缓存然后调用task的shutdown方法关闭任务.

private CompletableFuture<Void> freeSlotInternal(TaskSlot<T> taskSlot, Throwable cause) {AllocationID allocationId = taskSlot.getAllocationId();if (LOG.isDebugEnabled()) {LOG.debug("Free slot {}.", taskSlot, cause);} else {LOG.info("Free slot {}.", taskSlot);}if (taskSlot.isEmpty()) {// remove the allocation id to task slot mappingallocatedSlots.remove(allocationId);// unregister a potential timeouttimerService.unregisterTimeout(allocationId);JobID jobId = taskSlot.getJobId();Set<AllocationID> slots = slotsPerJob.get(jobId);if (slots == null) {throw new IllegalStateException("There are no more slots allocated for the job "+ jobId+ ". This indicates a programming bug.");}slots.remove(allocationId);if (slots.isEmpty()) {slotsPerJob.remove(jobId);}taskSlots.remove(taskSlot.getIndex());budgetManager.release(taskSlot.getResourceProfile());}return taskSlot.closeAsync(cause);}

3.2.6. 任务相关(add/remove/getTask)

@Overridepublic boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {checkRunning();Preconditions.checkNotNull(task);TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());if (taskSlot != null) {if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {// 根据任务的  task.getExecutionId()   // 加入到slot的任务缓存  Map<ExecutionAttemptID, T> tasks  中.if (taskSlot.add(task)) {taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping<>(task, taskSlot));return true;} else {return false;}} else {throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());}} else {throw new SlotNotFoundException(task.getAllocationId());}}@Overridepublic T removeTask(ExecutionAttemptID executionAttemptID) {checkStarted();TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.remove(executionAttemptID);if (taskSlotMapping != null) {T task = taskSlotMapping.getTask();TaskSlot<T> taskSlot = taskSlotMapping.getTaskSlot();taskSlot.remove(task.getExecutionId());if (taskSlot.isReleasing() && taskSlot.isEmpty()) {slotActions.freeSlot(taskSlot.getAllocationId());}return task;} else {return null;}}@Overridepublic T getTask(ExecutionAttemptID executionAttemptID) {TaskSlotMapping<T> taskSlotMapping = taskSlotMappings.get(executionAttemptID);if (taskSlotMapping != null) {return taskSlotMapping.getTask();} else {return null;}}@Overridepublic Iterator<T> getTasks(JobID jobId) {return new PayloadIterator(jobId);}@Overridepublic AllocationID getCurrentAllocation(int index) {TaskSlot<T> taskSlot = taskSlots.get(index);if (taskSlot == null) {return null;}return taskSlot.getAllocationId();}@Overridepublic MemoryManager getTaskMemoryManager(AllocationID allocationID)throws SlotNotFoundException {TaskSlot<T> taskSlot = getTaskSlot(allocationID);if (taskSlot != null) {return taskSlot.getMemoryManager();} else {throw new SlotNotFoundException(allocationID);}}

3.2.7. 获取信息相关

@Overridepublic boolean hasAllocatedSlots(JobID jobId) {return getAllocatedSlots(jobId).hasNext();}@Overridepublic Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {return new TaskSlotIterator(jobId, TaskSlotState.ALLOCATED);}@Override@Nullablepublic JobID getOwningJob(AllocationID allocationId) {final TaskSlot<T> taskSlot = getTaskSlot(allocationId);if (taskSlot != null) {return taskSlot.getJobId();} else {return null;}}

3.2.8. TimeoutListener

@Overridepublic void notifyTimeout(AllocationID key, UUID ticket) {checkStarted();if (slotActions != null) {slotActions.timeoutSlot(key, ticket);}}

3.2.11. 状态相关

@Overridepublic boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {TaskSlot<T> taskSlot = taskSlots.get(index);if (taskSlot != null) {return taskSlot.isAllocated(jobId, allocationId);} else if (index < 0) {return allocatedSlots.containsKey(allocationId);} else {return false;}}@Overridepublic boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {TaskSlot<T> taskSlot = getTaskSlot(allocationId);if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) {return markExistingSlotActive(taskSlot);} else {return false;}}@Overridepublic boolean isSlotFree(int index) {return !taskSlots.containsKey(index);}@Overridepublic boolean hasAllocatedSlots(JobID jobId) {return getAllocatedSlots(jobId).hasNext();}

3.3. TaskSlotMapping

Mapping class between a {@link TaskSlotPayload} and a {@link TaskSlot}.

// ---------------------------------------------------------------------// Static utility classes// ---------------------------------------------------------------------/** Mapping class between a {@link TaskSlotPayload} and a {@link TaskSlot}. */private static final class TaskSlotMapping<T extends TaskSlotPayload> {private final T task;private final TaskSlot<T> taskSlot;private TaskSlotMapping(T task, TaskSlot<T> taskSlot) {this.task = Preconditions.checkNotNull(task);this.taskSlot = Preconditions.checkNotNull(taskSlot);}public T getTask() {return task;}public TaskSlot<T> getTaskSlot() {return taskSlot;}}

3.4. 迭代器相关

TaskSlotIterator

/**** 足给定状态条件并属于给定作业的{@link TaskSlot}上的迭代器。** Iterator over {@link TaskSlot} which fulfill a given state condition and belong to the given* job.*/private final class TaskSlotIterator implements Iterator<TaskSlot<T>> {private final Iterator<AllocationID> allSlots;private final TaskSlotState state;private TaskSlot<T> currentSlot;private TaskSlotIterator(TaskSlotState state) {this(slotsPerJob.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).iterator(),state);}private TaskSlotIterator(JobID jobId, TaskSlotState state) {this(slotsPerJob.get(jobId) == null? Collections.emptyIterator(): slotsPerJob.get(jobId).iterator(),state);}private TaskSlotIterator(Iterator<AllocationID> allocationIDIterator, TaskSlotState state) {this.allSlots = Preconditions.checkNotNull(allocationIDIterator);this.state = Preconditions.checkNotNull(state);this.currentSlot = null;}@Overridepublic boolean hasNext() {while (currentSlot == null && allSlots.hasNext()) {AllocationID tempSlot = allSlots.next();TaskSlot<T> taskSlot = getTaskSlot(tempSlot);if (taskSlot != null && taskSlot.getState() == state) {currentSlot = taskSlot;}}return currentSlot != null;}@Overridepublic TaskSlot<T> next() {if (currentSlot != null) {TaskSlot<T> result = currentSlot;currentSlot = null;return result;} else {while (true) {AllocationID tempSlot;try {tempSlot = allSlots.next();} catch (NoSuchElementException e) {throw new NoSuchElementException("No more task slots.");}TaskSlot<T> taskSlot = getTaskSlot(tempSlot);if (taskSlot != null && taskSlot.getState() == state) {return taskSlot;}}}}

PayloadIterator

/** Iterator over all {@link TaskSlotPayload} for a given job. */private final class PayloadIterator implements Iterator<T> {private final Iterator<TaskSlot<T>> taskSlotIterator;private Iterator<T> currentTasks;private PayloadIterator(JobID jobId) {this.taskSlotIterator = new TaskSlotIterator(jobId, TaskSlotState.ACTIVE);this.currentTasks = null;}@Overridepublic boolean hasNext() {while ((currentTasks == null || !currentTasks.hasNext())&& taskSlotIterator.hasNext()) {TaskSlot<T> taskSlot = taskSlotIterator.next();currentTasks = taskSlot.getTasks();}return (currentTasks != null && currentTasks.hasNext());}@Overridepublic T next() {while ((currentTasks == null || !currentTasks.hasNext())) {TaskSlot<T> taskSlot;try {taskSlot = taskSlotIterator.next();} catch (NoSuchElementException e) {throw new NoSuchElementException("No more tasks.");}currentTasks = taskSlot.getTasks();}return currentTasks.next();}@Overridepublic void remove() {throw new UnsupportedOperationException("Cannot remove tasks via this iterator.");}}

3.5. State

    private enum State {CREATED,RUNNING,CLOSING,CLOSED}

【Flink】Flink 1.12.2 TaskSlotTable相关推荐

  1. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [四] 上一篇: [flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 Jo ...

  2. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [三] 上一章:[flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yar ...

  3. 【Flink】FLink 1.12 版本的 Row 类型 中的 RowKind 是干嘛的

    1.概述 在[Flink]Flink 从 1.9.1 版本 升级到 1.12.4 版本的 注意事项 以及 过程 升级的时候,偶然看到row类型多了一个属性. @PublicEvolving publi ...

  4. 【FLink】Flink 1.9 升级到 1.12.4 无配置页面 无日志

    文章目录 1.概述 2.场景再现2 2.1 概述 2.2 日志的配置 2.3 加载2个 2.4 缺少文件 2.7 扩展 2.5.1 Flink 默认日志框架 2.5.2 slf4j 基础概念 2.5. ...

  5. 【Flink】Flink 1.9 升级 1.12.4 本地可以运行 打包后 集群运行就找不到类 ClassNotFoundException

    1.场景1 1.1 概述 升级过程请参考: [Flink]Flink 从 1.9.1 版本 升级到 1.12.4 版本的 注意事项 以及 过程 然后本地运行好好的,但是而且,是在主类中类. 打开jar ...

  6. 【2】flink数据流转换算子

    [README] 本文记录了flink对数据的转换操作,包括 基本转换,map,flatMap,filter: 滚动聚合(min minBy max maxBy sum): 规约聚合-reduce: ...

  7. 【flink】flink 报错 key group from 44 to 45 does not contain 4

    文章目录 1.概述 本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载. 1.概述 此问题和一个问题很相似:[Flink]Flink KeyGroupRang ...

  8. 【flink】Flink常见Checkpoint超时问题排查思路

    1.概述 转载:Flink常见Checkpoint超时问题排查思路 这里仅仅是自己学习. 在日常flink应用中,相信大家经常会遇到checkpoint超时失败这类的问题,遇到这种情况的时候仅仅只会在 ...

  9. 【Flink】flink 升级 the given -yarn-cluster does not contain a valid port

    文章目录 1.场景1 1.1 概述 1.场景1 1.1 概述 在本次场景中,我是从flink 1.9升级到1.12.4 升级请参考:[Flink]Flink 从 1.9.1 版本 升级到 1.12.4 ...

  10. 【3】flink sink

    [README] 本文记录了flink sink操作,输出目的存储器(中间件)包括 kafka: es: db: 等等有很多: 本文只给出了 sink2kafka的代码: 本文使用的flink为 1. ...

最新文章

  1. python输入正整数n、求n以内能被17整除的最大正整数_求100之内自然数中最大的能被17整除的数...
  2. jsf登录注册页面_您将在下一个项目中使用JSF吗?
  3. amd cpu排行_【每日热点】长盈精密成功开发纽扣电池壳体组件;AMD处理器全球份额有望达到20%;英伟达承诺将ARM总部留在英国...
  4. 函数指针,指针函数,函数指针数组
  5. 使用Gitolite搭建Gitserver
  6. 学成在线--0.项目概述
  7. 选择软路由的七大理由
  8. 序列每天从0开始_【算法打卡】分割数组为连续子序列
  9. restController
  10. FIS如何成为制霸北美的金融科技航母?丨亿欧解案例
  11. 安装genymotion模拟器
  12. windows使用小技巧 ━━ windows11的WLAN图标突然消失的解决办法,wifi没了可以找回来,一般的驱动突然坏了都可以用这个方法找回
  13. c++的vector初始化
  14. taobao滑动验证码解决方法
  15. 用户画像分析相关整理
  16. 人工智能有没有可能在未来超越人类?
  17. VSCODE使用EmbeddedIDE插件开发51单片机
  18. 按照网络规模来分,服务器分为哪几类?
  19. 有必要说一说即将到来的春招(经历+重要性+如何准备)
  20. Pytorch中KL loss

热门文章

  1. 受半导体短缺及疫情影响,丰田已下调9月10月及当前财年产量预期
  2. 你扔掉的旧衣服撑起了一个千亿市场?
  3. 150家通用经销商决定退出凯迪拉克品牌 因不愿投资于销售电动车
  4. 小米集团:副董事长林斌承诺5年内不出售公司股份 已作安排的除外
  5. B站发布2020年一季度财报:月活用户达1.72亿,日活用户突破5000万
  6. 长虹美菱:公司主要通过抖音短视频等平台进行直播带货
  7. 腾讯人均月薪8万!第一季度财报发布:微信月活数恐怖,游戏平均日赚4亿
  8. 星巴克人造肉产品来了,植物牛肉餐品真香?
  9. 疑似华为Mate X 5G版入网 将支持两种组网方式
  10. 郎朗钢琴课独家上线知乎 手把手带你开启钢琴之路