作者:zhisheng

cloud.tencent.com/developer/article/1558372

阿里的双11销量大屏可以说是一道特殊的风景线。实时大屏(real-time dashboard)正在被越来越多的企业采用,用来及时呈现关键的数据指标。并且在实际操作中,肯定也不会仅仅计算一两个维度。由于Flink的“真·流式计算”这一特点,它比Spark Streaming要更适合大屏应用。本文从笔者的实际工作经验抽象出简单的模型,并简要叙述计算流程(当然大部分都是源码)。

前言

数据格式与接入

简化的子订单消息体如下。

{"userId": 234567,"orderId": 2902306918400,"subOrderId": 2902306918401,"siteId": 10219,"siteName": "site_blabla","cityId": 101,"cityName": "北京市","warehouseId": 636,"merchandiseId": 187699,"price": 299,"quantity": 2,"orderStatus": 1,"isNewOrder": 0,"timestamp": 1572963672217
}

由于订单可能会包含多种商品,故会被拆分成子订单来表示,每条JSON消息表示一个子订单。现在要按照自然日来统计以下指标,并以1秒的刷新频率呈现在大屏上:

  • 每个站点(站点ID即siteId)的总订单数、子订单数、销量与GMV;

  • 当前销量排名前N的商品(商品ID即merchandiseId)与它们的销量。

由于大屏的最大诉求是实时性,等待迟到数据显然不太现实,因此我们采用处理时间作为时间特征,并以1分钟的频率做checkpointing。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);

然后订阅Kafka的订单消息作为数据源。

    Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties");DataStream<String> sourceStream = env.addSource(new FlinkKafkaConsumer011<>(ORDER_EXT_TOPIC_NAME,                        // topicnew SimpleStringSchema(),                    // deserializerconsumerProps                                // consumer properties)).setParallelism(PARTITION_COUNT).name("source_kafka_" + ORDER_EXT_TOPIC_NAME).uid("source_kafka_" + ORDER_EXT_TOPIC_NAME);

给带状态的算子设定算子ID(通过调用uid()方法)是个好习惯,能够保证Flink应用从保存点重启时能够正确恢复状态现场。为了尽量稳妥,Flink官方也建议为每个算子都显式地设定ID,参考:https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job

接下来将JSON数据转化为POJO,JSON框架采用FastJSON。

    DataStream<SubOrderDetail> orderStream = sourceStream.map(message -> JSON.parseObject(message, SubOrderDetail.class)).name("map_sub_order_detail").uid("map_sub_order_detail");

JSON已经是预先处理好的标准化格式,所以POJO类SubOrderDetail的写法可以通过Lombok极大地简化。如果JSON的字段有不规范的,那么就需要手写Getter和Setter,并用@JSONField注解来指明。

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class SubOrderDetail implements Serializable {private static final long serialVersionUID = 1L;private long userId;private long orderId;private long subOrderId;private long siteId;private String siteName;private long cityId;private String cityName;private long warehouseId;private long merchandiseId;private long price;private long quantity;private int orderStatus;private int isNewOrder;private long timestamp;
}

统计站点指标

将子订单流按站点ID分组,开1天的滚动窗口,并同时设定ContinuousProcessingTimeTrigger触发器,以1秒周期触发计算。注意处理时间的时区问题,这是老生常谈了。

    WindowedStream<SubOrderDetail, Tuple, TimeWindow> siteDayWindowStream = orderStream.keyBy("siteId").window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))).trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)));

接下来写个聚合函数。

    DataStream<OrderAccumulator> siteAggStream = siteDayWindowStream.aggregate(new OrderAndGmvAggregateFunc()).name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv");
  public static final class OrderAndGmvAggregateFuncimplements AggregateFunction<SubOrderDetail, OrderAccumulator, OrderAccumulator> {private static final long serialVersionUID = 1L;@Overridepublic OrderAccumulator createAccumulator() {return new OrderAccumulator();}@Overridepublic OrderAccumulator add(SubOrderDetail record, OrderAccumulator acc) {if (acc.getSiteId() == 0) {acc.setSiteId(record.getSiteId());acc.setSiteName(record.getSiteName());}acc.addOrderId(record.getOrderId());acc.addSubOrderSum(1);acc.addQuantitySum(record.getQuantity());acc.addGmv(record.getPrice() * record.getQuantity());return acc;}@Overridepublic OrderAccumulator getResult(OrderAccumulator acc) {return acc;}@Overridepublic OrderAccumulator merge(OrderAccumulator acc1, OrderAccumulator acc2) {if (acc1.getSiteId() == 0) {acc1.setSiteId(acc2.getSiteId());acc1.setSiteName(acc2.getSiteName());}acc1.addOrderIds(acc2.getOrderIds());acc1.addSubOrderSum(acc2.getSubOrderSum());acc1.addQuantitySum(acc2.getQuantitySum());acc1.addGmv(acc2.getGmv());return acc1;}}

累加器类OrderAccumulator的实现很简单,看源码就大概知道它的结构了,因此不再多废话。唯一需要注意的是订单ID可能重复,所以需要用名为orderIds的HashSet来保存它。HashSet应付我们目前的数据规模还是没太大问题的,如果是海量数据,就考虑换用HyperLogLog吧。

接下来就该输出到Redis供呈现端查询了。这里有个问题:一秒内有数据变化的站点并不多,而ContinuousProcessingTimeTrigger每次触发都会输出窗口里全部的聚合数据,这样做了很多无用功,并且还会增大Redis的压力。所以,我们在聚合结果后再接一个ProcessFunction,代码如下。

    DataStream<Tuple2<Long, String>> siteResultStream = siteAggStream.keyBy(0).process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {})).name("process_site_gmv_changed").uid("process_site_gmv_changed");
  public static final class OutputOrderGmvProcessFuncextends KeyedProcessFunction<Tuple, OrderAccumulator, Tuple2<Long, String>> {private static final long serialVersionUID = 1L;private MapState<Long, OrderAccumulator> state;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);state = this.getRuntimeContext().getMapState(new MapStateDescriptor<>("state_site_order_gmv",Long.class,OrderAccumulator.class));}@Overridepublic void processElement(OrderAccumulator value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {long key = value.getSiteId();OrderAccumulator cachedValue = state.get(key);if (cachedValue == null || value.getSubOrderSum() != cachedValue.getSubOrderSum()) {JSONObject result = new JSONObject();result.put("site_id", value.getSiteId());result.put("site_name", value.getSiteName());result.put("quantity", value.getQuantitySum());result.put("orderCount", value.getOrderIds().size());result.put("subOrderCount", value.getSubOrderSum());result.put("gmv", value.getGmv());out.collect(new Tuple2<>(key, result.toJSONString());state.put(key, value);}}@Overridepublic void close() throws Exception {state.clear();super.close();}}

说来也简单,就是用一个MapState状态缓存当前所有站点的聚合数据。由于数据源是以子订单为单位的,因此如果站点ID在MapState中没有缓存,或者缓存的子订单数与当前子订单数不一致,表示结果有更新,这样的数据才允许输出。

最后就可以安心地接上Redis Sink了,结果会被存进一个Hash结构里。

    // 看官请自己构造合适的FlinkJedisPoolConfigFlinkJedisPoolConfig jedisPoolConfig = ParameterUtil.getFlinkJedisPoolConfig(false, true);siteResultStream.addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper())).name("sink_redis_site_gmv").uid("sink_redis_site_gmv").setParallelism(1);
  public static final class GmvRedisMapper implements RedisMapper<Tuple2<Long, String>> {private static final long serialVersionUID = 1L;private static final String HASH_NAME_PREFIX = "RT:DASHBOARD:GMV:";@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, HASH_NAME_PREFIX);}@Overridepublic String getKeyFromData(Tuple2<Long, String> data) {return String.valueOf(data.f0);}@Overridepublic String getValueFromData(Tuple2<Long, String> data) {return data.f1;}@Overridepublic Optional<String> getAdditionalKey(Tuple2<Long, String> data) {return Optional.of(HASH_NAME_PREFIX +new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) +"SITES");}}

商品Top N

我们可以直接复用前面产生的orderStream,玩法与上面的GMV统计大同小异。这里用1秒滚动窗口就可以了。

    WindowedStream<SubOrderDetail, Tuple, TimeWindow> merchandiseWindowStream = orderStream.keyBy("merchandiseId").window(TumblingProcessingTimeWindows.of(Time.seconds(1)));DataStream<Tuple2<Long, Long>> merchandiseRankStream = merchandiseWindowStream.aggregate(new MerchandiseSalesAggregateFunc(), new MerchandiseSalesWindowFunc()).name("aggregate_merch_sales").uid("aggregate_merch_sales").returns(TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { }));

聚合函数与窗口函数的实现更加简单了,最终返回的是商品ID与商品销量的二元组。

  public static final class MerchandiseSalesAggregateFuncimplements AggregateFunction<SubOrderDetail, Long, Long> {private static final long serialVersionUID = 1L;@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(SubOrderDetail value, Long acc) {return acc + value.getQuantity();}@Overridepublic Long getResult(Long acc) {return acc;}@Overridepublic Long merge(Long acc1, Long acc2) {return acc1 + acc2;}}public static final class MerchandiseSalesWindowFuncimplements WindowFunction<Long, Tuple2<Long, Long>, Tuple, TimeWindow> {private static final long serialVersionUID = 1L;@Overridepublic void apply(Tuple key,TimeWindow window,Iterable<Long> accs,Collector<Tuple2<Long, Long>> out) throws Exception {long merchId = ((Tuple1<Long>) key).f0;long acc = accs.iterator().next();out.collect(new Tuple2<>(merchId, acc));}}

既然数据最终都要落到Redis,那么我们完全没必要在Flink端做Top N的统计,直接利用Redis的有序集合(zset)就行了,商品ID作为field,销量作为分数值,简单方便。不过flink-redis-connector项目中默认没有提供ZINCRBY命令的实现(必须再吐槽一次),我们可以自己加,步骤参照之前写过的那篇加SETEX的命令的文章,不再赘述。RedisMapper的写法如下。

  public static final class RankingRedisMapper implements RedisMapper<Tuple2<Long, Long>> {private static final long serialVersionUID = 1L;private static final String ZSET_NAME_PREFIX = "RT:DASHBOARD:RANKING:";@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);}@Overridepublic String getKeyFromData(Tuple2<Long, Long> data) {return String.valueOf(data.f0);}@Overridepublic String getValueFromData(Tuple2<Long, Long> data) {return String.valueOf(data.f1);}@Overridepublic Optional<String> getAdditionalKey(Tuple2<Long, Long> data) {return Optional.of(ZSET_NAME_PREFIX +new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) + ":" +"MERCHANDISE");}}

后端取数时,用ZREVRANGE命令即可取出指定排名的数据了。只要数据规模不是大到难以接受,并且有现成的Redis,这个方案完全可以作为各类Top N需求的通用实现。


The End

大屏的实际呈现需要保密,截图自然是没有的。以下是提交执行时Flink Web UI给出的执行计划(实际有更多的统计任务,不止3个Sink)。通过复用源数据,可以在同一个Flink job内实现更多统计需求。

END


#关注架构师的点点滴滴#精彩推荐1. 一文让你搞懂分布式事务2. “网红” WebAssembly 与 K8s 如何实现双剑合璧?3. FBI WARNING:架构师不能碰的禁忌
4.Java后端架构之微服务杂谈5. 一个全世界最大成人网站的爬虫
6. 漫画:互联网公司黑话防骗指北7. 囚犯学会编程之后会发生什么?8. 山哥面试心经:想进入BAT WSJ其实很简单

阿里技术:基于Kafka+Flink+Redis的电商大屏实时计算案例相关推荐

  1. 基于Kafka+Flink+Redis的电商大屏实时计算案例

    前言 一年一度的双11又要到了,阿里的双11销量大屏可以说是一道特殊的风景线.实时大屏(real-time dashboard)正在被越来越多的企业采用,用来及时呈现关键的数据指标.并且在实际操作中, ...

  2. 基于 Kafka + Flink + Redis 的电商大屏实时计算案

    前言 阿里的双11销量大屏可以说是一道特殊的风景线.实时大屏(real-time dashboard)正在被越来越多的企业采用,用来及时呈现关键的数据指标.并且在实际操作中,肯定也不会仅仅计算一两个维 ...

  3. 计算机毕业设计之SpringBoot+Vue.js+WebMagic电商数据分析 电商大数据 电商数据采集系统 电商大屏 大数据毕业设计 电商知识图谱

    需求 近5年电商企业社会责任数据,数据可视化 (1) 社会责任数据 (2) 电商企业:(30家左右的数据即可) 1-10名:阿里巴巴.美团点评.拼多多.京东.小米集团.滴滴.贝壳找房.京东健康.阿里健 ...

  4. 【ECdataway数据威】2018电商大数据与案例分享会 品牌方免费公开报名开启

    5月中旬开始,电商2大平台已经打响了"年中大促"的大战,并且将战线拉长到接近一个月,2大平台随后也晒出了骄人的战绩,在这战绩的背后是电商迭代的进化与演变,ECdataway数据威用 ...

  5. 基于Flink+Alink构建电商全端智能AI个性化实时推荐系统

    如今随着互联网发展,数据量不断增大,大数据已经成为各个互联网公司的重点方向,而推荐系统成为互联网必不可少的配置,一个好的推荐系统,能为企业带来了可观的用户流量和销售额,特别对于电商系统,好的推荐系统可 ...

  6. 通过Dapr实现一个简单的基于.net的微服务电商系统(十)——一步一步教你如何撸Dapr之绑定...

    如果说Actor是dapr有状态服务的内部体现的话,那绑定应该是dapr对serverless这部分的体现了.我们可以通过绑定极大的扩展应用的能力,甚至未来会成为serverless的基础.最开始接触 ...

  7. 通过Dapr实现一个简单的基于.net的微服务电商系统(五)——一步一步教你如何撸Dapr之状态管理...

    状态管理和上一章的订阅发布都算是Dapr相较于其他服务网格框架来讲提供的比较特异性的内容,今天我们来讲讲状态管理. 目录: 一.通过Dapr实现一个简单的基于.net的微服务电商系统 二.通过Dapr ...

  8. 通过Dapr实现一个简单的基于.net的微服务电商系统(四)——一步一步教你如何撸Dapr之订阅发布...

    之前的章节我们介绍了如何通过dapr发起一个服务调用,相信看过前几章的小伙伴已经对dapr有一个基本的了解了,今天我们来聊一聊dapr的另外一个功能--订阅发布 目录: 一.通过Dapr实现一个简单的 ...

  9. 北大教授呼吁:国家紧急授权调用阿里、京东、顺丰等电商物流企业从事救灾工作!...

    本公众号是垂直类的公众号,一年来只发表关于网络安全技术方面文章,偶尔有二篇随感,如昨日文章"停止恐慌---我的8日新型肺炎惊魂记",但古语有云"天下兴亡匹夫有责" ...

最新文章

  1. 5款没有专利纠葛的Linux发行版
  2. 如何在Mac OSX Yosemite中将Ruby版本2.0.0更新到最新版本?
  3. nullnullDataTable 排序
  4. 什么时候使用webservice1
  5. MySQL实战课程---通过录像手把手带您学会当前互联网流行架构
  6. 基于Ruby的watir-webdriver自动化测试方案与实施(四)
  7. Spinner的使用方法
  8. python-greenlet模块(协程)
  9. Python画图(直方图、多张子图、二维图形、三维图形以及图中图)
  10. 买房后每月还贷是什么感觉?
  11. Java集合详解2:LinkedList和Queue
  12. '__pendingCallbacks[...].async' is null or not an object
  13. this与static
  14. Klipper 不支持中文Gcode文件名的解决办法
  15. TCO14, I bought a watch last year
  16. 大数据发展趋势十个大方向
  17. php微信段子,微信朋友圈有趣的段子 朋友圈配图
  18. 万豪、希尔顿、凯悦、万达、首旅如家旗下酒店年末扎堆开业 | 中国酒店业周刊...
  19. 【leetcode】592.分数加减运算(python)
  20. python蓝桥杯 既约分数

热门文章

  1. c语言 隐式声明,关于C#:隐式函数声明和链接
  2. oracle -12169,很奇怪的错误ORA-12169
  3. 怎样格式化电脑_硬盘数据销毁最安全的步骤是怎样的?有公司可以做吗
  4. (王道408考研数据结构)第六章图-第四节4:最短路径之迪杰斯特拉算法(思想、代码、演示、答题规范)
  5. 【C语言重点难点精讲】C语言预处理
  6. (王道408考研数据结构)第八章排序-第一节:排序综述
  7. 我是如何从双非本科到拿到微软校招offer的?
  8. LeetCode 75 颜色分类
  9. USACO-Section1.4 Wormholes(枚举法)
  10. Please, commit your changes or stash them before you can merge.