解决flink、flink-sql去重过程中的热点问题

1、flink-sql解决热点问题

使用Sql去实现一个去重功能,通常会这样实现
SELECT day, COUNT(DISTINCT user_id) FROM T GROUP BY day --sql1
或者
select day,count(*) from( select distinct user_id,day from T ) a group by day --sql2
但是这两种方式都未解决计算热点问题,例如当某一个day 对应的devId 特别大的情况下,那么计算压力都会到该day所在的task,使这个task成为任务的性能瓶颈。

package com.yyds.flink_distinct;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** 去重过程中的热点问题(使用flink sql进行解决)**/
public class _06_DistinctHotpotFlinkSql {public static void main(String[] args) {// 创建表的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 解决热点问题的配置tenv.getConfig().getConfiguration().setString("table.optimizer.distinct-agg.split.enabled", "true");SingleOutputStreamOperator<_06_User> ss1 = env.socketTextStream("hadoop01", 9999).map(new MapFunction<String, _06_User>() {@Overridepublic _06_User map(String line) throws Exception {String[] arr = line.split(",");return new _06_User(arr[0], arr[1]);}});tenv.createTemporaryView("T",ss1);String executeSql = "SELECT `day`, COUNT(DISTINCT user_id) as cnt FROM T  GROUP BY `day`";/*** -- 会转换为这个sqlSELECT day, SUM(cnt)FROM (SELECT day, COUNT(DISTINCT user_id) as cntFROM TGROUP BY day, MOD(HASH_CODE(user_id), 1024))GROUP BY dayMOD(HASH_CODE(user_id), 1024) 表示对取user_id的hashCode然后对1024取余,也就是将user_id划分到1024个桶里面去,那么里层通过对day与桶编号进行去重(cnt)外层只需要对cnt执行sum操作即可,因为分桶操作限制了相同的user_id 一定会在相同的桶里面*/String explainSql = tenv.explainSql(executeSql, ExplainDetail.CHANGELOG_MODE);System.out.println(explainSql);tenv.executeSql(executeSql).print();}
}

2、flink解决热点问题

去重过程中的热点问题(编码实现)

实时计算广告位访客数,流量数据id(广告位ID)、devId(访问ID)、time(访问时间)

实现思路:
• 首先通过对id、设备id分桶编号、小时级别时间分组,使用一个ProcessFunction计算分桶后的去重数(与MapState方式相同)
• 然后通过对id、小时级别时间分组,使用另一个ProcessFunction做sum操作,

  但是这里面需要注意的一个问题是对于相同id与时间其数据可能会来源于上游不同的task,而上游的每个task的数据都会以全量一直往下发送,如果直接做累加操作会导致重复计算,因此得实现一个类似于sql中retract撤回机制,也就是上一个ProcessFunction每发送一条数据都需要先将之前的数据发送一份表示其为撤回。

主程序:

package com.yyds.flink_distinct;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** 去重过程中的热点问题(编码实现)*** 实时计算广告位访客数,流量数据id(广告位ID)、devId(访问ID)、time(访问时间)** 实现思路:* •  首先通过对id、设备id分桶编号、小时级别时间分组,使用一个ProcessFunction计算分桶后的去重数(与MapState方式相同)* •  然后通过对id、小时级别时间分组,使用另一个ProcessFunction做sum操作,***       但是这里面需要注意的一个问题是对于相同id与时间其数据可能会来源于上游不同的task,*          而上游的每个task的数据都会以全量一直往下发送,如果直接做累加操作会导致重复计算,因此得实现一个类似于sql中retract撤回机制,*          也就是上一个ProcessFunction每发送一条数据都需要先将之前的数据发送一份表示其为撤回。**/
public class _07_DistinctHotpot {public static void main(String[] args) throws Exception {// 创建表的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 模拟数据:* 1,001,1000* 1,002,1000* 1,003,1000* 1,004,1000*/// 读取原始数据,转换为javaBeanSingleOutputStreamOperator<_07_AdData> ss1 = env.socketTextStream("hadoop01", 9999).map(new MapFunction<String, _07_AdData>() {@Overridepublic _07_AdData map(String line) throws Exception {String[] arr = line.split(",");return new _07_AdData(Integer.parseInt(arr[0]), arr[1],Long.parseLong( arr[2]));}});//  首先通过对id、设备id分桶编号、小时级别时间分组,使用一个ProcessFunction计算分桶后的去重数(与MapState方式相同)KeyedStream<_07_AdData, _07_AdKey1> keyedStream1 = ss1.keyBy(new KeySelector<_07_AdData, _07_AdKey1>() {@Overridepublic _07_AdKey1 getKey(_07_AdData data) throws Exception {long endTime = TimeWindow.getWindowStartWithOffset(data.getTime(), 0, Time.hours(1).toMilliseconds()) + Time.hours(1).toMilliseconds();return new _07_AdKey1(data.getId(), endTime, data.getDevId().hashCode() % 3);}});SingleOutputStreamOperator<Tuple2<Boolean, Tuple3<Integer, Long, Long>>> processStream1 = keyedStream1.process(new _07_DistinctProcessFunction01());KeyedStream<Tuple2<Boolean, Tuple3<Integer, Long, Long>>, _07_AdKey2> keyedStream2 = processStream1.keyBy(new KeySelector<Tuple2<Boolean, Tuple3<Integer, Long, Long>>, _07_AdKey2>() {@Overridepublic _07_AdKey2 getKey(Tuple2<Boolean, Tuple3<Integer, Long, Long>> tp2) throws Exception {return new _07_AdKey2(tp2.f1.f0, tp2.f1.f1);}});keyedStream2.process(new _07_DistinctProcessFunction02());env.execute("_07_DistinctHotpot");}
}

自定义函数

package com.yyds.flink_distinct;import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class _07_DistinctProcessFunction01 extends KeyedProcessFunction<_07_AdKey1,_07_AdData, Tuple2<Boolean, Tuple3<Integer,Long,Long>>> {// 定义第一个状态MapStateMapState<String,Integer> deviceIdState ;// 定义第二个状态ValueStateValueState<Long> countState ;@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor<String, Integer> deviceIdStateDescriptor = new MapStateDescriptor<>("deviceIdState", String.class, Integer.class);deviceIdState = getRuntimeContext().getMapState(deviceIdStateDescriptor);ValueStateDescriptor<Long> countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class);countState = getRuntimeContext().getState(countStateDescriptor);}@Overridepublic void processElement(_07_AdData adData, Context context, Collector<Tuple2<Boolean, Tuple3<Integer, Long, Long>>> collector) throws Exception {String devId = adData.getDevId();Integer i = deviceIdState.get(devId);if(i == null){i = 0;}int id = context.getCurrentKey().getId();long time = context.getCurrentKey().getTime();long code = context.getCurrentKey().getBucketCode();Long c = countState.value();if(c == null){c = 0L;}//        System.out.println("id = " +  id  + ",time = " +  time +  ",c = " + c +  ",code = " + code);if(  i == 1  ){// 表示已经存在}else {// 表示不存在,放入到状态中deviceIdState.put(devId,1);// 将统计的数据 + 1Long count = c + 1;countState.update(count);System.out.println("id = " +  id  + ",time = " +  time +  ",count = " + count +  ",code = " + code);if(count > 1){// 认为大于1的需要进行撤回System.out.println("========撤回======");collector.collect(Tuple2.of(false,Tuple3.of(id,time,c)));collector.collect(Tuple2.of(true,Tuple3.of(id,time,count)));}else {collector.collect(Tuple2.of(true,Tuple3.of(id,time,count)));}}}
}
package com.yyds.flink_distinct;import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;/*** 重点在于如果收到编码为false 的数据,那么需要从当前计数里面减掉撤回的计数值。*/
public class _07_DistinctProcessFunction02 extends KeyedProcessFunction<_07_AdKey2, Tuple2<Boolean, Tuple3<Integer,Long,Long>>,Void> {// 定义状态ValueStateValueState<Long> countState ;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Long> countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class);countState = getRuntimeContext().getState(countStateDescriptor);}@Overridepublic void processElement(Tuple2<Boolean, Tuple3<Integer, Long, Long>> tp2, Context context, Collector<Void> collector) throws Exception {Long count = countState.value();if(count == null) count = 0L;Boolean bool = tp2.f0;System.out.println(bool);if(bool){countState.update(count + tp2.f1.f2);System.out.println(context.getCurrentKey() + ":" + countState.value());}else {// 发生撤回,那么需要从当前计数里面减掉撤回的计数值。countState.update(count - tp2.f1.f2);System.out.println(context.getCurrentKey() + ":" + countState.value());}}
}

javaBean:

package com.yyds.flink_distinct;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString/*** 原始数据*/
public class _07_AdData {private int id;private String devId;private Long time;
}
package com.yyds.flink_distinct;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString/*** 第一次keyBy的数据*/
public class _07_AdKey1 {private int id;private Long time;private int bucketCode ; // 桶的编码
}
package com.yyds.flink_distinct;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString/*** 第二次keyBy的数据*/
public class _07_AdKey2 {private int id;private Long time;
}

flink去重(二)解决flink、flink-sql去重过程中的热点问题相关推荐

  1. matlab生产计划问题,用MATLAB解决综合生产计划编制过程中的优化问题

    第 18卷第 3期 2005年 6月 常 州 工 学 院 学 报 Journal of Changzhou Institute of Technology Vol. 18 No. 3 Jun. 200 ...

  2. day12_oracle hint——SQL优化过程中常见Oracle中HINT的30个用法

    在SQL语句优化过程中,经常会用到hint, 以下是在SQL优化过程中常见Oracle中"HINT"的30个用法 1. /*+ALL_ROWS*/ 表明对语句块选择基于开销的优化方 ...

  3. 解决Google Drive 大文件下载过程中中途失败问题

    使用Internet Download Manager的断点续传功能进行下载大文件. 解决Google Drive 大文件下载过程中中途失败问题 - 知乎 (zhihu.com)

  4. @以最缓和的方式 解决 K8S 集群搭建过程中遇到的问题

    以最缓和的方式解决 K8S 集群搭建过程中遇到的问题 问题描述 启用 minikube dashboard之后,用 kubectl proxy 命令遇到错误 kubectl proxy error: ...

  5. 解决关于Navicat破解安装过程中出现“rsa public key not find”

    解决关于Navicat破解安装过程中出现"rsa public key not find" 问题描述 解决办法 问题描述 出现"rsa public key not fi ...

  6. 2021年大数据Flink(二):Flink用武之地

    目录 ​​​​​​​Flink用武之地 ​​​​​​​Event-driven Applications[事件驱动] ​​​​​​​Data Analytics Applications[数据分析] ...

  7. SQL执行过程中的性能负载点

    一.SQL执行过程 1.用户连接数据库,执行SQL语句: 2.先在内存进行内存读,找到了所需数据就直接交给用户工作空间: 3.内存读失败,也就说在内存中没找到支持SQL所需数据,就进行物理读,也就是到 ...

  8. 肖哥教你解决安装和运行eNSP过程中遇到的各种问题

    华为ensp 目前不是很稳定,肖哥现将在使用ensp过程中遇到的问题总结如下: 使用ENSP注意事项: ① 建议安装ensp时,安装目录不要出现中文. ② 关闭杀毒软件(尤其是电脑管家)和防火墙. ③ ...

  9. 解决 Metasploit 启动及使用过程中一直出现警告信息的问题

    问题描述 运行 msfconsole,即报如下警告信息: ➜ recon msfconsole /usr/share/metasploit-framework/vendor/bundle/ruby/2 ...

最新文章

  1. Java面试题汇总及答案2021最新(序列化含答案)
  2. Vue打包之后会出现.map文件用处
  3. 第二阶段冲刺--团队站立会议03
  4. IIR+全通滤波器实现相位平衡_matlab仿真
  5. 万字长文带你一文读完Effective C++
  6. React-引领未来的用户界面开发框架-读书笔记(八)
  7. c简单的链表错误及改正
  8. IIS支持下载.config后缀名的文件
  9. 如何设置ListView控件中的列头的颜色!
  10. JAVA + LR实现apache流媒体的性能测试
  11. 【权限设计】如何以“权限”为单位的进行权限设计(二)
  12. 逆向脱壳附加数据处理
  13. 外卖类应用的竞争与趋势
  14. Python自制日常办公辅助工具之:批量视频截图,子集固定尺寸截图+序列化命名
  15. SpringMVC工作原理概述
  16. 十年经验教你如何学习嵌入式
  17. html实现手机截屏,iPhone手机如何实现网页长截图?
  18. 2020测试工具索引
  19. 射击类项目(数据的持久化保存)整理四
  20. 【JSD2209-DAY05】for、while、数组(上)

热门文章

  1. JavaWeb之HelloWord
  2. 测试绝地求生显卡使用率软件,多分辨率《绝地求生:大逃杀》显卡性能测试!...
  3. 一张图看懂SharpCamera
  4. katalon studio录制WebUI自动化脚本
  5. Unity3D简陋版跑酷游戏
  6. 哈勃经典照片:数百万恒星构成宇宙喷泉(一)(图)
  7. 高并发(二)--核心理论
  8. 代理服务器-CCProxy
  9. 百度云管家超级会员过段时间速度几乎为0的解决办法
  10. Windows Server AD域控服务器升级/迁移(AD域控的五大角色转移)