1.概述

通过socketTextStream读取9999端口数据,统计在一定时间内不同类型商品的销售总额度,如果持续销售额度为0,则执行定时器通知老板,是不是卖某种类型商品的员工偷懒了(只做功能演示,根据个人业务来使用,比如统计UV等操作)

2.代码

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject ProcessFuncationScala {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval stream: DataStream[String] = env.socketTextStream("localhost", 9999)val typeAndData: DataStream[(String, String)] = stream.map(x => (x.split(",")(0), x.split(",")(1))).setParallelism(4)typeAndData.keyBy(0).process(new MyprocessFunction()).print("结果")env.execute()}/*** 实现:*    根据key分类,统计每个key进来的数据量,定期统计数量,如果数量为0则预警*/class MyprocessFunction extends  KeyedProcessFunction[Tuple,(String,String),String]{//统计间隔时间val delayTime : Long = 1000 * 10lazy val state : ValueState[(String,Long)] = getRuntimeContext.getState[(String,Long)](new ValueStateDescriptor[(String, Long)]("cjcount",classOf[Tuple2[String,Long]]))override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = {printf("定时器触发,时间为:%d,状态为:%s,key为:%s\n",timestamp,state.value(),ctx.getCurrentKey)if(state.value()._2==0){//该时间段数据为0,进行预警printf("类型为:%s,数据为0,预警\n",state.value()._1)}//定期数据统计完成后,清零state.update(state.value()._1,0)//再次注册定时器执行val currentTime: Long = ctx.timerService().currentProcessingTime()ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), String]#Context, out: Collector[String]): Unit = {printf("状态值:%s,state是否为空:%s\n",state.value(),(state.value()==null))if(state.value() == null){//获取时间val currentTime: Long = ctx.timerService().currentProcessingTime()//注册定时器十秒后触发ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)printf("定时器注册时间:%d\n",currentTime+10000L)state.update(value._1,value._2.toInt)} else{//统计数据val key: String = state.value()._1var count: Long = state.value()._2count += value._2.toInt//更新state值state.update((key,count))}println(getRuntimeContext.getTaskNameWithSubtasks+"->"+value)printf("状态值:%s\n",state.value())//返回处理后结果out.collect("处理后返回数据->"+value)}}}

代码中使用ValueState记录了状态信息,每次来商品都会进行总额度累加;商品第一次进入的时候会注册一个定时器,每隔十秒执行一次,定时器做预警功能,如果十秒内商品销售等于0,我们则进行预警。

3.测试

往端口输入数据,十秒内输入四条数据

帽子,12
帽子,12
鞋,10
鞋,10

通过我们打印我们会发现统计完成,

定时器触发,时间为:1586005420511,状态为:(鞋,20),key为:(鞋)
定时器触发,时间为:1586005421080,状态为:(帽子,24),key为:(帽子)

如果我们十秒内不输入数据,则会提示数据为0,进行预警

定时器触发,时间为:1586005406244,状态为:(帽子,0),key为:(帽子)
类型为:帽子,数据为0,预警
定时器触发,时间为:1586005406244,状态为:(鞋,0),key为:(鞋)
类型为:鞋,数据为0,预警

4.问题

到这里我们已经实现了定期统计功能,但有没有发现,如果帽子分配在task1执行,鞋在task2执行,鞋一天进来1亿条数据,帽子进来1条数据,我们会出现严重的数据倾斜问题。

我们实际看一下具体问题

计算结果我们就先不看了,直接看数据分配问题

三个task阶段 , Socket是单并行的source,我们将并行度改为4

输入数据:1条 帽子,10 ;50条 鞋,10

我们看Map阶段,数据是均衡的,因为这里还没有进行keyby


我们再看keyby后的task

我们发现50条数据都在ID为3的subtask中,出现了严重数据倾斜问题 ,这种问题我们可以进行两阶段keyby解决该问题

【Flink】Flink界面如何查看数据是否倾斜相关推荐

  1. 【Flink】Flink source后全过滤数据导致监控数据为0

    文章目录 1.背景 2.案例 3.小结 4.其他可能的原因 1.背景 是这样的,一个客户的程序,突然运行的时候发现,kafka好像没法消费了. 发现一个问题,flink程序启动了,没报错,然后正常运行 ...

  2. 大数据框架对比:Hadoop、Storm、Samza、Spark和Flink——flink支持SQL,待看

    简介 大数据是收集.整理.处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称.虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性.规模,以及价值在最近几年才 ...

  3. Apache Flink Meetup · 上海站,超强数据湖干货等你!

    简介:Apache Flink x Iceberg Meetup 上海站 你是否有过流批技术栈不统一的抓狂? 你是否有过流批数据对不上的烦恼? 你是否有过,海量数据更新时效性跟不上的无奈? Apach ...

  4. Flink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中

    文章目录 案例:实时处理电商订单信息 需求一:统计商城实时订单实收金额 需求二:将上面的最后计算的结果,存储到 Redis 中(Key 为:totalprice) Redis Sink 自定义 Red ...

  5. mysql查看数据倾斜_深入理解hadoop数据倾斜

    深入理解hadoop之数据倾斜 1.什么是数据倾斜 我们在用map /reduce程序执行时,有时候会发现reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理 ...

  6. Flink流式处理百万数据量CSV文件

    前言 最近公司让做一个'没有必要'的需求 需求针对的对象 这是同一个csv文件的不同展示形式 Excel展示形式 文本展示形式 这个csv文件中可能有数百万条数据 需求 将所有的异常数据检测出来 什么 ...

  7. Flink OLAP 助力 ByteHTAP 亮相数据库顶会 VLDB

    复杂查询 QPS 破百,字节跳动 Flink OLAP 助力 ByteHTAP 亮相数据库顶会 VLDB. 2022 年 9 月 5 日至 9 月 9 日,VLDB 2022 在澳大利亚悉尼举行.字节 ...

  8. 1.30.Flink SQL案例将Kafka数据写入hive

    1.30.Flink SQL案例将Kafka数据写入hive 1.30.1.1.场景,环境,配置准备 1.30.1.2.案例代码 1.30.1.2.1.编写pom.xml文件 1.30.1.2.2.M ...

  9. Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓

    摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...

最新文章

  1. htmlspecialchars() improvements in PHP 5.4
  2. 定了!5G商用牌照近期发放​​​​,透露两大信息(附:2019年5G行业关键材料及市场研究报告)...
  3. 视图解析jstlView支持便捷的国际化功能
  4. 小工匠聊架构-Redis 缓存一致性设计
  5. C++ Primer 5th笔记(chap 16 模板和泛型编程)定义
  6. Linux下导入,导出mysql数据库的命令
  7. 映射文件中增删改查标签中的parameterType和resultType
  8. JDBC基础篇(MYSQL)——自定义JDBCUtil工具类
  9. 中条码一般为四色怎么转单色黑_条码机适用的耗材判断标准
  10. Skype国际版下载方法
  11. 服务器主动向客户端发送信息机制
  12. ue4 项目模板_卡牌游戏项目模板
  13. 关于vue-cli3的浏览器兼容性
  14. 猫狗二分类实战(PyTorch)
  15. java集合框架笔记
  16. 游戏公司招聘建模师的流程分析指导,零基础学习3D建模教程
  17. 三峡大学计算机学院毕业答辩问题,毕业答辩会问点什么问题?
  18. 如何破解安腾校园网客户端(2)
  19. opencv实现两个视频拼接显示
  20. 欢送离职同事聚餐通知

热门文章

  1. 雷军晒十多年前的手机:支持无线充电、内置8GB存储
  2. 壕!腾讯85亿买了个岛:200万平方米巨无霸“新鹅厂”来了
  3. 这又是什么新玩法?华为Mate 30 Pro真机谍照现身:音量键大变样
  4. 《哪吒之魔童降世》观影人次突破1亿大关 为动画电影之最!
  5. 又一款5G手机获进网许可 开售在即 你会尝鲜吗?
  6. 特斯拉联合苹果发难 要对小鹏汽车“窃密”员工动手了...
  7. 明星分手文案火了!为了营销 你们这些商家也是很努力啊...
  8. 程序员放弃阿里60w年薪,选到手5k的公务员,坚信公务员后期完胜程序员
  9. Linux的shell编程(一)
  10. Ubuntu报“xxx is not in the sudoers file.This incident will be reported” 错误解决方法