简介:在实际业务使用中,需要经常实时做一些数据分析,包括实时PV和UV展示,实时销售数据,实时店铺UV以及实时推荐系统等,基于此类需求,Confluent+实时计算Flink版是一个高效的方案。

业务背景

在实际业务使用中,需要经常实时做一些数据分析,包括实时PV和UV展示,实时销售数据,实时店铺UV以及实时推荐系统等,基于此类需求,Confluent+实时计算Flink版是一个高效的方案。

Confluent是基于Apache Kafka提供的企业级全托管流数据服务,由 Apache Kafka 的原始创建者构建,通过企业级功能扩展了 Kafka 的优势,同时消除了 Kafka管理或监控的负担。

实时计算Flink版是阿里云基于 Apache Flink 构建的企业级实时大数据计算商业产品。实时计算 Flink 由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,提供全系列产品矩阵,完全兼容开源 Flink API,并充分基于强大的阿里云平台提供云原生的 Flink 商业增值能力。

一、准备工作-创建Confluent集群和实时计算Flink版集群

  1. 登录Confluent管理控制台,创建Confluent集群,创建步骤参考 Confluent集群开通
  1. 登录实时计算Flink版管理控制台,创建vvp集群。请注意,创建vvp集群选择的vpc跟confluent集群的region和vpc使用同一个,这样可以在vvp内部访问confluent的内部域名。

二、最佳实践-实时统计玩家充值金额-Confluent+实时计算Flink+Hologres

2.1 新建Confluent消息队列

  1. 在confluent集群列表页,登录control center

  1. 在左侧选中Topics,点击Add a topic按钮,创建一个名为confluent-vvp-test的topic,将partition设置为3

2.2 配置结果表 Hologres

  1. 进入Hologres控制台,点击Hologres实例,在DB管理中新增数据库`mydb`

  1. 登录Hologres数据库,新建SQL

  1. Hologres中创建结果表 SQL语句
--用户累计消费结果表CREATE TABLE consume (appkey VARCHAR,serverid VARCHAR,servertime VARCHAR,roleid VARCHAR,amount FLOAT,dt VARCHAR,primary key(appkey,dt));

2.3 创建实时计算vvp作业

  1. 首先登录vvp控制台,选择集群所在region,点击控制台,进入开发界面

  1. 点击作业开发Tab,点击新建文件,文件名称:confluent-vvp-hologres,文件类型选择:流作业/SQL

  1. 在输入框写入以下代码:
create TEMPORARY table kafka_game_consume_source(  appkey STRING,servertime STRING,consumenum DOUBLE,roleid STRING,serverid STRING
) with ('connector' = 'kafka','topic' = 'game_consume_log','properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.xxx:9071[xxx可以找开发同学查看]','properties.group.id' = 'gamegroup','format' = 'json','properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks','properties.ssl.truststore.password' = '[your truststore password]','properties.security.protocol'='SASL_SSL','properties.sasl.mechanism'='PLAIN','properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx[集群的用户]" password="xxx[相应的密码]";'
);
-- 创建累计消费hologres sink表
CREATE TEMPORARY TABLE consume(appkey STRING,serverid STRING,servertime STRING,roleid STRING,amount DOUBLE,dt STRING,PRIMARY KEY (appkey,dt) NOT ENFORCED)WITH ('connector' = 'hologres','dbname' = 'mydb','endpoint' = 'hgprecn-cn-tl32gkaet006-cn-beijing-vpc.hologres.aliyuncs.com:80','password' = '[your appkey secret]','tablename' = 'consume','username' = '[your app key]','mutateType' = 'insertorreplace');
--{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"}
--{"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"}
--{"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"}
--{"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"}
--{"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"}-- 计算每个用户累积消费金额insert into consumeSELECTappkey,LAST_VALUE(serverid) as serverid,LAST_VALUE(servertime) as servertime,LAST_VALUE(roleid) as roleid,sum(consumenum) as amount,substring(servertime,1,10) as dtFROM kafka_game_consume_sourceGROUP BY appkey,substring(servertime,1,10)having sum(consumenum) > 0;
  1. 在高级配置里,增加依赖文件truststore.jks(访问内部域名得添加这个文件,访问公网域名可以不用),访问依赖文件的固定路径前缀都是/flink/usrlib/(这里就是/flink/usrlib/truststore.jks)

  1. 点击上线按钮,完成上线

  1. 在运维作用列表里找到刚上线的作用,点击启动按钮,等待状态更新为running,运行成功。

  1. 在control center的【Topics->Messages】页面,逐条发送测试消息,格式为:
{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"}
{"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"}
{"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"}
{"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"}
{"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"}

2.4 查看用户充值金额实时统计效果

三、最佳实践-电商实时PV和UV统计-Confluent+实时计算Flink+RDS

3.1 新建Confluent消息队列

  1. 在confluent集群列表页,登录control center

  1. 在左侧选中Topics,点击Add a topic按钮,创建一个名为pv-uv的topic,将partition设置为3

3.2 创建云数据库RDS结果表

  1. 登录 RDS 管理控制台页面,购买RDS。确保RDS与Flink全托管集群在相同region,相同VPC下

  1. 添加虚拟交换机网段(vswitch IP段)进入RDS白名单,详情参考:设置白名单文档

3.【vswitch IP段】可在 flink的工作空间详情中查询

  1. 在【账号管理】页面创建账号【高权限账号】

  1. 数据库实例下【数据库管理】新建数据库【conflufent_vvp】

  1. 使用系统自带的DMS服务登陆RDS,登录名和密码输入上面创建的高权限账户

  1. 双击【confluent_vvp】数据库,打开SQLConsole,将以下建表语句复制粘贴到 SQLConsole中,创建结果表
CREATE TABLE result_cps_total_summary_pvuv_min(summary_date date NOT NULL COMMENT '统计日期',summary_min varchar(255) COMMENT '统计分钟',pv bigint COMMENT 'pv',uv bigint COMMENT 'uv',currenttime timestamp COMMENT '当前时间',primary key(summary_date,summary_min)
)

3.3 创建实时计算VVP作业

1.【[VVP控制台】新建文件

  1. 在SQL区域输入以下代码:
--数据的订单源表
CREATE TABLE source_ods_fact_log_track_action (account_id VARCHAR,--用户IDclient_ip VARCHAR,--客户端IPclient_info VARCHAR,--设备机型信息platform VARCHAR,--系统版本信息imei VARCHAR,--设备唯一标识`version` VARCHAR,--版本号`action` VARCHAR,--页面跳转描述gpm VARCHAR,--埋点链路c_time VARCHAR,--请求时间target_type VARCHAR,--目标类型target_id VARCHAR,--目标IDudata VARCHAR,--扩展信息,JSON格式session_id VARCHAR,--会话IDproduct_id_chain VARCHAR,--商品ID串cart_product_id_chain VARCHAR,--加购商品IDtag VARCHAR,--特殊标记`position` VARCHAR,--位置信息network VARCHAR,--网络使用情况p_dt VARCHAR,--时间分区天p_platform VARCHAR --系统版本信息
) WITH ('connector' = 'kafka','topic' = 'game_consume_log','properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.c79f69095bc5d4d98b01136fe43e31b93:9071','properties.group.id' = 'gamegroup','format' = 'json','properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks','properties.ssl.truststore.password' = '【your password】','properties.security.protocol'='SASL_SSL','properties.sasl.mechanism'='PLAIN','properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="【your user name】" password="【your password】";'
);
--{"account_id":"id1","client_ip":"172.11.1.1","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:10:00"}
CREATE TABLE result_cps_total_summary_pvuv_min (summary_date date,--统计日期summary_min varchar,--统计分钟pv bigint,--点击量uv bigint,--一天内同个访客多次访问仅计算一个UVcurrenttime timestamp,--当前时间primary key (summary_date, summary_min)
) WITH (type = 'rds',url = 'url = 'jdbc:mysql://rm-【your rds clusterId】.mysql.rds.aliyuncs.com:3306/confluent_vvp',',tableName = 'result_cps_total_summary_pvuv_min',userName = 'flink_confluent_vip',password = '【your rds password】'
);
CREATE VIEW result_cps_total_summary_pvuv_min_01 AS
selectcast (p_dt as date) as summary_date --时间分区, count (client_ip) as pv --客户端的IP, count (distinct client_ip) as uv --客户端去重, cast (max (c_time) as TIMESTAMP) as c_time --请求的时间
fromsource_ods_fact_log_track_action
groupby p_dt;
INSERTinto result_cps_total_summary_pvuv_min
selecta.summary_date,--时间分区cast (DATE_FORMAT (c_time, 'HH:mm') as varchar) as summary_min,--取出小时分钟级别的时间a.pv,a.uv,CURRENT_TIMESTAMP as currenttime --当前时间
fromresult_cps_total_summary_pvuv_min_01 AS a;
  1. 点击【上线】之后,在作业运维页面点击启动按钮,直到状态更新为RUNNING状态。

  1. 在control center的【Topics->Messages】页面,逐条发送测试消息,格式为:
{"account_id":"id1","client_ip":"72.11.1.111","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:11:00"}
{"account_id":"id2","client_ip":"72.11.1.112","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:12:00"}
{"account_id":"id3","client_ip":"72.11.1.113","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:13:00"}

3.4 查看PV和UV效果

可以看出rds数据表的pv和uv会随着发送的消息数据,动态的变化,同时还可以通过【数据可视化】来查看相应的图表信息。

pv图表展示:

uv图表展示:

原文链接

本文为阿里云原创内容,未经允许不得转载。

基于Confluent+Flink的实时数据分析最佳实践相关推荐

  1. 基于Flink+ClickHouse构建实时游戏数据分析最佳实践

    简介:本实践介绍如何快速收集海量用户行为数据,实现秒级响应的实时用户行为分析,并通过实时流计算.云数据库ClickHouse等技术进行深入挖掘和分析,得到用户特征和画像,实现个性化系统推荐服务. 直达 ...

  2. 基于函数计算的游戏打包最佳实践

    简介:本文主要介绍了通过Serverless工作流(FNF)+ 函数计算(FC)+ 对象存储(OSS)+ 日志服务(SLS)的组合方案,实现游戏发行过程中,自动化.并行化的一键式构建游戏渠道包.同时也 ...

  3. 趣头条基于 Flink 的实时平台建设实践

    本文由趣头条实时平台负责人席建刚分享趣头条实时平台的建设,整理者叶里君.文章将从平台的架构.Flink 现状,Flink 应用以及未来计划四部分分享. 一.平台架构 1.Flink 应用时间线 首先是 ...

  4. 基于 PAI 搭建企业级个性化推荐系统 最佳实践

    场景描述 本方案结合阿里云 PAI 团队预置的基础版算法方案为例,演示如何以阿里云提供的数据.AI 类产品为基础,离线部分采用Maxcompute&Dataworks&PAI的大数据& ...

  5. 基于AWS的云架构设计最佳实践——万字长文:云架构设计原则|附PDF下载

    译者序 AWS用户广泛,产品线复杂,AWS发布的白皮书<Architecting for the Cloud-AWS Best Practices>介绍了常见场景下云架构的最佳实践,不仅对 ...

  6. 基于AWS的云服务架构最佳实践

    近年来,对于打造高度可扩展的应用程序,软件架构师们挖掘了若干相关理念,并以最佳实践的方式加以实施.在今天的"信息时代",这些理念更加适用于不断增长的数据集,不可预知的流量模式,以及 ...

  7. 基于AWS的云服务架构最佳实践 #CSDN博文精选# #IT# #云服务实践#

    大家好,小C将继续与你们见面,带来精选的CSDN博文~ 在这里,你将收获: 将系统化学习理论运用于实践,系统学习IT技术 学习内容涵盖数据库.软件测试.主流框架.领域驱动设计和第三方生态等,离全栈工程 ...

  8. 文件表单带数据一起提交spring_基于 Spring 实现管道模式的最佳实践

    管道模式(Pipeline Pattern) 是 责任链模式(Chain of Responsibility Pattern) 的常用变体之一.在管道模式中,管道扮演着流水线的角色,将数据传递到一个加 ...

  9. 基于 Spring 实现管道模式的最佳实践

    本篇为设计模式第二篇,第一篇可见设计模式最佳套路 -- 愉快地使用策略模式 管道模式(Pipeline Pattern) 是责任链模式(Chain of Responsibility Pattern) ...

最新文章

  1. 万字长文带你全面认识 Kubernetes 中如何实现蓝绿部署、金丝雀发布和滚动更新...
  2. 35岁危机可能是真的!调查显示在领英上年龄越大越难找工作,得贴年轻头像才行...
  3. 2021年夏季学期“清华大学大数据能力提升项目” 招募《大数据实践课》企业合作项目...
  4. 2018最佳GAN论文回顾(下)
  5. [Unity] 播放 Generic Animation 导致无法移动的解决办法:在 Animator 中勾选 Apply Root Motion
  6. Java中解决继承和接口默认方法冲突
  7. python 拆分list,按照对应位置重组
  8. 主题:spring集成quartz,出现2次重复调用的问题
  9. java io类型_Java NIO之Java中的IO分类
  10. Office系列完全干净卸载工具合集(最全)
  11. 基于正点原子STM32F407开发板源码和贪吃蛇程序
  12. 中文冒号vs英文冒号
  13. 数据结构-二分查找(含全部代码)
  14. Charles 抓包原理与实践
  15. git stash 暂存恢复和文件误删恢复
  16. python运行时关闭硬件信息-Python实现的读取电脑硬件信息功能示例
  17. Data Augment ------TensorFlow 训练图片处理
  18. 基于SSM的高校课程评价系统
  19. 关于四芯网线上网的奇怪问题
  20. 如何用计算机处理图片,修改图片用什么软件 小白必看的图片处理技巧_电脑故障...

热门文章

  1. c++ h cpp文件如何关联_C++核心准则SF.5: .cpp文件必须包含定义它接口的.h文件
  2. a*算法流程图_光伏逆变器MPPT基本算法介绍李星硕
  3. asp单元格合并后宽度没有合并_宅在家里跟着大牛从零开始学excel第五课-合并,边框,列宽行高...
  4. cubemx串口的发送与接收_串口收发模块设计
  5. dataframe 一列的不同值_python数据分析包|Pandas-02之缺失值(NA)处理
  6. 多线程百度网盘爬虫Python完整源码
  7. movielens推荐系统_案例|推荐系统的评估指标
  8. c# 创建委托 消息订阅_C#面向对象之委托和事件
  9. python程序结构框架_Python——Flask框架——程序的基本结构
  10. js 带笔锋 签字版_年轻人的第一支签字笔? ——米家签字笔评测