Flink keyby 数据倾斜问题处理
上一篇我们使用keyby后发现数据严重倾斜
https://datamining.blog.csdn.net/article/details/105316728
大概看下问题所在,大量数据在一个subtask中运行
这里我们使用两阶段keyby 解决该问题
之前的问题如下图所示
我们期望的是
但我们的需要根据key进行聚合统计,那么把相同的key放在不同的subtask如何统计?
我们看下图(只画了主要部分)
1.首先将key打散,我们加入将key转化为 key-随机数 ,保证数据散列
2.对打散后的数据进行聚合统计,这时我们会得到数据比如 : (key1-12,1),(key1-13,19),(key1-1,20),(key2-123,11),(key2-123,10)
3.将散列key还原成我们之前传入的key,这时我们的到数据是聚合统计后的结果,不是最初的原数据
4.二次keyby进行结果统计,输出到addSink
直接看实现代码
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorobject ProcessFunctionScalaV2 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(2000)val stream: DataStream[String] = env.socketTextStream("localhost", 9999)val typeAndData: DataStream[(String, Long)] = stream.map(x => (x.split(",")(0), x.split(",")(1).toLong))val dataStream: DataStream[(String, Long)] = typeAndData.map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new CountAggregate())keyByAgg.print("第一次keyby输出")val result: DataStream[DataJast] = keyByAgg.map(data => {val newKey: String = data.key.substring(0, data.key.indexOf("-"))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())result.print("第二次keyby输出")env.execute()}case class DataJast(key :String,count:Long)//计算keyby后,每个Window中的数据总和class CountAggregate extends AggregateFunction[(String, Long),DataJast, DataJast] {override def createAccumulator(): DataJast = {println("初始化")DataJast(null,0)}override def add(value: (String, Long), accumulator: DataJast): DataJast = {if(accumulator.key==null){printf("第一次加载,key:%s,value:%d\n",value._1,value._2)DataJast(value._1,value._2)}else{printf("数据累加,key:%s,value:%d\n",value._1,accumulator.count+value._2)DataJast(value._1,accumulator.count + value._2)}}override def getResult(accumulator: DataJast): DataJast = {println("返回结果:"+accumulator)accumulator}override def merge(a: DataJast, b: DataJast): DataJast = {DataJast(a.key,a.count+b.count)}}/*** 实现:* 根据key分类,统计每个key进来的数据量,定期统计数量*/class MyProcessFunction extends KeyedProcessFunction[String,DataJast,DataJast]{val delayTime : Long = 1000L * 30lazy val valueState:ValueState[Long] = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("ccount",classOf[Long]))override def processElement(value: DataJast, ctx: KeyedProcessFunction[String, DataJast, DataJast]#Context, out: Collector[DataJast]): Unit = {if(valueState.value()==0){valueState.update(value.count)printf("运行task:%s,第一次初始化数量:%s\n",getRuntimeContext.getIndexOfThisSubtask,value.count)val currentTime: Long = ctx.timerService().currentProcessingTime()//注册定时器ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}else{valueState.update(valueState.value()+value.count)printf("运行task:%s,更新统计结果:%s\n" ,getRuntimeContext.getIndexOfThisSubtask,valueState.value())}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, DataJast, DataJast]#OnTimerContext, out: Collector[DataJast]): Unit = {//定时器执行,可加入业务操作printf("运行task:%s,触发定时器,30秒内数据一共,key:%s,value:%s\n",getRuntimeContext.getIndexOfThisSubtask,ctx.getCurrentKey,valueState.value())//定时统计完成,初始化统计数据valueState.update(0)//注册定时器val currentTime: Long = ctx.timerService().currentProcessingTime()ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}}}
对key进行散列
val dataStream: DataStream[(String, Long)] = typeAndData.map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))
设置窗口滚动时间,每隔十秒统计一次每隔key下的数据总量
val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new AverageAggregate())keyByAgg.print("第一次keyby输出")
还原key,并进行二次keyby,对数据总量进行累加
val result: DataStream[DataJast] = keyByAgg.map(data => {val newKey: String = data.key.substring(0, data.key.indexOf("-"))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())
我们看下优化后的状态
先看下第一map,直接从端口拿数据,这不涉及keyby,所以这个没影响
再看下第一次keyby后的结果,因为我们散列后,flink根据哈希进行分配,所以数据不是百分之百平均,但是很明显基本上已经均衡了,不会出现这里1一条,那里1条这种状况
再看下第二次keyby,这里会发现我们ID的2的subtask有820条数据,其他的没有数据;这里是正常现象,因为我们是对第一次聚合后的数据进行keyby统计,所以这里的数据大小会非常小,比如我们原始数据一条数据有1M大小,1000条数据就1个G,业务往往还有其他操作,我们再第一次keyby 散列时处理其他逻辑(比如ETL等等操作),最终将统计结果输出给第二次keyby,很可能1个G的数据,最终只有1kb,这比我们将1个G的数据放在一个subtask中处理好很多。
上面我们自定义了MyProcessFunction方法,设置每30秒执行一次,实际业务场景,我们可能会设置一小时执行一次。
至此我们既保证了数据定时统计,也保证了数据不倾斜问题。
Flink keyby 数据倾斜问题处理相关推荐
- Flink实战(九十三):数据倾斜(二)keyby 窗口数据倾斜的优化
在大数据处理领域,数据倾斜是一个非常常见的问题,今天我们就简单讲讲在flink中如何处理流式数据倾斜问题. 我们先来看一个可能产生数据倾斜的sql. select TUMBLE_END(proc_ti ...
- Spark - 数据倾斜实战之 skewness 偏度与 kurtosis 峰度 By ChatGPT4
目录 一.引言 二.峰度 Skewness 简介 三.峰度 kurtosis 简介 四.Skewness 偏度与 kurtosis 峰度实现 1.Spark 实现 2.自定义实现 五.偏度.峰度绘图 ...
- Spark 调优之数据倾斜
什么是数据倾斜? Spark 的计算抽象如下 数据倾斜指的是:并行处理的数据集中,某一部分(如 Spark 或 Kafka 的一个 Partition)的数据显著多于其它部分,从而使得该部分的处理速度 ...
- sqoop数据倾斜_北京卓越讯通大数据岗位面试题分享
北京卓越讯通面试题 学长1 1)笔试 (1)JAVA支持的数据类型有哪些?什么是自动拆装箱? (2)AtomicInteger和Volatile等线程安全操作的关键字的理解个使用 (3)创建线程有几种 ...
- 大表与大表join数据倾斜_技术分享|大数据技术初探之Spark数据倾斜调优
侯亚南 数据技术处 支宸啸 数据技术处 在大数据计算中,我们可能会遇到一个很棘手的问题--数据倾斜,此时spark任务的性能会比预期要差很多:绝大多数task都很快执行完成,但个别task执行极慢或者 ...
- 大数据常见问题:数据倾斜
offer收割系列介绍: 1.分享桥哥本人或小伙伴在面试大厂时遇到的真题,并给出参考答案!!如果能帮到大家,点赞.收藏.评论是对我最大的支持!! 2.涉及岗位:主要为大数据开发.数据仓库(桥哥干过的) ...
- 【Flink】flink keyby 在 subtask 中分配不均的研究
1.概述 转载:flink keyby 在 subtask 中分配不均的研究 最近在做大数据量的实时数据迁移, 频繁使用到了keyby hash去均衡数据, 但是却发现subtask执行的数据量不是很 ...
- 大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例
大数据技术之_19_Spark学习_07 第1章 Spark 性能优化 1.1 调优基本原则 1.1.1 基本概念和原则 1.1.2 性能监控方式 1.1.3 调优要点 1.2 数据倾斜优化 1.2. ...
- Spark性能优化--如何解决数据倾斜
1 Data Skew 数据倾斜 1.1 数据倾斜概念 对Hive.Spark.Flink等大数据计算框架来讲,数据量大并不可怕,可怕的是数据倾斜. 数据倾斜是指并行处理的数据集中某一部分的数据显著多 ...
最新文章
- PyTorch | (3)Tensor及其基本操作
- HDU_1075 What Are You Talking About(Trie 树)
- Ubuntu_Win10双系统互换注意事项以及蓝屏解决方案
- LYNC2013部署系列PART10:后端高可用部署
- cad线性标注命令_CAD线性标注如何使用的
- mysql 人名用什么类型_如何选择合适的MySQL数据类型
- 80-10-010-原理-Java NIO-简介
- linux配置命令route,linux路由配置命令route学习
- Dell R410 BIOS 升级方法
- python播放音频文件——playsound
- c语言以顺序结构存储的二叉树的非递归遍历,一种二叉树非递归遍历算法的C语言实现...
- c语言单位换算转换程序,c语言时间换算(c语言时间换算过n秒)
- python人脸识别毕业设计-毕业论文:基于树莓派的人脸识别门禁系统本科毕业设计文章...
- linux系统文件名颜色含义
- 得知大熊哥最后一天在岗位工作今天离开有感而发
- win10如何使用低版本的IE浏览器?
- HashMap的四种同步方式
- 【年度总结】继往开来:回首不靠谱的2021,希冀靠谱的2022
- 交叉引用跳转不到后面_参考文献如何正确标注引用而不会变红?
- WOL远程开机,实际落地成功。
热门文章
- 遍历对象属性_细说JS遍历对象属性的N种方法
- php new static 效率,PHP中new static()与new self()的比较
- gdal java api_Java使用GDAL库
- python爬虫登录网站_python爬虫19 | 遇到需要的登录的网站怎么办?用这3招轻松搞定!...
- python redis模块connectionerror_ConnectionError:Error 2连接到Python/Django Redis中的unix套接字...
- 【学习记录】macOS的Redis安装及基本使用
- java事件绑定,Java编程GUI中的事件绑定代码示例
- 苹果双系统运行oracle失败,oracle 11gR2 RAC for linux x86_64 grid运行root.sh 失败问题处理...
- 车站信号计算机联锁系统英语,车站信号计算机联锁-复习题
- 2017电大c语言考试时间,2017年电大 《c语言程序设计》a课程考核说明.doc