Flink 相关的组件和作业的稳定性通常是比较关键的,所以得需要对它们进行监控,如果有异常,则需要及时告警通知。本章先会教会教会大家如何利用现有 Flink UI 上面的信息去发现和排查问题,会指明一些比较重要和我们非常关心的指标,通过这些指标我们能够立马定位到问题的根本原因。接着笔者会教大家如何去利用现有的 Metrics Reporter 去构建一个 Flink 的监控系统,它可以收集到所有作业的监控指标,并会存储这些监控指标数据,最后还会有一个监控大盘做数据可视化,通过这个大盘可以方便排查问题。

实时监控 Flink 及其作业

当将 Flink JobManager、TaskManager 都运行起来了,并且也部署了不少 Flink Job,那么它到底是否还在运行、运行的状态如何、资源 TaskManager 和 Slot 的个数是否足够、Job 内部是否出现异常、计算速度是否跟得上数据生产的速度 等这些问题其实对我们来说是比较关注的,所以就很迫切的需要一个监控系统帮我们把整个 Flink 集群的运行状态给展示出来。通过监控系统我们能够很好的知道 Flink 内部的整个运行状态,然后才能够根据项目生产环境遇到的问题 ‘对症下药’。下面分别来讲下 JobManager、TaskManager、Flink Job 的监控以及最关心的一些监控指标。

监控 JobManager

我们知道 JobManager 是 Flink 集群的中控节点,类似于 Apache Storm 的 Nimbus 以及 Apache Spark 的 Driver 的角色。它负责作业的调度、作业 Jar 包的管理、Checkpoint 的协调和发起、与 TaskManager 之间的心跳检查等工作。如果 JobManager 出现问题的话,就会导致作业 UI 信息查看不了,TaskManager 和所有运行的作业都会受到一定的影响,所以这也是为啥在 7.1 节中强调 JobManager 的高可用问题。

在 Flink 自带的 UI 上 JobManager 那个 Tab 展示的其实并没有显示其对应的 Metrics,那么对于 JobManager 来说常见比较关心的监控指标有哪些呢?

基础指标

因为 Flink JobManager 其实也是一个 Java 的应用程序,那么它自然也会有 Java 应用程序的指标,比如内存、CPU、GC、类加载、线程信息等。

•内存:内存又分堆内存和非堆内存,在 Flink 中还有 Direct 内存,每种内存又有初始值、使用值、最大值等指标,因为在 JobManager 中的工作其实相当于 TaskManager 来说比较少,也不存储事件数据,所以通常 JobManager 占用的内存不会很多,在 Flink JobManager 中自带的内存 Metrics 指标有:

jobmanager_Status_JVM_Memory_Direct_Countjobmanager_Status_JVM_Memory_Direct_MemoryUsedjobmanager_Status_JVM_Memory_Direct_TotalCapacityjobmanager_Status_JVM_Memory_Heap_Committedjobmanager_Status_JVM_Memory_Heap_Maxjobmanager_Status_JVM_Memory_Heap_Usedjobmanager_Status_JVM_Memory_Mapped_Countjobmanager_Status_JVM_Memory_Mapped_MemoryUsedjobmanager_Status_JVM_Memory_Mapped_TotalCapacityjobmanager_Status_JVM_Memory_NonHeap_Committedjobmanager_Status_JVM_Memory_NonHeap_Maxjobmanager_Status_JVM_Memory_NonHeap_Used

•CPU:JobManager 分配的 CPU 使用情况,如果使用类似 K8S 等资源调度系统,则需要对每个容器进行设置资源,比如 CPU 限制不能超过多少,在 Flink JobManager 中自带的 CPU 指标有:

jobmanager_Status_JVM_CPU_Loadjobmanager_Status_JVM_CPU_Time

•GC:GC 信息对于 Java 应用来说是避免不了的,每种 GC 都有时间和次数的指标可以供参考,提供的指标有:

jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Countjobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Timejobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Countjobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Time

Checkpoint 指标

因为 JobManager 负责了作业的 Checkpoint 的协调和发起功能,所以 Checkpoint 相关的指标就有表示 Checkpoint 执行的时间、Checkpoint 的时间长短、完成的 Checkpoint 的次数、Checkpoint 失败的次数、Checkpoint 正在执行 Checkpoint 的个数等,其对应的指标如下:

jobmanager_job_lastCheckpointAlignmentBufferedjobmanager_job_lastCheckpointDurationjobmanager_job_lastCheckpointExternalPathjobmanager_job_lastCheckpointRestoreTimestampjobmanager_job_lastCheckpointSizejobmanager_job_numberOfCompletedCheckpointsjobmanager_job_numberOfFailedCheckpointsjobmanager_job_numberOfInProgressCheckpointsjobmanager_job_totalNumberOfCheckpoints

重要的指标

另外还有比较重要的指标就是 Flink UI 上也提供的,类似于 Slot 总共个数、Slot 可使用的个数、TaskManager 的个数(通过查看该值可以知道是否有 TaskManager 发生异常重启)、正在运行的作业数量、作业运行的时间和完成的时间、作业的重启次数,对应的指标如下:

jobmanager_job_uptimejobmanager_numRegisteredTaskManagersjobmanager_numRunningJobsjobmanager_taskSlotsAvailablejobmanager_taskSlotsTotaljobmanager_job_downtimejobmanager_job_fullRestartsjobmanager_job_restartingTime

监控 TaskManager

TaskManager 在 Flink 集群中也是一个个的进程实例,它的数量代表着能够运行作业个数的能力,所有的 Flink 作业最终其实是会在 TaskManager 上运行的,TaskManager 管理着运行在它上面的所有作业的 Task 的整个生命周期,包括了 Task 的启动销毁、内存管理、磁盘 IO、网络传输管理等。

因为所有的 Task 都是运行运行在 TaskManager 上的,有的 Task 可能会做比较复杂的操作或者会存储很多数据在内存中,那么就会消耗很大的资源,所以通常来说 TaskManager 要比 JobManager 消耗的资源要多,但是这个资源具体多少其实也不好预估,所以可能会出现由于分配资源的不合理,导致 TaskManager 出现 OOM 等问题。一旦 TaskManager 因为各种问题导致崩溃重启的话,运行在它上面的 Task 也都会失败,JobManager 与它的通信也会丢失。因为作业出现 failover,所以在重启这段时间它是不会去消费数据的,所以必然就会出现数据消费延迟的问题。对于这种情况那么必然就很需要 TaskManager 的监控信息,这样才能够对整个集群的 TaskManager 做一个提前预警。

那么在 Flink 中自带的 TaskManager Metrics 有哪些呢?主要也是 CPU、类加载、GC、内存、网络等。其实这些信息在 Flink UI 上也是有,如下图所示,不知道读者有没有细心观察过。


在这个 TaskManager 的 Metrics 监控页面通常比较关心的指标有内存相关的,还有就是 GC 的指标,通常一个 TaskManager 出现 OOM 之前会不断的进行 GC,在这个 Metrics 页面它展示了年轻代和老年代的 GC 信息(时间和次数),如下图所示,大家可以细心观察下是否 TaskManager OOM 前老年代和新生代的 GC 次数比较、时间比较长。


在 Flink Reporter 中提供的 TaskManager Metrics 指标如下:

taskmanager_Status_JVM_CPU_Loadtaskmanager_Status_JVM_CPU_Timetaskmanager_Status_JVM_ClassLoader_ClassesLoadedtaskmanager_Status_JVM_ClassLoader_ClassesUnloadedtaskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Counttaskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Timetaskmanager_Status_JVM_GarbageCollector_G1_Young_Generation_Counttaskmanager_Status_JVM_GarbageCollector_G1_Young_Generation_Timetaskmanager_Status_JVM_Memory_Direct_Counttaskmanager_Status_JVM_Memory_Direct_MemoryUsedtaskmanager_Status_JVM_Memory_Direct_TotalCapacitytaskmanager_Status_JVM_Memory_Heap_Committedtaskmanager_Status_JVM_Memory_Heap_Maxtaskmanager_Status_JVM_Memory_Heap_Usedtaskmanager_Status_JVM_Memory_Mapped_Counttaskmanager_Status_JVM_Memory_Mapped_MemoryUsedtaskmanager_Status_JVM_Memory_Mapped_TotalCapacitytaskmanager_Status_JVM_Memory_NonHeap_Committedtaskmanager_Status_JVM_Memory_NonHeap_Maxtaskmanager_Status_JVM_Memory_NonHeap_Usedtaskmanager_Status_JVM_Threads_Counttaskmanager_Status_Network_AvailableMemorySegmentstaskmanager_Status_Network_TotalMemorySegmentstaskmanager_Status_Shuffle_Netty_AvailableMemorySegmentstaskmanager_Status_Shuffle_Netty_TotalMemorySegments

监控 Flink 作业

对于运行的作业来说,其实我们会更关心其运行状态,如果没有其对应的一些监控信息,那么对于我们来说这个 Job 就是一个黑盒,完全不知道是否在运行,Job 运行状态是什么、Task 运行状态是什么、是否在消费数据、消费数据是咋样(细分到每个 Task)、消费速度能否跟上生产数据的速度、处理数据的过程中是否有遇到什么错误日志、处理数据是否有出现反压问题等等。

上面列举的这些问题通常来说是比较关心的,那么在 Flink UI 上也是有提供的查看对应的信息的,点开对应的作业就可以查看到作业的执行图,每个 Task 的信息都是会展示出来的,包含了状态、Bytes Received(接收到记录的容量大小)、Records Received(接收到记录的条数)、Bytes Sent(发出去的记录的容量大小)、Records Sent(发出去记录的条数)、异常信息、timeline(作业运行状态的时间线)、Checkpoint 信息,如下图所示。


这些指标也可以通过 Flink 的 Reporter 进行上报存储到第三方的时序数据库,然后通过类似 Grafana 展示出来,如下图所示。通过这些信息大概就可以清楚的知道一个 Job 的整个运行状态,然后根据这些运行状态去分析作业是否有问题。


在流作业中最关键的指标无非是作业的实时性,那么延迟就是衡量作业的是否实时的一个基本参数,但是对于现有的这些信息其实还不知道作业的消费是否有延迟,通常来说可以结合 Kafka 的监控去查看对应消费的 Topic 的 Group 的 Lag 信息,如果 Lag 很大就表明有数据堆积了,另外还有一个办法就是需要自己在作业中自定义 Metrics 做埋点,将算子在处理数据的系统时间与数据自身的 Event Time 做一个差值,求得值就可以知道算子消费的数据是什么时候的了。比如在 1571457964000(2019-10-19 12:06:04)Map 算子消费的数据的事件时间是 1571457604000(2019-10-19 12:00:04),相差了 6 分钟,那么就表明消费延迟了 6 分钟,然后通过 Metrics Reporter 将埋点的 Metrics 信息上传,这样最终就可以获取到作业在每个算子处的消费延迟的时间。

上面的是针对于作业延迟的判断方法,另外像类似于作业反压的情况,在 Flink 的 UI 也会有展示,具体怎么去分析和处理这种问题在 9.1 节中有详细讲解。

根据这些监控信息不仅可以做到提前预警,做好资源的扩容(比如增加容器的数量/内存/CPU/并行度/Slot 个数),也还可以找出作业配置的资源是否有浪费。通常来说一个作业的上线可能是会经过资源的预估,然后才会去申请这个作业要配置多少资源,比如算子要使用多少并行度,最后上线后可以通过完整的运行监控信息查看该作业配置的并行度是否有过多或者配置的内存比较大。比如出现下面这些情况的时候可能就是资源出现浪费了:

•作业消费从未发生过延迟,即使在数据流量高峰的时候,也未发生过消费延迟•作业运行所在的 TaskManager 堆内存使用率异常的低•作业运行所在的 TaskManager 的 GC 时间和次数非常规律,没有出现异常的现象,如下图所示。


在 Flink Metrics Reporter 上传的指标中大概有下面这些:

taskmanager_job_task_Shuffle_Netty_Input_Buffers_outPoolUsagetaskmanager_job_task_Shuffle_Netty_Input_Buffers_outputQueueLengthtaskmanager_job_task_Shuffle_Netty_Output_Buffers_inPoolUsagetaskmanager_job_task_Shuffle_Netty_Output_Buffers_inputExclusiveBuffersUsagetaskmanager_job_task_Shuffle_Netty_Output_Buffers_inputFloatingBuffersUsagetaskmanager_job_task_Shuffle_Netty_Output_Buffers_inputQueueLengthtaskmanager_job_task_Shuffle_Netty_Output_numBuffersInLocaltaskmanager_job_task_Shuffle_Netty_Output_numBuffersInLocalPerSecondtaskmanager_job_task_Shuffle_Netty_Output_numBuffersInRemotetaskmanager_job_task_Shuffle_Netty_Output_numBuffersInRemotePerSecondtaskmanager_job_task_Shuffle_Netty_Output_numBytesInLocaltaskmanager_job_task_Shuffle_Netty_Output_numBytesInLocalPerSecondtaskmanager_job_task_Shuffle_Netty_Output_numBytesInRemotetaskmanager_job_task_Shuffle_Netty_Output_numBytesInRemotePerSecondtaskmanager_job_task_buffers_inPoolUsagetaskmanager_job_task_buffers_inputExclusiveBuffersUsagetaskmanager_job_task_buffers_inputFloatingBuffersUsagetaskmanager_job_task_buffers_inputQueueLengthtaskmanager_job_task_buffers_outPoolUsagetaskmanager_job_task_buffers_outputQueueLengthtaskmanager_job_task_checkpointAlignmentTimetaskmanager_job_task_currentInputWatermarktaskmanager_job_task_numBuffersInLocaltaskmanager_job_task_numBuffersInLocalPerSecondtaskmanager_job_task_numBuffersInRemotetaskmanager_job_task_numBuffersInRemotePerSecondtaskmanager_job_task_numBuffersOuttaskmanager_job_task_numBuffersOutPerSecondtaskmanager_job_task_numBytesIntaskmanager_job_task_numBytesInLocaltaskmanager_job_task_numBytesInLocalPerSecondtaskmanager_job_task_numBytesInPerSecondtaskmanager_job_task_numBytesInRemotetaskmanager_job_task_numBytesInRemotePerSecondtaskmanager_job_task_numBytesOuttaskmanager_job_task_numBytesOutPerSecondtaskmanager_job_task_numRecordsIntaskmanager_job_task_numRecordsInPerSecondtaskmanager_job_task_numRecordsOuttaskmanager_job_task_numRecordsOutPerSecondtaskmanager_job_task_operator_currentInputWatermarktaskmanager_job_task_operator_currentOutputWatermarktaskmanager_job_task_operator_numLateRecordsDroppedtaskmanager_job_task_operator_numRecordsIntaskmanager_job_task_operator_numRecordsInPerSecondtaskmanager_job_task_operator_numRecordsOuttaskmanager_job_task_operator_numRecordsOutPerSecond

最关心的性能指标

上面已经提及到 Flink 的 JobManager、TaskManager 和运行的 Flink Job 的监控以及常用的监控信息,这些指标有的是可以直接在 Flink 的 UI 上观察到的,另外 Flink 提供了 Metrics Reporter 进行上报存储到监控系统中去,然后通过可视化的图表进行展示,在 8.2 节中将教大家如何构建一个完整的监控系统。那么有了这么多监控指标,其实哪些是比较重要的呢,比如说这些指标出现异常的时候可以发出告警及时进行通知,这样可以做到预警作用,另外还可以根据这些信息进行作业资源的评估。下面列举一些笔者觉得比较重要的指标:

JobManager

在 JobManager 中有着该集群中所有的 TaskManager 的个数、Slot 的总个数、Slot 的可用个数、运行的时间、作业的 Checkpoint 情况,笔者觉得这几个指标可以重点关注。

•TaskManager 个数:如果出现 TaskManager 突然减少,可能是因为有 TaskManager 挂掉重启,一旦该 TaskManager 之前运行了很多作业,那么重启带来的影响必然是巨大的。•Slot 个数:取决于 TaskManager 的个数,决定了能运行作业的最大并行度,如果资源不够,及时扩容。•作业运行时间:根据作业的运行时间来判断作业是否存活,中途是否掉线过。•Checkpoint 情况:Checkpoint 是 JobManager 发起的,并且关乎到作业的状态是否可以完整的保存。

TaskManager

因为所有的作业最终都是运行在 TaskManager 上,所以 TaskManager 的监控指标也是异常的监控,并且作业的复杂度也会影响 TaskManager 的资源使用情况,所以 TaskManager 的基础监控指标比如内存、GC 如果出现异常或者超出设置的阈值则需要立马进行告警通知,防止后面导致大批量的作业出现故障重启。

•内存使用率:部分作业的算子会将所有的 State 数据存储在内存中,这样就会导致 TaskManager 的内存使用率会上升,还有就是可以根据该指标看作业的利用率,从而最后来重新划分资源的配置。•GC 情况:分时间和次数,一旦 TaskManager 的内存率很高的时候,必定伴随着频繁的 GC,如果在 GC 的时候没有得到及时的预警,那么将面临 OOM 风险。

Flink Job

作业的稳定性和及时性其实就是大家最关心的,常见的指标有:作业的状态、Task 的状态、作业算子的消费速度、作业出现的异常日志。

•作业的状态:在 UI 上是可以看到作业的状态信息,常见的状态变更信息如下图所示。


•Task 的状态:其实导致作业的状态发生变化的原因通常是由于 Task 的运行状态出现导致,所以也需要对 Task 的运行状态进行监控,Task 的运行状态如下图所示。


•作业异常日志:导致 Task 出现状态异常的根因通常是作业中的代码出现各种各样的异常日志,最后可能还会导致作业无限重启,所以作业的异常日志也是需要及时关注。•作业重启次数:当 Task 状态和作业的状态发生变化的时候,如果作业中配置了重启策略或者开启了 Checkpoint 则会进行作业重启的,重启作业的带来的影响也会很多,并且会伴随着一些不确定的因素,最终导致作业一直重启,这样既不能解决问题,还一直在占用着资源的消耗。•算子的消费速度:代表了作业的消费能力,还可以知道作业是否发生延迟,可以包含算子接收的数据量和发出去数据量,从而可以知道在算子处是否有发生数据的丢失。

小结与反思

本节讲了 Flink 中常见的监控对象,比如 JobManager、TaskManager 和 Flink Job,对于这几个分别介绍了其内部大概有的监控指标,以及在真实生产环境关心的指标,你是否还有其他的监控指标需要补充呢?

本节涉及的监控指标对应的含义可以参考官网链接:metrics

本节涉及的监控指标列表地址:https://github.com/zhisheng17/flink-learning/blob/master/flink-learning-monitor/flink_monitor_measurements.md

猜你喜欢

1、Elasticsearch用得好,下班下得早

2、如果你也想做实时数仓…

3、Kylin on Kubernetes 在 eBay 的实践

4、Spark SQL 自适应执行优化引擎

过往记忆大数据微信群,请添加微信:fangzhen0219,备注【进群】

自带的jvm监控不准_如何实时监控 Flink 集群和作业?相关推荐

  1. CentOS安装Elasticsearch_IK分词器拼音分词器_部署kibana_部署es集群

    CentOS安装Elasticsearch_IK分词器_部署kibana_部署es集群 一.部署单点es ①:创建网络 因为我们还需要部署kibana容器,因此需要让es和kibana容器互联.这里先 ...

  2. mongodb 监控权限_运维监控产品分析篇

    开源运维监控系统篇 1.zabbix 用户群:85%以上的泛互联网企业. 优点:支持多平台的企业级分布式开源监控软件 安装部署简单,多种数据采集插件灵活集成 功能强大,可实现复杂多条件告警, 自带画图 ...

  3. 服务器定期监控数据_基础设施硬件监控探索与实践

    本文选自 <交易技术前沿>总第三十六期文章(2019年9月) 陈靖宇 深圳证券交易所 系统运行部 Email: jingyuchen@szse.cn 摘要:为了应对基础设施规模不断上升,数 ...

  4. 闪烁指示灯监控方案_机房温湿度监控检测方案【斯必得智慧机房】

    点击上方"蓝字"关注我们 01 机房温湿度监控检测方案 摘要:本系统的目的是为了保障中心机房系统的正常运行,实时监测机房环境的各项指标,遇到机房停电.电源故障.环境温度过高.非法闯 ...

  5. 12c集群日志位置_关于Oracle 12c的集群监控(CHM)

    正常情况下,我们有很多工具来监控oracle集群,但是Oracle更建议使用OEM来监视Oracle集群的日常运维工作,使用Cluster Health Monitor (CHM)来监控完整的技术架构 ...

  6. k8s和harbor的集成_爱威尔-基于kubernetes集群的项目持续集成(gitlab+harbor+Jenkins)安装...

    这个算是基于kubernetes集群的项目持续集成的前导篇,先把这用环境搭建好我们后面就可以专注做基于k8s的docker化项目持续集成了. gitlab安装 https://about.gitlab ...

  7. redis集群扩容和缩容_深入理解Redis Cluster集群

    一.背景 前面的文章<深入理解Redis哨兵机制>一文中介绍了Redis哨兵集群的工作原理,哨兵集群虽然满足了高可用的特性,但是依然存在这样的问题:即数据只能往一个主节点上进行写入. 只能 ...

  8. cdh sqoop 配置_相比于手动搭建集群,使用Ambari或者CDH的必要性

    1. Ambari简介 Apache Ambari是一种基于Web的工具,支持Apache Hadoop集群的供应.管理和监控.Ambari已支持大多数Hadoop组件,包括HDFS.MapReduc ...

  9. python实时监控文件大小_python实现实时监控文件的方法

    在业务稳定性要求比较高的情况下,运维为能及时发现问题,有时需要对应用程序的日志进行实时分析,当符合某个条件时就立刻报警,而不是被动等待出问题后去解决,比如要监控nginx的$request_time和 ...

最新文章

  1. 计算机学院杨洋,美国莱特州立大学吴志强教授访问计算机科学与技术学院
  2. 来自前苹果高管Heidi Roizen的经验之谈
  3. python十大实例_Python练习实例100例(从简入难)96-100
  4. 学习Altas 笔记[js调用重载的方法出错,如何处理]
  5. Android.mk调用bin/shell
  6. 牛客网Java刷题知识点之调用线程类的start()方法和run()方法的区别
  7. 《统计学习方法》课后习题参考答案
  8. 华山论剑之iOS的淫思巧计(持续更新中..)
  9. 清华学霸尹成Python教程
  10. 摩尔定律、安迪-比尔定律、反摩尔定律
  11. 50欧姆系统dBm与电压的转换关系
  12. Mastering Go 英文原版翻译项目
  13. 使用ardunio制作神秘礼物(环境光传感器、舵机、LED、蜂鸣器)
  14. 干货!Flask 动态展示 Pyecharts 图表数据的几种方法!
  15. 算法--分治法归并排序 python
  16. 2022年MySQL最新面试题
  17. 泛型是什么,为什么要用泛型
  18. 模糊综合评判法实现学生互评
  19. 半年10倍的股票秘诀
  20. 【Python小案例】打工人必备:有了这款倒计时神器,再也不用担心自己的隐私被偷窥啦~(附源码)

热门文章

  1. 继腾讯、阿里、字节、快手等后,京东、有赞又爆大裁员,个别部门比例或达40%!...
  2. 【白皮书分享】2021国有企业数字化转型指数与方法路径白皮书.pdf(附下载链接)...
  3. 通俗易懂!视觉slam第十部分——贝叶斯估计
  4. Could not install packages due to an EnvironmentError: [Errno 13] Permission denied解决办法
  5. Java学习——继承和多态
  6. 【岗位详情】腾讯广告后台开发工程师(北京)
  7. 服务器系统文档分类,服务器操作系统及分类
  8. mysql skip remarks_mysql DatabaseMetaData 获取table remarks为空的解决办法
  9. html图片多tab切换代码,CSS实现Tab页切换实例代码
  10. 吴恩达机器学习(十三)异常检测(高斯分布)