Flink 从入门到精通 系列文章

前言

虽然笔者之前写过基于Prometheus PushGateway搭建Flink监控的过程,但是在我们的生产环境中,使用的是InfluxDB。InfluxDB是一个由Go语言写成的、由InfluxData部分开源的时序数据库,能够非常好地处理监控指标的存储和查询,配合Grafana即可简单地实现Flink作业metrics的收集与展示。本文简述配置过程及一些小问题。

硬件参数

新版InfluxDB的集群版是收费的,但是单点也足够我们存储较长时间的监控数据了。

  • CPU:Intel E5 v4 12C/24T

  • 内存:96GB

  • 硬盘:500GB SSD * 2

  • 网络:10Gbps

  • 操作系统:CentOS 7.5 64-bit

  • InfluxDB 1.8

  • Grafana 6.7.4

安装与配置InfluxDB

先下载RPM包,再用yum localinstall安装,可以自动解决依赖关系。

wget https://dl.influxdata.com/influxdb/releases/influxdb-1.8.0.x86_64.rpmyum -y localinstall influxdb-1.8.0.x86_64.rpm

安装完毕后,配置文件位于/etc/influxdb/influxdb.conf。具体配置项可参见官方文档,有一些需要注意的,列举如下。

  • 元数据存储目录

[meta]dir = "/data1/influxdb/meta"
  • 时序数据和write-ahead log存储目录InfluxDB采用LSM Tree改良而来的TSM存储引擎,所以WAL、compaction等机制它都有。建议两种数据分盘存储,提高读写效率。

[data]dir = "/data2/influxdb/data"wal-dir = "/data1/influxdb/wal"
  • 并发及慢查询设置

    写入超时write-timeout默认是10s,当数据量很大时可能比较紧张,可以改大点。

[coordinator]write-timeout = "20s"max-concurrent-queries = 0query-timeout = "60s"log-queries-after = "30s"
  • 保留策略设置

[retention]enabled = truecheck-interval = "60m"
  • HTTP设置HTTP日志没有太大必要,可以关掉。

[http]enabled = truebind-address = ":8086"auth-enabled = falselog-enabled = false

启动InfluxDB并建库

根据官方文档的说明,如果Linux使用的init系统是systemd,并且以服务方式启动InfluxDB(即service influxdb start),那么所有日志会固定打进/var/log/messages里,使用journalctl可以查看。但是这样不太方便,所以我们后台启动InfluxDB,并将日志做重定向,即:

nohup influxd -config /etc/influxdb/influxdb.conf > /var/log/influxdb/influxd.log 2>&1 &

还可以对上述日志文件用logrotate做切割,不再赘述。

然后进入InfluxDB的Shell。默认没有用户名和密码,HTTP端口为8086。

~ influxConnected to http://localhost:8086 version 1.8.0InfluxDB shell version: 1.8.0>

创建Flink监控指标的数据库。

> CREATE DATABASE flink_metrics;> SHOW DATABASES;name: databasesname----_internalflink_metrics

InfluxDB自动生成的保留策略(retention policy)是保留所有历史数据。我们可以创建新的保留策略,使监控数据自动过期,防止硬盘爆掉。以下就在flink_metrics库上创建了一周的保留策略,并自动设为默认。

> CREATE RETENTION POLICY "one_week" ON "flink_metrics" DURATION 168h REPLICATION 1 DEFAULT;>> SHOW RETENTION POLICIES ON "flink_metrics";name duration shardGroupDuration replicaN default---- -------- ------------------ -------- -------autogen 0s 168h0m0s 1 falseone_week 168h0m0s 24h0m0s 1 true

配置Flink Metrics Reporter

将$FLINK_HOME/opt下的flink-metrics-influxdb-.jar拷贝到$FLINK_HOME/lib目录,并且在flink-conf.yaml中添加如下配置。

metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host: bd-flink-mon-001metrics.reporter.influxdb.port: 8086metrics.reporter.influxdb.db: flink_metrics

启动Flink on YARN作业,稍等片刻,就可以看到该库下产生了许多measurement——即等同于数据库中的表。InfluxDB没有显式建表的语句,执行INSERT语句时会自动建表。

> USE flink_metrics;Using database flink_metrics> SHOW MEASUREMENTS;name: measurementsname----jobmanager_Status_JVM_CPU_Loadjobmanager_Status_JVM_CPU_Timejobmanager_Status_JVM_ClassLoader_ClassesLoadedjobmanager_Status_JVM_ClassLoader_ClassesUnloadedjobmanager_Status_JVM_GarbageCollector_ConcurrentMarkSweep_Countjobmanager_Status_JVM_GarbageCollector_ConcurrentMarkSweep_Timejobmanager_Status_JVM_GarbageCollector_ParNew_Countjobmanager_Status_JVM_GarbageCollector_ParNew_Timejobmanager_Status_JVM_Memory_Direct_Countjobmanager_Status_JVM_Memory_Direct_MemoryUsedjobmanager_Status_JVM_Memory_Direct_TotalCapacityjobmanager_Status_JVM_Memory_Heap_Committedjobmanager_Status_JVM_Memory_Heap_Maxjobmanager_Status_JVM_Memory_Heap_Usedjobmanager_Status_JVM_Memory_Mapped_Countjobmanager_Status_JVM_Memory_Mapped_MemoryUsedjobmanager_Status_JVM_Memory_Mapped_TotalCapacityjobmanager_Status_JVM_Memory_NonHeap_Committedjobmanager_Status_JVM_Memory_NonHeap_Maxjobmanager_Status_JVM_Memory_NonHeap_Usedjobmanager_Status_JVM_Threads_Countjobmanager_job_downtimejobmanager_job_fullRestarts......

查询一下试试。注意InfluxDB中的一行数据称为一个point,point又包含time(时间戳)、tag(有索引字段)、field(无索引的值)。

> SELECT * FROM "taskmanager_job_task_operator_heartbeat-rate" LIMIT 1;name: taskmanager_job_task_operator_heartbeat-ratetime host job_id job_name operator_id operator_name subtask_index task_attempt_id task_attempt_num task_id task_name tm_id value---- ---- ------ -------- ----------- ------------- ------------- --------------- ---------------- ------- --------- ----- -----1592324240887000000 ths-bigdata-flink-worker043 b23bec2afe87a3b4fa7e930824a8dff4 com.sht.bigdata.clickstream.job.AnalyticsAndOrderLogExtractor bff97a3c8e9f03115fa1e7908e04df21 Source: source_kafka_ms_order_done 6 52c07162c4344d43898dfd3be6d77ac3 0 bff97a3c8e9f03115fa1e7908e04df21 Source: source_kafka_ms_order_done -> order_flatMap_log_record container_e08_1589127619440_0062_01_000002 0

time字段默认是以Unix时间戳显示的,如果想要可读的时间字符串,执行PRECISION rfc3339语句即可。

另外有一个小问题需要注意:

如果Flink的版本<=1.9,Flink报告的监控指标中有NaN和正负无穷,InfluxDB无法handle这些,就会在TaskManager日志中打印出大量报警信息,非常吵闹,详情可见FLINK-12579。解决方法也简单,就是找到Flink源码中flink-metrics-influxdb项目的POM文件,手动将influxdb-java依赖项的版本改高(如改成2.17),重新打包并替换掉$FLINK_HOME/lib目录下的同名文件。

安装启动Grafana

wget https://dl.grafana.com/oss/release/grafana-6.7.4-1.x86_64.rpmyum -y localinstall grafana-6.7.4-1.x86_64.rpmservice grafana-server start

浏览器访问3000端口就行了。

添加InfluxDB数据源

点击Configuration -> Data Sources -> Add data source添加InfluxDB数据源,截图如下。

Flink Metrics Dashboard示例

点击Create -> Dashboard -> Settings -> Variables,先添加两个变量:一是作业名称,二是TaskManager的ID,这两个字段经常用来分组。

说个小tip,如果不想让不同时期启动的相同作业监控数据发生混淆,可以在指定Flink作业的名称时,加上一些其他的东西(如该作业的Maven profile名称以及启动时间)进行区分。

public static String getJobName(Class<?> clazz, Properties props) {return StringUtils.join(Arrays.asList(clazz.getCanonicalName,new LocalDateTime.toString("yyyyMMddHHmmss"),props.getProperty("profile.id")), '_');}

再举个栗子,以折线图按Source分组展示端到端延迟:

注意,端到端延迟的tag只有murmur hash过的算子ID(用uid()方法设定的),并没有算子名称,并且官方暂时不打算解决这个问题(见FLINK-8592),所以我们只能曲线救国,要么用最大值来表示,要么将作业中Sink算子的ID统一化。

基于 Apache Flink 的实时监控告警系统

关于数据中台的深度思考与总结(干干货)

日志收集Agent,阴暗潮湿的地底世界

2020 继续踏踏实实的做好自己

你点的每个赞,我都当成了喜欢

influxdb tsm文件_利用InfluxDB+Grafana搭建Flink on YARN作业监控大屏相关推荐

  1. Flink从入门到精通100篇(三)-如何利用InfluxDB+Grafana搭建Flink on YARN作业监控大屏环境

    前言 虽然博主之前写过基于Prometheus PushGateway搭建Flink监控的过程,但是在我们的生产环境中,使用的是InfluxDB.InfluxDB是一个由Go语言写成的.由Influx ...

  2. 【云驻共创】华为云之手把手教你搭建IoT物联网应用充电桩实时监控大屏

    文章目录 前言 1.什么是充电桩 2.什么是IOT 3.什么是端.边.云.应用协同 4.什么是Astro轻应用 一.玩转lOT动态实时大屏(线下实际操作) 1.Astro轻应用说明 1.1 场景说明 ...

  3. 数据可视化:利用Python和Echarts制作“用户消费行为分析”可视化大屏

    数据可视化:利用Python和Echarts制作"用户消费行为分析"可视化大屏 前言 实验目的: 准备工作: 一.创建项目: 二.建立数据库连接获取数据: 三.页面布局: 四.下载 ...

  4. Serverless 开发,基于 IoT物联网 + 表格存储DB + DataV 搭建实时环境监控大屏

    今天给大家带来基于阿里云 IoT 物联网平台 + Tablestore 表格存储数据库 + DataV大屏 三大云产品组合搭建实时环境监控大屏的开发实战. 少啰嗦,先看效果. 部署后效果    1.技 ...

  5. influxdb tsm文件_Influxdb中的Compaction操作

    Influxdb中的Compaction操作 Compaction概述 Influxdb的存储引擎使用了TSM文件结构,这其实也是在LSM-Tree基础针对时序特点作了改进,因此其与LSM-Tree类 ...

  6. influxdb tsm文件_Influxdb中TSM文件结构解析之读写TSM

    TSM文件组成概述每个TSM文件由4部分组成,源码里给出了文件结构,我们在这里搬过来 Header, Blocks, Index, Footer ┌────────┬───────────────── ...

  7. python 私有云_利用Docker+NextCloud搭建私有云盘

    利用Docker+NextCloud搭建私有云盘 NextCloud 是一款开源网络硬盘系统,最新版本是15,NextCloud 源代码完全开放,你可以在开源许可协议的约束下免费使用,对于需要专业支持 ...

  8. github项目怎么运行_利用 GitHub 从零开始搭建一个博客

    "NightTeam",一个值得加星标的公众号. 趁着周末,搭建了一下 NightTeam 的官方博客和官方主页,耗时数个小时,两个站点终于完工了. 由于 NightTeam 的域 ...

  9. 钉钉一行代码_利用Python快速搭建钉钉和邮件数据推送系统

    前面的文章我们写到了利用Python实现钉钉和邮件的数据推送,在数据处理这一块实现了对mysql和odps的数据获取和处理,可以满足常规业务大部分数据场景需求,在一家初创公司数据基础建设还不完善的时候 ...

最新文章

  1. python免费自学网站-python免费么
  2. WEB接口测试之Jmeter接口测试自动化 (四)(持续构建)
  3. (王道408考研操作系统)第三章内存管理-第二节4:页面分配策略
  4. 【LeetCode】【HOT】394. 字符串解码(栈)
  5. Redis学习总结(17)——Redis 持久化和过期机制复习
  6. maven常用的中央仓库
  7. Python入门--二重循环中的continue和break
  8. 使用 ftrace 调试 Linux 内核,第 1 部分【转】
  9. Map集合常见面试题
  10. 1338: 不及格率
  11. html5 3d动画效果代码,精选9款迷人的HTML5 3D动画效果及源码
  12. [VB.NET]雪花飘的屏保
  13. 安庆集团-冲刺日志(第一天)
  14. 关于kindle使用的文章
  15. nginx禁止某些指定的浏览器标识来爬我们网站
  16. JavaWeb: Tomcat优化
  17. 股票分析及利用tushare查看股票部分信息
  18. 关于雷达隐身的一些事
  19. voc数据集格式转换为coco数据集格式+修改xml格式文件
  20. 基于Servlet WebSocket MySQL实现的网络在线考试系统

热门文章

  1. TCP/IP记一次关于IP地址和MAC物理地址的思考
  2. shell-变量的数值计算
  3. 新版本来袭:Apache Spark 1.5新特性介绍
  4. Python命令行参数学习
  5. NodeJS API Process全局对象
  6. Oracle通过SSL方式连接AD服务器
  7. Activity全屏问题
  8. wayos利用easyradius实现WEB认证页面的记住密码及到期提醒功能
  9. 简单的Windows资源管理器——Java版本
  10. 鹅厂2020暑期实习第二次一面