简介:Flink+Hologres亿级用户实时UV精确去重最佳实践

UV、PV计算,因为业务需求不同,通常会分为两种场景:

  • 离线计算场景:以T+1为主,计算历史数据
  • 实时计算场景:实时计算日常新增的数据,对用户标签去重

针对离线计算场景,Hologres基于RoaringBitmap,提供超高基数的UV计算,只需进行一次最细粒度的预聚合计算,也只生成一份最细粒度的预聚合结果表,就能达到亚秒级查询。具体详情可以参见往期文章>>Hologres如何支持超高基数UV计算(基于RoaringBitmap实现)

对于实时计算场景,可以使用Flink+Hologres方式,并基于RoaringBitmap,实时对用户标签去重。这样的方式,可以较细粒度的实时得到用户UV、PV数据,同时便于根据需求调整最小统计窗口(如最近5分钟的UV),实现类似实时监控的效果,更好的在大屏等BI展示。相较于以天、周、月等为单位的去重,更适合在活动日期进行更细粒度的统计,并且通过简单的聚合,也可以得到较大时间单位的统计结果。

主体思想

  1. Flink将流式数据转化为表与维表进行JOIN操作,再转化为流式数据。此举可以利用Hologres维表的insertIfNotExists特性结合自增字段实现高效的uid映射。
  2. Flink把关联的结果数据按照时间窗口进行处理,根据查询维度使用RoaringBitmap进行聚合,并将查询维度以及聚合的uid存放在聚合结果表,其中聚合出的uid结果放入Hologres的RoaringBitmap类型的字段中。
  3. 查询时,与离线方式相似,直接按照查询条件查询聚合结果表,并对其中关键的RoaringBitmap字段做or运算后并统计基数,即可得出对应用户数。
  4. 处理流程如下图所示

方案最佳实践

1.创建相关基础表

1)创建表uid_mapping为uid映射表,用于映射uid到32位int类型。

  • RoaringBitmap类型要求用户ID必须是32位int类型且越稠密越好(即用户ID最好连续)。常见的业务系统或者埋点中的用户ID很多是字符串类型或Long类型,因此需要使用uid_mapping类型构建一张映射表。映射表利用Hologres的SERIAL类型(自增的32位int)来实现用户映射的自动管理和稳定映射。
  • 由于是实时数据, 设置该表为行存表,以提高Flink维表实时JOIN的QPS。
BEGIN;
CREATE TABLE public.uid_mapping (
uid text NOT NULL,
uid_int32 serial,
PRIMARY KEY (uid)
);
--将uid设为clustering_key和distribution_key便于快速查找其对应的int32值
CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');
CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid');
CALL set_table_property('public.uid_mapping', 'orientation', 'row');
COMMIT;

2)创建表dws_app为基础聚合表,用于存放在基础维度上聚合后的结果。

  • 使用RoaringBitmap前需要创建RoaringBitmap extention,同时也需要Hologres实例为0.10版本
CREATE EXTENSION IF NOT EXISTS roaringbitmap;

  • 为了更好性能,建议根据基础聚合表数据量合理的设置Shard数,但建议基础聚合表的Shard数设置不超过计算资源的Core数。推荐使用以下方式通过Table Group来设置Shard数
--新建shard数为16的Table Group,
--因为测试数据量百万级,其中后端计算资源为100core,设置shard数为16
BEGIN;
CREATE TABLE tg16 (a int);                             --Table Group哨兵表
call set_table_property('tg16', 'shard_count', '16');
COMMIT;

  • 相比离线结果表,此结果表增加了时间戳字段,用于实现以Flink窗口周期为单位的统计。结果表DDL如下:
BEGIN;
create table dws_app(
  country text,
  prov text,
  city text,
  ymd text NOT NULL,  --日期字段
  timetz TIMESTAMPTZ,  --统计时间戳,可以实现以Flink窗口周期为单位的统计
  uid32_bitmap roaringbitmap, -- 使用roaringbitmap记录uv
  primary key(country, prov, city, ymd, timetz)--查询维度和时间作为主键,防止重复插入数据
);
CALL set_table_property('public.dws_app', 'orientation', 'column');
--日期字段设为clustering_key和event_time_column,便于过滤
CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');
CALL set_table_property('public.dws_app', 'event_time_column', 'ymd');
--等价于将表放在shard数为16的table group
call set_table_property('public.dws_app', 'colocate_with', 'tg16');
--group by字段设为distribution_key
CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');
COMMIT;

2.Flink实时读取数据并更新dws_app基础聚合表

完整示例源码请见alibabacloud-hologres-connectors examples

1)Flink 流式读取数据源(DataStream),并转化为源表(Table)

//此处使用csv文件作为数据源,也可以是kafka等
DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
// 与维表join需要添加proctime字段,详见https://help.aliyun.com/document_detail/62506.html
Table odsTable =
    tableEnv.fromDataStream(
    odsStream,
    $("uid"),
    $("country"),
    $("prov"),
    $("city"),
    $("ymd"),
    $("proctime").proctime());
// 注册到catalog环境
tableEnv.createTemporaryView("odsTable", odsTable);

2)将源表与Hologres维表(uid_mapping)进行关联

其中维表使用insertIfNotExists参数,即查询不到数据时自行插入,uid_int32字段便可以利用Hologres的serial类型自增创建。

// 创建Hologres维表,其中nsertIfNotExists表示查询不到则自行插入
String createUidMappingTable =
    String.format(
    "create table uid_mapping_dim("
    + "  uid string,"
    + "  uid_int32 INT"
    + ") with ("
    + "  'connector'='hologres',"
    + "  'dbname' = '%s'," //Hologres DB名
    + "  'tablename' = '%s',"//Hologres 表名
    + "  'username' = '%s'," //当前账号access id
    + "  'password' = '%s'," //当前账号access key
    + "  'endpoint' = '%s'," //Hologres endpoint
    + "  'insertifnotexists'='true'"
    + ")",
    database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// 源表与维表join
String odsJoinDim =
    "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"
    + "  FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
    + "  ON ods.uid = dim.uid";
Table joinRes = tableEnv.sqlQuery(odsJoinDim);

3)将关联结果转化为DataStream,通过Flink时间窗口处理,结合RoaringBitmap进行聚合

DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =
    source
    // 筛选需要统计的维度(country, prov, city, ymd)
    .keyBy(0, 1, 2, 3)
    // 滚动时间窗口;此处由于使用读取csv模拟输入流,采用ProcessingTime,实际使用中可使用EventTime
    .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
    // 触发器,可以在窗口未结束时获取聚合结果
    .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
    .aggregate(
    // 聚合函数,根据key By筛选的维度,进行聚合
    new AggregateFunction<
        Tuple5<String, String, String, String, Integer>,
        RoaringBitmap,
        RoaringBitmap>() {
            @Override
            public RoaringBitmap createAccumulator() {
                return new RoaringBitmap();
            }
            @Override
            public RoaringBitmap add(
                Tuple5<String, String, String, String, Integer> in,
                RoaringBitmap acc) {
                // 将32位的uid添加到RoaringBitmap进行去重
                acc.add(in.f4);
                return acc;
            }
            @Override
            public RoaringBitmap getResult(RoaringBitmap acc) {
                return acc;
            }
            @Override
            public RoaringBitmap merge(
                RoaringBitmap acc1, RoaringBitmap acc2) {
                return RoaringBitmap.or(acc1, acc2);
            }
     },
    //窗口函数,输出聚合结果
    new WindowFunction<
        RoaringBitmap,
        Tuple6<String, String, String, String, Timestamp, byte[]>,
        Tuple,
        TimeWindow>() {
            @Override
            public void apply(
                Tuple keys,
                TimeWindow timeWindow,
                Iterable<RoaringBitmap> iterable,
                Collector<
                Tuple6<String, String, String, String, Timestamp, byte[]>> out)
                throws Exception {
                RoaringBitmap result = iterable.iterator().next();
                // 优化RoaringBitmap
                result.runOptimize();
                // 将RoaringBitmap转化为字节数组以存入Holo中
                byte[] byteArray = new byte[result.serializedSizeInBytes()];
                result.serialize(ByteBuffer.wrap(byteArray));
                // 其中 Tuple6.f4(Timestamp) 字段表示以窗口长度为周期进行统计,以秒为单位
                out.collect(
                    new Tuple6<>(
                        keys.getField(0),
                        keys.getField(1),
                        keys.getField(2),
                        keys.getField(3),
                        new Timestamp(
                            timeWindow.getEnd() / 1000 * 1000),
                        byteArray));
        }
    });

4)写入结果表

需要注意的是,Hologres中RoaringBitmap类型在Flink中对应Byte数组类型

// 计算结果转换为表
Table resTable =
    tableEnv.fromDataStream(
        processedSource,
        $("country"),
        $("prov"),
        $("city"),
        $("ymd"),
        $("timest"),
        $("uid32_bitmap"));
// 创建Hologres结果表, 其中Hologres的RoaringBitmap类型通过Byte数组存入
String createHologresTable =
    String.format(
        "create table sink("
        + "  country string,"
        + "  prov string,"
        + "  city string,"
        + "  ymd string,"
        + "  timetz timestamp,"
        + "  uid32_bitmap BYTES"
        + ") with ("
        + "  'connector'='hologres',"
        + "  'dbname' = '%s',"
        + "  'tablename' = '%s',"
        + "  'username' = '%s',"
        + "  'password' = '%s',"
        + "  'endpoint' = '%s',"
        + "  'connectionSize' = '%s',"
        + "  'mutatetype' = 'insertOrReplace'"
        + ")",
    database, dwsTableName, username, password, endpoint, connectionSize);
tableEnv.executeSql(createHologresTable);
// 写入计算结果到dws表
tableEnv.executeSql("insert into sink select * from " + resTable);

3.数据查询

查询时,从基础聚合表(dws_app)中按照查询维度做聚合计算,查询bitmap基数,得出group by条件下的用户数

  • 查询某天内各个城市的uv
--运行下面RB_AGG运算查询,可执行参数先关闭三阶段聚合开关(默认关闭),性能更好
set hg_experimental_enable_force_three_stage_agg=off
SELECT  country
        ,prov
        ,city
        ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM    dws_app
WHERE   ymd = '20210329'
GROUP BY country
         ,prov
         ,city
;

  • 查询某段时间内各个省份的uv
--运行下面RB_AGG运算查询,可执行参数先关闭三阶段聚合开关(默认关闭),性能更好
set hg_experimental_enable_force_three_stage_agg=off
SELECT  country
        ,prov
        ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM    dws_app
WHERE   time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08'
GROUP BY country
         ,prov
;

原文链接:https://developer.aliyun.com/article/784354?

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

Flink+Hologres亿级用户实时UV精确去重最佳实践相关推荐

  1. Hologres如何支持亿级用户UV计算

    简介: 本文将介绍阿里云Hologres如何基于RoaringBitmap进行UV等高复杂度计算的方案,实现亿级用户万级标签亚秒级分析,帮助用户从Kylin平滑迁移到Hologres,实现更实时.开发 ...

  2. hive建立内部表映射hbase_快手 HBase 在千亿级用户特征数据分析中的应用与实践...

    分享嘉宾:陈杨 快手 编辑整理:Hoh Xil 内容来源:BigData NoSQL 12th Meetup 出品社区:DataFun 注:欢迎转载,转载请注明出处. 快手建设 HBase 差不多有2 ...

  3. Kylin 在满帮集团千亿级用户访问行为分析中的应用

    2019 年 7 月 12 日,国内首届以 Apache Kylin 为主题的大数据领域的前沿盛会 Kylin Data Summit 在上海落幕.在本次大会的制造业分论坛上,来自满帮集团的陈雅婕的分 ...

  4. 亿级用户百TB级数据的 AIOps 技术实践之路(增强版)

    作者简介 周荣,华为消费者BG云运维部 AIOps 负责人,GOPS 2018 深圳站金牌讲师,07年加入华为,先后分别负责下一代智能网.中间件平台.运维工具等产品的研发与规划,在分布式系统.大数据分 ...

  5. 巧用 maxTimeMS 服务端超时,避免承载亿级用户的腾讯云数据库MongoDB服务雪崩

    腾讯云数据库MongoDB作为一款基于开源社区MongoDB版本的文档数据库产品,其承载着公司内外包括微信.看点.QQ音乐在内的亿级用户重量级APP产品.在某些场景的使用过程中,用户在客户端请求超时后 ...

  6. 10分钟搞懂:亿级用户的分布式数据存储解决方案!

    内容提供:李智慧,前阿里巴巴技术专家,<大型网站技术架构>作者6月6日晚,林志玲与Akira公布婚讯.徐蔡坤祝福高考同学超常发挥,粉丝们百万的转发和点赞造成微博短暂宕机.分布式数据库和分布 ...

  7. 亿级用户体量,千万级日活用户,《王者荣耀》高并发背后的故事!

    堪称中国最火爆的手机游戏"王者荣耀",拥有亿级用户体量,千万级日活用户,如何快速.低成本地保障业务突发?本文从该问题出发,论述了问题对应的解决方案,并对其效果做出总结. 作者:黎斌 ...

  8. 为支持亿级用户,短视频应用应该如何打造技术架构?

    本文系美图架构师麦俊生,在Boss直聘主办的直聘学院「对话架构师」活动上的分享整理,介绍短视频社交"美拍"架构实践的总结. 麦俊生,Boss直聘「直聘学院」特邀分享嘉宾.美图架构平 ...

  9. 亿级用户游戏排行榜设计方案

    欢迎大家关注我的公众号[老周聊架构],Java后端主流技术栈的原理.源码分析.架构以及各种互联网高并发.高性能.高可用的解决方案. 一.前言 不管是手游还是端游,貌似都离不开排行榜,没有排行榜的游戏是 ...

最新文章

  1. Styling Alert controls in Flex using the StyleManager class and setStyle() methods
  2. MySQL 解压缩安装
  3. 关于拓扑排序的进一步说明
  4. POJ 3159 Candies
  5. Qt文档阅读笔记-Qt工作笔记-QTableWidget::selectedItems()官方解析与实例(如何进行多选)
  6. 演练:调试 Windows 窗体
  7. Nuget如何自动下载依赖DLL引用
  8. 一条命通关,这个AI算法玩超级马里奥操作秀翻天丨视频+开源代码
  9. python中流程图的基本元素_面试干货:成为Python程序员的终极指南!(内附回答)...
  10. 2021 年电工杯 A 题(第一题、第二题、第四题)
  11. linux自动内存清理
  12. app渗透实战案例—Spring Boot Actuator未授权到脱库
  13. 数字图像学笔记——6. 噪音生成(椒盐噪音、高斯噪音、泊松噪音)
  14. 防火墙第三天——恶意软件、反病毒技术。。。
  15. 2009年经典语录雷人总汇
  16. 送礼蓝牙耳机哪款合适?2021最好的蓝牙耳机排行!
  17. onkeypress、onkeydown、onkeyup
  18. JS(javascript)中this的几种用法实例详解
  19. 機器學習基石 机器学习基石 (Machine Learining Foundations) 作业2 Q16-18 C++实现
  20. 让Element-ui的Container布局容器高度百分百显示

热门文章

  1. java高并发(十四)ReetrantLock 与锁
  2. IT大佬廖雪峰带你玩转Python数据分析(内附资源)
  3. 程序员看片必备神器!包邮送一台!!
  4. 又跌!6月全国程序员工资新统计,太扎心
  5. python中定义变量和数组_Python中的线程和全局变量 - 数组和标准变量之间的区别?...
  6. Oracle11g rac监听,关于oracle11g RAC 监听器使用中出现的no services以及no listener分析...
  7. oracle 创建nchar类型,nchar类型的用法!
  8. python中的pandas怎么安装_如何优雅的安装Python的pandas?
  9. 洛谷 P2704 [NOI2001]炮兵阵地
  10. 如何发布一个npm包?