前言

使用 flink 很长一段时间了,突然发现竟然没有计算过 topN,这可是 flink 常见的计算场景了, 故自己想了一个场景来计算一下。
基于 Flink 1.12

场景

外卖员听单的信息会发到单独一个 topic 中,计算一个每天有多少个 外卖员听单以及总共的听单次数。

kafka 中消息类型

{"locTime":"2020-12-28 12:32:23","courierId":12,"other":"aaa"}

locTime:事件发生的时间,courierId 外卖员id

计算一天中 听单次数 top5 的外卖员

代码

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topics, new SimpleStringSchema(), properties);FlinkHelp.setOffset(parameter, consumer);consumer.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {String locTime = "";try {Map<String, Object> map = Json2Others.json2map(element);locTime = map.get("locTime").toString();} catch (IOException e) {}LocalDateTime startDateTime =LocalDateTime.parse(locTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));long milli = startDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();return milli;}}).withIdleness(Duration.ofSeconds(1)));SingleOutputStreamOperator<CourierListenInfos> process = env.addSource(consumer).filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return true;}}).keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String value) throws Exception {Map<String, Object> map = Json2Others.json2map(value);String courierId = map.get("courierId").toString();String day = map.get("locTime").toString().split(" ")[0].replace("-", "");return day + "-" + courierId;}}).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).allowedLateness(Time.minutes(1))
//              .trigger(CountTrigger.of(5))// 其实多个 trigger 就是下一个 trigger 覆盖上一个 trigger.trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))//追历史数据的时候会有问题
//              .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))//处理完毕后将 window state 中的数据清除掉.evictor(TimeEvictor.of(Time.seconds(0), true)).process(new ProcessWindowFunction<String, CourierListenInfos, String, TimeWindow>() {private JedisCluster jedisCluster;private ReducingStateDescriptor<Long> reducingStateDescriptor;private ReducingState<Long> listenCount;@Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.hours(25))//default,不支持 eventTime 1.12.0.setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime).cleanupInRocksdbCompactFilter(1000).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();reducingStateDescriptor =new ReducingStateDescriptor<Long>("listenCount", new Sum(), TypeInformation.of(Long.class));reducingStateDescriptor.enableTimeToLive(ttlConfig);listenCount = getRuntimeContext().getReducingState(reducingStateDescriptor);jedisCluster = RedisUtil.getJedisCluster(redisHp);}@Overridepublic void close() throws Exception {RedisUtil.closeConn(jedisCluster);}@Overridepublic void process(String s, Context context, Iterable<String> elements, Collector<CourierListenInfos> out) throws Exception {Iterator<String> iterator = elements.iterator();long l = context.currentProcessingTime();long watermark = context.currentWatermark();TimeWindow window = context.window();String endDay = DateUtils.millisecondsToDateStr(window.getEnd(), "yyyyMMdd HH:mm:ss");String startDay = DateUtils.millisecondsToDateStr(window.getStart(), "yyyyMMdd HH:mm:ss");System.out.println("currentProcessingTime:" + l + " watermark:" + watermark + " windowTime:" + startDay + "-" + endDay);while (iterator.hasNext()) {iterator.next();listenCount.add(1L);}iterator = elements.iterator();Map<String, Object> map = Json2Others.json2map(iterator.next());String courierId = map.get("courierId").toString();String day = map.get("locTime").toString().split(" ")[0].replace("-", "");out.collect(new CourierListenInfos(day, courierId, listenCount.get()));}});process.keyBy(new KeySelector<CourierListenInfos, String>() {@Overridepublic String getKey(CourierListenInfos value) throws Exception {return value.getDay();}}).process(new KeyedProcessFunction<String, CourierListenInfos, String>() {private JedisCluster jedisCluster;private MapStateDescriptor<String, Long> mapStateCountDescriptor;private MapState<String, Long> courierInfoCountMapState;private boolean mucalc = false;@Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.hours(25))//default,不支持 eventTime 1.12.0.setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime).cleanupInRocksdbCompactFilter(1000).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();mapStateCountDescriptor =new MapStateDescriptor<String, Long>("courierInfoCountMapState", TypeInformation.of(String.class), TypeInformation.of(Long.class));mapStateCountDescriptor.enableTimeToLive(ttlConfig);courierInfoCountMapState = getRuntimeContext().getMapState(mapStateCountDescriptor);jedisCluster = RedisUtil.getJedisCluster(redisHp);}@Overridepublic void close() throws Exception {RedisUtil.closeConn(jedisCluster);}@Overridepublic void processElement(CourierListenInfos value, Context ctx, Collector<String> out) throws Exception {courierInfoCountMapState.put(value.getDay() + "#" + value.getCourierId(), value.getListenCount());
//              System.out.println("ctx.timerService().currentWatermark() = " + DateUtils.millisecondsToDateStr(ctx.timerService().currentWatermark(), "yyyyMMdd HH:mm:ss"));
//              System.out.println("ctx.timestamp() = " + DateUtils.millisecondsToDateStr(ctx.timestamp(), "yyyyMMdd HH:mm:ss"));ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() / 1000 + 1000);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {String day = ctx.getCurrentKey();PriorityQueue<CourierListenInfos> courierListenInfos = new PriorityQueue<>(new Comparator<CourierListenInfos>() {@Overridepublic int compare(CourierListenInfos o1, CourierListenInfos o2) {return (int) (o1.listenCount - o2.listenCount);}});Iterable<Map.Entry<String, Long>> entries = courierInfoCountMapState.entries();for (Map.Entry<String, Long> entry : entries) {//                  System.out.println("entry.getKey() " + entry.getKey());String[] split = entry.getKey().split("#", -1);courierListenInfos.offer(new CourierListenInfos(split[0], split[1], entry.getValue()));if (courierListenInfos.size() > 5) {courierListenInfos.poll();}}courierInfoCountMapState.clear();String tops = "";int size = courierListenInfos.size();for (int i = 0; i < size; i++) {CourierListenInfos courierListenInfos1 = courierListenInfos.poll();System.out.println("courierListenInfos1 " + courierListenInfos1);courierInfoCountMapState.put(courierListenInfos1.getDay() + "#" + courierListenInfos1.getCourierId(), courierListenInfos1.listenCount);tops = tops + courierListenInfos1.courierId + "#" + courierListenInfos1.listenCount;if (i != size - 1) {tops += ",";}}
//              System.out.println("courierListenInfos.poll() = " + tops);jedisCluster.hset("test_courier_tops", day + "-top5", tops);System.out.println("============");}}).setParallelism(1);

结果样例

‘20201227-top5’:‘1#1111,2#2222,3#3333’
‘20201227-top5’:‘1#1111,2#2222,3#3333’

Flink 计算 TopN相关推荐

  1. flink计算交通事故概率

    目录 flink计算交通事故概率 数据模型 总结 flink计算交通事故概率 要计算交通事故概率,我们需要有一些数据作为输入,包括交通违法记录.车辆信息.天气信息.道路信息等.为了简化问题,我们以一个 ...

  2. Flink计算pv和uv的通用方法

    关注公众号:大数据技术派,回复"资料",领取`1024G`资料. Flink系列教程 PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次. UV ...

  3. 大型分布式系统监控平台(六)-- 第一个flink应用topN

    聊的不止技术.跟着小帅写代码,还原和技术大牛一对一真实对话,剖析真实项目筑成的一砖一瓦,了解最新最及时的资讯信息,还可以学到日常撩妹小技巧哦,让我们开始探索主人公小帅的职场生涯吧! (PS:本系列文章 ...

  4. SQL 分组计算 topN

    文章目录 在线运行SQL 建表: 分组 topN row_number() union all 自关联 在线运行SQL 首先安利这款免费在线 SQL 运行平台 sql fiddle: 建表: crea ...

  5. Flink SQL 功能解密系列 —— 流式 TopN 挑战与实现

    TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜.流式的 TopN 不同于批处理的 TopN,它的特点是持续的在内存中按照某个统计指标(如出现次数)计算 TopN 排行榜,然后当排行榜 ...

  6. 实战 | flink sql 实时 TopN

    实战 | flink sql 实时 TopN 1.背景篇 2.难点剖析篇-此类指标建设.保障的难点 2.1.数据建设 2.2.数据保障 2.3.数据服务保障 3.数据建设篇-具体实现方案详述 3.1. ...

  7. 日均百亿级日志处理:微博基于Flink的实时计算平台建设

    来自:DBAplus社群 作者介绍 吕永卫,微博广告资深数据开发工程师,实时数据项目组负责人. 黄鹏,微博广告实时数据开发工程师,负责法拉第实验平台数据开发.实时数据关联平台.实时算法特征数据计算.实 ...

  8. 基于实时计算Flink版的场景解决方案demo

    简介:通过两个demo分享技术实时计算flink版的解决方案 本文整理自阿里云智能行业解决方案专家GIN的直播分享 直播链接:https://developer.aliyun.com/learning ...

  9. 基于实时计算(Flink)与高斯模型构建实时异常检测系统

    案例与解决方案汇总页: 阿里云实时计算产品案例&解决方案汇总 1. 概述 异常检测(anomaly detection)指的是对不符合预期模式或数据集(英语:dataset)中其他项目的项目. ...

最新文章

  1. python怎么画形状_python – matplotlib – 如何绘制随机导向的矩形(或任何形状)?...
  2. python 异常和弹出框
  3. Nginx+keepalived从入门到集群搭建(手把手教学,建议收藏)
  4. 关于CI的服务器与最佳实践,这里有一些思考 1
  5. 在 SAP Kyma 上使用 Redis 服务
  6. 干货:产品经理怎么做才能在需求评审中少挨打?
  7. Panabit安装配置笔记
  8. 如何用java写单链表_如何使用Java实现单链表?
  9. nlp基础—7.隐马尔可夫模型(HMM算法)
  10. java基于ssm数据库原理及应用题库管理系统
  11. HG5520A型多用表校准仪
  12. 感量越大抑制频率约低_EDA365:开关电源 LC 滤波器设计
  13. python 字节码_32.12. dis — Python 字节码反汇编器 — Python 2.7.18 文档
  14. RROR: [XSIM 43-3238] Failed to link the design.
  15. TeXstudio编译提示缺少.sty文件
  16. (干货)各大AI竞赛 Top 解决方案开源汇总+大牛经验(Kaggle,Ali,Tencent、JD、KDD Cup...)
  17. 【NI Multisim 14.0 操作实例——音量控制电路】
  18. 触摸屏登录的几种方式
  19. WebBrowser或IE物件分析網頁表格/自動登入網頁
  20. XMPP - Socket 实现创建聊天室

热门文章

  1. ik分词器-添加新的词汇和停止一些词汇
  2. TCP/UDP 端口及部分端口的作用
  3. oracle删除重复数据-百万级别数据以上情况
  4. 线上服务器内存飙升怎么排查?
  5. 从0部署Tekton之Tekton安装
  6. Linux 系统设置 : dmesg 命令详解
  7. C字符串操作strlen/strnlen_s详解
  8. 奇数页 偶数页 页脚不一致怎么处理
  9. 【SQLite】C++链接SQLite数据库
  10. matlab控制理论学习