上一篇我们使用keyby后发现数据严重倾斜

https://datamining.blog.csdn.net/article/details/105316728

大概看下问题所在,大量数据在一个subtask中运行

这里我们使用两阶段keyby 解决该问题

之前的问题如下图所示

我们期望的是

但我们的需要根据key进行聚合统计,那么把相同的key放在不同的subtask如何统计?

我们看下图(只画了主要部分)

1.首先将key打散,我们加入将key转化为 key-随机数 ,保证数据散列

2.对打散后的数据进行聚合统计,这时我们会得到数据比如 : (key1-12,1),(key1-13,19),(key1-1,20),(key2-123,11),(key2-123,10)

3.将散列key还原成我们之前传入的key,这时我们的到数据是聚合统计后的结果,不是最初的原数据

4.二次keyby进行结果统计,输出到addSink

直接看实现代码

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorobject ProcessFunctionScalaV2 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(2000)val stream: DataStream[String] = env.socketTextStream("localhost", 9999)val typeAndData: DataStream[(String, Long)] = stream.map(x => (x.split(",")(0), x.split(",")(1).toLong))val dataStream: DataStream[(String, Long)] = typeAndData.map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new CountAggregate())keyByAgg.print("第一次keyby输出")val result: DataStream[DataJast] = keyByAgg.map(data => {val newKey: String = data.key.substring(0, data.key.indexOf("-"))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())result.print("第二次keyby输出")env.execute()}case class DataJast(key :String,count:Long)//计算keyby后,每个Window中的数据总和class CountAggregate extends AggregateFunction[(String, Long),DataJast, DataJast] {override def createAccumulator(): DataJast = {println("初始化")DataJast(null,0)}override def add(value: (String, Long), accumulator: DataJast): DataJast = {if(accumulator.key==null){printf("第一次加载,key:%s,value:%d\n",value._1,value._2)DataJast(value._1,value._2)}else{printf("数据累加,key:%s,value:%d\n",value._1,accumulator.count+value._2)DataJast(value._1,accumulator.count + value._2)}}override def getResult(accumulator: DataJast): DataJast = {println("返回结果:"+accumulator)accumulator}override def merge(a: DataJast, b: DataJast): DataJast = {DataJast(a.key,a.count+b.count)}}/*** 实现:*    根据key分类,统计每个key进来的数据量,定期统计数量*/class MyProcessFunction extends  KeyedProcessFunction[String,DataJast,DataJast]{val delayTime : Long = 1000L * 30lazy val valueState:ValueState[Long] = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("ccount",classOf[Long]))override def processElement(value: DataJast, ctx: KeyedProcessFunction[String, DataJast, DataJast]#Context, out: Collector[DataJast]): Unit = {if(valueState.value()==0){valueState.update(value.count)printf("运行task:%s,第一次初始化数量:%s\n",getRuntimeContext.getIndexOfThisSubtask,value.count)val currentTime: Long = ctx.timerService().currentProcessingTime()//注册定时器ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}else{valueState.update(valueState.value()+value.count)printf("运行task:%s,更新统计结果:%s\n" ,getRuntimeContext.getIndexOfThisSubtask,valueState.value())}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, DataJast, DataJast]#OnTimerContext, out: Collector[DataJast]): Unit = {//定时器执行,可加入业务操作printf("运行task:%s,触发定时器,30秒内数据一共,key:%s,value:%s\n",getRuntimeContext.getIndexOfThisSubtask,ctx.getCurrentKey,valueState.value())//定时统计完成,初始化统计数据valueState.update(0)//注册定时器val currentTime: Long = ctx.timerService().currentProcessingTime()ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}}}

对key进行散列

 val dataStream: DataStream[(String, Long)] = typeAndData.map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))

设置窗口滚动时间,每隔十秒统计一次每隔key下的数据总量

 val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new AverageAggregate())keyByAgg.print("第一次keyby输出")

还原key,并进行二次keyby,对数据总量进行累加

  val result: DataStream[DataJast] = keyByAgg.map(data => {val newKey: String = data.key.substring(0, data.key.indexOf("-"))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())

我们看下优化后的状态

先看下第一map,直接从端口拿数据,这不涉及keyby,所以这个没影响

再看下第一次keyby后的结果,因为我们散列后,flink根据哈希进行分配,所以数据不是百分之百平均,但是很明显基本上已经均衡了,不会出现这里1一条,那里1条这种状况

再看下第二次keyby,这里会发现我们ID的2的subtask有820条数据,其他的没有数据;这里是正常现象,因为我们是对第一次聚合后的数据进行keyby统计,所以这里的数据大小会非常小,比如我们原始数据一条数据有1M大小,1000条数据就1个G,业务往往还有其他操作,我们再第一次keyby 散列时处理其他逻辑(比如ETL等等操作),最终将统计结果输出给第二次keyby,很可能1个G的数据,最终只有1kb,这比我们将1个G的数据放在一个subtask中处理好很多。

上面我们自定义了MyProcessFunction方法,设置每30秒执行一次,实际业务场景,我们可能会设置一小时执行一次。

至此我们既保证了数据定时统计,也保证了数据不倾斜问题。

Flink keyby 数据倾斜问题处理相关推荐

  1. Flink实战(九十三):数据倾斜(二)keyby 窗口数据倾斜的优化

    在大数据处理领域,数据倾斜是一个非常常见的问题,今天我们就简单讲讲在flink中如何处理流式数据倾斜问题. 我们先来看一个可能产生数据倾斜的sql. select TUMBLE_END(proc_ti ...

  2. Spark - 数据倾斜实战之 skewness 偏度与 kurtosis 峰度 By ChatGPT4

    目录 一.引言 二.峰度 Skewness 简介 三.峰度 kurtosis 简介 四.Skewness 偏度与 kurtosis 峰度实现 1.Spark 实现 2.自定义实现 五.偏度.峰度绘图 ...

  3. Spark 调优之数据倾斜

    什么是数据倾斜? Spark 的计算抽象如下 数据倾斜指的是:并行处理的数据集中,某一部分(如 Spark 或 Kafka 的一个 Partition)的数据显著多于其它部分,从而使得该部分的处理速度 ...

  4. sqoop数据倾斜_北京卓越讯通大数据岗位面试题分享

    北京卓越讯通面试题 学长1 1)笔试 (1)JAVA支持的数据类型有哪些?什么是自动拆装箱? (2)AtomicInteger和Volatile等线程安全操作的关键字的理解个使用 (3)创建线程有几种 ...

  5. 大表与大表join数据倾斜_技术分享|大数据技术初探之Spark数据倾斜调优

    侯亚南 数据技术处 支宸啸 数据技术处 在大数据计算中,我们可能会遇到一个很棘手的问题--数据倾斜,此时spark任务的性能会比预期要差很多:绝大多数task都很快执行完成,但个别task执行极慢或者 ...

  6. 大数据常见问题:数据倾斜

    offer收割系列介绍: 1.分享桥哥本人或小伙伴在面试大厂时遇到的真题,并给出参考答案!!如果能帮到大家,点赞.收藏.评论是对我最大的支持!! 2.涉及岗位:主要为大数据开发.数据仓库(桥哥干过的) ...

  7. 【Flink】flink keyby 在 subtask 中分配不均的研究

    1.概述 转载:flink keyby 在 subtask 中分配不均的研究 最近在做大数据量的实时数据迁移, 频繁使用到了keyby hash去均衡数据, 但是却发现subtask执行的数据量不是很 ...

  8. 大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例

    大数据技术之_19_Spark学习_07 第1章 Spark 性能优化 1.1 调优基本原则 1.1.1 基本概念和原则 1.1.2 性能监控方式 1.1.3 调优要点 1.2 数据倾斜优化 1.2. ...

  9. Spark性能优化--如何解决数据倾斜

    1 Data Skew 数据倾斜 1.1 数据倾斜概念 对Hive.Spark.Flink等大数据计算框架来讲,数据量大并不可怕,可怕的是数据倾斜. 数据倾斜是指并行处理的数据集中某一部分的数据显著多 ...

最新文章

  1. PyTorch | (3)Tensor及其基本操作
  2. HDU_1075 What Are You Talking About(Trie 树)
  3. Ubuntu_Win10双系统互换注意事项以及蓝屏解决方案
  4. LYNC2013部署系列PART10:后端高可用部署
  5. cad线性标注命令_CAD线性标注如何使用的
  6. mysql 人名用什么类型_如何选择合适的MySQL数据类型
  7. 80-10-010-原理-Java NIO-简介
  8. linux配置命令route,linux路由配置命令route学习
  9. Dell R410 BIOS 升级方法
  10. python播放音频文件——playsound
  11. c语言以顺序结构存储的二叉树的非递归遍历,一种二叉树非递归遍历算法的C语言实现...
  12. c语言单位换算转换程序,c语言时间换算(c语言时间换算过n秒)
  13. python人脸识别毕业设计-毕业论文:基于树莓派的人脸识别门禁系统本科毕业设计文章...
  14. linux系统文件名颜色含义
  15. 得知大熊哥最后一天在岗位工作今天离开有感而发
  16. win10如何使用低版本的IE浏览器?
  17. HashMap的四种同步方式
  18. 【年度总结】继往开来:回首不靠谱的2021,希冀靠谱的2022
  19. 交叉引用跳转不到后面_参考文献如何正确标注引用而不会变红?
  20. WOL远程开机,实际落地成功。

热门文章

  1. 遍历对象属性_细说JS遍历对象属性的N种方法
  2. php new static 效率,PHP中new static()与new self()的比较
  3. gdal java api_Java使用GDAL库
  4. python爬虫登录网站_python爬虫19 | 遇到需要的登录的网站怎么办?用这3招轻松搞定!...
  5. python redis模块connectionerror_ConnectionError:Error 2连接到Python/Django Redis中的unix套接字...
  6. 【学习记录】macOS的Redis安装及基本使用
  7. java事件绑定,Java编程GUI中的事件绑定代码示例
  8. 苹果双系统运行oracle失败,oracle 11gR2 RAC for linux x86_64 grid运行root.sh 失败问题处理...
  9. 车站信号计算机联锁系统英语,车站信号计算机联锁-复习题
  10. 2017电大c语言考试时间,2017年电大 《c语言程序设计》a课程考核说明.doc