25.Flink监控
25.1.什么是Metrics
25.2.Metrics分类
25.2.1.Metric Types
25.2.2.代码
25.2.3.操作
26.Flink性能优化
26.1.复用对象
26.2.数据倾斜
26.3.异步IO
26.4.合理调整并行度
27.Flink内存管理
28.Spark VS Flink
28.1.应用场景
28.2.API
28.3.核心角色/流程原理
28.3.1.spark
28.3.2.Flink
28.4.时间机制
28.5.容错机制
28.6.窗口
28.7.整合Kafka
28.8.其他的
28.9.单独补充:流式计算实现原理
28.10.单独补充:背压/反压
28.10.1.back pressure

25.Flink监控

https://blog.lovedata.net/8156c1e1.html

25.1.什么是Metrics

由于集群运行后很难发现内部的实际状况,跑得慢或快,是否异常等,开发人员无法实时查看所有的Task日志,比如作业很大或者有很多作业的情况下,该如何处理?此时Metrics可以很好的帮助开发人员了解作业的当前状态。

Flink提供的Metrics可以在Flink内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。

25.2.Metrics分类

25.2.1.Metric Types

1.常用的如Counter, 写过mapreduce作业的开发人员就应该很熟悉Counter, 其实含义都是一样的,就是对一个计数器进行累加,即对于多条数据和多兆数据一直往上加的过程。
2.Gauga,Gauge是最简单的Metrics,它反映一个值。比如要看现在Java heap内存用了多少,就可以每次实时的暴露一个Gauge, Gauge当前的值就是heap使用的量。
3.Meter, Meter是指统计吞吐量和单位时间内发生”事件”的次数。它相当于求一种速率,即事件次数除以使用的时间。
4.Histogram,Histogram比较复杂,也并不常用,Histogram用于统计一些数据的分布,比如说Quantile、Mean、StdDev、Max、Min等。

Metric在Flink内部有多层结构,以Group的方式组织,它并不是一个扁平化的结构,Metric Group + Metric Name是Metrics的唯一标识。

25.2.2.代码

package day6;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author tuzuoquan* @date 2022/6/22 9:26*/
public class MetricsDemo {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.sourceDataStream<String> lines = env.socketTextStream("node1", 9999);//TODO 2.transformationSingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] arr = value.split(" ");for (String word : arr) {out.collect(word);}}});SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new RichMapFunction<String, Tuple2<String, Integer>>() {//用来记录map处理了多少个单词Counter myCounter;//对Counter进行初始化@Overridepublic void open(Configuration parameters) throws Exception {myCounter = getRuntimeContext().getMetricGroup().addGroup("myGroup").counter("myCounter");}//处理单词,将单词记为(单词,1)@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {myCounter.inc();//计数器+1return Tuple2.of(value, 1);}});SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);//TODO 3.sinkresult.print();//TODO 4.executeenv.execute();}}

运行命令:

// /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
// /export/server/flink/bin/flink run --class cn.xxx.metrics.MetricsDemo /root/metrics.jar
// 查看WebUI

25.2.3.操作

1.打包
2.提交到Yarn上运行
3.查看监控指标

4.也可以通过浏览器f12的找到url发送请求获取监控信息

5.也可以通过代码发送请求获取监控信息

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;public class MetricsTest {public static void main(String[] args) {//String result = sendGet("http://node1:8088/proxy/application_1609508087977_0010/jobs/558a5a3016661f1d732228330ebfaad5/vertices/cbc357ccb763df2852fee8c4fc7d55f2/metrics?get=0.Map.myGroup.myCounter");String result = sendGet("http://node1:8088/proxy/application_1609508087977_0010/jobs/558a5a3016661f1d732228330ebfaad5");System.out.println(result);}public static String sendGet(String url) {String result = "";BufferedReader in = null;try {String urlNameString = url;URL realUrl = new URL(urlNameString);URLConnection connection = realUrl.openConnection();// 设置通用的请求属性connection.setRequestProperty("accept", "*/*");connection.setRequestProperty("connection", "Keep-Alive");connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");// 建立实际的连接connection.connect();in = new BufferedReader(new InputStreamReader(connection.getInputStream()));String line;while ((line = in.readLine()) != null) {result += line;}} catch (Exception e) {System.out.println("发送GET请求出现异常!" + e);e.printStackTrace();}// 使用finally块来关闭输入流finally {try {if (in != null) {in.close();}} catch (Exception e2) {e2.printStackTrace();}}return result;}}

26.Flink性能优化

26.1.复用对象

stream.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {@Overridepublic void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {long changesCount = ...// A new Tuple instance is created on every executioncollector.collect(new Tuple2<>(userName, changesCount));}}

上面的代码可以优化为下面的代码:

可以避免Tuple2的重复创建

stream.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {// Create an instance that we will reuse on every callprivate Tuple2<String, Long> result = new Tuple<>();@Overridepublic void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {long changesCount = ...// Set fields on an existing object instead of creating a new oneresult.f0 = userName;// Auto-boxing!! A new Long value may be createdresult.f1 = changesCount;// Reuse the same Tuple2 objectcollector.collect(result);}
}

26.2.数据倾斜

rebalance

自定义分区器

key+随机前后缀

26.3.异步IO

26.4.合理调整并行度

数据过滤之后可以减少并行度

数据合并之后再处理之前可以增加并行度

大量小文件写入到HDFS可以减少并行度
1.ds.writeAsText(“data/output/result1”).setParallelism(1);
2.env.setParallelism(1);
3.提交任务时webUI或命令行参数 flink run -p 10
4.配置文件flink-conf.yaml parallelism.default: 1


更多的优化在后面的项目中结合业务来讲解

27.Flink内存管理

1.减少full gc时间:因为所有常用数据都在Memory Manager里,这部分内存的生命周期是伴随TaskManager管理的而不会被GC回收。其他的常用数据对象都是用户定义的数据对象,这部分会快速的被GC回收。
2.减少OOM:所有的运行的内存应用都从池化的内存中获取,而且运行时的算法可以在内存不足的时候将数据写到堆外内存。
3.节约空间:由于Flink自定序列化/反序列化的方法,所有的对象都以二进制的形式存储,降低消耗。
4.高效的二进制操作和缓存友好:二进制数据以定义好的格式存储,可以高效地比较与操作。另外,该二进制形式可以把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构可以对CPU高效缓存更友好,可以从CPU的L1/L2/L3缓存获取性能的提升,也就是Flink的数据存储二进制格式符合CPU缓存的标准,非常方便被CPU的L1/L2/L3各级别缓存利用,比内存还要快!

28.Spark VS Flink

28.1.应用场景

Spark:主要用作离线批处理 , 对延迟要求不高的实时处理(微批) ,DataFrame和DataSetAPI 也支持 “流批一体”
Flink:主要用作实时处理 ,注意Flink1.12开始支持真正的流批一体

28.2.API

Spark : RDD(不推荐) /DSteam(不推荐)/DataFrame和DataSet
Flink : DataSet(1.12软弃用) 和 DataStream /Table&SQL(快速发展中)

28.3.核心角色/流程原理

28.3.1.spark


28.3.2.Flink





28.4.时间机制

Spark: SparkStreaming只支持处理时间 StructuredStreaming开始支持事件时间
Flink: 直接支持事件时间 / 处理时间 /摄入时间

28.5.容错机制

Spark : 缓存/持久化 +Checkpoint(应用级别) StructuredStreaming中的Checkpoint也开始借鉴Flink使用Chandy-Lamport algorithm分布式快照算法

Flink: State + Checkpoint(Operator级别) + 自动重启策略 + Savepoint

28.6.窗口

Spark中的支持基于时间/数量的滑动/滚动 要求windowDuration和slideDuration必须是batchDuration的倍数

Flink中的窗口机制更加灵活/功能更多

支持基于时间/数量的滑动/滚动 和 会话窗口

28.7.整合Kafka

SparkStreaming整合Kafka: 支持offset自动维护/手动维护 , 支持动态分区检测 无需配置

Flink整合Kafka: 支持offset自动维护/手动维护(一般自动由Checkpoint维护即可) , 支持动态分区检测 需要配置

//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
props.setProperty("flink.partition-discovery.interval-millis","5000");

28.8.其他的

源码编程语言

Flink的高级功能 : Flink CEP可以实现 实时风控…

28.9.单独补充:流式计算实现原理

Spark :
SparkStreaming: 微批
StructuredStreaming: 微批(连续处理在实验中)

Flink : 是真真正正的流式处理, 只不过对于低延迟和高吞吐做了平衡
早期就确定了后续的方向:基于事件的流式数据处理框架!

env.setBufferTimeout - 默认100ms
taskmanager.memory.segment-size - 默认32KB

28.10.单独补充:背压/反压

28.10.1.back pressure

Spark: PIDRateEsimator ,PID算法实现一个速率评估器(统计DAG调度时间,任务处理时间,数据条数等, 得出一个消息处理最大速率, 进而调整根据offset从kafka消费消息的速率)。

Flink: 基于credit – based 流控机制,在应用层模拟 TCP 的流控机制(上游发送数据给下游之前会先进行通信,告诉下游要发送的blockSize, 那么下游就可以准备相应的buffer来接收, 如果准备ok则返回一个credit凭证,上游收到凭证就发送数据, 如果没有准备ok,则不返回credit,上游等待下一次通信返回credit)

阻塞占比在 web 上划分了三个等级:
OK: 0 <= Ratio <= 0.10,表示状态良好;
LOW: 0.10 < Ratio <= 0.5,表示有待观察;
HIGH: 0.5 < Ratio <= 1,表示要处理了(增加并行度/subTask/检查是否有数据倾斜/增加内存…)。

例如,0.01,代表着100次中有一次阻塞在内部调用

25.Flink监控\什么是Metrics\Metrics分类\Flink性能优化的方法\合理调整并行度\合理调整并行度\Flink内存管理\Spark VS Flink\时间机制\容错机制等相关推荐

  1. 第一章-Flink介绍-《Fink原理、实战与性能优化》读书笔记

    Flink介绍-<Fink原理.实战与性能优化>读书笔记 1.1 Apache Flink是什么? 在当代数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产生的数据应该如 ...

  2. Flink教程(29)- Flink内存管理

    文章目录 01 引言 02 Flink内存管理 2.1 Flink内存划分 2.2 Flink堆外内存 2.3 序列化与反序列化 2.4 操纵二进制数据 2.5 注意 03 文末 01 引言 在前面的 ...

  3. flink分析使用之八内存管理机制

    一.flink内存机制 在前面的槽机制上,提到了内存的共享,这篇文章就分析一下,在Flink中对内存的管理.在Flink中,内存被抽象出来,形成了一套自己的管理机制.Flink本身基本是以Java语言 ...

  4. 上:Spark VS Flink – 下一代大数据计算引擎之争,谁主沉浮?

    作者简介 王海涛,曾经在微软的 SQL Server和大数据平台组工作多年.带领团队建立了微软对内的 Spark 服务,主打 Spark Streaming.去年加入阿里实时计算部门,参与改进阿里基于 ...

  5. 一口气搞懂「Flink Metrics」监控指标和性能优化,全靠这33张图和7千字(建议收藏)

    前言 大家好,我是土哥. 最近在公司做 Flink 推理任务的性能测试,要对 job 的全链路吞吐.全链路时延.吞吐时延指标进行监控和调优,其中要使用 Flink Metrics 对指标进行监控. 接 ...

  6. Java服务器性能监控(一) Metrics

    引言 对于后台服务而言,我们除了需要保证其每个功能正常工作,我们还需要了解服务的运行情况,包括机器的物理性能(线程数,文件句柄数,内存占用大小,GC时间等)以及业务性能(关键流程的通过率.QPS以及响 ...

  7. jp@gc - PerfMon Metrics Collector:服务器性能监测控件

    1.Jmeter插件下载  安装安装下面三个插件 ,就可以像Loadrunner一样监控服务器CPU.内存等性能参数 下载客户端插件JMeterPlugins-Standard 和 JMeterPlu ...

  8. Flink教程(28)- Flink性能优化

    文章目录 01 引言 02 History Server 03 序列化 04 复用对象 05 数据倾斜 06 总结 01 引言 在前面的博客,我们学习了Flink的Metrics监控了,有兴趣的同学可 ...

  9. Flink核心篇,四大基石、容错机制、广播、反压、序列化、内存管理、资源管理...

    Flink基础篇,基本概念.设计理念.架构模型.编程模型.常用算子 大纲: 1.Flink的四大基石包含哪些? 2.讲一下Flink的Time概念? 3.介绍下Flink窗口,以及划分机制? 4.介绍 ...

最新文章

  1. 又一个强大的PHP5.3依赖注入容器
  2. 获取标签的src属性兼容性
  3. 黑马12期 day73-EasyUI笔记(2017年7月11日16:23:45)
  4. 2018明星学术公众号TOP 10重磅发布,PaperWeekly再度入选
  5. MySQL 的日语认证有了,中文呢?
  6. C# 9.0 正式发布了(C# 9.0 on the record)
  7. Red5 修改RTMP监听端口和ip
  8. eclipse编写wordcount提交spark运行
  9. 各种Java加密算法
  10. QML笔记-TextEdit的使用
  11. Web服务器Nginx多方位优化策略
  12. android 序列化传参数,Android序列化之Parcelable和Serializable的使用详解
  13. 高并发模拟( 测试 )
  14. charset参数 sqluldr2_SQLULDR2
  15. 德标螺纹规格对照表_螺栓螺母德标、欧标、国标对照表
  16. 用友t 的服务器找不到系统管理,用友T+找不到账套了怎么办
  17. 基于multisim的fm调制解调_基于SDR的FM调制与解调器的实现
  18. 网页分享至Facebook,Twitter,LinkedIn,WhatsApp,邮箱总结
  19. linux系统中 为mysql还原数据库_linux中mysql还原数据库命令
  20. eregi php 5.2,PHP5.3x不再支持ereg和eregi

热门文章

  1. [网络基本概念] Router / Bridge
  2. JavaScript获取元素
  3. 织梦插件教程新老版本通用织梦插件大全
  4. 一点、两点、三点透视投影的python3实现-计算机图形学
  5. php替换正文中的汉字,php如何实现替换汉字
  6. 来,吃了许嵩这颗毒药
  7. python对投标_有关招标投标签订合同的说法,正确的是(      ) 。
  8. Pytorch 查看模型参数
  9. sql怎么给字段备注
  10. 【论文精读1】基于BN的模型剪枝-Learning Efficient Convolution Networks through Network Slimming