文章目录

  • 一. 问题引入
    • 1. 场景描述
    • 2. 日志简析
  • 二. 初级问题分析与解决
    • 1. 问题分析
      • 1.1. yarn的调度器设置
      • 1.2. 程序设置
    • 2. 问题解决
  • 三. (性能)新的问题
    • 1. 问题描述
    • 2. 理想化的最优方案
    • 3. "PlanB"的解决方案
  • 四. 反思与迭代

一. 问题引入

1. 场景描述

使用flink引擎,处理hdfs到hive的任务,hdfs的文件数有4000个,这里设置并行度为20,提交任务运行。

2. 日志简析

任务提交之后发现报错,我们简单分析下yarn的日志:

1. 申请的资源超过了yarn最大的container资源限制,也就是说一个taskexecutor所需的资源过大Could not compute the container Resource from the given TaskExecutorProcessSpec TaskExecutorProcessSpec {cpuCores=10.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemorySize=4.300gb (4617089912 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=1024.000mb (1073741824 bytes)}. This usually indicates the requested resource is larger than Yarn's max container resource limit.2. 接着开始请求一个新的worker,这里的worker应该也是container ,那此时正在pend的数量为1.Requesting new worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 1.3. 但同样的这个container超过了yarn的资源,这时直接放弃分配资源Requested container resource (<memory:12288, vCores:10>) exceeds the max limitation of the Yarn cluster (<memory:12288, vCores:4>). Will not allocate resource.4. 如此在反复这样执行,似乎陷入到了死循环。最后任务部署超时而报错。。。

上面的日志说的比较明白,就是:申请的资源超过了yarn最大的container资源限制。
 
 
再放出点堆栈信息,供日后参考分析:

2022-11-18 <b>10:12:12</b>,938 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Failed requesting worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 0
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: <b>Could not compute the container Resource from the given</b> TaskExecutorProcessSpec TaskExecutorProcessSpec {cpuCores=10.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemorySize=4.300gb (4617089912 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=1024.000mb (1073741824 bytes)}. This usually indicates the requested resource is larger than Yarn's max container resource limit.。。。at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.7.jar:1.12.7]
2022-11-18 <b>10:12:12,93</b>9 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 1.
2022-11-18 10:12:12,940 WARN  org.apache.flink.yarn.TaskExecutorProcessSpecContainerResourcePriorityAdapter [] - Requested container resource (<memory:12288, vCores:10>) exceeds the max limitation of the Yarn cluster (<memory:12288, vCores:4>). Will not allocate resource.
2022-11-18 10:12:12,940 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Failed requesting worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 0
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not compute the container Resource from the given TaskExecutorProcessSpec TaskExecutorProcessSpec {cpuCores=10.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemorySize=4.300gb (4617089912 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=1024.000mb (1073741824 bytes)}. This usually indicates the requested resource is larger than Yarn's max container resource limit.。。。at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.7.jar:1.12.7]
2022-11-18 10:12:12,941 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 1.
2022-11-18 10:12:12,941 WARN  org.apache.flink.yarn.TaskExecutorProcessSpecContainerResourcePriorityAdapter [] - Requested container resource (<memory:12288, vCores:10>) exceeds the max limitation of the Yarn cluster (<memory:12288, vCores:4>). Will not allocate resource.
2022-11-18 10:12:12,941 WARN  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Failed requesting worker with resource spec WorkerResourceSpec {cpuCores=10.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}, current pending count: 0
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not compute the container Resource from the given TaskExecutorProcessSpec TaskExecutorProcessSpec {cpuCores=10.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemorySize=4.300gb (4617089912 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=1024.000mb (1073741824 bytes)}. This usually indicates the requested resource is larger than Yarn's max container resource limit.at org.apache.flink.yarn.YarnResourceManagerDriver.requestResource(YarnResourceManagerDriver.java:254) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:249) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestWorkerIfRequired(ActiveResourceManager.java:310) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$0(ActiveResourceManager.java:261) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) ~[?:1.8.0_152]at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) ~[?:1.8.0_152]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.requestNewWorker(ActiveResourceManager.java:251) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.startNewWorker(ActiveResourceManager.java:160) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.ResourceManager$ResourceActionsImpl.allocateResource(ResourceManager.java:1382) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.allocateResource(SlotManagerImpl.java:1058) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.fulfillPendingSlotRequestWithPendingTaskManagerSlot(SlotManagerImpl.java:954) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.lambda$internalRequestSlot$9(SlotManagerImpl.java:943) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.util.OptionalConsumer.ifNotPresent(OptionalConsumer.java:51) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalRequestSlot(SlotManagerImpl.java:941) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.registerSlotRequest(SlotManagerImpl.java:410) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:529) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_152]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_152]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_152]at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.7.jar:1.12.7]
<br/>

二. 初级问题分析与解决

1. 问题分析

报错很明确,也就是我们向yarn多申请了资源。所以我们关注两点:yarn的调度器设置(规定了队列,每个任务申请的限制)、程序中是如何设置的并行度的。

1.1. yarn的调度器设置

打开yarn 看到Maximum Allocation限制为<memory:12288, vCores:4> ,具体的说,就是我们申请一个container最大内存能申请12G、最大核心数为4

1.2. 程序设置

看下运行的shell的伪代码

tmp_value=`echo $parallelism 10 | awk '{if($1 > $2) print 1; else print 0;}'`
if [ $tmp_value -eq 1 ] ;thentm=12288vcores=10numberOfTaskSlots=10
fi
..."yarn.containers.vcores":${vcores}
"taskmanager.numberOfTaskSlots":${numberOfTaskSlots}
...

这里可以看到冲突:当我们的并行度设置超过10时,vcores设置为10,但yarn最大让设置为4,所以会报错。。。

2. 问题解决

解决问题的方式也比较简单,让vcores最大保持为4,然后再运行。

这里我们设置整个job的并行度还是为20,然后申请一个container中vcores=4,那么按照一个并行度分配一个线程的逻辑

即一个taskmanager(container)中有4核对应有4个slot,那将会有20/4=5个taskmanager

三. (性能)新的问题

1. 问题描述

任务是跑起来了,但是出现了一个性能的问题:

先看下job的运行情况:

job的消费速度:

  • 20并行度下,每个taskmanager的内存为12G,每秒消费11.28万条数据,那每分钟处理的速度是676.89万条/min,每小时4.06亿条/小时

  • 2032.766839793899649 GB 每小时平均

  • 5793.385493412613869 GB 累计2.85小时

最终的结果

  • 53.14亿条
  • 数据量有:3.03TB
  • 消费每条数据的平均大小是:626byte。
  • 1G=1,073,741,824 bytes

最终整个job运行完,花了11个小时:

客户的要求是1小时内完成,但速度太慢了。。。

2. 理想化的最优方案

客户集群的资源是够的,所以我们不考虑资源问题,那既然这样,因为hdfs的文件总共有4000个,再有yarn最大资源分配是4(core/container),所以我们部署一个4000并行度的任务,它将运行的最快!!!

所以我将并行度设置为4000时将会有1000个taskmanager启动。等一下。1000个???我心疼jobmanager一秒钟先。

果然,任务还没调度完,就失败了。。。

所以理想有些简单,也没有银弹。

3. "PlanB"的解决方案

既然一个jobmanager不能管理这么多的taskmanager,那就降低taskmanager的数量。
。。。经过多次尝试之后,这里最终给出了方案B的设置:

并行度设置为500,taskmanager启动了125个。

任务最终处理时间缩短到:23分钟

*********************************************
nErrors              |  0
nullErrors           |  0
duplicateErrors      |  0
conversionErrors     |  0
otherErrors          |  0
numWrite             |  3164002700
byteWrite            |  17123302298224
numRead              |  3164002700
writeDuration        |  545313434
byteRead             |  1983716362408
readDuration         |  539805057
snapshotWrite        |  6328005400
*********************************************2022-11-21 11:48:56 Start to stop flink job(3734d90205a13eff7cfa61b5c0c9686b)
2022-11-21 11:50:11 Success to stop flink job(3734d90205a13eff7cfa61b5c0c9686b)
2022-11-21 11:50:11 Flink process exit code is :0

四. 反思与迭代

针对上述的处理过程,发现几点值得思考和迭代:

  1. 我们程序和yarn进行定时通讯,及时获取yarn中调度器的设置,然后动态设置最大核心数,充分利用资源的同时,也保证了程序的稳定;
  1. 我们在设置并行度时,需要考虑jobmanager能够协调的taskmanager的数量,不是靠尝试。
     
    本文学到的经验是,设置125个taskmanager时,jobmanager是可以顶住压力的,但接下来可以分析分析flink的通讯相关的源码,以便能极大的发挥集群资源。
     
    其次对于jobmanager管理这么多节点是否可以设置flink的高可用,增加job运行的稳定度

还有一个小细节

  1. taskmanager的内存使用有些浪费,如上图,并未充分利用内存资源,这点我们也需思考要如何优化。

仔细点我们也可以发现

  1. 当container设置的核心数最大为4时,numberOfTaskSlots的设置其实无效了。

【性能|优化】TB级flink任务报错分析:Could not compute the container Resource相关推荐

  1. java电商秒杀深度优化_【B0796】Java性能优化亿级流量秒杀方案及电商项目秒杀实操2020视频教程...

    Java视频教程名称:Java性能优化亿级流量秒杀方案及电商项目秒杀实操2020视频教程    java自学网[javazx.com]  性能视频教程   it教程 Java自学网收集整理 java论 ...

  2. mysql 1126报错 User 'root' has exceeded the 'max_questions' resource (current value: 10000)

    mysql 1126报错 User 'root' has exceeded the 'max_questions' resource (current value: 10000) 最近在项目中,从de ...

  3. docker运行报错:Error response from daemon:Container {containerId} is not running

    问题产生: 已启动docker服务systemctl start docker.service,由于本地8080端口之前已经在使用忘记关闭了,在执行docker run -dit --privileg ...

  4. outlook响应服务器450,outlook报错分析

    原标题:outlook报错分析 1.0x800CCC79,报错内容一般如下: 由于服务器拒绝收件人之一,无法发送邮件.被拒绝的电子邮件地址是"xxx@163.com". 主题 '饿 ...

  5. 报错:NestedIOException: Failed to parse mapping resource

    报错:NestedIOException: Failed to parse mapping resource 详细信息如下: BeanInstantiationException: Failed to ...

  6. 思科2960接入华为S5130报错分析

    思科2960接入华为S5130报错分析 拓扑: 拓扑介绍: 如图所示,原先架构为华为5130接入到核心5130,有一台思科2960管理口access接5130核心,后新增一台思科2960接入至5130 ...

  7. sparksql Error in query: resolved attribute(s)报错分析

    sparksql Error in query: resolved attribute(s)报错分析 项目场景: sparksql Error in query: resolved attribute ...

  8. ROS基础(安装、报错分析)

    ROS 1 ROS基础1 1.1 ROS的安装 首先需要搭建开发环境.下载与安装参考博客的做法. ros_graph rosnode list rosnode info /turtlesim rost ...

  9. echarts报错分析汇总

    echarts报错分析汇总 echart 节点重复报错---echarts2.js:60988 Graph nodes have duplicate name or id 桑基图-拖拽时出现样式错乱, ...

最新文章

  1. axios get请求_Axios使用指南
  2. loadrunner 的脚本语言
  3. Linux下安装Solr
  4. python2与python3性能对比_对Python2与Python3中__bool__方法的差异详解
  5. 安防监控应用成LED企业新盈利点
  6. 多命令顺序执行,单引号,双引号,反引号,转义符
  7. 【java设计模式之Command(菜单命令) 】
  8. twisted系列教程七–小插曲,延迟对象
  9. 基础才是重中之重~对象的生与死
  10. python逢7跳过_python学习笔记(七)break 和continue
  11. AT89C51单片机万年历仿真图+代码
  12. 2019年总结-做时间的朋友
  13. npm报错 443(error : getaddrinfo enotfound registry.npmjs.org registry.npmjs.org:443)
  14. 20核服务器项目,详细解答E5-2680v2,20核40线程服务器的具体用途怎么体现出来
  15. java double 赋值语句_Java中float、double、long类型变量赋值添加f、d、L尾缀问题
  16. 深度盘点:Python 使用和高性能技巧总结
  17. 开原框架RxJava
  18. MATLAB绘图设置
  19. ABBYY OCR技术教电脑阅读缅甸语(上)
  20. layui select onchange 实现

热门文章

  1. java mp3 播放_JAVA播放MP3
  2. 计算机主板 也叫系统版,计算机主板的型号是什么?
  3. #775 Div.1 C. Tyler and Strings 组合数学
  4. 闪讯在 OS X 10.10.4 上的使用姿势
  5. vivado实现SDI接口
  6. 电子邮件加密和数字签名服务解决方案
  7. Unity帧率、屏幕刷新率
  8. 常规批复试录取浙大MBA的幸运儿经验分享
  9. 使用ThinkPHP进行图片批量裁剪
  10. 计算机程序工作日志,计算机程序员实习日记大全