作者:王卓然 花名琸然 阿里云存储服务技术专家


背景介绍

电子商务模式是指在网络环境和大数据环境下基于一定技术基础的商务运作方式和盈利模式,对于数据的分析和可视化是电商运营中最重要的部分之一,而电商大屏提供了数据分析和可视化的完美结合。电商大屏包含有全量订单和实时订单的聚合,全量订单的聚合提供的是全景的综合数据视图,而实时订单的聚合展示的是实时的运营指标数据。本文将通过结合Tablestore和Spark的流批一体存储和计算,来自建电商大屏完成电商数据的分析和可视化,其效果图如下。

架构设计

在本次的电商大屏实战中,客户端会实时向Tablestore插入原始订单数据,实时流计算会通过Spark Structured Streaming实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回Tablestore,最终在DataV大屏上进行展示,而离线批计算通过Spark SQL进行原始订单数据的总金额和用户维度总金额的离线聚合,聚合结果也会写回Tablestore, 并最终在DataV大屏上进行展示,整个场景的架构图如下图所示。

准备工作

1.创建阿里云E-MapReduce的Hadoop集群,文档参见创建集群,https://help.aliyun.com/document_detail/35223.html?spm=a2c6h.12873639.0.0.501837e9edviEi。
2.下载E-MapReduce的最新SDK包,https://github.com/aliyun/aliyun-emapreduce-sdk/tree/master-2.x/jars/datasources/latest?spm=a2c4e.10696291.0.0.46c819a4aCpndW,包名的格式为`js
emr-datasources_shaded_*.jar,` 里面会包含有Tablestore相关的Spark批流Source和Sink。

数据源说明

数据源是一张简单的原始订单表OrderSource,表有两个主键UserId(用户ID)和OrderId(订单ID)和两个属性列price(价格)和timestamp(订单时间),数据示例如下图所示。

批流SQL流程详解

创建数据源表

1.登陆EMR Header机器,执行以下命令,启动sql客户端,该客户端用于批流SQL计算,其中emr-datasources_shaded_*.jar为准备工作中下载的EMR最新版的SDK包。

streaming-sql --driver-class-path emr-datasources_shaded_*.jar --jars emr-datasources_shaded_*.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2

2.创建原始订单数据表(Source表)的外表order_source,该外表将用于后续的流批SQL执行。

DROP TABLE IF EXISTS order_source;
CREATE TABLE order_source
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderSource",
tunnel.id="2b7bbf3d-d6c4-4cea-89fe-71998bccaf19",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"},"price": {"cols": "price", "type": "double"}, "timestamp": {"cols": "timestamp", "type": "long"}}}'
);

参数说明:

参数名 解释
endpoint 表格存储实例的访问地址
access.key.id 阿里云账号AK ID
access.key.secret 阿里云账号AK Secret
instance.name 表格存储实例名
table.name 表格存储表名
tunnel.id 表格存储的增量通道ID, 该参数用于实时的增量SQL, 批量SQL时非必须。
catalog 表的字段Schema定义,上述示例中对应的四个列为UserId(主键), OrderId(主键), price, timestamp,数据类型分别为string, string, double, long。

实时流计算

实时流计算将实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回Tablestore。首先创建流计算的Sink外表order_stream_sink(对应Tablestore表OrderStreamSink),然后运行流计算SQL进行实时聚合,最后将聚合结果实时写回Tablestore目的表中。
Sink表的各参数含义和Source表一致,其中catalog字段的内容有所不同,对应的Sink表中有四个字段,begin(开始时间,主键列,格式为2019-11-27 14:54:00),end(结束时间,主键列),count(订单数),totalPrice(订单总金额)。

// 创建Sink表order_stream_sink对应Tablestore的表OrderStreamSink(主键为begin和end两列)
DROP TABLE IF EXISTS order_stream_sink;
CREATE TABLE order_stream_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderStreamSink",
catalog='{"columns": {"begin": {"col": "begin", "type": "string"},"end": {"col": "end", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);// 在order_source表上创建视图order_source_stream_view
CREATE SCAN order_source_stream_view ON order_source USING STREAM OPTIONS ("maxoffsetsperchannel"="10000");// 在视图order_source_stream_view上运行STREAM SQL作业,以下样例会按30s粒度进行订单数和订单金额的聚合,
// 聚合结果将写回Tablestore表OrderStreamSink。
CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_stream_sink
SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");

在运行Stream SQL后,可以实时得到聚合结果,聚合结果样例如下图所示,聚合结果存放在OrderStreamSink表中,通过Tablestore和DataV的直连功能,https://helpcdn.aliyun.com/document_detail/97683.html?spm=a2c6h.12873639.0.0.b00037e9NF3oOT,可以很容易的将结果绘制在DataV的大屏上。

离线批计算

离线批计算将进行原始订单数据的总金额和用户维度总金额的离线聚合,首先会创建两张Sink表分别存放历史总金额和用户维度总金额的聚合数据,然后直接在源表order_source上运行批计算SQL,最后得到聚合结果。

// 批计算任务
// 用户维度结果表:OrderBatchSink(主键UserId, 属性列count,totalPrice)
// 总数据维度结果表:OrderTotalSink(主键Count, 属性列totalPrice)
DROP TABLE IF EXISTS order_batch_sink;
CREATE TABLE order_batch_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderBatchSink",
tunnel.id="",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);DROP TABLE IF EXISTS order_totol_sink;
CREATE TABLE order_total_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderTotalSink",
tunnel.id="",
catalog='{"columns": {"count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

运行以下批计算SQL进行用户维度聚合结果的更新。

// SQL命令
INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId;
// 实际运行
spark-sql> INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId;
Time taken: 5.107 seconds

运行以下批计算SQL进行总数据维度结果的更新。

// SQL命令
INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source;
// 实际运行
spark-sql> INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source;
Time taken: 4.272 seconds

写在最后

本文通过使用一套存储(Tablestore)和一套计算(Spark)完成了批流计算的有效结合,更多有关批流一体的细节和干货可以参见直播Tablestore结合Spark的云上流批一体大数据架构。


对开源大数据和感兴趣的同学可以加小编微信(图一二维码,备注进群)进入技术交流微信群。也可钉钉扫码加入社区的钉钉群

阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!

Tablestore结合Spark的流批一体SQL实战相关推荐

  1. 40亿条/秒!Flink流批一体在阿里双11首次落地的背后

    简介:今年的双11,实时计算处理的流量洪峰创纪录地达到了每秒40亿条的记录,数据体量也达到了惊人的每秒7TB,基于Flink的流批一体数据应用开始在阿里巴巴最核心的数据业务场景崭露头角,并在稳定性.性 ...

  2. 基于Flink SQL构建流批一体实时数仓

    基于Flink构建流批一体的实时数仓是目前数据仓库领域比较火的实践方案.随着Flink的不断迭代,其提供的一系列技术特性使得用户构建流批一体的应用变得越来越方便.本文主要分享基于FinkSQL构建实时 ...

  3. 大数据架构如何做到流批一体?

    阿里妹导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发 ...

  4. 大数据架构如何做到流批一体?【对于Flink等流批一体的概念做了很好的澄清!】

    导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发展?今 ...

  5. pb数据窗口怎么调用视图_大数据架构如何做到流批一体?

    阿里妹导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发 ...

  6. hive增量表和全量表_基于 Flink + Hive 构建流批一体准实时数仓

    基于 Hive 的离线数仓往往是企业大数据生产系统中不可缺少的一环.Hive 数仓有很高的成熟度和稳定性,但由于它是离线的,延时很大.在一些对延时要求比较高的场景,需要另外搭建基于 Flink 的实时 ...

  7. 数据仓库、数据湖、流批一体,终于有大神讲清楚了!

    摘要 数据仓库,数据湖,包括Flink社区提的流批一体,它们到底能解决什么问题?今天将由阿里云研究员从解决业务问题出发,将问题抽丝剥茧,从技术维度娓娓道来:为什么你需要数据湖或者数据仓库解决方案?它的 ...

  8. 流批一体生产应用!Bigo 实时计算平台建设实践

    简介:本文由 Bigo 计算平台负责人徐帅分享,主要介绍 Bigo 实时计算平台建设实践的介绍 本文由 Bigo 计算平台负责人徐帅分享,主要介绍 Bigo 实时计算平台建设实践的介绍.内容包括: B ...

  9. 数据仓库、数据湖、流批一体

    作者: 蒋晓伟(量仔) 阿里云研究员 金晓军(仙隐) 阿里云高级技术专家 摘要:数据仓库,数据湖,包括Flink社区提的流批一体,它们到底能解决什么问题?今天将由阿里云研究员从解决业务问题出发,将问题 ...

最新文章

  1. mysql 负载 查看_Mysql-命令查询当前正在负载运行的SQL语句
  2. gj11 多线程、多进程和线程池编程
  3. @query传参_VueRouter之query与params两种传参区别
  4. MongoDB学习笔记(三)使用Spring Data操作MongoDB
  5. java rmi 入门实例
  6. (个人总结)Linux命令——任意目录查看穿越
  7. 鲁迅文学院60周年庆
  8. poj3254 Corn Fields
  9. python读取html文件正则替换_Python正则获取和过滤或者替换HTML标签的方法说明
  10. sun-java6-jdk_Ubuntu下安装sun-java6-jdk和eclipse
  11. flink 入门及安装
  12. fabric usage
  13. 平方数之和【leetcode 633】
  14. 贩卖个人信息非法获利300余万,平安惠普、拍拍贷都是买方
  15. 开题报告的前景_开题报告全分析,写出一份满意的答卷
  16. Ceph Async RDMA网络通信性能优化
  17. 【mininet 0x02】如何使用mn工具来操作mininet
  18. java怎么删除一行表格_Java 创建、删除Word表格
  19. 合肥辰工科技有限公司简介及公司产品介绍
  20. 中国玉米面筋行业市场供需与战略研究报告

热门文章

  1. 5.18 优先队列(堆) 滑动窗口(二) 交换链表的节点
  2. 服务器被一堆系统登录_饥荒联机云服务器开档
  3. ES 与关系型数据库的对比
  4. HTML5+CSS3小实例:全屏导航栏菜单
  5. C语言之“拒绝scanf,从我做起”
  6. DJ2-1 进程管理
  7. Mysql复制表结构、表数据以及修改主键
  8. 项目里的UT越来越慢,怎么办?
  9. C termios.h 简单用法
  10. Ubuntu 18.04 vscode 编辑器空格显示过小问题解决方案