文章目录

  • 一. 任务描述与一句话
    • 1. 任务描述
    • 2. 一句话
  • 二. 日志分析
    • 1. 申请一个task manager
    • 2. 大概3分钟后运行这个tm时,报资源找不到
  • 三. 源码分析与报错机制定位
    • 1. 关键日志
    • 2. 源码定位
      • 2.1. 为什么报:has no more allocated slots for job
      • 2.2. 谁控制资源的释放
  • 四、思考与更深入的探索
    • 1. 是否是最优的解决方案
    • 2. flink timerservice的设计

一. 任务描述与一句话

1. 任务描述

flink版本:1.12.7
任务为:hdfs到hive的数据清洗
具体的:1000并发、每个tm 10core、15G运行内存、文件数据为8000多个,总数据量为230多万条。

2. 一句话

任务现象描述:
简单的说,flink申请好某一个taskmanager之后,没有立刻去使用资源,过了一段时间之后资源被释放,导致再使用这个资源时报has no more allocated slots。

超时资源释放的参数由akka.ask.timeout控制。
 
解决问题的方式就是观察:tm从申请到第一次使用的时间间隔,然后设置timout大于这个间隔即可。
这里我设置的是akka.ask.timeout = 600s

二. 日志分析

从申请一个taskmanager到has no more allocated slots的日志分析

1. 申请一个task manager

2022-12-26 14:47:47,129 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Registering TaskManager with ResourceID container_e219_1670315060312_628979_01_000078(10.203.70.8:8842) (akka.tcp://flink@server/user/rpc/taskmanager_0) at ResourceManager

记住:注册taskmanager的时间(14:47:47)。

2. 大概3分钟后运行这个tm时,报资源找不到

2022-12-26 14:50:39,712 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[raw_message]) -> Calc(select=[(raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)b=([^& \"]*)' REGEXP_EXTRACT 2) AS b, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)c=([^& \"]*)' REGEXP_EXTRACT 2) AS c, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)cId=([^& \"]*)' REGEXP_EXTRACT 2) AS cid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)f=([^& \"]*)' REGEXP_EXTRACT 2) AS f, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)fl=([^& \"]*)' REGEXP_EXTRACT 2) AS fl, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)gid=([^& \"]*)' REGEXP_EXTRACT 2) AS gid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)guid=([^& \"]*)' REGEXP_EXTRACT 2) AS guid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)id=([^& \"]*)' REGEXP_EXTRACT 2) AS id, (raw_message REGEXP_EXTRACT _UTF-16LE'((?:(?:25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(?:25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d))))' REGEXP_EXTRACT 2) AS ip, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)mid=([^& \"]*)' REGEXP_EXTRACT 2) AS mid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)p=([^& \"]*)' REGEXP_EXTRACT 2) AS p, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)pos=([^& \"]*)' REGEXP_EXTRACT 2) AS pos, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)qd=([^& \?\"]*)' REGEXP_EXTRACT 2) AS qd, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)r=([^& \"]*)' REGEXP_EXTRACT 2) AS r, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)sd=([^& \"]*)' REGEXP_EXTRACT 2) AS sd, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)sid=([^& \"]*)' REGEXP_EXTRACT 2) AS sid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)sr=([^& \"]*)' REGEXP_EXTRACT 2) AS sr, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)steptime=([^& \"]*)' REGEXP_EXTRACT 2) AS steptime, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)t=([^& \"]*)' REGEXP_EXTRACT 2) AS t, (raw_message REGEXP_EXTRACT _UTF-16LE'(\d{2})\/(\w{3})\/(\d{4}):(\d{2}):(\d{2}):(\d{2})' REGEXP_EXTRACT 2) AS time_stamp, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)title=([^& \"]*)' REGEXP_EXTRACT 2) AS title, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)u=([^& \"\?]*)' REGEXP_EXTRACT 2) AS u, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)ul=([^& \"]*)' REGEXP_EXTRACT 2) AS ul, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)ce=([^& \"]*)' REGEXP_EXTRACT 2) AS ce, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)uid=([^& \"]*)' REGEXP_EXTRACT 2) AS uid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)tg=([^& \"]*)' REGEXP_EXTRACT 2) AS tg, (raw_message REGEXP_EXTRACT _UTF-16LE'/qdas/([^& \.\?\"]*)' REGEXP_EXTRACT 2) AS event_type, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)correct=([^& \"]*)' REGEXP_EXTRACT 2) AS correct, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)groupid=([^& \"]*)' REGEXP_EXTRACT 2) AS groupid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)method=([^& \"]*)' REGEXP_EXTRACT 2) AS method, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)number=([^& \"]*)' REGEXP_EXTRACT 2) AS number, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)option=([^& \"]*)' REGEXP_EXTRACT 2) AS option1, _UTF-16LE'\N' AS country, _UTF-16LE'\N' AS province, _UTF-16LE'\N' AS city, _UTF-16LE'\N' AS operator, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)channel=([^& \"\ ]*)' REGEXP_EXTRACT 2) AS channel, _UTF-16LE'\N' AS logs], where=[(raw_message LIKE _UTF-16LE'%/qdas/%')]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[b, c, cid, f, fl, gid, guid, id, ip, mid, p, pos, qd, r, sd, sid, sr, steptime, t, time_stamp, title, u, ul, ce, uid, tg, event_type, correct, groupid, method, number, option1, country, province, city, operator, channel, logs]) (778/1000) (7fefe2591680c4f3c44256e639484c1b) switched from DEPLOYING to RUNNING.
2022-12-26 14:50:42,479 WARN  org.apache.flink.runtime.taskmanager.TaskManagerLocation     [] - No hostname could be resolved for the IP address , using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
2022-12-26 14:50:50,180 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[raw_message]) -> Calc(select=[(raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)b=([^& \"]*)' REGEXP_EXTRACT 2) AS b, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)c=([^& \"]*)' REGEXP_EXTRACT 2) AS c, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)cId=([^& \"]*)' REGEXP_EXTRACT 2) AS cid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)f=([^& \"]*)' REGEXP_EXTRACT 2) AS f, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)fl=([^& \"]*)' REGEXP_EXTRACT 2) AS fl, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)gid=([^& \"]*)' REGEXP_EXTRACT 2) AS gid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)guid=([^& \"]*)' REGEXP_EXTRACT 2) AS guid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)id=([^& \"]*)' REGEXP_EXTRACT 2) AS id, (raw_message REGEXP_EXTRACT _UTF-16LE'((?:(?:25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(?:25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d))))' REGEXP_EXTRACT 2) AS ip, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)mid=([^& \"]*)' REGEXP_EXTRACT 2) AS mid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)p=([^& \"]*)' REGEXP_EXTRACT 2) AS p, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)pos=([^& \"]*)' REGEXP_EXTRACT 2) AS pos, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)qd=([^& \?\"]*)' REGEXP_EXTRACT 2) AS qd, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)r=([^& \"]*)' REGEXP_EXTRACT 2) AS r, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)sd=([^& \"]*)' REGEXP_EXTRACT 2) AS sd, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)sid=([^& \"]*)' REGEXP_EXTRACT 2) AS sid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)sr=([^& \"]*)' REGEXP_EXTRACT 2) AS sr, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)steptime=([^& \"]*)' REGEXP_EXTRACT 2) AS steptime, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)t=([^& \"]*)' REGEXP_EXTRACT 2) AS t, (raw_message REGEXP_EXTRACT _UTF-16LE'(\d{2})\/(\w{3})\/(\d{4}):(\d{2}):(\d{2}):(\d{2})' REGEXP_EXTRACT 2) AS time_stamp, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)title=([^& \"]*)' REGEXP_EXTRACT 2) AS title, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)u=([^& \"\?]*)' REGEXP_EXTRACT 2) AS u, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)ul=([^& \"]*)' REGEXP_EXTRACT 2) AS ul, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)ce=([^& \"]*)' REGEXP_EXTRACT 2) AS ce, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)uid=([^& \"]*)' REGEXP_EXTRACT 2) AS uid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)tg=([^& \"]*)' REGEXP_EXTRACT 2) AS tg, (raw_message REGEXP_EXTRACT _UTF-16LE'/qdas/([^& \.\?\"]*)' REGEXP_EXTRACT 2) AS event_type, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)correct=([^& \"]*)' REGEXP_EXTRACT 2) AS correct, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)groupid=([^& \"]*)' REGEXP_EXTRACT 2) AS groupid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)method=([^& \"]*)' REGEXP_EXTRACT 2) AS method, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)number=([^& \"]*)' REGEXP_EXTRACT 2) AS number, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)option=([^& \"]*)' REGEXP_EXTRACT 2) AS option1, _UTF-16LE'\N' AS country, _UTF-16LE'\N' AS province, _UTF-16LE'\N' AS city, _UTF-16LE'\N' AS operator, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)channel=([^& \"\ ]*)' REGEXP_EXTRACT 2) AS channel, _UTF-16LE'\N' AS logs], where=[(raw_message LIKE _UTF-16LE'%/qdas/%')]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[b, c, cid, f, fl, gid, guid, id, ip, mid, p, pos, qd, r, sd, sid, sr, steptime, t, time_stamp, title, u, ul, ce, uid, tg, event_type, correct, groupid, method, number, option1, country, province, city, operator, channel, logs]) (789/1000) (aa79b1c7e588d99c9aef8c66a5fbbd9e) switched from DEPLOYING to FAILED on container_e219_1670315060312_628979_01_000078 @ 10.203.70.8 (dataPort=26135).
org.apache.flink.util.FlinkException: TaskExecutor akka.tcp://flink@server/user/rpc/taskmanager_0 has no more allocated slots for job 8edae0d39803e7ecaab61224b5493b39.at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnectionIfNoAllocatedResources(TaskExecutor.java:1872) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1853) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1886) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$3000(TaskExecutor.java:176) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:2244) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.7.jar:1.12.7]
2022-12-26 14:50:50,219 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution aa79b1c7e588d99c9aef8c66a5fbbd9e.
2022-12-26 14:50:50,228 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_788.
2022-12-26 14:50:50,229 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_788.
2022-12-26 14:50:50,231 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Flink RI Job (8edae0d39803e7ecaab61224b5493b39) switched from state RUNNING to RESTARTING.
2022-12-26 14:50:50,234 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[raw_message]) -> Calc(select=[(raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)b=([^& \"]*)' REGEXP_EXTRACT 2) AS b, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)c=([^& \"]*)' REGEXP_EXTRACT 2) AS c, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)cId=([^& \"]*)' REGEXP_EXTRACT 2) AS cid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)f=([^& \"]*)' REGEXP_EXTRACT 2) AS f, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)fl=([^& \"]*)' REGEXP_EXTRACT 2) AS fl, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)gid=([^& \"]*)' REGEXP_EXTRACT 2) AS gid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)guid=([^& \"]*)' REGEXP_EXTRACT 2) AS guid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)id=([^& \"]*)' REGEXP_EXTRACT 2) AS id, (raw_message REGEXP_EXTRACT _UTF-16LE'((?:(?:25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(?:25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d))))' REGEXP_EXTRACT 2) AS ip, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)mid=([^& \"]*)' REGEXP_EXTRACT 2) AS mid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)p=([^& \"]*)' REGEXP_EXTRACT 2) AS p, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)pos=([^& \"]*)' REGEXP_EXTRACT 2) AS pos, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)qd=([^& \?\"]*)' REGEXP_EXTRACT 2) AS qd, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)r=([^& \"]*)' REGEXP_EXTRACT 2) AS r, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)sd=([^& \"]*)' REGEXP_EXTRACT 2) AS sd, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)sid=([^& \"]*)' REGEXP_EXTRACT 2) AS sid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)sr=([^& \"]*)' REGEXP_EXTRACT 2) AS sr, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)steptime=([^& \"]*)' REGEXP_EXTRACT 2) AS steptime, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)t=([^& \"]*)' REGEXP_EXTRACT 2) AS t, (raw_message REGEXP_EXTRACT _UTF-16LE'(\d{2})\/(\w{3})\/(\d{4}):(\d{2}):(\d{2}):(\d{2})' REGEXP_EXTRACT 2) AS time_stamp, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)title=([^& \"]*)' REGEXP_EXTRACT 2) AS title, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)u=([^& \"\?]*)' REGEXP_EXTRACT 2) AS u, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)ul=([^& \"]*)' REGEXP_EXTRACT 2) AS ul, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)ce=([^& \"]*)' REGEXP_EXTRACT 2) AS ce, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)uid=([^& \"]*)' REGEXP_EXTRACT 2) AS uid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)tg=([^& \"]*)' REGEXP_EXTRACT 2) AS tg, (raw_message REGEXP_EXTRACT _UTF-16LE'/qdas/([^& \.\?\"]*)' REGEXP_EXTRACT 2) AS event_type, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)correct=([^& \"]*)' REGEXP_EXTRACT 2) AS correct, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)groupid=([^& \"]*)' REGEXP_EXTRACT 2) AS groupid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)method=([^& \"]*)' REGEXP_EXTRACT 2) AS method, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)number=([^& \"]*)' REGEXP_EXTRACT 2) AS number, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)option=([^& \"]*)' REGEXP_EXTRACT 2) AS option1, _UTF-16LE'\N' AS country, _UTF-16LE'\N' AS province, _UTF-16LE'\N' AS city, _UTF-16LE'\N' AS operator, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)channel=([^& \"\ ]*)' REGEXP_EXTRACT 2) AS channel, _UTF-16LE'\N' AS logs], where=[(raw_message LIKE _UTF-16LE'%/qdas/%')]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[b, c, cid, f, fl, gid, guid, id, ip, mid, p, pos, qd, r, sd, sid, sr, steptime, t, time_stamp, title, u, ul, ce, uid, tg, event_type, correct, groupid, method, number, option1, country, province, city, operator, channel, logs]) (787/1000) (5363c22bf2a1edb34eeefee4591895d4) switched from DEPLOYING to FAILED on container_e219_1670315060312_628979_01_000078 @ 10.203.70.8 (dataPort=26135).
org.apache.flink.util.FlinkException: TaskExecutor akka.tcp://flinkserver/user/rpc/taskmanager_0 has no more allocated slots for job 8edae0d39803e7ecaab61224b5493b39.at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnectionIfNoAllocatedResources(TaskExecutor.java:1872) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1853) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1886) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$3000(TaskExecutor.java:176) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:2244) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.7.jar:1.12.7]
2022-12-26 14:50:50,235 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 5363c22bf2a1edb34eeefee4591895d4.
2022-12-26 14:50:50,235 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_786.
2022-12-26 14:50:50,235 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_786.
2022-12-26 14:50:50,236 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[raw_message]) -> Calc(select=[(raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)b=([^& \"]*)' REGEXP_EXTRACT 2) AS b, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)c=([^& \"]*)' REGEXP_EXTRACT 2) AS c, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)cId=([^& \"]*)' REGEXP_EXTRACT 2) AS cid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)f=([^& \"]*)' REGEXP_EXTRACT 2) AS f, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)fl=([^& \"]*)' REGEXP_EXTRACT 2) AS fl, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)gid=([^& \"]*)' REGEXP_EXTRACT 2) AS gid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)guid=([^& \"]*)' REGEXP_EXTRACT 2) AS guid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)id=([^& \"]*)' REGEXP_EXTRACT 2) AS id, (raw_message REGEXP_EXTRACT _UTF-16LE'((?:(?:25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(?:25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d))))' REGEXP_EXTRACT 2) AS ip, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)mid=([^& \"]*)' REGEXP_EXTRACT 2) AS mid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)p=([^& \"]*)' REGEXP_EXTRACT 2) AS p, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)pos=([^& \"]*)' REGEXP_EXTRACT 2) AS pos, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)qd=([^& \?\"]*)' REGEXP_EXTRACT 2) AS qd, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)r=([^& \"]*)' REGEXP_EXTRACT 2) AS r, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)sd=([^& \"]*)' REGEXP_EXTRACT 2) AS sd, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)sid=([^& \"]*)' REGEXP_EXTRACT 2) AS sid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)sr=([^& \"]*)' REGEXP_EXTRACT 2) AS sr, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)steptime=([^& \"]*)' REGEXP_EXTRACT 2) AS steptime, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)t=([^& \"]*)' REGEXP_EXTRACT 2) AS t, (raw_message REGEXP_EXTRACT _UTF-16LE'(\d{2})\/(\w{3})\/(\d{4}):(\d{2}):(\d{2}):(\d{2})' REGEXP_EXTRACT 2) AS time_stamp, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)title=([^& \"]*)' REGEXP_EXTRACT 2) AS title, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)u=([^& \"\?]*)' REGEXP_EXTRACT 2) AS u, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)ul=([^& \"]*)' REGEXP_EXTRACT 2) AS ul, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)ce=([^& \"]*)' REGEXP_EXTRACT 2) AS ce, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)uid=([^& \"]*)' REGEXP_EXTRACT 2) AS uid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)tg=([^& \"]*)' REGEXP_EXTRACT 2) AS tg, (raw_message REGEXP_EXTRACT _UTF-16LE'/qdas/([^& \.\?\"]*)' REGEXP_EXTRACT 2) AS event_type, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)correct=([^& \"]*)' REGEXP_EXTRACT 2) AS correct, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)groupid=([^& \"]*)' REGEXP_EXTRACT 2) AS groupid, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)method=([^& \"]*)' REGEXP_EXTRACT 2) AS method, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)number=([^& \"]*)' REGEXP_EXTRACT 2) AS number, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)option=([^& \"]*)' REGEXP_EXTRACT 2) AS option1, _UTF-16LE'\N' AS country, _UTF-16LE'\N' AS province, _UTF-16LE'\N' AS city, _UTF-16LE'\N' AS operator, (raw_message REGEXP_EXTRACT _UTF-16LE'(\?|&)channel=([^& \"\ ]*)' REGEXP_EXTRACT 2) AS channel, _UTF-16LE'\N' AS logs], where=[(raw_message LIKE _UTF-16LE'%/qdas/%')]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[b, c, cid, f, fl, gid, guid, id, ip, mid, p, pos, qd, r, sd, sid, sr, steptime, t, time_stamp, title, u, ul, ce, uid, tg, event_type, correct, groupid, method, number, option1, country, province, city, operator, channel, logs]) (788/1000) (ef57df82ced6382e618b7bc1f90bfd90) switched from DEPLOYING to FAILED on container_e219_1670315060312_628979_01_000078 @ 10.203.70.8 (dataPort=26135).
org.apache.flink.util.FlinkException: TaskExecutor akka.tcp://flink@/user/rpc/taskmanager_0 has no more allocated slots for job 8edae0d39803e7ecaab61224b5493b39.at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnectionIfNoAllocatedResources(TaskExecutor.java:1872) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1853) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1886) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$3000(TaskExecutor.java:176) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:2244) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.7.jar:1.12.7]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.7.jar:1.12.7]at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.7.jar:1.12.7]
2022-12-26 14:50:50,236 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution ef57df82ced6382e618b7bc1f90bfd90.
2022-12-26 14:50:50,236 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_787.
2022-12-26 14:50:50,236 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_787.
2022-12-26 14:50:50,241 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Flink RI Job (8edae0d39803e7ecaab61224b5493b39) switched from state RESTARTING to FAILING.

根据堆栈来分析:这里进行了org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot
timeout的判断 ,具体的timeout参数是:akka.ask.timeout

/** Timeout for akka ask calls. */
public static final ConfigOption<String> ASK_TIMEOUT =ConfigOptions.key("akka.ask.timeout").stringType().defaultValue("10 s").withDescription("Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you"+ " should try to increase this value. Timeouts can be caused by slow machines or a congested network. The"+ " timeout value requires a time-unit specifier (ms/s/min/h/d).");

大概意思是:超时用于所有的未来或堵塞的akka请求,默认值是10s。

三. 源码分析与报错机制定位

1. 关键日志

看到日志具体的表现是当taskmanager启动后3分钟,开始使用tm的资源运行任务,但此时因为akka.ask.timeout超时(程序中设置为120s),资源被释放导致tm没有可用资源。

org.apache.flink.util.FlinkException: TaskExecutor akka.tcp://flink@/user/rpc/taskmanager_0 has no more allocated slots for job 8edae0d39803e7ecaab61224b5493b39.at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnectionIfNoAllocatedResources(TaskExecutor.java:1872) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1853) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1886) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$3000(TaskExecutor.java:176) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:2244) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.12.7.jar:1.12.7]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.7.jar:1.12.7]

分析堆栈可以得到如上分析的逻辑:当超时时,tm就会被free(释放),并报出“has no more allocated slots for job”。

2. 源码定位

上述的日志分析有一个大概的错误定位,但具体的还要追踪源码分析,但不要陷入到源码中,而是要围绕两个问题去确定报错的具体原因,然后对症下药

  1. 为什么报:has no more allocated slots for job
  2. 怎么触发的报错

2.1. 为什么报:has no more allocated slots for job

根据日志的堆栈信息分析,
根据方法名如果没有可分配资源就去释放:那具体是怎么没有的资源

private void closeJobManagerConnectionIfNoAllocatedResources(JobID jobId) {// check whether we still have allocated slots for the same jobif (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty()&& !partitionTracker.isTrackingPartitionsFor(jobId)) {// we can remove the job from the job leader servicefinal FlinkException cause =new FlinkException("TaskExecutor "+ getAddress()+ " has no more allocated slots for job "+ jobId+ '.');releaseJobResources(jobId, cause);}
}

这里会释放资源

private void freeSlotInternal(AllocationID allocationId, Throwable cause) {checkNotNull(allocationId);log.debug("Free slot with allocation id {} because: {}", allocationId, cause.getMessage());try {final JobID jobId = taskSlotTable.getOwningJob(allocationId);//这里有一个释放的操作,追踪这个代码final int slotIndex = taskSlotTable.freeSlot(allocationId, cause);if (slotIndex != -1) {if (isConnectedToResourceManager()) {// the slot was freed. Tell the RM about itResourceManagerGateway resourceManagerGateway =establishedResourceManagerConnection.getResourceManagerGateway();resourceManagerGateway.notifySlotAvailable(establishedResourceManagerConnection.getTaskExecutorRegistrationId(),new SlotID(getResourceID(), slotIndex),allocationId);}if (jobId != null) {closeJobManagerConnectionIfNoAllocatedResources(jobId);}}} catch (SlotNotFoundException e) {log.debug("Could not free slot for allocation id {}.", allocationId, e);}localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
}

继续按照堆栈来分析

2.2. 谁控制资源的释放


private void timeoutSlot(AllocationID allocationId, UUID ticket) {checkNotNull(allocationId);checkNotNull(ticket);//这里会判断slot是否超时if (taskSlotTable.isValidTimeout(allocationId, ticket)) {freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " has timed out."));} else {log.debug("Received an invalid timeout for allocation id {} with ticket {}.",allocationId,ticket);}
}//这里有一个timerservice去决定是否timeout(ticket是否还是有效)
@Override
public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {checkStarted();return state == State.RUNNING && timerService.isValid(allocationId, ticket);
}//关键代码:判断是否超时
/*** Check whether the timeout for the given key and ticket is still valid (not yet unregistered* and not yet overwritten).** @param key for which to check the timeout* @param ticket of the timeout* @return True if the timeout ticket is still valid; otherwise false*/
public boolean isValid(K key, UUID ticket) {if (timeouts.containsKey(key)) {Timeout<K> timeout = timeouts.get(key);return timeout.getTicket().equals(ticket);} else {return false;}
}

此时大概知道,有一个timeout的对象来决定是否超时,如果超时则释放资源。

再观察timeout的来历!


TaskManagerServicesprivate static TaskSlotTable<Task> createTaskSlotTable(final int numberOfSlots,final TaskExecutorResourceSpec taskExecutorResourceSpec,final long timerServiceShutdownTimeout,final int pageSize,final Executor memoryVerificationExecutor) {//这里创建了timerservicefinal TimerService<AllocationID> timerService =new TimerService<>(new ScheduledThreadPoolExecutor(1), timerServiceShutdownTimeout);return new TaskSlotTableImpl<>(numberOfSlots,TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(taskExecutorResourceSpec),TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(taskExecutorResourceSpec, numberOfSlots),pageSize,timerService,memoryVerificationExecutor);
}public boolean allocateSlot(int index,JobID jobId,AllocationID allocationId,ResourceProfile resourceProfile,Time slotTimeout) {checkRunning();// register a timeout for this slot since it's in state allocated
timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());注册timeout
/*** Register a timeout for the given key which shall occur in the given delay.** @param key for which to register the timeout* @param delay until the timeout* @param unit of the timeout delay*/
public void registerTimeout(final K key, final long delay, final TimeUnit unit) {Preconditions.checkState(timeoutListener != null,"The " + getClass().getSimpleName() + " has not been started.");if (timeouts.containsKey(key)) {unregisterTimeout(key);}timeouts.put(key, new Timeout<>(timeoutListener, key, delay, unit, scheduledExecutorService));
}

timeout来源于taskManagerConfiguration

TaskExecutortaskmanager开始分配资源:private void allocateSlot(SlotID slotId, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile)throws SlotAllocationException {if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {if (taskSlotTable.allocateSlot(slotId.getSlotNumber(),jobId,allocationId,resourceProfile,//timeout来源于这里taskManagerConfiguration.getTimeout())) {log.info("Allocated slot for {}.", allocationId);} else {log.info("Could not allocate slot for {}.", allocationId);throw new SlotAllocationException("Could not allocate slot.");}} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {final String message ="The slot " + slotId + " has already been allocated for a different job.";log.info(message);final AllocationID allocationID =taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));}
}

再找taskManagerConfiguration 是怎么来的TaskManagerRunner 下的startTaskManager

public static TaskExecutor startTaskManager(
...
throws Exception {...TaskManagerConfiguration taskManagerConfiguration =TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress);...
}def getTimeout(config: Configuration): time.Duration = {TimeUtils.parseDuration(config.getString(AkkaOptions.ASK_TIMEOUT))
}public static final ConfigOption<String> ASK_TIMEOUT =ConfigOptions.key("akka.ask.timeout").stringType().defaultValue("10 s").withDescription("Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you"+ " should try to increase this value. Timeouts can be caused by slow machines or a congested network. The"+ " timeout value requires a time-unit specifier (ms/s/min/h/d).");

最终确定参数:akka.ask.timeout决定了什么时候has no more allocated slots!!!

四、思考与更深入的探索

1. 是否是最优的解决方案

可以思考一个问题,任务运行时资源为什么会有这些时间等待?

看到任务是1000并发处理230多万条数据,对于这个任务来说,tm的资源调度会花不少时间(看到有7分钟左右的资源申请到运行),而且有些资源调度完了之后没有立刻使用,而是等待占据资源,这其实造成了资源浪费。
 
并发的设置只是简单根据文件数进行设置(多少文件设置多少并发,最高1000),这个设置有点‘随意’,因为对于这个任务来说8000多个文件每个文件都很小,不应该设置这么大的并发。

可能的优化点

首先是否可以同时根据文件数和文件大小的综合考虑去设置并发呢。这个可以考虑优化

其次再说每个tm 设置了10core 15G ,一个并发对应1core的情况。
如果一个线程中出现了问题 那这个tm的任务可能就会有问题,是不是可以一个tm就只设置1core 然后这个tm设置多个线程,这样即发挥了cpu的能力,也节省内存资源,因为这是个传输型任务而不是计算型任务。

2. flink timerservice的设计

上述源码分析虽然能确定了导致问题的根本原因:

释放资源时长的参数:akka.ask.timeout

但对于资源释放的完整过程的源码分析的其实有点粗犷,比如关键的timerservice对资源释放的调度逻辑还是有必要研究一下的。timerservice的在flink很多地方都使用到了,比如上述问题、flink窗口等逻辑。

之后找个时间研究下timerservice的逻辑。

【报错】flink源码分析: has no more allocated slots与思考相关推荐

  1. cefsharp已停止工作_Winform下CefSharp的引用、配置、实例与报错排除(源码)

    Winform下CefSharp的引用.配置.实例与报错排除(源码) Winform 下 CefSharp 的引用, 配置, 实例与报错排除 [TOC] 1, 关于 CefSharp 装一手, 比较简 ...

  2. Flink源码分析 - 源码构建

    本篇文章首发于头条号Flink源码分析 - 源码构建,欢迎关注我的头条号和微信公众号"大数据技术和人工智能"(微信搜索bigdata_ai_tech)获取更多干货,也欢迎关注我的C ...

  3. vue add element报错_Vue 源码解析 -- new Vue -gt; mountComponent 001

    这一系列文章的学习出处于Vue.js 技术揭秘 | Vue.js 技术揭秘,有兴趣的伙伴自行阅览 本文涉及到vue源码文件中的 src/core/instance/index.js ==> ne ...

  4. Winform下CefSharp的引用、配置、实例与报错排除(源码)

    Winform下CefSharp的引用.配置.实例与报错排除 本文详细介绍了CefSharp在vs2013..net4.0环境下,创建Winfrom项目.引用CefSharp的方法,演示了winfro ...

  5. flink源码分析_Flink源码分析之深度解读流式数据写入hive

    前言 前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下.以便朋友们对flink ...

  6. flutter 真机无法调试 sdk报错_Flutter源码剖析(二):源码的阅读与调试环境配置

    综述 Flutter从架构上来说有3部分: 用Dart写的Framework层,面向开发者 用Java/Kotlin写的Embdder层(For Android,iOS是OC/Swift),纯Flut ...

  7. 【FLink源码分析】:Accumulator源码分析

    文章目录 Accumulator源码分析学习 Accumulator源码结构 Accumulator 结构 成员方法 SimpleAccumulator 累加器使用案例 code Accumulato ...

  8. 【Flink源码分析】Flink 命令启动全流程

    一.启动脚本分析 1. WordCount启动命令 bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname loca ...

  9. 擎创技术流 | Flink源码分析-JobDispatcher

    背景介绍 最近一直在阅读Flink基于Yarn的资源管理相关的代码,牵扯的流程比较长,主要包含以下几个环节: 客户端环节:命令参数解析,定位到作业入口,生成JobGraph,翻译成启动对应的Yarn集 ...

最新文章

  1. 年后跳槽BAT必看:10种数据结构、算法和编程课助你面试通关
  2. BMC Biology:香港城市大学孙燕妮组发表高准确度预测病毒宿主的工具
  3. 闲诗一首:《扬州即行》
  4. CSDDN特约专稿:个性化推荐技术漫谈
  5. 终于开通了园子里的博客!
  6. layui 如何去dom_javascript 怎么去引用layui里面的方法
  7. 这群程序员疯了!他们想成为IT界最会带货的男人
  8. arm9 安装java_QT5.7 AM1808 ARM9的交叉编译
  9. curl_multi实现并发
  10. 母亲节:微信喊你给母亲充钱 华为帮你教爸妈用手机
  11. Ps 初学者教程,如何使用色阶功能提高照片的对比度和亮度?
  12. 私有云的优缺点_私有云服务器的优缺点
  13. Java二维码编码识别
  14. 简易考试系统(java、头歌实验)
  15. 【经验分享】如何使用校园账号登录WOS(Web of Science)
  16. 抖音短链接v.douyin.com生成方法
  17. [福大软工] Z班——Alpha现场答辩情况汇总
  18. java simp,java – 自动装配SimpMessagingTemplate
  19. CAD梦想画图中的“旋转命令”
  20. 银河麒麟V10——Ubantu--银河麒麟系统安装

热门文章

  1. 搜书吧自动回复功能实现
  2. 鸡你太美用计算机怎么弹,鸡你太美怎么双开、多开?鸡你太美双开、多开管理器使用图文教程...
  3. 带编译器的codeblocks下载地址
  4. web程序设计笔记(六)——公告栏
  5. Chroma7123彩色分析仪
  6. linux设备驱动——bus、device、driver加载顺序与匹配流程
  7. instagram 标签_使Instagram更适合您的11条提示
  8. 用NVivo图表培养您的视觉素养
  9. pyhon的数据类型
  10. java核心知识点总结篇-JVM篇