简介

Flink发布监控全流程

入门

使用架构图

特点

能够监控进程内部的信息

规范化的数据模型

所有采集的监控数据均以指标(metric)的形式保存在内置的时间序列数据库当中(TSDB)。所有的样本除了基本的指标名称以外,还包含一组用于描述该样本特征的标签。如下所示:

http_request_status{code='200',content_path='/api/path',environment='produment'} => [value1@timestamp1,value2@timestamp2...] http_request_status{code='200',content_path='/api/path2',environment='produment'} => [value1@timestamp1,value2@timestamp2...]

每一条时间序列由指标名称(Metrics Name)以及一组标签(Labels)唯一标识。每条时间序列按照时间的先后顺序存储一系列的样本值。

  • http_request_status:指标名称(Metrics Name)
  • {code='200',content_path='/api/path',environment='produment'}:表示维度的标签,基于这些Labels我们可以方便地对监控数据进行聚合,过滤,裁剪。
  • [value1@timestamp1,value2@timestamp2...]:按照时间的先后顺序 存储的样本值。

查询语言PromQL

Prometheus内置了一个强大的数据查询语言PromQL。 通过PromQL可以实现对监控数据的查询、聚合。同时PromQL也被应用于数据可视化(如Grafana)以及告警当中。

通过PromQL可以轻松回答类似于以下问题:

  • 在过去一段时间中95%应用延迟时间的分布范围?
  • CPU占用率前5位的服务有哪些?

Prometheus的架构

官网

Prometheus - Monitoring system & time series database

下载地址

Download | Prometheus

安装包

链接:https://pan.baidu.com/s/1pvbFCCLv6XekPk8h6o1nkA 
提取码:yyds 
--来自百度网盘超级会员V4的分享

使用

解压

部署情况

master node1 node2

prometheus

pushgateway

node exporter

node exporter node exporter

修改prometheus.yml

scrape_configs:- job_name: 'prometheus'static_configs:- targets: ['master:9090']# 添加 PushGateway 监控配置- job_name: 'pushgateway'static_configs:- targets: ['master:9091']labels:instance: pushgateway# 添加 Node Exporter 监控配置- job_name: 'node exporter'static_configs:- targets: ['master:9100', 'node1:9100', 'node2:9100']

参数说明

  • job_name:监控作业的名称
  • static_configs:表示静态目标配置,就是固定从某个target拉取数据
  • targets:指定监控的目标,其实就是从哪儿拉取数据。Prometheus会从http://hadoop202:9090/metrics上拉取数据。

Prometheus是可以在运行时自动加载配置的。启动时需要添加:--web.enable-lifecycle

修改配置如图

分发node_exporter

./xsync /home/bigdata/prome/node_exporter-1.2.2.linux-amd64/

启动

时间同步

systemctl stop ntpd
ntpdate 120.24.81.91

启动prometheus

nohup ./prometheus --web.enable-lifecycle --config.file=prometheus.yml > ./prometheus.log 2>&1 &

停止prometheus

curl -X POST http://localhost:9090/-/quit

启动pushgateway

nohup ./pushgateway --web.listen-address=":9091" > ./pushgateway.log 2>&1 &

启动node_exporter(三台机器都启动)

./node_exporter  & 

访问 prometheus的9090端口

点击对应的界面进行查看

使用PromSql

按时间查询

node_arp_entries[5m]

m表示分钟

条件查询

node_arp_entries{device='ens33',instance='node1:9100'}

使用正则表达式

node_arp_entries{device=~'^ens33'}

使用条件

node_arp_entries{device=~'^ens33'}[1m] offset 10m

对于历史数据累加

sum(node_arp_entries{device=~'^ens33'} offset 10m) by(device)

监控Flink

添加配置文件

添加依赖

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-metrics-prometheus_2.12</artifactId><version>1.13.5</version><scope>provided</scope></dependency>

打包插件

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

在resource下面添加配置文件

log4j.properties

monitorInterval=30# This affects logging for both user code and Flink
rootLogger.level = error
rootLogger.appenderRef.file.ref = MainAppender# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file}
appender.main.filePattern = ${sys:log.file}.%i
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategyappender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

flink-conf.yaml

##### 与Prometheus集成配置 #####
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
# PushGateway的主机名与端口号
metrics.reporter.promgateway.host: master
metrics.reporter.promgateway.port: 9091
## Flink metric在前端展示的标签(前缀)与随机后缀
metrics.reporter.promgateway.jobName: flink-metrics-ppg
#如果jobName启动二次,那么第二次的时候会有一个随机的名字
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
#这里表示多久推一次数据
metrics.reporter.promgateway.interval: 15 SECONDS

启动程序的时候修改配置(由于加了<scope>provided</scope>,如果不修改配置那么就会加载不到)

本地测试监控Flink

传入参数

对应的应用程序(本地测试)

public class Demo01App {public static void main(String[] args) throws Exception {//0 调试取本地配置 ,打包部署前要去掉
//         Configuration configuration=new Configuration(); //此行打包部署专用
//        String resPath = Thread.currentThread().getContextClassLoader().getResource("flink-conf.yaml").getPath(); //本地调试专用Configuration configuration = GlobalConfiguration.loadConfiguration("C:\\Users\\zhang\\Desktop");            //本地调试专用//1. 读取初始化环境configuration.setString("metrics.reporter.promgateway.jobName","demo01App");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 2. 指定nc的host和portParameterTool parameterTool = ParameterTool.fromArgs(args);String hostname = parameterTool.get("host");int port = parameterTool.getInt("port");// 3. 接受socket数据源DataStreamSource<String> dataStreamSource = env.socketTextStream(hostname, port);dataStreamSource.print();//appnameenv.execute("demo01App");}
}

测试程序

查看控制台然后可以看到采集过来的数据

发布集群测试监控Flink

先启动yarn

修改linux里面flink的配置

提交运行

./flink run -m node1:34982 -c com.atguigu.prome.app.Demo01App -p 2 ./flink-prome2022-1.1-SNAPSHOT.jar

使用grafana

安装

解压

tar -zxvf grafana-enterprise-8.1.2.linux-amd64.tar.gz

启动

nohup ./bin/grafana-server web > ./grafana.log 2>&1 &

访问

监控Linux

先添加数据源

如果和前一分钟比,它们的时间不在变化那么这个时候说明Flink挂掉了

flink_jobmanager_job_uptime-flink_jobmanager_job_uptime offset 1m

导入数据

得到的效果为

监控Flink

修改监控Flink配置文件的问题

原因:是pushgateway不会主动的清理数据,监控面板的判断有误,如果我们改成现在和过去一分钟的数据进行减法如果等于零,也就是没有数据更新的时候改成complete

原始值

absent(flink_jobmanager_job_uptime{job_name="$JobName", job=~"$JobManager", job_id=~"$JobId", instance_id="$InstanceId"} > 0)

修改后的值为

absent(flink_jobmanager_job_uptime{job_name="$JobName", job=~"$JobManager", job_id=~"$JobId", instance_id="$InstanceId"} - flink_jobmanager_job_uptime{job_name="$JobName", job=~"$JobManager", job_id=~"$JobId", instance_id="$InstanceId"} offset 1m > 0)

当程序停止以后可以看到

添加心跳图

因为只有图表才能发送报警

配置查询参数

flink_jobmanager_job_uptime - flink_jobmanager_job_uptime offset 1m

效果图

Flink监控埋点

示例程序

public class Demo01App {public static void main(String[] args) throws Exception {//0 调试取本地配置 ,打包部署前要去掉
//         Configuration configuration=new Configuration(); //此行打包部署专用String resPath = Thread.currentThread().getContextClassLoader().getResource("flink-conf.yaml").getPath(); //本地调试专用Configuration configuration = GlobalConfiguration.loadConfiguration("C:\\Users\\zhang\\Desktop");            //本地调试专用//1. 读取初始化环境configuration.setString("metrics.r+eporter.promgateway.jobName","demo01App");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 2. 指定nc的host和portParameterTool parameterTool = ParameterTool.fromArgs(args);String hostname = parameterTool.get("host");int port = parameterTool.getInt("port");// 3. 接受socket数据源DataStreamSource<String> dataStreamSource = env.socketTextStream(hostname, port);dataStreamSource.keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String s) throws Exception {return s;}}).process(new ProcessFunction<String, String>() {Counter counter=null;@Overridepublic void open(Configuration parameters) throws Exception {//TODO 申明埋点counter = getRuntimeContext().getMetricGroup().addGroup("mycount").counter("mycountTest");}@Overridepublic void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {// TODO 对于埋点的数据进行累加counter.inc();collector.collect(s);}}).print();//appnameenv.execute("demo01App");}
}

使用Prometheus得到指标

http://master:9091/metrics

上图可以看到自定义的指标收集到了

窗口最大值,求缓存命中率

思想就是10分钟一个窗口,求出窗口的最大值,和上一个窗口进行减法然后就是10分钟的增量

自定义得到的数据

使用grafana展示自定义指标

添加图表,把查询Prometheus的查询得到的数据到grafana进行展示

保存以后得到图标

pushGetWay定期清理过期数据

由于pushGetWay在任务挂掉一会不会自动清理掉数据,它是由最新的数据覆盖久数据的形式,如果任务挂了以后,那么就没有新的数据进行覆盖了,这个时候就会有数据的残留,我们得进行处理

总结

pushGetWay不会自动的删除过期的数据,Promethus默认保存15天的数据,自己会对每一次拉去过来的数据加上一个时间戳

Flink(Pometheus监控)相关推荐

  1. Flink教程(27)- Flink Metrics监控

    文章目录 01 引言 02 Metrics概述 2.1 Metrics介绍 2.2 Metrics 类型 03 WebUI监控 3.1 自定义监控指标 04 REST API监控 4.1 http请求 ...

  2. 【Flink】Flink 系统监控指标

    文章目录 1.美图 2.概述 3.大类 2.具体某个节点 1.美图 2.概述 Flink Metrics指任务在flink集群中运行过程中的各项指标,包括机器系统指标:Hostname,CPU,Mem ...

  3. 大数据监控平台-Prometheus监控Hadoop3.1.3

    简介 本篇主要是使用jmx配合Prometheus监控大数据平台 前提 链接:https://pan.baidu.com/s/1c6nsjOKw4-a_Wqr82l0QhQ  提取码:yyds  -- ...

  4. 实时监控:基于流计算 Oceanus ( Flink ) 实现系统和应用级实时监控

    作者:吴云涛,腾讯 CSIG 高级工程师 本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其 App 应用的 CPU和内存等资源消耗数据,以短信 ...

  5. 自带的jvm监控不准_如何实时监控 Flink 集群和作业?

    Flink 相关的组件和作业的稳定性通常是比较关键的,所以得需要对它们进行监控,如果有异常,则需要及时告警通知.本章先会教会教会大家如何利用现有 Flink UI 上面的信息去发现和排查问题,会指明一 ...

  6. flink监控prometheus/influxdb + grafana企业实战

    本文档为实时计算相关的监控系统的整体说明,记录监控系统相关的部分细节. 监控大盘如图: 功能点 对Flink集群(服务器)进行监控 对源头数据源与目的数据源进行监控 对指标作业进行监控 告警 方案对比 ...

  7. 基于Apache Flink的爱奇艺实时计算平台建设实践

    导读:随着大数据的快速发展,行业大数据服务越来越重要.同时,对大数据实时计算的要求也越来越高.今天会和大家分享下爱奇艺基于Apache Flink的实时计算平台建设实践. 今天的介绍会围绕下面三点展开 ...

  8. 特来电监控引擎流计算应用实践

    随着云计算的深入落地,大数据技术有了坚实的底层支撑,不断向前发展并日趋成熟,无论是传统企业还是互联网公司,都不再满足于离线批处理计算,而是更倾向于应用实时流计算,要想在残酷的企业竞争中立于不败之地,企 ...

  9. Flink 学习(一)

    摘自Flink官网https://flink.apache.org/ 最近看到公司有Flink平台,正好做过storm和spark streaming上的业务,借着这个机会把flink也学了.正好比较 ...

最新文章

  1. 使用PHP处理POST上传时$_FILES数组为何为空
  2. 64 大小_32位和64位Windows系统差别在哪里
  3. 如何防止XshellPortable、putty、SecureCRT等断网造成Linux命令中断
  4. 【算法习作】荷兰国旗问题
  5. Service Mesh所应对的8项挑战 1
  6. oracle 执行带参数的sql语句_当用EXECUTE IMMEDIATE执行SQL语句中的参数个数也是动态的?用什么方法实现?...
  7. 【渝粤教育】国家开放大学2018年春季 8625-21T老年心理健康 参考试题
  8. Linux 命令操作手册
  9. osg+ActiveQT嵌入ie64位
  10. 20130418代码
  11. 洛谷P3261 [JLOI2015]城池攻占(左偏树)
  12. 《编译与反编译技术实战》——1.2 词法分析生成器LEX
  13. java笔试试题含答案_Java笔试题带答案
  14. Redis客户端工具安装
  15. ALC5640-VB-CGT简介
  16. win10前置耳机插孔没声音_win10录屏没声音解决方法「系统天地」
  17. 华为无线路由器信道怎么测试软件,路由器无线信道是什么怎么设置
  18. 这个网站做数据科学家的FB 但除了社交还做实事
  19. matlab从excel第二行写,matlab从某行读excel
  20. canvas实现简单进度条

热门文章

  1. 所有毕业生的论文都要查重吗?
  2. python多子图坐标轴共享
  3. Mybatis实现分页功能
  4. pandas美国人口分析实例
  5. 关于一款开源远程控制软件(gh0st)的源码分析(一)
  6. VPS云服务器搭建FTP并连接
  7. Qt udp组播Qt udp组播
  8. Cesium中自定义材质material
  9. 《Java SE实战指南》15-04:接口和抽象类的区别
  10. MMGG测评 感动全球的链上自走棋——HeroesEmpires游戏拆解