背景

随着新零售的概念慢慢崛起,互联网电商行业竞争越来越激烈!实时数据信息对于电商行业尤为重要,那如何从实时不断的数据流中获取我们想要的信息呢?以下案例是 流计算的合作伙伴袋鼠云用阿里云流计算来解决电商订单管理案例。

场景案例

统计商铺的订单总数和总的销量

业务架构图

业务流程:

1. 用阿里云的DTS([DTS信息同步](https://help.aliyun.com/document_detail/60037.html?spm=5176.10695662.1996646101.searchclickresult.4148a693oQy3gY))把用户的数据同步到大数据总线(DATAHUB)。
2. 阿里云流计算订阅大数据总线(DATAHUB)的数据进行实时计算。
3. 将实时数据插入到RDS的云数据库
4. 再通过阿里云的DATAV或者是其他的大屏做数据展示。

准备工作

RDS->DataHub 数据实时同步,是将 RDS for MySQL 产生的增量数据数据实时同步到 DataHub 中的 topic。
由RDS经过DTS数据同步到大数据总线(DATAHUB)后 DataHub表Schema信息。

可以参考[RDS 到 DataHub 数据实时同步](https://help.aliyun.com/document_detail/45214.html?spm=5176.doc26633.6.602.o8AntI)

1.订单源表

| 字段名 | 数据类型 | 详情 |
| :--- | :--- | :--- |
| dts_ordercodeofsys | varchar | 订单编号 |
| dts_paytime | varchar | 订单付款时间 |
| dts_deliveredtime | varchar | 订单发货时间 |
| dts_storecode | varchar | 店铺编号 |
| dts_warehousecode | varchar | 仓库code |
| dts_cancelled | bigint | 是否取消 |
| dts_delivered | bigint | 是否发货 |
| dts_receivercity | varchar | 收货人城市 |
| dts_receiverprovince | varchar | 收货人省份 |
| dts_record_id | varchar | 记录ID |
| dts_operation_flag | varchar | 操作Flag |
| dts_instance_id | varchar | 数据库instanceId |
| dts_db_name | varchar | 数据库名 |
| dts_table_name | varchar | 数据表 |
| dts_utc_timestamp | varchar | 更新时间 |
| dts_before_flag | varchar | 变更前标识 |
| dts_after_flag | varchar | 变更后标识 |

2.订单详情源表

| 字段名 | 数据类型 | 详情 |
| :--- | :--- | :--- |
| dts_ordercodeofsys | varchar | 订单编号 |
| dts_skuname | varchar | 商品名字 |
| dts_skucode | varchar | 商品编号 |
| dts_quantity | bigint | 数量 |
| dts_dividedamount | double | 发货金额 |
| dts_salechanneldividedamount | double | 渠道销售金额 |
| dts_initialcost | double | 成本 |
| dts_record_id | varchar | 记录ID |
| dts_operation_flag | varchar | 操作Flag |
| dts_instance_id | varchar | 数据库instanceId |
| dts_db_name | varchar | 数据库名字 |
| dts_table_name | varchar | 表名 |
| dts_utc_timestamp | varchar | 更新时间 |
| dts_before_flag | varchar | 变更前标识 |
| dts_after_flag | varchar | 变更后标识 |

业务逻辑

--数据的订单源表
create table orders_real(dts_ordercodeofsys   varchar, dts_paytime          varchar, dts_deliveredtime    varchar, dts_storecode        varchar, dts_warehousecode    varchar, dts_cancelled        bigint, dts_delivered        bigint, dts_receivercity     varchar, dts_receiverprovince varchar, dts_record_id        varchar, dts_operation_flag   varchar, dts_instance_id      varchar, dts_db_name          varchar, dts_table_name       varchar, dts_utc_timestamp    varchar, dts_before_flag      varchar, dts_after_flag       varchar
) with (type='datahub',endPoint='http://dh-cn-XXXXX.com',project='项目名',topic='表名',accessId='自己的ID',accessKey='自己的KEY'
); create table orderdetail_real(dts_ordercodeofsys            varchar, dts_skuname                   varchar,dts_skucode                   varchar,dts_quantity                  bigint ,dts_dividedamount             double,dts_salechanneldividedamount  double,dts_initialcost               double,dts_record_id                 varchar,dts_operation_flag            varchar,dts_instance_id               varchar,dts_db_name                   varchar,dts_table_name                varchar,dts_utc_timestamp             varchar,dts_before_flag               varchar,dts_after_flag                varchar
) with (type='datahub',endPoint='http://dh-cn-XXXX.com',project='项目名',topic='表名',accessId='自己的ID',accessKey='自己的KEY'
); create table ads_all_count_amount(bill_date     varchar,--下单时间bill_count    bigint,--总的订单总数qty           bigint,--总的销售量primary key (bill_date)
) with (type='rds',url='jdbc:mysql://rm-XXXX.mysql.rds.aXXXXcs.com:3306/XXXX',tableName='数据库表名',userName='数据库的账号',password='数据库的密码'
);--订单源表,最新交易时间的商品编号
CREATE VIEW new_paytime AS
SELECT
dts_ordercodeofsys,
MAX(dts_paytime) AS dts_paytimeFROM orders_realGROUP BY dts_ordercodeofsys--订单详情表,有效的订单的订单编码、商品名称、商品编号、数量的信息
CREATE VIEW new_orderdetail AS
SELECT
dts_ordercodeofsys,
dts_skuname,
dts_skucode,
CASE WHEN dts_operation_flag = 'U'AND dts_before_flag = 'Y'AND dts_after_flag = 'N' THEN -1 * dts_quantity WHEN dts_operation_flag = 'U'AND dts_before_flag = 'N'AND dts_after_flag = 'Y' THEN dts_quantity WHEN dts_operation_flag = 'D' THEN -1 * dts_quantity ELSE dts_quantity END AS dts_quantityFROM orderdetail_real--订单总单数,总销售量
INSERT INTO ads_all_count_amount
SELECT from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd') AS bill_date, COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count, SUM(b.dts_quantity) AS qty
from (new_paytime) a
join (new_orderdetail) b
ON a.dts_ordercodeofsys = b.dts_ordercodeofsys
GROUP BY from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd')

难点解析

为了方便大家理解结构化代码和代码维护,我们推荐使用View([VIEW的语义](~~62534~~))把业务逻辑差分成三个模块。

模块一

首先根据订单编号做分组,因为同一个编号订单会有多次业务操作(例如下单、付款、发货),会在Binlog日志中形成多条同一订单编号的订单流水记录。使用MAX(dts_paytime)获取同一编号的最后一次操作数据库最终付款交易时间。

CREATE VIEW new_paytime AS
SELECT dts_ordercodeofsys, MAX(dts_paytime) AS dts_paytime
FROM orders_real
GROUP BY dts_ordercodeofsys

模块二

数据库日志会获取所有的数据记录的变更,而每个订单是有状态的。如列表所示:

| 字段名 | 数据类型 | 详情 |
| :--- | :--- | :--- |
| dts_record_id | varchar | 记录ID |
| dts_operation_flag | varchar | 操作Flag |
| dts_instance_id | varchar | 数据库instanceId |
| dts_db_name | varchar | 数据库名字 |
| dts_table_name | varchar | 表名 |
| dts_utc_timestamp | varchar | 更新时间 |
| dts_before_flag | varchar | 变更前标识 |
| dts_after_flag | varchar | 变更后标识 |

dts_record_id: 这条增量日志的唯一标识,唯一递增。如果变更类型为 update,那么增量更新会被拆分成 2 条,一条 Insert,一条 Delete。这两条记录具有相同的 record_id。

dts_instance_id: 这条增量日志所对应的数据库的 server id。

dts_db_name: 这条增量更新日志更新的表所在的数据库库名。

dts_table_name:这条增量更新日志更新的表。

dts_operation_flag: 标示这条增量日志的操作类型。取值包括:

I : insert 操作  
D : delete 操作  
U : update 操作

dts_utc_timestamp: 这条增量日志的操作时间戳,为这个更新操作记录 binlog 的时间戳。这个时间戳为 UTC 时间。

dts_before_flag: 表示这条增量日志后面带的各个 column 值是否更新前的值。取值包括:Y 和 N。当后面的 column 为更新前的值时,dts_before_flag=Y, 当后面的 column 值为更新后的值时,dts_before_flag=N.

dts_after_flag:表示这条增量日志后面带的各个 column 值是否更新后的值。取值包括:Y 和 N。 当后面的 column 为更新前的值时,dts_after_flag=N,当后面的 column 值为更新后的值时,dts_after_flag=Y.

对于不同的操作类型,增量日志中的 dts_before_flag 和 dts_after_flag 定义如下:

1. 操作类型为:insert  
  
   当操作类型为 insert 时,后面的所有 column 值为新插入的记录值,即为更新后的值。所以 before_flag=N, after_flag=Y
   ![undefined | center](https://private-alipayobjects.alipay.com/alipay-rmsdeploy-image/skylark/png/78d9a184-b187-4855-8553-33024073e7cb.png "")
2. 操作类型为:update  
  
   当操作类型为 update 时,会将 update 操作拆为 2 条增量日志。这两条增量日志的 dts_record_id ,dts_operation_flag 及 dts_utc_timestamp 相同。  
   第一条日志记录了更新前的值,所以 dts_before_flag=Y, dts_after_flag=N  
   第二条日志记录了更新后的值,所以 dts_before_flag=N, dts_after_flag=Y
   ![undefined | center](https://private-alipayobjects.alipay.com/alipay-rmsdeploy-image/skylark/png/14e95405-d891-436c-95ae-2580131a6594.png "")

3. 操作类型为:delete  
  
   当操作类型为 delete 时,后面的所有 column 值为被删除的记录值,即为更新前的值。所以 dts_before_flag=Y, dts_after_flag=N
   ![undefined | center](https://private-alipayobjects.alipay.com/alipay-rmsdeploy-image/skylark/png/55b5f393-e4c2-4d1e-b820-81237f0f5b93.png "")

CREATE VIEW new_orderdetail AS
SELECT
dts_ordercodeofsys,
dts_skuname,
dts_skucode,
CASE WHEN dts_operation_flag = 'U'AND dts_before_flag = 'Y'AND dts_after_flag = 'N' THEN -1 * dts_quantity WHEN dts_operation_flag = 'U'AND dts_before_flag = 'N'AND dts_after_flag = 'Y' THEN dts_quantity WHEN dts_operation_flag = 'D' THEN -1 * dts_quantity ELSE dts_quantity END AS dts_quantityFROM orderdetail_real

怎么判断是有效交易订单呢?

首先是要满足dts_operation_flag=U 或者 dts_operation_flag=I,  
然后dts_before_flag代表的是变更前订单状态,dts_after_flag是变更后订单状态;

所以有效交易订单为:

dts_operation_flag = 'U'AND dts_before_flag = 'N'AND dts_after_flag = 'Y' THEN dts_quantity 

* 为什么THEN -1 * dts_quantity呢?

订单的取消或者是交易没有成功在总的销量里也会记录;为了保证总销量的正确性,所以把没有成交的订单数量设为负数在计算总的销量会减去这个数量。

#### 模块三

为什么订单源表和订单详情要做JOIN操作?

new_paytime查出的是最新交易的时间的所有的订单编号;new_orderdetail查询的是所有的有效的订单的订单编码、商品名称、商品编号、数量的信息;两张表JOIN是为整合成一张大表,方便用户来统计订单总数和总的销量。

SELECT from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd') AS bill_date, COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count, SUM(b.dts_quantity) AS qty
from (new_paytime) a
join (new_orderdetail) b
ON a.dts_ordercodeofsys = b.dts_ordercodeofsys
GROUP BY from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd');

【阿里云流计算】- 电商订单和销量统计案例相关推荐

  1. Unirech阿里云国际版电商行业解决方案定制

    阿里云国际站的电商行业解决方案定制可为用户提供业界领先的双11技术,支持直播.跨境电商和大数据分析等前沿业务的场景,重塑未来零售,帮助电商业务更上一层楼.那么阿里云国际版电商解决方案将会为用户提供怎样 ...

  2. SQL分析阿里云淘宝电商数据

    本文使用SQL语句对50w条淘宝用户数据进行分析. 数据库:SQL Server. 文章结构 一.提出问题 1.分析目的 2.分析框架 二.数据准备 1.数据来源 2.数据描述 3.数据预处理 3.1 ...

  3. 如何运用阿里云服务器进行电商直播?

    直播购物是全球零售商和直播服务提供商的最新营收点. 阿里云视频直播服务是一个端到端的流媒体解决方案,可让您通过一个跨社交媒体.应用程序和网站的直播购物平台提供高清和低延迟的直播体验,从而有效吸引购物者 ...

  4. 阿里云流计算中维表join VS 流join

    最近业务上使用blink进行清洗数据,使用到了双流join和维表join,今天有同学问我流join和维表join有什么区别.在此我做个简单的说明,描述不对的地方,欢迎大家纠正,后面补充. 流式计算过程 ...

  5. 独家对话阿里云函数计算负责人不瞋:你所不知道的 Serverless

    作者 | 杨丽 来源 | 雷锋网(ID:leiphone-sz) Serverless 其实离我们并没有那么遥远. 如果你是一名互联网研发人员,那么极有可能了解并应用过 Serverless 这套技术 ...

  6. 乘风破浪,云服务器为电商大促加足“马力”

    摘要:什么样的IT基础设施才能支撑持续一整天甚至多天的高并发流量?如何规避618带来的各种"坑"?细听我分说. 特殊时期让2020年新年伊始就走出异常曲线,社会机器被按下暂停键,恐 ...

  7. 智慧零售2.0时代,容器云已成电商转型利器

    当前,伴随着新基建.5G以及后疫情时代的背景,中国零售业正在经历继实体零售.虚拟零售之后的第三次大变革.而这次零售大变革的主角变成了中国,变革的主体就是虚实融合的智慧零售.之所以说智慧零售是世界零售业 ...

  8. 支付退款流程设计_电商订单系统,你该如何设计

    文章来自:https://baijiahao.baidu.com/s?id=1602959656926168475&wfr=spider&for=pc 作者:人人都是产品经理 点击加入 ...

  9. 基于TableStore的海量电商订单元数据管理

    一.背景 订单系统存在于各行各业,如电商订单.银行流水.运营商话费账单等,是一个非常广泛.通用的系统.对于这类系统,在过去十几年发展中已经形成了经典的做法.但是随着互联网的发展,以及各企业对数据的重视 ...

最新文章

  1. 在线编程题之“明明的随机数”
  2. 成人高考计算机考试技巧,备战2015年成人高考:计算机基础考试经验分享
  3. django入门与实践 3-1 环境搭建
  4. c++ 操作mysql_C++操作mysql方法总结(1)
  5. java游戏暂停弹出字体_小白写了个java的小游戏 想加个暂停的功能 无从下手 求大佬们帮...
  6. 星三角启动的优缺点和内外接的区别
  7. 什么是工程思维和产品思维
  8. linux 拷贝目录报错,Linux复制文件时出现omitting directory错误怎么办
  9. 合肥工业大学C语言提交作业,合肥工业大学C语言题库程序设计.doc
  10. Smart-Link、Monitor-Link介绍与配置举例
  11. 服务器集群有哪些类型
  12. 如何做擦能防止网站被劫持
  13. 歪理邪说解析架构设计师上午考试试题之一(分析2010下半年系统架构设计师上午试题01-05题)
  14. 【Unity】Unity实现鼠标控制摄像机围绕物体旋转镜头 滑轮控制远近
  15. sockboom群_Phonics:自然拼读自然拼读练习表
  16. uni-app 打开外部网页地址 web url
  17. 全志F133(D1s)芯片 如何在Tina下进行显示旋转?
  18. 中国互联网+化妆品行业深度调研及投资机会分析报告
  19. BetaFlight模块设计之三十二:MSP协议模块分析
  20. 傲梅分区助手专业版 v6.2 中文免费版

热门文章

  1. 考研资料 | 西安电子科技大学考研专业课资料(绝密)
  2. 小学生计算机课程的简报,《让故事动起来》——信息技术公开课简报
  3. 绘制地铁线路html,基于HTML5技术绘制上海地铁图
  4. OSG计算并绘制模型中每一个三角面片的法向量
  5. js对以下结构数组处理时遇到的问题(arr:[{class:“a“,team:“hero“,member:[{name:“Tom“,hobby:[“读书“,“看报“]}]}],...])
  6. web3.js链接以太坊并查询钱包u余额
  7. linux glibc 版本查看,三种方法查看glibc的版本号
  8. LaTeX 制作(跨页)长表格
  9. python为什么有gil锁_为什么目前python3的全局锁gil性能远逊于python2
  10. 骑马与砍杀2 自制一个简单MOD