文章目录

  • 写SQL的规律
  • 第 1 章 数仓分层
    • 1.1 为什么要分层
    • 1.2 数据集市与数据仓库概念
    • 1.3 数仓命名规范
    • 1.3.1 表命名
      • 1.3.2 脚本命名
  • 第 2 章 数仓理论
    • 2.1 范式理论
      • 2.1.1 范式概念
      • 2.1.2 函数依赖
      • 2.1.3 三范式区分
    • 2.2 关系建模与维度建模
      • 2.2.1 关系建模
      • 2.2.2 维度建模
    • 2.3 维度表和事实表(重点)
      • 2.3.1 维度表
      • 2.3.2 事实表
    • 2.4 数据仓库建模(绝对重点)
      • 2.4.1 ODS 层
      • 2.4.2 DWD 层
      • 2.4.3 DWS 层
      • 2.4.4 DWT 层
      • 2.4.5 ADS 层
  • 第 3 章 数仓搭建-ODS 层
    • 3.1 创建数据库
    • 3.2 ODS 层(用户行为数据)
      • 3.2.1 创建启动日志表 ods_start_log
      • 3.2.2 创建事件日志表 ods_event_log
      • 3.2.3 Shell 中单引号和双引号区别
      • 3.2.4 ODS 层加载数据脚本
    • 3.3 ODS 层(业务数据)
      • 3.3.1 订单表(增量及更新)
      • 3.3.2 订单详情表(增量)
      • 3.3.3 SKU 商品表(全量)
      • 3.3.4 用户表(增量及更新)
      • 3.3.5 商品一级分类表(全量)
      • 3.3.6 商品二级分类表(全量)
      • 3.3.7 商品三级分类表(全量)
      • 3.3.8 支付流水表(增量)
      • 3.3.9 省份表(特殊)
      • 3.3.10 地区表(特殊)
      • 3.3.11 品牌表(全量)
      • 3.3.12 订单状态表(增量)
      • 3.3.13 SPU 商品表(全量)
      • 3.3.14 商品评论表(增量)
      • 3.3.15 退单表(增量)
      • 3.3.16 加购表(全量)
      • 3.3.17 商品收藏表(全量)
      • 3.3.18 优惠券领用表(新增及变化)
      • 3.3.19 优惠券表(全量)
      • 3.3.20 活动表(全量)
      • 3.3.21 活动订单关联表(增量)
      • 3.3.22 优惠规则表(全量)
      • 3.3.23 编码字典表(全量)
      • 3.3.24 ODS 层加载数据脚本
      • 总结:
  • 第 4 章 数仓搭建-DWD 层
    • 4.1 DWD 层(用户行为启动表数据解析)
      • 4.1.1 创建启动表
      • 4.1.2 get_json_object 函数使用
      • 4.1.3 向启动表导入数据
      • 4.1.4 DWD 层启动表加载数据脚本
    • 4.2 DWD 层(用户行为事件表数据解析) ****
      • 4.2.1 创建基础明细表
      • 4.2.2 自定义 UDF 函数(解析公共字段)
      • 4.2.3 自定义 UDTF 函数(解析事件字段)
      • 4.2.4 解析事件日志基础明细表
      • 4.2.5 DWD 层数据解析脚本
    • 4.3 DWD 层(用户行为事件表获取)
      • 4.3.1 商品点击表
      • 4.3.2 商品详情页表
      • 4.3.3 商品列表页表
      • 4.3.4 广告表
      • 4.3.5 消息通知表
      • 4.3.6 用户后台活跃表
      • 4.3.7 评论表
      • 4.3.8 收藏表
      • 4.3.9 点赞表
      • 4.3.10 错误日志表
      • 4.3.11 DWD 层事件表加载数据脚本
    • 4.4 DWD 层(业务数据)
      • 4.4.1 商品维度表(全量表)
      • 4.4.2 优惠券信息表(全量)
      • 4.4.3 活动维度表(全量)
      • 4.4.4 地区维度表(特殊)
      • 4.4.5 时间维度表(特殊)(预留)
      • 4.4.6 订单明细事实表(事务型快照事实表)
      • 4.4.7 支付事实表(事务型快照事实表)
      • 4.4.8 退款事实表(事务型快照事实表)
      • 4.4.9 评价事实表(事务型快照事实表)
      • 4.4.10 加购事实表(周期型快照事实表,每日快照)
      • 4.4.11 收藏事实表(周期型快照事实表,每日快照)
      • 4.4.12 优惠券领用事实表(累积型快照事实表)
      • 4.4.13 订单事实表(累积型快照事实表)
      • 4.4.14 用户维度表(拉链表)
        • 步骤 0:初始化拉链表(首次独立执行)
        • 步骤 2:先合并变动信息,再追加新增信息,插入到临时表中
        • 步骤 3:把临时表覆盖给拉链表
      • 4.4.15 DWD 层数据导入脚本
  • 第 5 章 数仓搭建-DWS 层
    • 5.1 业务术语
    • 5.2 系统函数
      • 5.2.1 collect_set 函数
      • 5.2.2 nvl 函数
      • 5.2.3 日期处理函数
    • 5.3 DWS 层(用户行为)
      • 5.3.1 每日设备行为
    • 5.4 DWS 层(业务)
      • 5.4.1 每日会员行为
      • 5.4.2 每日商品行为
      • 5.4.3 每日优惠券统计(预留)
      • 5.4.4 每日活动统计(预留)
      • 5.4.5 每日购买行为
    • 5.5 DWS 层数据导入脚本
  • 第 6 章 数仓搭建-DWT 层
    • 6.1 设备主题宽表
    • 6.2 会员主题宽表
    • 6.3 商品主题宽表
    • 6.4 优惠券主题宽表(预留)
    • 6.5 活动主题宽表(预留)
    • 6.6 DWT 层数据导入脚本
  • 第 7 章 数仓搭建-ADS 层
    • 7.1 设备主题
      • 7.1.1 活跃设备数(日、周、月)
      • 7.1.2 每日新增设备
      • 7.1.3 沉默用户数
      • 7.1.4 本周回流用户数
      • 7.1.5 流失用户数
      • 7.1.6 留存率
      • 7.1.7 最近连续三周活跃用户数
      • 7.1.8 最近七天内连续三天活跃用户数
    • 7.2 会员主题
      • 7.2.1 会员主题信息
      • 7.2.2 漏斗分析(由于事实表不全所以所有数据从dws取)
    • 7.3 商品主题
      • 7.3.1 当天商品个数信息
      • 7.3.2 当天商品销量排名top10
      • 7.3.3 当天商品收藏排名top10
      • 7.3.4 当天商品加入购物车个数排名top10
      • 7.3.5 商品退款率排名(最近 30 天)
      • 7.3.6 当天商品差评率
    • 7.4 营销主题(用户+商品+购买行为)
      • 7.4.1 当天下单数目统计
      • 7.4.2 当天支付信息统计
      • 7.4.3 各一级品类下月品牌复购率
    • 7.5 ADS 层导入脚本
  • 第 8 章 Azkaban 调度
    • 8.1 Azkaban 部署
      • 8.1.1 安装前准备
      • 8.1.2 安装 Azkaban
      • 8.1.3 生成密钥库
      • 8.1.4 时间同步配置
      • 8.1.5 配置文件
        • 8.1.5.1 Web server服务器配置
        • 8.1.5.2 执行服务器配置
      • 8.1.6 启动 executor 服务器
      • 8.1.7 启动 web server服务器
    • 8.2 创建 MySQL 数据库和表
    • 8.3 Sqoop 导出脚本
    • 8.4 会员主题指标获取的全调度流程

写SQL的规律

1、先找到目标表
2、分析一下,需要哪些表能满足目标表的所有字段
即准备所有输入表。
3、写逻辑
3.1 insert overwrite table 目标包名称
观察目标表是否需要考虑分区
3.2 固定格式,如果需要多表join,
先把整体大框写出,再具体去写一个一个子查询

    selectfrom ()b1join()b2on
3.3 遇到统计什么次数用count
3.4 遇到统计什么金额用sum
3.5 如果是累积表,获取旧表(目标表)数据,再获取新表(输入表)数据3.6 遇到统计累积值,旧的 + 新的
3.7 累积表中获取首次时间旧的时间为null,取当前时间,否则取旧的
3.8 累积表中获取末次时间(最近时间)   新的id不为空,取当前时间,否则取旧的
3.9 天数和次数的转换。if(new.login_count>0,1,0)
3.10 使用group by时要注意:查询的字段:1.分组里面有;2.常量;3.聚合函数
3.11 累积30天等指标在新数据表new中进行累加 where最近30天3.12 如果涉及的表比较多可以采用with tmp的方法

第 1 章 数仓分层

1.1 为什么要分层


1.2 数据集市与数据仓库概念

1.3 数仓命名规范

1.3.1 表命名

1.3.2 脚本命名

第 2 章 数仓理论

2.1 范式理论

2.1.1 范式概念

2.1.2 函数依赖

2.1.3 三范式区分

第一范式

第二范式

第三范式

2.2 关系建模与维度建模


2.2.1 关系建模




2.2.2 维度建模

在维度建模的基础上又分为三种模型:星型模型、雪花模型、星座模型。




2.3 维度表和事实表(重点)

2.3.1 维度表


2.3.2 事实表



2.4 数据仓库建模(绝对重点)

2.4.1 ODS 层

2.4.2 DWD 层



2.4.3 DWS 层


2.4.4 DWT 层


2.4.5 ADS 层

对电商系统各大主题指标分别进行分析。

第 3 章 数仓搭建-ODS 层


3.1 创建数据库

1)启动 hive
2)显示数据库

hive (default)> show databases;

3)创建数据库

hive (default)> create database gmall;

4)使用数据库

hive (default)> use gmall;

3.2 ODS 层(用户行为数据)

3.2.1 创建启动日志表 ods_start_log

ODS创建启动日志表分析

1)创建输入数据是 lzo 输出是 text,支持 json 解析的分区表

hive (gmall)>
drop table if exists ods_start_log;
CREATE EXTERNAL TABLE ods_start_log
(
`line` string
)
PARTITIONED BY(`dt` string)
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/gmall/ods/ods_start_log';

说明 Hive 的 LZO 压缩:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LZO
2)加载数据

hive (gmall)> load data inpath '/origin_data/gmall/log/topic_start/2020-03-10' into table gmall.ods_start_log partition(dt='2020-03-10');

注意:时间格式都配置成 YYYY-MM-DD 格式,这是 Hive 默认支持的时间格式
3)查看是否加载成功

hive (gmall)> select * from ods_start_log where dt='2020-03-10' limit 2;

4)为 lzo 压缩文件创建索引

[dw@dw1 hadoop-2.7.2]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /warehouse/gmall/ods/ods_start_log/dt=2020-03-10

3.2.2 创建事件日志表 ods_event_log

ODW层创建事件日志表分析

1)创建输入数据是 lzo 输出是 text,支持 json 解析的分区表

hive (gmall)> drop table if exists ods_event_log;
CREATE EXTERNAL TABLE ods_event_log(`line` string)
PARTITIONED BY
(
`dt` string
)
STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/gmall/ods/ods_event_log';

2)加载数据

hive (gmall)> load data inpath '/origin_data/gmall/log/topic_event/2020-03-10' into table gmall.ods_event_log partition(dt='2020-03-10');

3)查看是否加载成功

hive (gmall)> select * from ods_event_log where dt="2020-03-10" limit 2;

4)为 lzo 压缩文件创建索引

[dw@dw1 hadoop-2.7.2]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /warehouse/gmall/ods/ods_event_log/dt=2020-03-10

3.2.3 Shell 中单引号和双引号区别

1)在/home/dw/bin 创建一个 test.sh 文件

[dw@dw1 bin]$ vim test.sh
#!/bin/bash
do_date=$1echo '$do_date'  不解析内容
echo "$do_date"  解析变量内容
echo "'$do_date'" 嵌套看外层
echo '"$do_date"'
echo `date` 执行命令

2)加权限

chmod 777 test.sh

3)查看执行结果

[dw@dw1 bin]$ test.sh 2020-03-10


单引号不解析变量里面的内容,双引号则会解析;
如果是嵌套谁在最外面谁起作用
反引号`,执行引号中命令

3.2.4 ODS 层加载数据脚本

1)在 dw1的/home/dw/bin 目录下创建脚本

[dw@dw1 bin]$ vim hdfs_to_ods_log.sh
#!/bin/bash#定义变量
hive=/opt/module/hive/bin/hive
APP=gmall#获取时间
if [ -n "$1" ] ; thendo_date=$1
elsedo_date=`date -s '-1 day' +%F`
fisql="
load data inpath '/origin_data/gmall/log/topic_event/$do_date' overwrite into table ${APP}.ods_event_log partition(dt='$do_date');
load data inpath '/origin_data/gmall/log/topic_start/$do_date' overwrite into table ${APP}.ods_start_log partition(dt='$do_date');
"#执行sql
$hive -e "$sql"#创建lzo索引
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /warehouse/gmall/ods/ods_start_log/dt=$do_date
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /warehouse/gmall/ods/ods_event_log/dt=$do_date

2)增加脚本执行权限

[dw@dw1 bin]$ chmod 777 hdfs_to_ods_log.sh

3)脚本使用

[dw@dw1 bin]$ hdfs_to_ods_log.sh 2020-03-11

4)查看导入数据

hive (gmall)>
select * from ods_start_log where dt='2020-03-11' limit 2;
select * from ods_event_log where dt='2020-03-11' limit 2;

5)脚本执行时间 (企业开发中一般在每日凌晨 30 分~1 点)

3.3 ODS 层(业务数据)

3.3.1 订单表(增量及更新)

hive (gmall)> drop table if exists ods_order_info;
create external table ods_order_info (
`id` string COMMENT '订单号',
`final_total_amount` decimal(10,2) COMMENT '订单金额',
`order_status` string COMMENT '订单状态',
`user_id` string COMMENT '用户 id',
`out_trade_no` string COMMENT '支付流水号',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`province_id` string COMMENT '省份 ID',
`benefit_reduce_amount` decimal(10,2) COMMENT '优惠金额',
`original_total_amount` decimal(10,2) COMMENT '原价金额',
`feight_fee` decimal(10,2) COMMENT '运费'
) COMMENT '订单表'PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'STORED ASINPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'location '/warehouse/gmall/ods/ods_order_info/';

3.3.2 订单详情表(增量)

hive (gmall)> drop table if exists ods_order_detail;
create external table ods_order_detail(
`id` string COMMENT '订单编号',
`order_id` string COMMENT '订单号',
`user_id` string COMMENT '用户 id',
`sku_id` string COMMENT '商品 id',
`sku_name` string COMMENT '商品名称',
`order_price` decimal(10,2) COMMENT '商品价格',
`sku_num` bigint COMMENT '商品数量',
`create_time` string COMMENT '创建时间') COMMENT '订单详情表'PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'STORED ASINPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'location '/warehouse/gmall/ods/ods_order_detail/';

3.3.3 SKU 商品表(全量)

hive (gmall)> drop table if exists ods_sku_info;
create external table ods_sku_info(
`id` string COMMENT 'skuId',
`spu_id` string COMMENT 'spuid',
`price` decimal(10,2) COMMENT '价格',
`sku_name` string COMMENT '商品名称',
`sku_desc` string COMMENT '商品描述',
`weight` string COMMENT '重量',
`tm_id` string COMMENT '品牌 id',
`category3_id` string COMMENT '品类 id',
`create_time` string COMMENT '创建时间'
) COMMENT 'SKU 商品表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_sku_info/';

3.3.4 用户表(增量及更新)

hive (gmall)> drop table if exists ods_user_info;
create external table ods_user_info(
`id` string COMMENT '用户 id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性别',
`email` string COMMENT '邮箱',
`user_level` string COMMENT '用户等级',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间') COMMENT '用户表'PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'STORED ASINPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'location '/warehouse/gmall/ods/ods_user_info/';

3.3.5 商品一级分类表(全量)

hive (gmall)> drop table if exists ods_base_category1;
create external table ods_base_category1(
`id` string COMMENT 'id',
`name` string COMMENT '名称'
) COMMENT '商品一级分类表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_category1/';

3.3.6 商品二级分类表(全量)

hive (gmall)> drop table if exists ods_base_category2;
create external table ods_base_category2(
`id` string COMMENT ' id',
`name` string COMMENT '名称',
category1_id string COMMENT '一级品类 id') COMMENT '商品二级分类表'PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'STORED ASINPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'location '/warehouse/gmall/ods/ods_base_category2/';

3.3.7 商品三级分类表(全量)

hive (gmall)> drop table if exists ods_base_category3;
create external table ods_base_category3(
`id` string COMMENT ' id',
`name` string COMMENT '名称',
category2_id string COMMENT '二级品类 id'
) COMMENT '商品三级分类表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_category3/';

3.3.8 支付流水表(增量)

hive (gmall)> drop table if exists ods_payment_info;
create external table ods_payment_info(
`id` bigint COMMENT '编号',
`out_trade_no` string COMMENT '对外业务编号',
`order_id` string COMMENT '订单编号',
`user_id` string COMMENT '用户编号',
`alipay_trade_no` string COMMENT '支付宝交易流水编号',
`total_amount` decimal(16,2) COMMENT '支付金额',
`subject` string COMMENT '交易内容',
`payment_type` string COMMENT '支付类型',
`payment_time` string COMMENT '支付时间'
) COMMENT '支付流水表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_payment_info/';

3.3.9 省份表(特殊)

hive (gmall)> drop table if exists ods_base_province;
create external table ods_base_province (
`id` bigint COMMENT '编号',
`name` string COMMENT '省份名称',
`region_id` string COMMENT '地区 ID',
`area_code` string COMMENT '地区编码',
`iso_code` string COMMENT 'iso 编码'
) COMMENT '省份表' row format delimited fields terminated by '\t'
STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_province/';

3.3.10 地区表(特殊)

hive (gmall)> drop table if exists ods_base_region;
create external table ods_base_region (
`id` bigint COMMENT '编号',
`region_name` string COMMENT '地区名称'
) COMMENT '地区表' row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_region/';

3.3.11 品牌表(全量)

hive (gmall)> drop table if exists ods_base_trademark;
create external table ods_base_trademark (
`tm_id` bigint COMMENT '编号',
`tm_name` string COMMENT '品牌名称'
) COMMENT '品牌表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_trademark/';

3.3.12 订单状态表(增量)

hive (gmall)> drop table if exists ods_order_status_log;
create external table ods_order_status_log (
`id` bigint COMMENT '编号',
`order_id` string COMMENT '订单 ID',
`order_status` string COMMENT '订单状态',
`operate_time` string COMMENT '修改时间'
) COMMENT '订单状态表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_order_status_log/';

3.3.13 SPU 商品表(全量)

hive (gmall)> drop table if exists ods_spu_info;
create external table ods_spu_info(
`id` string COMMENT 'spuid',
`spu_name` string COMMENT 'spu 名称',
`category3_id` string COMMENT '品类 id',
`tm_id` string COMMENT '品牌 id'
) COMMENT 'SPU 商品表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_spu_info/';

3.3.14 商品评论表(增量)

hive (gmall)> drop table if exists ods_comment_info;
create external table ods_comment_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 ID',
`sku_id` string COMMENT '商品 sku',
`spu_id` string COMMENT '商品 spu',
`order_id` string COMMENT '订单 ID',
`appraise` string COMMENT '评价',
`create_time` string COMMENT '评价时间'
) COMMENT '商品评论表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_comment_info/';

3.3.15 退单表(增量)

hive (gmall)> drop table if exists ods_order_refund_info;
create external table ods_order_refund_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 ID',
`order_id` string COMMENT '订单 ID',
`sku_id` string COMMENT '商品 ID',
`refund_type` string COMMENT '退款类型',
`refund_num` bigint COMMENT '退款件数',
`refund_amount` decimal(16,2) COMMENT '退款金额',
`refund_reason_type` string COMMENT '退款原因类型',
`create_time` string COMMENT '退款时间'
) COMMENT '退单表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_order_refund_info/';

3.3.16 加购表(全量)

hive (gmall)> drop table if exists ods_cart_info;
create external table ods_cart_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 id',
`sku_id` string COMMENT 'skuid',
`cart_price` string COMMENT '放入购物车时价格',
`sku_num` string COMMENT '数量',
`sku_name` string COMMENT 'sku 名称 (冗余)',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '修改时间',
`is_ordered` string COMMENT '是否已经下单',
`order_time` string COMMENT '下单时间'
) COMMENT '加购表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_cart_info/';

3.3.17 商品收藏表(全量)

hive (gmall)> drop table if exists ods_favor_info;
create external table ods_favor_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 id',
`sku_id` string COMMENT 'skuid',
`spu_id` string COMMENT 'spuid',
`is_cancel` string COMMENT '是否取消',
`create_time` string COMMENT '收藏时间',
`cancel_time` string COMMENT '取消时间'
) COMMENT '商品收藏表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_favor_info/';

3.3.18 优惠券领用表(新增及变化)

hive (gmall)> drop table if exists ods_coupon_use;
create external table ods_coupon_use(
`id` string COMMENT '编号',
`coupon_id` string COMMENT '优惠券 ID',
`user_id` string COMMENT 'skuid',
`order_id` string COMMENT 'spuid',
`coupon_status` string COMMENT '优惠券状态',
`get_time` string COMMENT '领取时间',
`using_time` string COMMENT '使用时间(下单)',
`used_time` string COMMENT '使用时间(支付)'
) COMMENT '优惠券领用表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_coupon_use/';

3.3.19 优惠券表(全量)

hive (gmall)> drop table if exists ods_coupon_info;
create external table ods_coupon_info(
`id` string COMMENT '购物券编号',
`coupon_name` string COMMENT '购物券名称',
`coupon_type` string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
`condition_amount` string COMMENT '满额数',
`condition_num` string COMMENT '满件数',
`activity_id` string COMMENT '活动编号',
`benefit_amount` string COMMENT '减金额',
`benefit_discount` string COMMENT '折扣',
`create_time` string COMMENT '创建时间',
`range_type` string COMMENT '范围类型 1、商品 2、品类 3、品牌',
`spu_id` string COMMENT '商品 id',
`tm_id` string COMMENT '品牌 id',
`category3_id` string COMMENT '品类 id',
`limit_num` string COMMENT '最多领用次数',
`operate_time` string COMMENT '修改时间',
`expire_time` string COMMENT '过期时间'
) COMMENT '优惠券表' PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_coupon_info/';

3.3.20 活动表(全量)

hive (gmall)> drop table if exists ods_activity_info;
create external table ods_activity_info(
`id` string COMMENT '编号',
`activity_name` string COMMENT '活动名称',
`activity_type` string COMMENT '活动类型',
`start_time` string COMMENT '开始时间',
`end_time` string COMMENT '结束时间',
`create_time` string COMMENT '创建时间'
) COMMENT '活动表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_activity_info/';

3.3.21 活动订单关联表(增量)

hive (gmall)> drop table if exists ods_activity_order;
create external table ods_activity_order(
`id` string COMMENT '编号',
`activity_id` string COMMENT '优惠券 ID',
`order_id` string COMMENT 'skuid',
`create_time` string COMMENT '领取时间'
) COMMENT '活动订单关联表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_activity_order/';

3.3.22 优惠规则表(全量)

hive (gmall)> drop table if exists ods_activity_rule;
create external table ods_activity_rule(
`id` string COMMENT '编号',
`activity_id` string COMMENT '活动 ID',
`condition_amount` string COMMENT '满减金额',
`condition_num` string COMMENT '满减件数',
`benefit_amount` string COMMENT '优惠金额',
`benefit_discount` string COMMENT '优惠折扣',
`benefit_level` string COMMENT '优惠级别'
) COMMENT '优惠规则表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_activity_rule/';

3.3.23 编码字典表(全量)

hive (gmall)> drop table if exists ods_base_dic;
create external table ods_base_dic(
`dic_code` string COMMENT '编号',
`dic_name` string COMMENT '编码名称',
`parent_code` string COMMENT '父编码',
`create_time` string COMMENT '创建日期',
`operate_time` string COMMENT '操作日期'
) COMMENT '编码字典表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_dic/';

3.3.24 ODS 层加载数据脚本

1)在/home/dw/bin 目录下创建脚本 hdfs_to_ods_db.sh

[dw@dw1 bin]$ vim hdfs_to_ods_db.sh
#!/bin/bashAPP=gmall
hive=/opt/module/hive/bin/hive# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;thendo_date=$2
elsedo_date=`date -d "-1 day" +%F`
fisql1="
load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table ${APP}.ods_order_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table ${APP}.ods_order_detail partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table ${APP}.ods_sku_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table ${APP}.ods_user_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table ${APP}.ods_payment_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table ${APP}.ods_base_category1 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table ${APP}.ods_base_category2 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table ${APP}.ods_base_category3 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_trademark/$do_date' OVERWRITE into table ${APP}.ods_base_trademark partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/activity_info/$do_date' OVERWRITE into table ${APP}.ods_activity_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/activity_order/$do_date' OVERWRITE into table ${APP}.ods_activity_order partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/cart_info/$do_date' OVERWRITE into table ${APP}.ods_cart_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/comment_info/$do_date' OVERWRITE into table ${APP}.ods_comment_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/coupon_info/$do_date' OVERWRITE into table ${APP}.ods_coupon_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/coupon_use/$do_date' OVERWRITE into table ${APP}.ods_coupon_use partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/favor_info/$do_date' OVERWRITE into table ${APP}.ods_favor_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_refund_info/$do_date' OVERWRITE into table ${APP}.ods_order_refund_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_status_log/$do_date' OVERWRITE into table ${APP}.ods_order_status_log partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/spu_info/$do_date' OVERWRITE into table ${APP}.ods_spu_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/activity_rule/$do_date' OVERWRITE into table ${APP}.ods_activity_rule partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_dic/$do_date' OVERWRITE into table ${APP}.ods_base_dic partition(dt='$do_date');
"sql2="
load data inpath '/origin_data/$APP/db/base_province/$do_date' OVERWRITE into table ${APP}.ods_base_province;
load data inpath '/origin_data/$APP/db/base_region/$do_date' OVERWRITE into table ${APP}.ods_base_region;
"case $1 in
"first"){$hive -e "$sql1"$hive -e "$sql2"
};;
"all"){$hive -e "$sql1"
};;
esac

2)修改权限

[dw@dw1 bin]$ chmod 777 hdfs_to_ods_db.sh

3)初次导入

[dw@dw1 bin]$ hdfs_to_ods_db.sh first 2020-03-10

4)每日导入

[dw@dw1 bin]$ hdfs_to_ods_db.sh all 2020-03-11

5)测试数据是否导入成功

hive (gmall)> select * from ods_order_detail where dt='2020-03-11';

总结:




第 4 章 数仓搭建-DWD 层

4.1 DWD 层(用户行为启动表数据解析)

4.1.1 创建启动表

1)建表语句 (压缩lzo加上列式存储parquet)

hive (gmall)> drop table if exists dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`open_ad_type` string,
`action` string,
`loading_time` string,
`detail` string,
`extend1` string
)PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_start_log/'
TBLPROPERTIES('parquet.compression'='lzo');

说明:数据采用 parquet 存储方式,是可以支持切片的,不需要再对数据创建索引。

4.1.2 get_json_object 函数使用

1)输入数据 xjson

Xjson=[{"name":" 大 郎 ","sex":" 男 ","age":"25"},{"name":" 西 门 庆 ","sex":" 男 ","age":"47"}]

2)取出第一个 json 对象

SELECT get_json_object(xjson,"$.[0]") FROM person;
结果是:{"name":"大郎","sex":"男","age":"25"}

3)取出第一个 json 的 age 字段的值

SELECT get_json_object(xjson,"$.[0].age") FROM person;
结果是:25

4.1.3 向启动表导入数据

hive (gmall)>
insert overwrite table dwd_start_log
PARTITION (dt='2020-03-10')
select get_json_object(line,'$.mid') mid_id,
get_json_object(line,'$.uid') user_id,
get_json_object(line,'$.vc') version_code,
get_json_object(line,'$.vn') version_name,
get_json_object(line,'$.l') lang,
get_json_object(line,'$.sr') source,
get_json_object(line,'$.os') os,
get_json_object(line,'$.ar') area,
get_json_object(line,'$.md') model,
get_json_object(line,'$.ba') brand,
get_json_object(line,'$.sv') sdk_version,
get_json_object(line,'$.g') gmail,
get_json_object(line,'$.hw') height_width,
get_json_object(line,'$.t') app_time,
get_json_object(line,'$.nw') network,
get_json_object(line,'$.ln') lng,
get_json_object(line,'$.la') lat,
get_json_object(line,'$.entry') entry,
get_json_object(line,'$.open_ad_type') open_ad_type,
get_json_object(line,'$.action') action,
get_json_object(line,'$.loading_time') loading_time,
get_json_object(line,'$.detail') detail,
get_json_object(line,'$.extend1') extend1
from ods_start_log
where dt='2020-03-10';

3)测试

hive (gmall)> select * from dwd_start_log where dt='2020-03-10' limit 2;

4.1.4 DWD 层启动表加载数据脚本

1)在 dw1 的/home/dw/bin 目录下创建脚本

[dw@dw1 bin]$ vim ods_to_dwd_log.sh
#!/bin/bashAPP=gmall
hive=/opt/module/hive/bin/hiveif [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fisql="
insert overwrite table ${APP}.dwd_start_log
PARTITION (dt='$do_date')
select get_json_object(line,'$.mid') mid_id,
get_json_object(line,'$.uid') user_id,
get_json_object(line,'$.vc') version_code,
get_json_object(line,'$.vn') version_name,
get_json_object(line,'$.l') lang,
get_json_object(line,'$.sr') source,
get_json_object(line,'$.os') os,
get_json_object(line,'$.ar') area,
get_json_object(line,'$.md') model,
get_json_object(line,'$.ba') brand,
get_json_object(line,'$.sv') sdk_version,
get_json_object(line,'$.g') gmail,
get_json_object(line,'$.hw') height_width,
get_json_object(line,'$.t') app_time,
get_json_object(line,'$.nw') network,
get_json_object(line,'$.ln') lng,
get_json_object(line,'$.la') lat,
get_json_object(line,'$.entry') entry,
get_json_object(line,'$.open_ad_type') open_ad_type,
get_json_object(line,'$.action') action,
get_json_object(line,'$.loading_time') loading_time,
get_json_object(line,'$.detail') detail,
get_json_object(line,'$.extend1') extend1
from ${APP}.ods_start_log
where dt='$do_date';
"
$hive -e "$sql"

2)增加脚本执行权限

[dw@dw1 bin]$ chmod 777 ods_to_dwd_log.sh

3)脚本使用

[dw@dw1 bin]$ ods_to_dwd_log.sh 2020-03-11

4)查询导入结果

hive (gmall)> select * from dwd_start_log where dt='2020-03-11' limit 2;

5)脚本执行时间 企业开发中一般在每日凌晨 30 分~1 点

4.2 DWD 层(用户行为事件表数据解析) ****


4.2.1 创建基础明细表

明细表用于存储 ODS 层原始表转换过来的明细数据。

1)创建事件日志基础明细表

hive (gmall)> drop table if exists dwd_base_event_log;
CREATE EXTERNAL TABLE dwd_base_event_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`event_name` string,
`event_json` string,
`server_time` string)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_base_event_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)说明:其中 event_name 和 event_json 用来对应事件名和整个事件。
这个地方将原始日志 1 对多的形式拆分出来了。
操作的时候我们需要将原始日志展平,需要用到 UDF 和 UDTF。

4.2.2 自定义 UDF 函数(解析公共字段)

UDF 函数特点:一行进一行出。简称,一进一出。

1)创建一个 maven 工程:hivefunction
2)创建包名:com.atguigu.udf
3)在 pom.xml 文件中添加如下内容

<properties><hive.version>2.3.0</hive.version>
</properties>
<dependencies><!--添加 hive 依赖--><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version></dependency>
</dependencies>
<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

注意 1:如果 hive 的 jar 包下载失败,可以将如下参数配置添加到 idea 中

-Dmaven.wagon.http.ssl.insecure=true -Dmaven.wagon.http.ssl.allowall=true
-Dmaven.wagon.http.ssl.ignore.validity.dates=true


注意 2:如果提示 pentaho-aggdesigner-algorithm.jar 包下载失败,需要在 maven 的 pom 中增 加如下仓库

<repositories><repository><id>spring-plugin</id><url>https://repo.spring.io/plugins-release/</url></repository>
</repositories>



4)UDF 用于解析公共字段

package com.atguigu.udf;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONObject;public class BaseFieldUDF extends UDF {public String evaluate(String line,String key){// 切割String[] log = line.split("\\|");//合法性判断if (log.length !=2 || StringUtils.isBlank(log[1].trim())){return "";}JSONObject json = new JSONObject(log[1].trim());String result = "";//根据传进来的key取值if ("st".equals(key)){//返回服务器时间result = log[0].trim();}else if ("et".equals(key)){if (json.has("et")){result = json.getString("et");}}else {//获取CM对应的value值JSONObject cm = json.getJSONObject("cm");if (cm.has(key)){result = cm.getString(key);}}return result;}public static void main(String[] args) {String line = "1583856085576|{\"cm\":{\"ln\":\"-45.7\",\"sv\":\"V2.9.4\",\"os\":\"8.0.6\",\"g\":\"8B3SSQ73@gmail.com\",\"mid\":\"0\",\"nw\":\"WIFI\",\"l\":\"es\",\"vc\":\"15\",\"hw\":\"1080*1920\",\"ar\":\"MX\",\"uid\":\"0\",\"t\":\"1583848155919\",\"la\":\"9.9\",\"md\":\"sumsung-5\",\"vn\":\"1.2.9\",\"ba\":\"Sumsung\",\"sr\":\"F\"},\"ap\":\"app\",\"et\":[{\"ett\":\"1583816572291\",\"en\":\"active_background\",\"kv\":{\"active_source\":\"2\"}},{\"ett\":\"1583777284105\",\"en\":\"error\",\"kv\":{\"errorDetail\":\"java.lang.NullPointerException\\\\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound\",\"errorBrief\":\"at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)\"}},{\"ett\":\"1583825047680\",\"en\":\"comment\",\"kv\":{\"p_comment_id\":0,\"addtime\":\"1583838058208\",\"praise_count\":924,\"other_id\":6,\"comment_id\":5,\"reply_count\":143,\"userid\":7,\"content\":\"樟给堕嘶炸壁茫腹抚孤瘤\"}},{\"ett\":\"1583817223097\",\"en\":\"favorites\",\"kv\":{\"course_id\":0,\"id\":0,\"add_time\":\"1583814567167\",\"userid\":5}}]}";String result = new BaseFieldUDF().evaluate(line, "et");System.out.println(result);}
}

4.2.3 自定义 UDTF 函数(解析事件字段)

UDTF 函数特点:多行进多行出。 简称,多进多出。

1)创建包名:com.atguigu.udtf
2)在 com.atguigu.udtf 包下创建类名:EventJsonUDTF
3)用于展开业务字段

package com.atguigu.udf;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;import java.util.ArrayList;
import java.util.List;public class EventJsonUDTF extends GenericUDTF {public StructObjectInspector initialize(StructObjectInspector argOIs){// 定义UDTF返回值类型与名称List<String> fieldName = new ArrayList<>();List<ObjectInspector> fieldType  =  new ArrayList<>();fieldName.add("event_name");fieldName.add("event_json");fieldType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);return ObjectInspectorFactory.getStandardStructObjectInspector(fieldName,fieldType);}@Overridepublic void process(Object[] objects) throws HiveException {//传入的是json array => udf传入etString input = objects[0].toString();//合法校验if (StringUtils.isBlank(input)){return;}else {JSONArray jsonArray = new JSONArray(input);if (jsonArray==null){return;}//循环遍历array当中的元素,封装成返回的事件名称和事件内容for (int i = 0; i < jsonArray.length(); i++) {String[] result = new String[2];try {//{"ett":"1583730574374","en":"ad","kv":{"activityId":"1","displayMills":"15926","entry":"1","action":"4","contentType":"0"}}result[0] = jsonArray.getJSONObject(i).getString("en");result[1] = jsonArray.getString(i);}catch (JSONException e){continue;}//写出forward(result);}}}@Overridepublic void close() throws HiveException {}
}

2)打包

3)将 hivefunction-1.0-SNAPSHOT.jar 上传到 dw1的/opt/module,然后再将该 jar 包上 传到 HDFS 的/user/hive/jars 路径下

[dw@dw1 module]$ hadoop fs -mkdir -p /user/hive/jars
[dw@dw1 module]$ hadoop fs -put hivefunction-1.0-SNAPSHOT.jar /user/hive/jars

4)创建永久函数与开发好的 java class 关联

hive (gmall)>
create function base_analizer as 'com.atguigu.udf.BaseFieldUDF' using jar 'hdfs://dw1:9000/user/hive/jars/hivefunction-1.0-SNAPSHOT.jar';
create function flat_analizer as 'com.atguigu.udtf.EventJsonUDTF' using jar 'hdfs://dw1:9000/user/hive/jars/hivefunction-1.0-SNAPSHOT.jar';

5)注意:如果修改了自定义函数重新生成 jar 包怎么处理?只需要替换 HDFS 路径上的旧 jar 包,然后重启 Hive 客户端即可。

4.2.4 解析事件日志基础明细表

1)解析事件日志基础明细表

hive (gmall)> insert overwrite table dwd_base_event_log
partition(dt='2020-03-10')
select
base_analizer(line,'mid') as mid_id,
base_analizer(line,'uid') as user_id,
base_analizer(line,'vc') as version_code,
base_analizer(line,'vn') as version_name,
base_analizer(line,'l') as lang,
base_analizer(line,'sr') as source,
base_analizer(line,'os') as os,
base_analizer(line,'ar') as area,
base_analizer(line,'md') as model,
base_analizer(line,'ba') as brand,
base_analizer(line,'sv') as sdk_version,
base_analizer(line,'g') as gmail,
base_analizer(line,'hw') as height_width,
base_analizer(line,'t') as app_time,
base_analizer(line,'nw') as network,
base_analizer(line,'ln') as lng,
base_analizer(line,'la') as lat,
event_name,
event_json,
base_analizer(line,'st') as server_time
from ods_event_log lateral view flat_analizer(base_analizer(line,'et'))
tmp_flat as event_name,event_json
where dt='2020-03-10' and base_analizer(line,'et')<>'';

2)测试

hive (gmall)> select * from dwd_base_event_log where dt='2020-03-10' limit 2;

4.2.5 DWD 层数据解析脚本

1)在 dw1 的/home/dw/bin 目录下创建脚本

[dw@dw1 bin]$ vim ods_to_dwd_base_log.sh
#!/bin/bashAPP=gmall
hive=/opt/module/hive/bin/hiveif [ -n "$1" ]; thendo_date=$1
elsedo_date=`date -d '-1 day' +%F`
fisql="
use gmall;
insert overwrite table ${APP}.dwd_base_event_log
partition(dt='$do_date')
select
base_analizer(line,'mid') as mid_id,
base_analizer(line,'uid') as user_id,
base_analizer(line,'vc') as version_code,
base_analizer(line,'vn') as version_name,
base_analizer(line,'l') as lang,
base_analizer(line,'sr') as source,
base_analizer(line,'os') as os,
base_analizer(line,'ar') as area,
base_analizer(line,'md') as model,
base_analizer(line,'ba') as brand,
base_analizer(line,'sv') as sdk_version,
base_analizer(line,'g') as gmail,
base_analizer(line,'hw') as height_width,
base_analizer(line,'t') as app_time,
base_analizer(line,'nw') as network,
base_analizer(line,'ln') as lng,
base_analizer(line,'la') as lat,
event_name,
event_json,
base_analizer(line,'st') as server_time
from ods_event_log lateral view flat_analizer(base_analizer(line,'et'))
tmp_flat as event_name,event_json
where dt='$do_date' and base_analizer(line,'et')<>'';
"$hive -e "$sql"

注意:使用自定义函数时,需要在执行脚本前,增加上要使用的库。例如:use gmall;
2)增加脚本执行权限

[dw@dw1 bin]$ chmod 777 ods_to_dwd_base_log.sh

3)脚本使用

[dw@dw1 module]$ ods_to_dwd_base_log.sh 2020-03-11

4)查询导入结果

hive (gmall)> select * from dwd_base_event_log where dt='2020-03-11' limit 2;

5)脚本执行时间 企业开发中一般在每日凌晨 30 分~1 点

4.3 DWD 层(用户行为事件表获取)

4.3.1 商品点击表


1)建表语句

hive (gmall)> drop table if exists dwd_display_log;
CREATE EXTERNAL TABLE dwd_display_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`goodsid` string,
`place` string,
`extend1` string,
`category` string,
`server_time` string
)PARTITIONED BY (dt string) stored as parquet
location '/warehouse/gmall/dwd/dwd_display_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)导入数据

hive (gmall)>insert overwrite table dwd_display_log
PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.goodsid') goodsid,
get_json_object(event_json,'$.kv.place') place,
get_json_object(event_json,'$.kv.extend1') extend1,
get_json_object(event_json,'$.kv.category') category,
server_time
from dwd_base_event_log where dt='2020-03-10' and event_name='display';

3)测试

hive (gmall)> select * from dwd_display_log where dt='2020-03-10' limit 2;

4.3.2 商品详情页表

1)建表语句

hive (gmall)> drop table if exists dwd_newsdetail_log;
CREATE EXTERNAL TABLE dwd_newsdetail_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`action` string,
`goodsid` string,
`showtype` string,
`news_staytime` string,
`loading_time` string,
`type1` string,
`category` string,
`server_time` string
) PARTITIONED BY (dt string) stored as parquet
location '/warehouse/gmall/dwd/dwd_newsdetail_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)导入数据

hive (gmall)> insert overwrite table dwd_newsdetail_log
PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.entry') entry,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.goodsid') goodsid,
get_json_object(event_json,'$.kv.showtype') showtype,
get_json_object(event_json,'$.kv.news_staytime') news_staytime,
get_json_object(event_json,'$.kv.loading_time') loading_time,
get_json_object(event_json,'$.kv.type1') type1,
get_json_object(event_json,'$.kv.category') category,
server_time
from dwd_base_event_log where dt='2020-03-10' and event_name='newsdetail';

3)测试

hive (gmall)> select * from dwd_newsdetail_log where dt='2020-03-10' limit 2;

4.3.3 商品列表页表

1)建表语句

hive (gmall)> drop table if exists dwd_loading_log;
CREATE EXTERNAL TABLE dwd_loading_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`loading_time` string,
`loading_way` string,
`extend1` string,
`extend2` string,
`type` string,
`type1` string,
`server_time` string
) PARTITIONED BY (dt string) stored as parquet
location '/warehouse/gmall/dwd/dwd_loading_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)导入数据

hive (gmall)> insert overwrite table dwd_loading_log
PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.loading_time') loading_time,
get_json_object(event_json,'$.kv.loading_way') loading_way,
get_json_object(event_json,'$.kv.extend1') extend1,
get_json_object(event_json,'$.kv.extend2') extend2,
get_json_object(event_json,'$.kv.type') type,
get_json_object(event_json,'$.kv.type1') type1,
server_time
from dwd_base_event_log where dt='2020-03-10' and event_name='loading';

3)测试

hive (gmall)> select * from dwd_loading_log where dt='2020-03-10' limit 2;

4.3.4 广告表

1)建表语句

hive (gmall)> drop table if exists dwd_ad_log;
CREATE EXTERNAL TABLE dwd_ad_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`action` string,
`contentType` string,
`displayMills` string,
`itemId` string,
`activityId` string,
`server_time` string
) PARTITIONED BY (dt string) stored as parquet
location '/warehouse/gmall/dwd/dwd_ad_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)导入数据

hive (gmall)> insert overwrite table dwd_ad_log
PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.entry') entry,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.contentType') contentType,
get_json_object(event_json,'$.kv.displayMills') displayMills,
get_json_object(event_json,'$.kv.itemId') itemId,
get_json_object(event_json,'$.kv.activityId') activityId,
server_time
from dwd_base_event_log where dt='2020-03-10' and event_name='ad';

3)测试

hive (gmall)> select * from dwd_ad_log where dt='2020-03-10' limit 2;

4.3.5 消息通知表

1)建表语句

hive (gmall)> drop table if exists dwd_notification_log;
CREATE EXTERNAL TABLE dwd_notification_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`noti_type` string,
`ap_time` string,
`content` string,
`server_time` string
) PARTITIONED BY (dt string) stored as parquet
location '/warehouse/gmall/dwd/dwd_notification_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)导入数据

hive (gmall)> insert overwrite table dwd_notification_log
PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.noti_type') noti_type,
get_json_object(event_json,'$.kv.ap_time') ap_time,
get_json_object(event_json,'$.kv.content') content,
server_time
from dwd_base_event_log where dt='2020-03-10' and event_name='notification';

3)测试

hive (gmall)> select * from dwd_notification_log where dt='2020-03-10' limit 2;

4.3.6 用户后台活跃表

1)建表语句

hive (gmall)> drop table if exists dwd_active_background_log;
CREATE EXTERNAL TABLE dwd_active_background_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`active_source` string,
`server_time` string
) PARTITIONED BY (dt string) stored as parquet
location '/warehouse/gmall/dwd/dwd_background_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)导入数据

hive (gmall)> insert overwrite table dwd_active_background_log
PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.active_source') active_source,
server_time
from dwd_base_event_log where dt='2020-03-10' and event_name='active_background';

3)测试

hive (gmall)> select * from dwd_active_background_log where dt='2020-03-10' limit 2;

4.3.7 评论表

1)建表语句

hive (gmall)> drop table if exists dwd_comment_log;
CREATE EXTERNAL TABLE dwd_comment_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`comment_id` int,
`userid` int,
`p_comment_id` int,
`content` string,
`addtime` string,
`other_id` int,
`praise_count` int,
`reply_count` int,
`server_time` string
)PARTITIONED BY (dt string) stored as parquet
location '/warehouse/gmall/dwd/dwd_comment_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)导入数据

hive (gmall)> insert overwrite table dwd_comment_log
PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.comment_id') comment_id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
get_json_object(event_json,'$.kv.content') content,
get_json_object(event_json,'$.kv.addtime') addtime,
get_json_object(event_json,'$.kv.other_id') other_id,
get_json_object(event_json,'$.kv.praise_count') praise_count,
get_json_object(event_json,'$.kv.reply_count') reply_count,
server_time
from dwd_base_event_log where dt='2020-03-10' and event_name='comment';

3)测试

hive (gmall)> select * from dwd_comment_log where dt='2020-03-10' limit 2;

4.3.8 收藏表

1)建表语句

hive (gmall)> drop table if exists dwd_favorites_log;
CREATE EXTERNAL TABLE dwd_favorites_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` int,
`course_id` int,
`userid` int,
`add_time` string,
`server_time` string
)PARTITIONED BY (dt string) stored as parquet
location '/warehouse/gmall/dwd/dwd_favorites_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)导入数据

hive (gmall)> insert overwrite table dwd_favorites_log
PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.course_id') course_id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time
from dwd_base_event_log where dt='2020-03-10' and event_name='favorites';

3)测试

hive (gmall)> select * from dwd_favorites_log where dt='2020-03-10' limit 2;

4.3.9 点赞表

1)建表语句

hive (gmall)> drop table if exists dwd_praise_log;
CREATE EXTERNAL TABLE dwd_praise_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` string,
`userid` string,
`target_id` string,
`type` string,
`add_time` string,
`server_time` string
)PARTITIONED BY (dt string) stored as parquet
location '/warehouse/gmall/dwd/dwd_praise_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)导入数据

hive (gmall)> insert overwrite table dwd_praise_log
PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.target_id') target_id,
get_json_object(event_json,'$.kv.type') type,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time
from dwd_base_event_log where dt='2020-03-10' and event_name='praise';

3)测试

hive (gmall)> select * from dwd_praise_log where dt='2020-03-10' limit 2;

4.3.10 错误日志表

1)建表语句

hive (gmall)> drop table if exists dwd_error_log;
CREATE EXTERNAL TABLE dwd_error_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`errorBrief` string,
`errorDetail` string,
`server_time` string
) PARTITIONED BY (dt string) stored as parquet
location '/warehouse/gmall/dwd/dwd_error_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)导入数据

hive (gmall)> insert overwrite table dwd_error_log
PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.errorBrief') errorBrief,
get_json_object(event_json,'$.kv.errorDetail') errorDetail,
server_time
from dwd_base_event_log where dt='2020-03-10' and event_name='error';

3)测试

hive (gmall)> select * from dwd_error_log where dt='2020-03-10' limit 2;

4.3.11 DWD 层事件表加载数据脚本

1)在 dw1的/home/dw/bin 目录下创建脚本

[dw@dw1 bin]$ vim ods_to_dwd_event_log.sh
#!/bin/bashAPP=gmall
hive=/opt/module/hive/bin/hiveif [ -n "$1" ]; thendo_date=$1
elsedo_date=`date -d '-1 day' +%F`
fisql="
insert overwrite table ${APP}.dwd_display_log
PARTITION (dt='$do_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.goodsid') goodsid,
get_json_object(event_json,'$.kv.place') place,
get_json_object(event_json,'$.kv.extend1') extend1,
get_json_object(event_json,'$.kv.category') category,
server_time
from ${APP}.dwd_base_event_log where dt='$do_date' and event_name='display';insert overwrite table ${APP}.dwd_newsdetail_log
PARTITION (dt='$do_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.entry') entry,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.goodsid') goodsid,
get_json_object(event_json,'$.kv.showtype') showtype,
get_json_object(event_json,'$.kv.news_staytime') news_staytime,
get_json_object(event_json,'$.kv.loading_time') loading_time,
get_json_object(event_json,'$.kv.type1') type1,
get_json_object(event_json,'$.kv.category') category,
server_time
from ${APP}.dwd_base_event_log where dt='$do_date' and event_name='newsdetail';insert overwrite table ${APP}.dwd_loading_log
PARTITION (dt='$do_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.loading_time') loading_time,
get_json_object(event_json,'$.kv.loading_way') loading_way,
get_json_object(event_json,'$.kv.extend1') extend1,
get_json_object(event_json,'$.kv.extend2') extend2,
get_json_object(event_json,'$.kv.type') type,
get_json_object(event_json,'$.kv.type1') type1,
server_time
from ${APP}.dwd_base_event_log where dt='$do_date' and event_name='loading';insert overwrite table ${APP}.dwd_ad_log
PARTITION (dt='$do_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.entry') entry,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.contentType') contentType,
get_json_object(event_json,'$.kv.displayMills') displayMills,
get_json_object(event_json,'$.kv.itemId') itemId,
get_json_object(event_json,'$.kv.activityId') activityId,
server_time
from ${APP}.dwd_base_event_log where dt='$do_date' and event_name='ad';insert overwrite table ${APP}.dwd_notification_log
PARTITION (dt='$do_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.noti_type') noti_type,
get_json_object(event_json,'$.kv.ap_time') ap_time,
get_json_object(event_json,'$.kv.content') content,
server_time
from ${APP}.dwd_base_event_log where dt='$do_date' and event_name='notification';insert overwrite table ${APP}.dwd_active_background_log
PARTITION (dt='$do_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.active_source') active_source,
server_time
from ${APP}.dwd_base_event_log where dt='$do_date' and event_name='active_background';insert overwrite table ${APP}.dwd_comment_log
PARTITION (dt='$do_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.comment_id') comment_id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
get_json_object(event_json,'$.kv.content') content,
get_json_object(event_json,'$.kv.addtime') addtime,
get_json_object(event_json,'$.kv.other_id') other_id,
get_json_object(event_json,'$.kv.praise_count') praise_count,
get_json_object(event_json,'$.kv.reply_count') reply_count,
server_time
from ${APP}.dwd_base_event_log where dt='$do_date' and event_name='comment';insert overwrite table ${APP}.dwd_favorites_log
PARTITION (dt='$do_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.course_id') course_id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time
from ${APP}.dwd_base_event_log where dt='$do_date' and event_name='favorites';insert overwrite table ${APP}.dwd_praise_log
PARTITION (dt='$do_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.target_id') target_id,
get_json_object(event_json,'$.kv.type') type,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time
from ${APP}.dwd_base_event_log where dt='$do_date' and event_name='praise';insert overwrite table ${APP}.dwd_error_log
PARTITION (dt='$do_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.errorBrief') errorBrief,
get_json_object(event_json,'$.kv.errorDetail') errorDetail,
server_time
from ${APP}.dwd_base_event_log where dt='$do_date' and event_name='error';"$hive -e "$sql"

2)增加脚本执行权限

[dw@dw1 bin]$ chmod 777 ods_to_dwd_event_log.sh

3)脚本使用

[dw@dw1  module]$ ods_to_dwd_event_log.sh 2020-03-11

4)查询导入结果

hive (gmall)> select * from dwd_comment_log where dt='2020-03-11' limit 2;

5)脚本执行时间 企业开发中一般在每日凌晨 30 分~1 点

4.4 DWD 层(业务数据)

4.4.1 商品维度表(全量表)


1)建表语句

hive (gmall)> DROP TABLE IF EXISTS `dwd_dim_sku_info`;
CREATE EXTERNAL TABLE `dwd_dim_sku_info` (
`id` string COMMENT '商品 id',
`spu_id` string COMMENT 'spuid',
`price` double COMMENT '商品价格',
`sku_name` string COMMENT '商品名称',
`sku_desc` string COMMENT '商品描述',
`weight` double COMMENT '重量',
`tm_id` string COMMENT '品牌 id',
`tm_name` string COMMENT '品牌名称',
`category3_id` string COMMENT '三级分类 id',
`category2_id` string COMMENT '二级分类 id',
`category1_id` string COMMENT '一级分类 id',
`category3_name` string COMMENT '三级分类名称',
`category2_name` string COMMENT '二级分类名称',
`category1_name` string COMMENT '一级分类名称',
`spu_name` string COMMENT 'spu 名称',
`create_time` string COMMENT '创建时间'
)COMMENT '商品维度表'
PARTITIONED BY (`dt` string) stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_sku_info/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)> insert overwrite table dwd_dim_sku_info
partition(dt='2020-03-10')
select
sku.id,
sku.spu_id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.tm_id,
bt.tm_name,
bc3.id category3_id,
bc2.id category2_id,
bc1.id category1_id,
bc3.name category3_name,
bc2.name category2_name,
bc1.name category1_name,
spu.spu_name,
sku.create_time
from
(
select
*
from ods_sku_info
where dt='2020-03-10'
)
sku join
(
select
*
from ods_base_trademark
where dt='2020-03-10'
)bt on sku.tm_id = bt.tm_id
join
(
select
*
from ods_spu_info
where dt='2020-03-10'
)spu on spu.id = sku.spu_id
join
(
select
*
from ods_base_category3
where dt='2020-03-10'
)bc3 on sku.category3_id=bc3.id
join
(
select
*
from ods_base_category2
where dt='2020-03-10'
)bc2 on bc3.category2_id=bc2.id
join (
select
*
from ods_base_category1
where dt='2020-03-10'
)bc1 on bc2.category1_id=bc1.id;

3)查询加载结果

hive (gmall)> select * from dwd_dim_sku_info where dt='2020-03-10' limit 2;

4.4.2 优惠券信息表(全量)

把 ODS 层 ods_coupon_info 表数据导入到 DWD 层优惠卷信息表,在导入过程中可以做 适当的清洗
1)建表语句

hive (gmall)> drop table if exists dwd_dim_coupon_info;
create external table dwd_dim_coupon_info(
`id` string COMMENT '购物券编号',
`coupon_name` string COMMENT '购物券名称',
`coupon_type` string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
`condition_amount` string COMMENT '满额数',
`condition_num` string COMMENT '满件数',
`activity_id` string COMMENT '活动编号',
`benefit_amount` string COMMENT '减金额',
`benefit_discount` string COMMENT '折扣',
`create_time` string COMMENT '创建时间',
`range_type` string COMMENT '范围类型 1、商品 2、品类 3、品牌',
`spu_id` string COMMENT '商品 id',
`tm_id` string COMMENT '品牌 id',
`category3_id` string COMMENT '品类 id',
`limit_num` string COMMENT '最多领用次数',
`operate_time` string COMMENT '修改时间',
`expire_time` string COMMENT '过期时间'
) COMMENT '优惠券信息表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_coupon_info/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)> insert overwrite table dwd_dim_coupon_info
partition(dt='2020-03-10')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id, tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from ods_coupon_info
where dt='2020-03-10';

3)查询加载结果

hive (gmall)> select * from dwd_dim_coupon_info where dt='2020-03-10';

4.4.3 活动维度表(全量)


1)建表语句

hive (gmall)> drop table if exists dwd_dim_activity_info;
create external table dwd_dim_activity_info(
`id` string COMMENT '编号',
`activity_name` string COMMENT '活动名称',
`activity_type` string COMMENT '活动类型',
`condition_amount` string COMMENT '满减金额',
`condition_num` string COMMENT '满减件数',
`benefit_amount` string COMMENT '优惠金额',
`benefit_discount` string COMMENT '优惠折扣',
`benefit_level` string COMMENT '优惠级别',
`start_time` string COMMENT '开始时间',
`end_time` string COMMENT '结束时间',
`create_time` string COMMENT '创建时间'
) COMMENT '活动信息表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_activity_info/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)> insert overwrite table dwd_dim_activity_info
partition(dt='2020-03-10')
select
info.id,
info.activity_name,
info.activity_type,
rule.condition_amount,
rule.condition_num,
rule.benefit_amount,
rule.benefit_discount,
rule.benefit_level,
info.start_time,
info.end_time,
info.create_time from
(
select
*
from
ods_activity_info
where dt='2020-03-10'
)info
left join
(
select
*
from
ods_activity_rule
where dt='2020-03-10'
)rule on info.id = rule.activity_id;

3)查询加载结果

hive (gmall)> select * from dwd_dim_activity_info where dt='2020-03-10';

4.4.4 地区维度表(特殊)


1)建表语句

hive (gmall)> DROP TABLE IF EXISTS `dwd_dim_base_province`;
CREATE EXTERNAL TABLE `dwd_dim_base_province` (
`id` string COMMENT 'id',
`province_name` string COMMENT '省市名称',
`area_code` string COMMENT '地区编码',
`iso_code` string COMMENT 'ISO 编码',
`region_id` string COMMENT '地区 id',
`region_name` string COMMENT '地区名称'
)COMMENT '地区省市表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_base_province/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)> insert overwrite table dwd_dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
from(
select
*
from
ods_base_province
) bp
join
(
select
*
from
ods_base_region
) br
on bp.region_id=br.id;

3)查询加载结果

hive (gmall)> select * from dwd_dim_base_province;

4.4.5 时间维度表(特殊)(预留)

1)建表语句

hive (gmall)> DROP TABLE IF EXISTS `dwd_dim_date_info`;
CREATE EXTERNAL TABLE `dwd_dim_date_info`(
`date_id` string COMMENT '日',
`week_id` int COMMENT '周',
`week_day` int COMMENT '周的第几天',
`day` int COMMENT '每月的第几天',
`month` int COMMENT '第几月',
`quarter` int COMMENT '第几季度',
`year` int COMMENT '年',
`is_workday` int COMMENT '是否是周末',
`holiday_id` int COMMENT '是否是节假日'
)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_dim_date_info/';

2)把 date_info.txt 文件上传到 dw1 的/opt/module/db_log/路径
3)数据装载

hive (gmall)> load data local inpath '/opt/module/db_log/date_info.txt' into table dwd_dim_date_info;

4)查询加载结果

hive (gmall)> select * from dwd_dim_date_info;

4.4.6 订单明细事实表(事务型快照事实表)



1)建表语句

hive (gmall)> drop table if exists dwd_fact_order_detail;
create external table dwd_fact_order_detail (
`id` string COMMENT '订单编号',
`order_id` string COMMENT '订单号',
`user_id` string COMMENT '用户 id',
`sku_id` string COMMENT 'sku 商品 id',
`sku_name` string COMMENT '商品名称',
`order_price` decimal(10,2) COMMENT '商品价格',
`sku_num` bigint COMMENT '商品数量',
`create_time` string COMMENT '创建时间',
`province_id` string COMMENT '省份 ID',
`total_amount` decimal(20,2) COMMENT '订单总金额'
)PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_detail/'
tblproperties ("parquet.compression"="lzo");

2)数据装载 (ods_order_detail关联省份订单总金额)

hive (gmall)> insert overwrite table dwd_fact_order_detail
partition(dt='2020-03-10')
select od.id,
od.order_id,
od.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time,
oi.province_id,
od.order_price*od.sku_num
from (
select
*
from
ods_order_detail where dt='2020-03-10'
) od
join (
select
*
from
ods_order_info
where dt='2020-03-10'
) oi
on od.order_id=oi.id;

3)查询加载结果

hive (gmall)> select * from dwd_fact_order_detail where dt='2020-03-10';

4.4.7 支付事实表(事务型快照事实表)



1)建表语句

hive (gmall)> drop table if exists dwd_fact_payment_info;
create external table dwd_fact_payment_info (
`id` string COMMENT '',
`out_trade_no` string COMMENT '对外业务编号',
`order_id` string COMMENT '订单编号',
`user_id` string COMMENT '用户编号',
`alipay_trade_no` string COMMENT '支付宝交易流水编号',
`payment_amount` decimal(16,2) COMMENT '支付金额',
`subject` string COMMENT '交易内容',
`payment_type` string COMMENT '支付类型',
`payment_time` string COMMENT '支付时间',
`province_id` string COMMENT '省份 ID'
)PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_payment_info/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)> insert overwrite table dwd_fact_payment_info
partition(dt='2020-03-10')
select
pi.id,
pi.out_trade_no,
pi.order_id,
pi.user_id,
pi.alipay_trade_no,
pi.total_amount,
pi.subject,
pi.payment_type,
pi.payment_time,
oi.province_id
from (
select
*
from ods_payment_info
where dt='2020-03-10'
)pi
join (
select
id,
province_id
from
ods_order_info
where dt='2020-03-10'
)oi
on pi.order_id = oi.id;

3)查询加载结果

hive (gmall)> select * from dwd_fact_payment_info where dt='2020-03-10';

4.4.8 退款事实表(事务型快照事实表)


1)建表语句

hive (gmall)> drop table if exists dwd_fact_order_refund_info;
create external table dwd_fact_order_refund_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 ID',
`order_id` string COMMENT '订单 ID',
`sku_id` string COMMENT '商品 ID',
`refund_type` string COMMENT '退款类型',
`refund_num` bigint COMMENT '退款件数',
`refund_amount` decimal(16,2) COMMENT '退款金额',
`refund_reason_type` string COMMENT '退款原因类型',
`create_time` string COMMENT '退款时间'
) COMMENT '退款事实表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_fact_order_refund_info/';

2)数据装载

hive (gmall)> insert overwrite table dwd_fact_order_refund_info
partition(dt='2020-03-10')
select id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from ods_order_refund_info
where dt='2020-03-10';

3)查询加载结果

hive (gmall)> select * from dwd_fact_order_refund_info where dt='2020-03-10';

4.4.9 评价事实表(事务型快照事实表)


1)建表语句

hive (gmall)> drop table if exists dwd_fact_comment_info;
create external table dwd_fact_comment_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 ID',
`sku_id` string COMMENT '商品 sku',
`spu_id` string COMMENT '商品 spu',
`order_id` string COMMENT '订单 ID',
`appraise` string COMMENT '评价',
`create_time` string COMMENT '评价时间'
) COMMENT '评价事实表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_fact_comment_info/';

2)数据装载

hive (gmall)> insert overwrite table dwd_fact_comment_info
partition(dt='2020-03-10')
select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from ods_comment_info
where dt='2020-03-10';

3)查询加载结果

hive (gmall)> select * from dwd_fact_comment_info where dt='2020-03-10';

4.4.10 加购事实表(周期型快照事实表,每日快照)


1)建表语句

hive (gmall)> drop table if exists dwd_fact_cart_info;
create external table dwd_fact_cart_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 id',
`sku_id` string COMMENT 'skuid',
`cart_price` string COMMENT '放入购物车时价格',
`sku_num` string COMMENT '数量',
`sku_name` string COMMENT 'sku 名称 (冗余)',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '修改时间',
`is_ordered` string COMMENT '是否已经下单。1 为已下单;0 为未下单',
`order_time` string COMMENT '下单时间'
) COMMENT '加购事实表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_fact_cart_info/';

2)数据装载

hive (gmall)> insert overwrite table dwd_fact_cart_info
partition(dt='2020-03-10')
select id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time
from ods_cart_info
where dt='2020-03-10';

3)查询加载结果

hive (gmall)> select * from dwd_fact_cart_info where dt='2020-03-10';

4.4.11 收藏事实表(周期型快照事实表,每日快照)


1)建表语句

hive (gmall)> drop table if exists dwd_fact_favor_info;
create external table dwd_fact_favor_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户 id',
`sku_id` string COMMENT 'skuid',
`spu_id` string COMMENT 'spuid',
`is_cancel` string COMMENT '是否取消',
`create_time` string COMMENT '收藏时间',
`cancel_time` string COMMENT '取消时间'
) COMMENT '收藏事实表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_fact_favor_info/';

2)数据装载

hive (gmall)> insert overwrite table dwd_fact_favor_info
partition(dt='2020-03-10')
select id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from ods_favor_info
where dt='2020-03-10';

3)查询加载结果

hive (gmall)> select * from dwd_fact_favor_info where dt='2020-03-10';

4.4.12 优惠券领用事实表(累积型快照事实表)


1)建表语句

hive (gmall)> drop table if exists dwd_fact_coupon_use;
create external table dwd_fact_coupon_use(
`id` string COMMENT '编号',
`coupon_id` string COMMENT '优惠券 ID',
`user_id` string COMMENT 'userid',
`order_id` string COMMENT '订单 id',
`coupon_status` string COMMENT '优惠券状态',
`get_time` string COMMENT '领取时间',
`using_time` string COMMENT '使用时间(下单)',
`used_time` string COMMENT '使用时间(支付)'
) COMMENT '优惠券领用事实表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_fact_coupon_use/';

注意:dt 是按照优惠卷领用时间 get_time 做为分区。
2)数据装载

set hive.exec.dynamic.partition.mode=nonstrict; #动态非严格模式,才能动态分区

hive (gmall)> set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_fact_coupon_use
partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.coupon_id is null,old.coupon_id,new.coupon_id),
if(new.user_id is null,old.user_id,new.user_id),
if(new.order_id is null,old.order_id,new.order_id),
if(new.coupon_status is null,old.coupon_status,new.coupon_status),
if(new.get_time is null,old.get_time,new.get_time),
if(new.using_time is null,old.using_time,new.using_time),
if(new.used_time is null,old.used_time,new.used_time),
date_format(if(new.get_time is null,old.get_time,new.get_time),'yyyy-MM-dd')
from (
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from
dwd_fact_coupon_use
where
dt in (
select
date_format(get_time,'yyyy-MM-dd')
from ods_coupon_use
where dt='2020-03-10'
)
)old
full outer join (
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ods_coupon_use
where dt='2020-03-10'
)new
on old.id=new.id;

3)查询加载结果

hive (gmall)> select * from dwd_fact_coupon_use where dt='2020-03-10';

4.4.13 订单事实表(累积型快照事实表)



hive (gmall)> drop table if exists dwd_fact_order_info;
create external table dwd_fact_order_info (
`id` string COMMENT '订单编号',
`order_status` string COMMENT '订单状态',
`user_id` string COMMENT '用户 id',
`out_trade_no` string COMMENT '支付流水号',
`create_time` string COMMENT '创建时间(未支付状态)',
`payment_time` string COMMENT '支付时间(已支付状态)',
`cancel_time` string COMMENT '取消时间(已取消状态)',
`finish_time` string COMMENT '完成时间(已完成状态)',
`refund_time` string COMMENT '退款时间(退款中状态)',
`refund_finish_time` string COMMENT '退款完成时间(退款完成状态)',
`province_id` string COMMENT '省份 ID',
`activity_id` string COMMENT '活动 ID',
`original_total_amount` string COMMENT '原价金额',
`benefit_reduce_amount` string COMMENT '优惠金额',
`feight_fee` string COMMENT '运费',
`final_total_amount` decimal(10,2)
COMMENT '订单金额' )
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_info/'
tblproperties ("parquet.compression"="lzo");

5)数据装载

5)常用函数



6)数据装载

框架
from
(
)old
full outer join
(
)new
onhive (gmall)> set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_fact_order_info
partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.order_status is null,old.order_status,new.order_status),
if(new.user_id is null,old.user_id,new.user_id),
if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
if(new.tms['1001'] is null,old.create_time,new.tms['1001']),--1001 对应未支付状态
if(new.tms['1002'] is null,old.payment_time,new.tms['1002']),
if(new.tms['1003'] is null,old.cancel_time,new.tms['1003']),
if(new.tms['1004'] is null,old.finish_time,new.tms['1004']),
if(new.tms['1005'] is null,old.refund_time,new.tms['1005']),
if(new.tms['1006'] is null,old.refund_finish_time,new.tms['1006']),
if(new.province_id is null,old.province_id,new.province_id),
if(new.activity_id is null,old.activity_id,new.activity_id),
if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
if(new.feight_fee is null,old.feight_fee,new.feight_fee),
if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount),
date_format(if(new.tms['1001'] is null,old.create_time,new.tms['1001']),'yyyy-MM-dd')
from (
select
*
from dwd_fact_order_info
where dt in(
select
date_format(create_time,'yyyy-MM-dd')
from ods_order_info where dt='2020-03-10'
)
)old
full outer join (
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from (
select
order_id,
str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') tms
from ods_order_status_log
where dt='2020-03-10'
group by order_id
)log
join (
select
*
from ods_order_info
where dt='2020-03-10'
)info
on log.order_id=info.id
left join (
select
*
from
ods_activity_order
where dt='2020-03-10'
)act
on log.order_id=act.order_id
)new
on old.id=new.id;

6)查询加载结果

hive (gmall)> select * from dwd_fact_order_info where dt='2020-03-10';

4.4.14 用户维度表(拉链表)


1)什么是拉链表

2)为什么要做拉链表

如何使用拉链表

3)拉链表形成过程

4)拉链表制作过程图

5)拉链表制作过程

步骤 0:初始化拉链表(首次独立执行)

(1)建立拉链表

hive (gmall)> drop table if exists dwd_dim_user_info_his;
create external table dwd_dim_user_info_his(
`id` string COMMENT '用户 id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性别',
`email` string COMMENT '邮箱',
`user_level` string COMMENT '用户等级',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`start_date` string COMMENT '有效开始日期',
`end_date` string COMMENT '有效结束日期'
) COMMENT '订单拉链表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_user_info_his/'
tblproperties ("parquet.compression"="lzo");

(2)初始化拉链表

hive (gmall)> insert overwrite table dwd_dim_user_info_his
select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'2020-03-10',
'9999-99-99'
from ods_user_info
oi
where oi.dt='2020-03-10';

步骤 1:制作当日变动数据(包括新增,修改)每日执行

步骤 2:先合并变动信息,再追加新增信息,插入到临时表中

1)建立临时表

hive (gmall)> drop table if exists dwd_dim_user_info_his_tmp;
create external table dwd_dim_user_info_his_tmp(
`id` string COMMENT '用户 id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性别',
`email` string COMMENT '邮箱',
`user_level` string COMMENT '用户等级',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`start_date` string COMMENT '有效开始日期',
`end_date` string COMMENT '有效结束日期'
) COMMENT '订单拉链临时表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_user_info_his_tmp/'
tblproperties ("parquet.compression"="lzo");

2)导入数据

hive (gmall)> insert overwrite table dwd_dim_user_info_his_tmp
select
*
from (
select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'2020-03-11' start_date,
'9999-99-99' end_date
from ods_user_info
where dt='2020-03-11'
union all
select
uh.id,
uh.name,
uh.birthday,
uh.gender,
uh.email,
uh.user_level,
uh.create_time,
uh.operate_time,
uh.start_date,
if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt,-1), uh.end_date)
end_date
from dwd_dim_user_info_his
uh
left join (
select
*
from
ods_user_info
where dt='2020-03-11'
) ui
on uh.id=ui.id
)his
order by his.id, start_date;

步骤 3:把临时表覆盖给拉链表

1)导入数据

hive (gmall)> insert overwrite table dwd_dim_user_info_his
select * from dwd_dim_user_info_his_tmp;

2)查询导入数据

hive (gmall)> select
id,
start_date,
end_date
from dwd_dim_user_info_his;

4.4.15 DWD 层数据导入脚本

1)在/home/dw/bin 目录下创建脚本 ods_to_dwd_db.sh

[dw@dw1 bin]$ vim ods_to_dwd_db.sh
#!/bin/bash
APP=gmall
hive=/opt/module/hive/bin/hive# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;thendo_date=$2
elsedo_date=`date -d "-1 day" +%F`
fisql1="
insert overwrite table ${APP}.dwd_dim_sku_info
partition(dt='$do_date')
select
sku.id,
sku.spu_id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.tm_id,
bt.tm_name,
bc3.id category3_id,
bc2.id category2_id,
bc1.id category1_id,
bc3.name category3_name,
bc2.name category2_name,
bc1.name category1_name,
spu.spu_name,
sku.create_time
from
(
select
*
from ${APP}.ods_sku_info
where dt='$do_date'
)
sku join
(
select
*
from ${APP}.ods_base_trademark
where dt='$do_date'
)bt on sku.tm_id = bt.tm_id
join
(
select
*
from ${APP}.ods_spu_info
where dt='$do_date'
)spu on spu.id = sku.spu_id
join
(
select
*
from ${APP}.ods_base_category3
where dt='$do_date'
)bc3 on sku.category3_id=bc3.id
join
(
select
*
from ${APP}.ods_base_category2
where dt='$do_date'
)bc2 on bc3.category2_id=bc2.id
join (
select
*
from ${APP}.ods_base_category1
where dt='$do_date'
)bc1 on bc2.category1_id=bc1.id;insert overwrite table ${APP}.dwd_dim_coupon_info
partition(dt='$do_date')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id, tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from ${APP}.ods_coupon_info
where dt='$do_date';insert overwrite table ${APP}.dwd_dim_activity_info
partition(dt='$do_date')
select
info.id,
info.activity_name,
info.activity_type,
rule.condition_amount,
rule.condition_num,
rule.benefit_amount,
rule.benefit_discount,
rule.benefit_level,
info.start_time,
info.end_time,
info.create_time from
(
select
*
from
${APP}.ods_activity_info
where dt='$do_date'
)info
left join
(
select
*
from
${APP}.ods_activity_rule
where dt='$do_date'
)rule on info.id = rule.activity_id;insert overwrite table ${APP}.dwd_fact_order_detail
partition(dt='$do_date')
select od.id,
od.order_id,
od.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time,
oi.province_id,
od.order_price*od.sku_num
from (
select
*
from
${APP}.ods_order_detail where dt='$do_date'
) od
join (
select
*
from
${APP}.ods_order_info
where dt='$do_date'
) oi
on od.order_id=oi.id;insert overwrite table ${APP}.dwd_fact_payment_info
partition(dt='$do_date')
select
pi.id,
pi.out_trade_no,
pi.order_id,
pi.user_id,
pi.alipay_trade_no,
pi.total_amount,
pi.subject,
pi.payment_type,
pi.payment_time,
oi.province_id
from (
select
*
from ${APP}.ods_payment_info
where dt='$do_date'
)pi
join (
select
id,
province_id
from
${APP}.ods_order_info
where dt='$do_date'
)oi
on pi.order_id = oi.id;insert overwrite table ${APP}.dwd_fact_order_refund_info
partition(dt='$do_date')
select id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from ${APP}.ods_order_refund_info
where dt='$do_date';insert overwrite table ${APP}.dwd_fact_comment_info
partition(dt='$do_date')
select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from ${APP}.ods_comment_info
where dt='$do_date';insert overwrite table ${APP}.dwd_fact_cart_info
partition(dt='$do_date')
select id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time
from ${APP}.ods_cart_info
where dt='$do_date';insert overwrite table ${APP}.dwd_fact_favor_info
partition(dt='$do_date')
select id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from ${APP}.ods_favor_info
where dt='$do_date';set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_fact_coupon_use
partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.coupon_id is null,old.coupon_id,new.coupon_id),
if(new.user_id is null,old.user_id,new.user_id),
if(new.order_id is null,old.order_id,new.order_id),
if(new.coupon_status is null,old.coupon_status,new.coupon_status),
if(new.get_time is null,old.get_time,new.get_time),
if(new.using_time is null,old.using_time,new.using_time),
if(new.used_time is null,old.used_time,new.used_time),
date_format(if(new.get_time is null,old.get_time,new.get_time),'yyyy-MM-dd')
from (
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from
${APP}.dwd_fact_coupon_use
where
dt in (
select
date_format(get_time,'yyyy-MM-dd')
from ${APP}.ods_coupon_use
where dt='$do_date'
)
)old
full outer join (
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ${APP}.ods_coupon_use
where dt='$do_date'
)new
on old.id=new.id;set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_fact_order_info
partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.order_status is null,old.order_status,new.order_status),
if(new.user_id is null,old.user_id,new.user_id),
if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
if(new.tms['1001'] is null,old.create_time,new.tms['1001']),
if(new.tms['1002'] is null,old.payment_time,new.tms['1002']),
if(new.tms['1003'] is null,old.cancel_time,new.tms['1003']),
if(new.tms['1004'] is null,old.finish_time,new.tms['1004']),
if(new.tms['1005'] is null,old.refund_time,new.tms['1005']),
if(new.tms['1006'] is null,old.refund_finish_time,new.tms['1006']),
if(new.province_id is null,old.province_id,new.province_id),
if(new.activity_id is null,old.activity_id,new.activity_id),
if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
if(new.feight_fee is null,old.feight_fee,new.feight_fee),
if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount),
date_format(if(new.tms['1001'] is null,old.create_time,new.tms['1001']),'yyyy-MM-dd')
from (
select
*
from ${APP}.dwd_fact_order_info
where dt in(
select
date_format(create_time,'yyyy-MM-dd')
from ${APP}.ods_order_info where dt='$do_date'
)
)old
full outer join (
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from (
select
order_id,
str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') tms
from ${APP}.ods_order_status_log
where dt='$do_date'
group by order_id
)log
join (
select
*
from ${APP}.ods_order_info
where dt='$do_date'
)info
on log.order_id=info.id
left join (
select
*
from
${APP}.ods_activity_order
where dt='$do_date'
)act
on log.order_id=act.order_id
)new
on old.id=new.id;insert overwrite table ${APP}.dwd_dim_user_info_his_tmp
select
*
from (
select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'$do_date' start_date,
'9999-99-99' end_date
from ${APP}.ods_user_info
where dt='$do_date'
union all
select
uh.id,
uh.name,
uh.birthday,
uh.gender,
uh.email,
uh.user_level,
uh.create_time,
uh.operate_time,
uh.start_date,
if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt,-1), uh.end_date)
end_date
from ${APP}.dwd_dim_user_info_his
uh
left join (
select
*
from
${APP}.ods_user_info
where dt='$do_date'
) ui
on uh.id=ui.id
)his
order by his.id, start_date;insert overwrite table ${APP}.dwd_dim_user_info_his
select * from ${APP}.dwd_dim_user_info_his_tmp;
"sql2="
insert overwrite table ${APP}.dwd_dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
from(
select
*
from
${APP}.ods_base_province
) bp
join
(
select
*
from
${APP}.ods_base_region
) br
on bp.region_id=br.id;
"case $1 in
"first"){$hive -e "$sql1"$hive -e "$sql2"
};;
"all"){$hive -e "$sql1"
};;
esac

2)增加脚本执行权限

[dw@dw1 bin]$ chmod 777 ods_to_dwd_db.sh

3)执行脚本导入数据

[dw@dw1 bin]$ ods_to_dwd_db.sh all 2020-03-11

4)查看导入数据

hive (gmall)>
select * from dwd_fact_order_info where dt='2020-03-11';
select * from dwd_fact_order_detail where dt='2020-03-11';
select * from dwd_fact_comment_info where dt='2020-03-11';
select * from dwd_fact_order_refund_info where dt='2020-03-11';

第 5 章 数仓搭建-DWS 层

5.1 业务术语





5.2 系统函数

5.2.1 collect_set 函数


5.2.2 nvl 函数

5.2.3 日期处理函数


5.3 DWS 层(用户行为)

5.3.1 每日设备行为

每日设备行为,主要按照设备 id 统计。

1)建表语句

hive (gmall)> drop table if exists dws_uv_detail_daycount;
create external table dws_uv_detail_daycount (
`mid_id` string COMMENT '设备唯一标识',
`user_id` string COMMENT '用户标识',
`version_code` string COMMENT '程序版本号',
`version_name` string COMMENT '程序版本名',
`lang` string COMMENT '系统语言',
`source` string COMMENT '渠道号',
`os` string COMMENT '安卓系统版本',
`area` string COMMENT '区域',
`model` string COMMENT '手机型号',
`brand` string COMMENT '手机品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '屏幕宽高',
`app_time` string COMMENT '客户端日志产生时的时间',
`network` string COMMENT '网络模式',
`lng` string COMMENT '经度',
`lat` string COMMENT '纬度',
`login_count` bigint COMMENT '活跃次数'
)partitioned by(dt string)
stored as parquet
location '/warehouse/gmall/dws/dws_uv_detail_daycount';

2)数据装载

hive (gmall)> insert overwrite table dws_uv_detail_daycount
partition(dt='2020-03-10')
select mid_id,
concat_ws('|', collect_set(user_id)) user_id,
concat_ws('|', collect_set(version_code)) version_code,
concat_ws('|', collect_set(version_name)) version_name,
concat_ws('|', collect_set(lang))lang,
concat_ws('|', collect_set(source)) source,
concat_ws('|', collect_set(os)) os,
concat_ws('|', collect_set(area)) area,
concat_ws('|', collect_set(model)) model,
concat_ws('|', collect_set(brand)) brand,
concat_ws('|', collect_set(sdk_version)) sdk_version,
concat_ws('|', collect_set(gmail)) gmail,
concat_ws('|', collect_set(height_width)) height_width,
concat_ws('|', collect_set(app_time)) app_time,
concat_ws('|', collect_set(network)) network,
concat_ws('|', collect_set(lng)) lng,
concat_ws('|', collect_set(lat)) lat,
count(*) login_count
from dwd_start_log
where dt='2020-03-10'
group by mid_id;

3)查询加载结果

hive (gmall)> select * from dws_uv_detail_daycount where dt='2020-03-10' limit 2;

5.4 DWS 层(业务)

DWS 层的宽表字段,是站在不同维度的视角去看事实表。重点关注事实表的度量值。

5.4.1 每日会员行为

1)建表语句

hive (gmall)> drop table if exists dws_user_action_daycount;
create external table dws_user_action_daycount (
user_id string comment '用户 id',
login_count bigint comment '登录次数',
cart_count bigint comment '加入购物车次数',
cart_amount double comment '加入购物车金额',
order_count bigint comment '下单次数',
order_amount decimal(16,2) comment '下单金额',
payment_count bigint comment '支付次数',
payment_amount decimal(16,2) comment '支付金额'
) COMMENT '每日用户行为'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dws/dws_user_action_daycount/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)>
with
tmp_login as (
select
user_id,
count(*) login_count
from dwd_start_log
where dt='2020-03-10' and user_id is not null
group by user_id
),
tmp_cart as (
select user_id,
count(*) cart_count,
sum(cart_price*sku_num) cart_amount
from dwd_fact_cart_info
where dt='2020-03-10' and user_id is not null and date_format(create_time,'yyyy-MM-dd')='2020-03-10'
group by user_id
),
tmp_order as (
select
user_id,
count(*) order_count,
sum(final_total_amount) order_amount
from dwd_fact_order_info
where dt='2020-03-10'
group by user_id
) ,
tmp_payment as (
select
user_id,
count(*) payment_count,
sum(payment_amount) payment_amount
from dwd_fact_payment_info
where dt='2020-03-10' group by user_id
)
insert overwrite table dws_user_action_daycount
partition(dt='2020-03-10')
select
user_actions.user_id,
sum(user_actions.login_count),
sum(user_actions.cart_count),
sum(user_actions.cart_amount),
sum(user_actions.order_count),
sum(user_actions.order_amount),
sum(user_actions.payment_count),
sum(user_actions.payment_amount)
from (
select
user_id,
login_count,
0 cart_count,
0 cart_amount,
0 order_count,
0 order_amount,
0 payment_count,
0 payment_amount
from tmp_login
union all
select
user_id,
0 login_count,
cart_count,
cart_amount,
0 order_count,
0 order_amount,
0 payment_count,
0 payment_amount
from tmp_cart
union all
select
user_id,
0 login_count,
0 cart_count,
0 cart_amount,
order_count,
order_amount,
0 payment_count,
0 payment_amount
from tmp_order
union all
select
user_id,
0 login_count,
0 cart_count,
0 cart_amount,
0 order_count,
0 order_amount,
payment_count,
payment_amount
from tmp_payment
) user_actions
group by user_id;

3)查询加载结果

hive (gmall)> select * from dws_user_action_daycount where dt='2020-03-10';

5.4.2 每日商品行为

1)建表语句

hive (gmall)> drop table if exists dws_sku_action_daycount;
create external table dws_sku_action_daycount (
sku_id string comment 'sku_id',
order_count bigint comment '被下单次数',
order_num bigint comment '被下单件数',
order_amount decimal(16,2) comment '被下单金额',
payment_count bigint comment '被支付次数',
payment_num bigint comment '被支付件数',
payment_amount decimal(16,2) comment '被支付金额',
refund_count bigint comment '被退款次数',
refund_num bigint comment '被退款件数',
refund_amount decimal(16,2) comment '被退款金额',
cart_count bigint comment '被加入购物车次数',
cart_num bigint comment '被加入购物车件数',
favor_count bigint comment '被收藏次数',
appraise_good_count bigint comment '好评数',
appraise_mid_count bigint comment '中评数',
appraise_bad_count bigint comment '差评数',
appraise_default_count bigint comment '默认评价数'
) COMMENT '每日商品行为'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dws/dws_sku_action_daycount/'
tblproperties ("parquet.compression"="lzo");

2)数据装载
注意:如果是 23 点 59 下单,支付日期跨天。需要从订单详情里面取出支付时间是今天,订单时间是昨天或者今天的订单。

hive (gmall)>
with
tmp_order as (
select
sku_id,
count(*) order_count,
sum(sku_num) order_num,
sum(total_amount) order_amount
from dwd_fact_order_detail
where dt='2020-03-10' group by sku_id
),
tmp_payment as (
select
sku_id,
count(*) payment_count,
sum(sku_num) payment_num,
sum(total_amount) payment_amount
from dwd_fact_order_detail
where dt='2020-03-10' and order_id in (
select
id
from
dwd_fact_order_info
where (dt='2020-03-10' or dt=date_add('2020-03-10',-1)) and date_format(payment_time,'yyyy-MM-dd')='2020-03-10' )
group by sku_id ),
tmp_refund as (
select
sku_id,
count(*) refund_count,
sum(refund_num) refund_num,
sum(refund_amount) refund_amount
from dwd_fact_order_refund_info
where dt='2020-03-10' group by sku_id ),
tmp_cart as (
select
sku_id,
count(*) cart_count,
sum(sku_num) cart_num
from dwd_fact_cart_info
where dt='2020-03-10' and date_format(create_time,'yyyy-MM-dd')='2020-03-10' group by sku_id ),
tmp_favor as (
select
sku_id,
count(*) favor_count
from dwd_fact_favor_info
where dt='2020-03-10' and date_format(create_time,'yyyy-MM-dd')='2020-03-10' group by sku_id ),
tmp_appraise as (
select sku_id,
sum(if(appraise='1201',1,0)) appraise_good_count,
sum(if(appraise='1202',1,0)) appraise_mid_count,
sum(if(appraise='1203',1,0)) appraise_bad_count,
sum(if(appraise='1204',1,0)) appraise_default_count
from dwd_fact_comment_info
where dt='2020-03-10'
group by sku_id
)
insert overwrite table dws_sku_action_daycount
partition(dt='2020-03-10')
select
sku_id,
sum(order_count),
sum(order_num),
sum(order_amount),
sum(payment_count),
sum(payment_num),
sum(payment_amount),
sum(refund_count),
sum(refund_num),
sum(refund_amount),
sum(cart_count),
sum(cart_num),
sum(favor_count),
sum(appraise_good_count),
sum(appraise_mid_count),
sum(appraise_bad_count),
sum(appraise_default_count)
from (
select
sku_id,
order_count,
order_num,
order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_order
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
payment_count,
payment_num,
payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count from tmp_payment
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
refund_count,
refund_num,
refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_refund
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
cart_count,
cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_cart
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_favor
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from tmp_appraise
)tmp
group by sku_id;

3)查询加载结果

hive (gmall)> select * from dws_sku_action_daycount where dt='2020-03-10';

5.4.3 每日优惠券统计(预留)


1)建表语句

hive (gmall)> drop table if exists dws_coupon_use_daycount;
create external table dws_coupon_use_daycount (
`coupon_id` string COMMENT '优惠券 ID',
`coupon_name` string COMMENT '购物券名称',
`coupon_type` string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
`condition_amount` string COMMENT '满额数', `condition_num` string COMMENT '满件数',
`activity_id` string COMMENT '活动编号', `benefit_amount` string COMMENT '减金额',
`benefit_discount` string COMMENT '折扣', `create_time` string COMMENT '创建时间',
`range_type` string COMMENT '范围类型 1、商品 2、品类 3、品牌', `spu_id` string COMMENT '商品 id',
`tm_id` string COMMENT '品牌 id', `category3_id` string COMMENT '品类 id',
`limit_num` string COMMENT '最多领用次数', `get_count` bigint COMMENT '领用次数',
`using_count` bigint COMMENT '使用(下单)次数', `used_count` bigint COMMENT '使用(支付)次数'
) COMMENT '每日优惠券统计'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dws/dws_coupon_use_daycount/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)> insert overwrite table dws_coupon_use_daycount
partition(dt='2020-03-10')
select
cu.coupon_id,
ci.coupon_name,
ci.coupon_type,
ci.condition_amount,
ci.condition_num,
ci.activity_id,
ci.benefit_amount,
ci.benefit_discount,
ci.create_time,
ci.range_type,
ci.spu_id,
ci.tm_id,
ci.category3_id,
ci.limit_num,
cu.get_count,
cu.using_count,
cu.used_count
from (
select
coupon_id,
sum(if(date_format(get_time,'yyyy-MM-dd')='2020-03-10',1,0)) get_count,
sum(if(date_format(using_time,'yyyy-MM-dd')='2020-03-10',1,0)) using_count,
sum(if(date_format(used_time,'yyyy-MM-dd')='2020-03-10',1,0)) used_count
from
dwd_fact_coupon_use
where dt='2020-03-10'
group by coupon_id
)cu left join (
select
*
from dwd_dim_coupon_info
where dt='2020-03-10'
)ci on cu.coupon_id=ci.id;

5.4.4 每日活动统计(预留)


1)建表语句

hive (gmall)> drop table if exists dws_activity_info_daycount;
create external table dws_activity_info_daycount(
`id` string COMMENT '编号',
`activity_name` string COMMENT '活动名称',
`activity_type` string COMMENT '活动类型',
`start_time` string COMMENT '开始时间',
`end_time` string COMMENT '结束时间',
`create_time` string COMMENT '创建时间',
`order_count` bigint COMMENT '下单次数',
`payment_count` bigint COMMENT '支付次数'
) COMMENT '购物车信息表'
PARTITIONED BY (`dt` string)row format delimited fields terminated by '\t'
location '/warehouse/gmall/dws/dws_activity_info_daycount/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)>insert overwrite table dws_activity_info_daycount
partition(dt='2020-03-10')
select
oi.activity_id,
ai.activity_name,
ai.activity_type,
ai.start_time,
ai.end_time,
ai.create_time,
oi.order_count,
oi.payment_count
from (
select
activity_id,
sum(if(date_format(create_time,'yyyy-MM-dd')='2020-03-10',1,0)) order_count,
sum(if(date_format(payment_time,'yyyy-MM-dd')='2020-03-10',1,0)) payment_count
from dwd_fact_order_info
where (dt='2020-03-10' or dt=date_add('2020-03-10',-1)) and activity_id is not null
group by activity_id
)oi join (
select * from dwd_dim_activity_info
where dt='2020-03-10'
)ai on oi.activity_id=ai.id;

5.4.5 每日购买行为


1)建表语句

hive (gmall)> drop table if exists dws_sale_detail_daycount;
create external table dws_sale_detail_daycount (
user_id string comment '用户 id',
sku_id string comment '商品 id',
user_gender string comment '用户性别',
user_age string comment '用户年龄',
user_level string comment '用户等级',
order_price decimal(10,2) comment '商品价格',
sku_name string comment '商品名称',
sku_tm_id string comment '品牌 id',
sku_category3_id string comment '商品三级品类 id',
sku_category2_id string comment '商品二级品类 id',
sku_category1_id string comment '商品一级品类 id',
sku_category3_name string comment '商品三级品类名称',
sku_category2_name string comment '商品二级品类名称',
sku_category1_name string comment '商品一级品类名称',
spu_id string comment '商品 spu', sku_num int comment '购买个数',
order_count bigint comment '当日下单单数',
order_amount decimal(16,2) comment '当日下单金额'
) COMMENT '每日购买行为' PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dws/dws_sale_detail_daycount/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)> insert overwrite table dws_sale_detail_daycount
partition(dt='2020-03-10')
select
op.user_id,
op.sku_id,
ui.gender,
months_between('2020-03-10', ui.birthday)/12 age,
ui.user_level,
si.price,
si.sku_name,
si.tm_id,
si.category3_id,
si.category2_id,
si.category1_id,
si.category3_name,
si.category2_name,
si.category1_name,
si.spu_id,
op.sku_num,
op.order_count,
op.order_amount
from (
select
user_id,
sku_id,
sum(sku_num) sku_num,
count(*) order_count,
sum(total_amount) order_amount
from dwd_fact_order_detail
where dt='2020-03-10' group by user_id, sku_id
)op join (
select
*
from dwd_dim_user_info_his
where end_date='9999-99-99'
)ui
on op.user_id = ui.id
join (
select * from dwd_dim_sku_info where dt='2020-03-10'
)si
on op.sku_id = si.id;

3)查询加载结果

hive (gmall)> select * from dws_sale_detail_daycount where dt='2020-03-10';

5.5 DWS 层数据导入脚本

1)在/home/dw/bin 目录下创建脚本 dwd_to_dws.sh

#!/bin/bashAPP=gmall
hive=/opt/module/hive/bin/hiveif [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fisql="
insert overwrite table ${APP}.dws_uv_detail_daycount
partition(dt='$do_date')
select mid_id,
concat_ws('|', collect_set(user_id)) user_id,
concat_ws('|', collect_set(version_code)) version_code,
concat_ws('|', collect_set(version_name)) version_name,
concat_ws('|', collect_set(lang))lang,
concat_ws('|', collect_set(source)) source,
concat_ws('|', collect_set(os)) os,
concat_ws('|', collect_set(area)) area,
concat_ws('|', collect_set(model)) model,
concat_ws('|', collect_set(brand)) brand,
concat_ws('|', collect_set(sdk_version)) sdk_version,
concat_ws('|', collect_set(gmail)) gmail,
concat_ws('|', collect_set(height_width)) height_width,
concat_ws('|', collect_set(app_time)) app_time,
concat_ws('|', collect_set(network)) network,
concat_ws('|', collect_set(lng)) lng,
concat_ws('|', collect_set(lat)) lat,
count(*) login_count
from ${APP}.dwd_start_log
where dt='$do_date'
group by mid_id;with
tmp_login as (
select
user_id,
count(*) login_count
from ${APP}.dwd_start_log
where dt='$do_date' and user_id is not null
group by user_id
),
tmp_cart as (
select user_id,
count(*) cart_count,
sum(cart_price*sku_num) cart_amount
from ${APP}.dwd_fact_cart_info
where dt='$do_date' and user_id is not null and date_format(create_time,'yyyy-MM-dd')='$do_date'
group by user_id
),
tmp_order as (
select
user_id,
count(*) order_count,
sum(final_total_amount) order_amount
from ${APP}.dwd_fact_order_info
where dt='$do_date'
group by user_id
) ,
tmp_payment as (
select
user_id,
count(*) payment_count,
sum(payment_amount) payment_amount
from ${APP}.dwd_fact_payment_info
where dt='$do_date' group by user_id
)
insert overwrite table ${APP}.dws_user_action_daycount
partition(dt='$do_date')
select
user_actions.user_id,
sum(user_actions.login_count),
sum(user_actions.cart_count),
sum(user_actions.cart_amount),
sum(user_actions.order_count),
sum(user_actions.order_amount),
sum(user_actions.payment_count),
sum(user_actions.payment_amount)
from (
select
user_id,
login_count,
0 cart_count,
0 cart_amount,
0 order_count,
0 order_amount,
0 payment_count,
0 payment_amount
from tmp_login
union all
select
user_id,
0 login_count,
cart_count,
cart_amount,
0 order_count,
0 order_amount,
0 payment_count,
0 payment_amount
from tmp_cart
union all
select
user_id,
0 login_count,
0 cart_count,
0 cart_amount,
order_count,
order_amount,
0 payment_count,
0 payment_amount
from tmp_order
union all
select
user_id,
0 login_count,
0 cart_count,
0 cart_amount,
0 order_count,
0 order_amount,
payment_count,
payment_amount
from tmp_payment
) user_actions
group by user_id;with
tmp_order as (
select
sku_id,
count(*) order_count,
sum(sku_num) order_num,
sum(total_amount) order_amount
from ${APP}.dwd_fact_order_detail
where dt='$do_date' group by sku_id
),
tmp_payment as (
select
sku_id,
count(*) payment_count,
sum(sku_num) payment_num,
sum(total_amount) payment_amount
from ${APP}.dwd_fact_order_detail
where dt='$do_date' and order_id in (
select
id
from
${APP}.dwd_fact_order_info
where (dt='$do_date' or dt=date_add('$do_date',-1)) and date_format(payment_time,'yyyy-MM-dd')='$do_date' )
group by sku_id ),
tmp_refund as (
select
sku_id,
count(*) refund_count,
sum(refund_num) refund_num,
sum(refund_amount) refund_amount
from ${APP}.dwd_fact_order_refund_info
where dt='$do_date' group by sku_id ),
tmp_cart as (
select
sku_id,
count(*) cart_count,
sum(sku_num) cart_num
from ${APP}.dwd_fact_cart_info
where dt='$do_date' and date_format(create_time,'yyyy-MM-dd')='$do_date' group by sku_id ),
tmp_favor as (
select
sku_id,
count(*) favor_count
from ${APP}.dwd_fact_favor_info
where dt='$do_date' and date_format(create_time,'yyyy-MM-dd')='$do_date' group by sku_id ),
tmp_appraise as (
select sku_id,
sum(if(appraise='1201',1,0)) appraise_good_count,
sum(if(appraise='1202',1,0)) appraise_mid_count,
sum(if(appraise='1203',1,0)) appraise_bad_count,
sum(if(appraise='1204',1,0)) appraise_default_count
from ${APP}.dwd_fact_comment_info
where dt='$do_date'
group by sku_id
)
insert overwrite table ${APP}.dws_sku_action_daycount
partition(dt='$do_date')
select
sku_id,
sum(order_count),
sum(order_num),
sum(order_amount),
sum(payment_count),
sum(payment_num),
sum(payment_amount),
sum(refund_count),
sum(refund_num),
sum(refund_amount),
sum(cart_count),
sum(cart_num),
sum(favor_count),
sum(appraise_good_count),
sum(appraise_mid_count),
sum(appraise_bad_count),
sum(appraise_default_count)
from (
select
sku_id,
order_count,
order_num,
order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_order
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
payment_count,
payment_num,
payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count from tmp_payment
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
refund_count,
refund_num,
refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_refund
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
cart_count,
cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_cart
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_favor
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from tmp_appraise
)tmp
group by sku_id;insert overwrite table ${APP}.dws_coupon_use_daycount
partition(dt='$do_date')
select
cu.coupon_id,
ci.coupon_name,
ci.coupon_type,
ci.condition_amount,
ci.condition_num,
ci.activity_id,
ci.benefit_amount,
ci.benefit_discount,
ci.create_time,
ci.range_type,
ci.spu_id,
ci.tm_id,
ci.category3_id,
ci.limit_num,
cu.get_count,
cu.using_count,
cu.used_count
from (
select
coupon_id,
sum(if(date_format(get_time,'yyyy-MM-dd')='$do_date',1,0)) get_count,
sum(if(date_format(using_time,'yyyy-MM-dd')='$do_date',1,0)) using_count,
sum(if(date_format(used_time,'yyyy-MM-dd')='$do_date',1,0)) used_count
from
${APP}.dwd_fact_coupon_use
where dt='$do_date'
group by coupon_id
)cu left join (
select
*
from ${APP}.dwd_dim_coupon_info
where dt='$do_date'
)ci on cu.coupon_id=ci.id;insert overwrite table ${APP}.dws_activity_info_daycount
partition(dt='$do_date')
select
oi.activity_id,
ai.activity_name,
ai.activity_type,
ai.start_time,
ai.end_time,
ai.create_time,
oi.order_count,
oi.payment_count
from (
select
activity_id,
sum(if(date_format(create_time,'yyyy-MM-dd')='$do_date',1,0)) order_count,
sum(if(date_format(payment_time,'yyyy-MM-dd')='$do_date',1,0)) payment_count
from ${APP}.dwd_fact_order_info
where (dt='$do_date' or dt=date_add('$do_date',-1)) and activity_id is not null
group by activity_id
)oi join (
select * from ${APP}.dwd_dim_activity_info
where dt='$do_date'
)ai on oi.activity_id=ai.id;insert overwrite table ${APP}.dws_sale_detail_daycount
partition(dt='$do_date')
select
op.user_id,
op.sku_id,
ui.gender,
months_between('$do_date', ui.birthday)/12 age,
ui.user_level,
si.price,
si.sku_name,
si.tm_id,
si.category3_id,
si.category2_id,
si.category1_id,
si.category3_name,
si.category2_name,
si.category1_name,
si.spu_id,
op.sku_num,
op.order_count,
op.order_amount
from (
select
user_id,
sku_id,
sum(sku_num) sku_num,
count(*) order_count,
sum(total_amount) order_amount
from ${APP}.dwd_fact_order_detail
where dt='$do_date' group by user_id, sku_id
)op join (
select
*
from ${APP}.dwd_dim_user_info_his
where end_date='9999-99-99'
)ui
on op.user_id = ui.id
join (
select * from ${APP}.dwd_dim_sku_info where dt='$do_date'
)si
on op.sku_id = si.id;
"$hive -e "$sql"

2)增加脚本执行权限

chmod 777 dwd_to_dws.sh

3)执行脚本导入数据

dwd_to_dws.sh 2020-03-11

4)查看导入数据

hive (gmall)> select * from dws_uv_detail_daycount where dt='2020-03-11' limit 2;
select * from dws_user_action_daycount where dt='2020-03-11' limit 2;
select * from dws_sku_action_daycount where dt='2020-03-11' limit 2;
select * from dws_sale_detail_daycount where dt='2020-03-11' limit 2;
select * from dws_coupon_use_daycount where dt='2020-03-11' limit 2;
select * from dws_activity_info_daycount where dt='2020-03-11' limit 2;

第 6 章 数仓搭建-DWT 层

6.1 设备主题宽表


1)建表语句

hive (gmall)> drop table if exists dwt_uv_topic;
create external table dwt_uv_topic (
`mid_id` string COMMENT '设备唯一标识',
`user_id` string COMMENT '用户标识',
`version_code` string COMMENT '程序版本号',
`version_name` string COMMENT '程序版本名',
`lang` string COMMENT '系统语言',
`source` string COMMENT '渠道号',
`os` string COMMENT '安卓系统版本',
`area` string COMMENT '区域',
`model` string COMMENT '手机型号',
`brand` string COMMENT '手机品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '屏幕宽高',
`app_time` string COMMENT '客户端日志产生时的时间',
`network` string COMMENT '网络模式',
`lng` string COMMENT '经度',
`lat` string COMMENT '纬度',
`login_date_first` string comment '首次活跃时间',
`login_date_last` string comment '末次活跃时间',
`login_day_count` bigint comment '当日活跃次数',
`login_count` bigint comment '累积活跃天数'
)
stored as
parquet location '/warehouse/gmall/dwt/dwt_uv_topic';

2)数据装载

hive (gmall)> insert overwrite table dwt_uv_topic
select
nvl(new.mid_id,old.mid_id),
nvl(new.user_id,old.user_id),
nvl(new.version_code,old.version_code),
nvl(new.version_name,old.version_name),
nvl(new.lang,old.lang),
nvl(new.source,old.source),
nvl(new.os,old.os),
nvl(new.area,old.area),
nvl(new.model,old.model),
nvl(new.brand,old.brand),
nvl(new.sdk_version,old.sdk_version),
nvl(new.gmail,old.gmail),
nvl(new.height_width,old.height_width),
nvl(new.app_time,old.app_time),
nvl(new.network,old.network),
nvl(new.lng,old.lng),
nvl(new.lat,old.lat),
if(old.login_date_first is null,'2020-03-10',old.login_date_first),
if(new.mid_id is not null,'2020-03-10',old.login_date_last),
if(new.mid_id is not null, new.login_count,0),
nvl(old.login_count,0)+if(new.login_count>0,1,0)
from (
select
*
from dwt_uv_topic
)old
full outer join (
select
*
from dws_uv_detail_daycount
where dt='2020-03-10'
)new
on old.mid_id=new.mid_id;

3)查询加载结果

hive (gmall)> select * from dwt_uv_topic limit 5;

6.2 会员主题宽表

宽表字段怎么来?维度关联的事实表度量值+开头、结尾+累积+累积一个时间段。

1)建表语句

hive (gmall)> drop table if exists dwt_user_topic;
create external table dwt_user_topic (
user_id string comment '用户 id',
login_date_first string comment '首次登录时间',
login_date_last string comment '末次登录时间',
login_count bigint comment '累积登录天数',
login_last_30d_count bigint comment '最近 30 日登录天数',
order_date_first string comment '首次下单时间',
order_date_last string comment '末次下单时间',
order_count bigint comment '累积下单次数',
order_amount decimal(16,2) comment '累积下单金额',
order_last_30d_count bigint comment '最近 30 日下单次数',
order_last_30d_amount bigint comment '最近 30 日下单金额',
payment_date_first string comment '首次支付时间',
payment_date_last string comment '末次支付时间',
payment_count decimal(16,2) comment '累积支付次数',
payment_amount decimal(16,2) comment '累积支付金额',
payment_last_30d_count decimal(16,2) comment '最近 30 日支付次数',
payment_last_30d_amount decimal(16,2) comment '最近 30 日支付金额'
)COMMENT '用户主题宽表'
stored as parquet
location '/warehouse/gmall/dwt/dwt_user_topic/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)> insert overwrite table dwt_user_topic
select
nvl(new.user_id,old.user_id),
if(old.login_date_first is null and new.login_count>0,'2020-03-10',old.login_date_first),
if(new.login_count>0,'2020-03-10',old.login_date_last),
nvl(old.login_count,0)+if(new.login_count>0,1,0),
nvl(new.login_last_30d_count,0),
if(old.order_date_first is null and new.order_count>0,'2020-03-10',old.order_date_first),
if(new.order_count>0,'2020-03-10',old.order_date_last),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.order_amount,0)+nvl(new.order_amount,0),
nvl(new.order_last_30d_count,0),
nvl(new.order_last_30d_amount,0),
if(old.payment_date_first is null and new.payment_count>0,'2020-03-10',old.payment_date_first),
if(new.payment_count>0,'2020-03-10',old.payment_date_last),
nvl(old.payment_count,0)+nvl(new.payment_count,0),
nvl(old.payment_amount,0)+nvl(new.payment_amount,0),
nvl(new.payment_last_30d_count,0),
nvl(new.payment_last_30d_amount,0)
from
dwt_user_topic
old
full outer join (
select
user_id,
sum(if(dt='2020-03-10',login_count,0)) login_count,
sum(if(dt='2020-03-10',order_count,0)) order_count,
sum(if(dt='2020-03-10',order_amount,0)) order_amount,
sum(if(dt='2020-03-10',payment_count,0)) payment_count,
sum(if(dt='2020-03-10',payment_amount,0)) payment_amount,
sum(if(login_count>0,1,0)) login_last_30d_count,
sum(order_count) order_last_30d_count,
sum(order_amount) order_last_30d_amount,
sum(payment_count) payment_last_30d_count,
sum(payment_amount) payment_last_30d_amount
from dws_user_action_daycount
where dt>=date_add( '2020-03-10',-30)
group by user_id
)new
on old.user_id=new.user_id;

3)查询加载结果

hive (gmall)> select * from dwt_user_topic limit 5;

6.3 商品主题宽表

1)建表语句

hive (gmall)> drop table if exists dwt_sku_topic;
create external table dwt_sku_topic (
sku_id string comment 'sku_id',
spu_id string comment 'spu_id',
order_last_30d_count bigint comment '最近 30 日被下单次数',
order_last_30d_num bigint comment '最近 30 日被下单件数',
order_last_30d_amount decimal(16,2) comment '最近 30 日被下单金额',
order_count bigint comment '累积被下单次数',
order_num bigint comment '累积被下单件数',
order_amount decimal(16,2) comment '累积被下单金额',
payment_last_30d_count bigint comment '最近 30 日被支付次数',
payment_last_30d_num bigint comment '最近 30 日被支付件数',
payment_last_30d_amount decimal(16,2) comment '最近 30 日被支付金额',
payment_count bigint comment '累积被支付次数',
payment_num bigint comment '累积被支付件数',
payment_amount decimal(16,2) comment '累积被支付金额',
refund_last_30d_count bigint comment '最近三十日退款次数',
refund_last_30d_num bigint comment '最近三十日退款件数',
refund_last_30d_amount decimal(10,2) comment '最近三十日退款金额',
refund_count bigint comment '累积退款次数',
refund_num bigint comment '累积退款件数',
refund_amount decimal(10,2) comment '累积退款金额',
cart_last_30d_count bigint comment '最近 30 日被加入购物车次数',
cart_last_30d_num bigint comment '最近 30 日被加入购物车件数',
cart_count bigint comment '累积被加入购物车次数',
cart_num bigint comment '累积被加入购物车件数',
favor_last_30d_count bigint comment '最近 30 日被收藏次数',
favor_count bigint comment '累积被收藏次数',
appraise_last_30d_good_count bigint comment '最近 30 日好评数',
appraise_last_30d_mid_count bigint comment '最近 30 日中评数',
appraise_last_30d_bad_count bigint comment '最近 30 日差评数',
appraise_last_30d_default_count bigint comment '最近 30 日默认评价数',
appraise_good_count bigint comment '累积好评数',
appraise_mid_count bigint comment '累积中评数',
appraise_bad_count bigint comment '累积差评数',
appraise_default_count bigint comment '累积默认评价数'
)COMMENT '商品主题宽表'
stored as parquet
location '/warehouse/gmall/dwt/dwt_sku_topic/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)> insert overwrite table dwt_sku_topic
select
nvl(new.sku_id,old.sku_id),
sku_info.spu_id, nvl(new.order_count30,0),
nvl(new.order_num30,0),
nvl(new.order_amount30,0),
nvl(old.order_count,0) + nvl(new.order_count,0),
nvl(old.order_num,0) + nvl(new.order_num,0),
nvl(old.order_amount,0) + nvl(new.order_amount,0),
nvl(new.payment_count30,0),
nvl(new.payment_num30,0),
nvl(new.payment_amount30,0),
nvl(old.payment_count,0) + nvl(new.payment_count,0),
nvl(old.payment_num,0) + nvl(new.payment_count,0),
nvl(old.payment_amount,0) + nvl(new.payment_count,0),
nvl(new.refund_count30,0), nvl(new.refund_num30,0),
nvl(new.refund_amount30,0),
nvl(old.refund_count,0) + nvl(new.refund_count,0),
nvl(old.refund_num,0) + nvl(new.refund_num,0),
nvl(old.refund_amount,0) + nvl(new.refund_amount,0),
nvl(new.cart_count30,0),
nvl(new.cart_num30,0),
nvl(old.cart_count,0) + nvl(new.cart_count,0),
nvl(old.cart_num,0) + nvl(new.cart_num,0),
nvl(new.favor_count30,0),
nvl(old.favor_count,0) + nvl(new.favor_count,0),
nvl(new.appraise_good_count30,0),
nvl(new.appraise_mid_count30,0),
nvl(new.appraise_bad_count30,0),
nvl(new.appraise_default_count30,0),
nvl(old.appraise_good_count,0) + nvl(new.appraise_good_count,0),
nvl(old.appraise_mid_count,0) + nvl(new.appraise_mid_count,0),
nvl(old.appraise_bad_count,0) + nvl(new.appraise_bad_count,0),
nvl(old.appraise_default_count,0) + nvl(new.appraise_default_count,0)
from (
select
sku_id,
spu_id,
order_last_30d_count,
order_last_30d_num,
order_last_30d_amount,
order_count, order_num,
order_amount,
payment_last_30d_count,
payment_last_30d_num,
payment_last_30d_amount,
payment_count, payment_num,
payment_amount,
refund_last_30d_count,
refund_last_30d_num,
refund_last_30d_amount,
refund_count,
refund_num,
refund_amount,
cart_last_30d_count,
cart_last_30d_num,
cart_count, cart_num,
favor_last_30d_count,
favor_count,
appraise_last_30d_good_count,
appraise_last_30d_mid_count,
appraise_last_30d_bad_count,
appraise_last_30d_default_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from dwt_sku_topic
)old
full outer join (
select
sku_id,
sum(if(dt='2020-03-10', order_count,0 )) order_count,
sum(if(dt='2020-03-10',order_num ,0 )) order_num,
sum(if(dt='2020-03-10',order_amount,0 )) order_amount ,
sum(if(dt='2020-03-10',payment_count,0 )) payment_count,
sum(if(dt='2020-03-10',payment_num,0 )) payment_num,
sum(if(dt='2020-03-10',payment_amount,0 )) payment_amount,
sum(if(dt='2020-03-10',refund_count,0 )) refund_count,
sum(if(dt='2020-03-10',refund_num,0 )) refund_num,
sum(if(dt='2020-03-10',refund_amount,0 )) refund_amount,
sum(if(dt='2020-03-10',cart_count,0 )) cart_count,
sum(if(dt='2020-03-10',cart_num,0 )) cart_num,
sum(if(dt='2020-03-10',favor_count,0 )) favor_count,
sum(if(dt='2020-03-10',appraise_good_count,0 )) appraise_good_count,
sum(if(dt='2020-03-10',appraise_mid_count,0 ) ) appraise_mid_count ,
sum(if(dt='2020-03-10',appraise_bad_count,0 )) appraise_bad_count,
sum(if(dt='2020-03-10',appraise_default_count,0 )) appraise_default_count,
sum(order_count) order_count30 , sum(order_num) order_num30,
sum(order_amount) order_amount30,
sum(payment_count) payment_count30,
sum(payment_num) payment_num30,
sum(payment_amount) payment_amount30,
sum(refund_count) refund_count30,
sum(refund_num) refund_num30,
sum(refund_amount) refund_amount30,
sum(cart_count) cart_count30,
sum(cart_num) cart_num30,
sum(favor_count) favor_count30,
sum(appraise_good_count) appraise_good_count30,
sum(appraise_mid_count) appraise_mid_count30,
sum(appraise_bad_count) appraise_bad_count30,
sum(appraise_default_count) appraise_default_count30
from dws_sku_action_daycount
where dt >= date_add ('2020-03-10', -30)
group by sku_id
)new
on new.sku_id = old.sku_id
left join (
select
*
from dwd_dim_sku_info
where dt='2020-03-10')
sku_info
on nvl(new.sku_id,old.sku_id)= sku_info.id;

3)查询加载结果

hive (gmall)> select * from dwt_sku_topic limit 5;

6.4 优惠券主题宽表(预留)


1)建表语句

hive (gmall)> drop table if exists dwt_coupon_topic;
create external table dwt_coupon_topic (
`coupon_id` string COMMENT '优惠券 ID',
`get_day_count` bigint COMMENT '当日领用次数',
`using_day_count` bigint COMMENT '当日使用(下单)次数',
`used_day_count` bigint COMMENT '当日使用(支付)次数',
`get_count` bigint COMMENT '累积领用次数',
`using_count` bigint COMMENT '累积使用(下单)次数',
`used_count` bigint COMMENT '累积使用(支付)次数'
)COMMENT '购物券主题宽表'
stored as parquet
location '/warehouse/gmall/dwt/dwt_coupon_topic/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)> insert overwrite table dwt_coupon_topic
select
nvl(new.coupon_id,old.coupon_id),
nvl(new.get_count,0),
nvl(new.using_count,0),
nvl(new.used_count,0),
nvl(old.get_count,0)+nvl(new.get_count,0),
nvl(old.using_count,0)+nvl(new.using_count,0),
nvl(old.used_count,0)+nvl(new.used_count,0)
from (
select * from dwt_coupon_topic
)old
full outer join (
select
coupon_id,
get_count,
using_count,
used_count
from dws_coupon_use_daycount
where dt='2020-03-10'
)new
on old.coupon_id=new.coupon_id;

3)查询加载结果

hive (gmall)> select * from dwt_coupon_topic limit 5;

6.5 活动主题宽表(预留)


1)建表语句

hive (gmall)> drop table if exists dwt_activity_topic;
create external table dwt_activity_topic(
`id` string COMMENT '活动 id',
`activity_name` string COMMENT '活动名称',
`order_day_count` bigint COMMENT '当日日下单次数',
`payment_day_count` bigint COMMENT '当日支付次数',
`order_count` bigint COMMENT '累积下单次数',
`payment_count` bigint COMMENT '累积支付次数'
) COMMENT '活动主题宽表' row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwt/dwt_activity_topic/'
tblproperties ("parquet.compression"="lzo");

2)数据装载

hive (gmall)> insert overwrite table dwt_activity_topic
select
nvl(new.id,old.id),
nvl(new.activity_name,old.activity_name),
nvl(new.order_count,0),
nvl(new.payment_count,0),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.payment_count,0)+nvl(new.payment_count,0)
from (
select * from dwt_activity_topic
)old
full outer join (
select
id,
activity_name,
order_count,
payment_count
from dws_activity_info_daycount
where dt='2020-03-10'
)new
on old.id=new.id;

3)查询加载结果

hive (gmall)> select * from dwt_activity_topic limit 5;

6.6 DWT 层数据导入脚本

1)在/home/dw/bin 目录下创建脚本 dws_to_dwt.sh

vim dws_to_dwt.sh
#!/bin/bashAPP=gmall
hive=/opt/module/hive/bin/hiveif [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fisql="
insert overwrite table ${APP}.dwt_uv_topic
select
nvl(new.mid_id,old.mid_id),
nvl(new.user_id,old.user_id),
nvl(new.version_code,old.version_code),
nvl(new.version_name,old.version_name),
nvl(new.lang,old.lang),
nvl(new.source,old.source),
nvl(new.os,old.os),
nvl(new.area,old.area),
nvl(new.model,old.model),
nvl(new.brand,old.brand),
nvl(new.sdk_version,old.sdk_version),
nvl(new.gmail,old.gmail),
nvl(new.height_width,old.height_width),
nvl(new.app_time,old.app_time),
nvl(new.network,old.network),
nvl(new.lng,old.lng),
nvl(new.lat,old.lat),
if(old.login_date_first is null,'$do_date',old.login_date_first),
if(new.mid_id is not null,'$do_date',old.login_date_last),
if(new.mid_id is not null, new.login_count,0),
nvl(old.login_count,0)+if(new.login_count>0,1,0)
from (
select
*
from ${APP}.dwt_uv_topic
)old
full outer join (
select
*
from ${APP}.dws_uv_detail_daycount
where dt='$do_date'
)new
on old.mid_id=new.mid_id;insert overwrite table ${APP}.dwt_user_topic
select
nvl(new.user_id,old.user_id),
if(old.login_date_first is null and new.login_count>0,'$do_date',old.login_date_first),
if(new.login_count>0,'$do_date',old.login_date_last),
nvl(old.login_count,0)+if(new.login_count>0,1,0),
nvl(new.login_last_30d_count,0),
if(old.order_date_first is null and new.order_count>0,'$do_date',old.order_date_first),
if(new.order_count>0,'$do_date',old.order_date_last),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.order_amount,0)+nvl(new.order_amount,0),
nvl(new.order_last_30d_count,0),
nvl(new.order_last_30d_amount,0),
if(old.payment_date_first is null and new.payment_count>0,'$do_date',old.payment_date_first),
if(new.payment_count>0,'$do_date',old.payment_date_last),
nvl(old.payment_count,0)+nvl(new.payment_count,0),
nvl(old.payment_amount,0)+nvl(new.payment_amount,0),
nvl(new.payment_last_30d_count,0),
nvl(new.payment_last_30d_amount,0)
from
${APP}.dwt_user_topic
old
full outer join (
select
user_id,
sum(if(dt='$do_date',login_count,0)) login_count,
sum(if(dt='$do_date',order_count,0)) order_count,
sum(if(dt='$do_date',order_amount,0)) order_amount,
sum(if(dt='$do_date',payment_count,0)) payment_count,
sum(if(dt='$do_date',payment_amount,0)) payment_amount,
sum(if(login_count>0,1,0)) login_last_30d_count,
sum(order_count) order_last_30d_count,
sum(order_amount) order_last_30d_amount,
sum(payment_count) payment_last_30d_count,
sum(payment_amount) payment_last_30d_amount
from ${APP}.dws_user_action_daycount
where dt>=date_add( '$do_date',-30)
group by user_id
)new
on old.user_id=new.user_id;insert overwrite table ${APP}.dwt_sku_topic
select
nvl(new.sku_id,old.sku_id),
sku_info.spu_id, nvl(new.order_count30,0),
nvl(new.order_num30,0),
nvl(new.order_amount30,0),
nvl(old.order_count,0) + nvl(new.order_count,0),
nvl(old.order_num,0) + nvl(new.order_num,0),
nvl(old.order_amount,0) + nvl(new.order_amount,0),
nvl(new.payment_count30,0),
nvl(new.payment_num30,0),
nvl(new.payment_amount30,0),
nvl(old.payment_count,0) + nvl(new.payment_count,0),
nvl(old.payment_num,0) + nvl(new.payment_count,0),
nvl(old.payment_amount,0) + nvl(new.payment_count,0),
nvl(new.refund_count30,0), nvl(new.refund_num30,0),
nvl(new.refund_amount30,0),
nvl(old.refund_count,0) + nvl(new.refund_count,0),
nvl(old.refund_num,0) + nvl(new.refund_num,0),
nvl(old.refund_amount,0) + nvl(new.refund_amount,0),
nvl(new.cart_count30,0),
nvl(new.cart_num30,0),
nvl(old.cart_count,0) + nvl(new.cart_count,0),
nvl(old.cart_num,0) + nvl(new.cart_num,0),
nvl(new.favor_count30,0),
nvl(old.favor_count,0) + nvl(new.favor_count,0),
nvl(new.appraise_good_count30,0),
nvl(new.appraise_mid_count30,0),
nvl(new.appraise_bad_count30,0),
nvl(new.appraise_default_count30,0),
nvl(old.appraise_good_count,0) + nvl(new.appraise_good_count,0),
nvl(old.appraise_mid_count,0) + nvl(new.appraise_mid_count,0),
nvl(old.appraise_bad_count,0) + nvl(new.appraise_bad_count,0),
nvl(old.appraise_default_count,0) + nvl(new.appraise_default_count,0)
from (
select
sku_id,
spu_id,
order_last_30d_count,
order_last_30d_num,
order_last_30d_amount,
order_count, order_num,
order_amount,
payment_last_30d_count,
payment_last_30d_num,
payment_last_30d_amount,
payment_count, payment_num,
payment_amount,
refund_last_30d_count,
refund_last_30d_num,
refund_last_30d_amount,
refund_count,
refund_num,
refund_amount,
cart_last_30d_count,
cart_last_30d_num,
cart_count, cart_num,
favor_last_30d_count,
favor_count,
appraise_last_30d_good_count,
appraise_last_30d_mid_count,
appraise_last_30d_bad_count,
appraise_last_30d_default_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from ${APP}.dwt_sku_topic
)old
full outer join (
select
sku_id,
sum(if(dt='$do_date', order_count,0 )) order_count,
sum(if(dt='$do_date',order_num ,0 )) order_num,
sum(if(dt='$do_date',order_amount,0 )) order_amount ,
sum(if(dt='$do_date',payment_count,0 )) payment_count,
sum(if(dt='$do_date',payment_num,0 )) payment_num,
sum(if(dt='$do_date',payment_amount,0 )) payment_amount,
sum(if(dt='$do_date',refund_count,0 )) refund_count,
sum(if(dt='$do_date',refund_num,0 )) refund_num,
sum(if(dt='$do_date',refund_amount,0 )) refund_amount,
sum(if(dt='$do_date',cart_count,0 )) cart_count,
sum(if(dt='$do_date',cart_num,0 )) cart_num,
sum(if(dt='$do_date',favor_count,0 )) favor_count,
sum(if(dt='$do_date',appraise_good_count,0 )) appraise_good_count,
sum(if(dt='$do_date',appraise_mid_count,0 ) ) appraise_mid_count ,
sum(if(dt='$do_date',appraise_bad_count,0 )) appraise_bad_count,
sum(if(dt='$do_date',appraise_default_count,0 )) appraise_default_count,
sum(order_count) order_count30 , sum(order_num) order_num30,
sum(order_amount) order_amount30,
sum(payment_count) payment_count30,
sum(payment_num) payment_num30,
sum(payment_amount) payment_amount30,
sum(refund_count) refund_count30,
sum(refund_num) refund_num30,
sum(refund_amount) refund_amount30,
sum(cart_count) cart_count30,
sum(cart_num) cart_num30,
sum(favor_count) favor_count30,
sum(appraise_good_count) appraise_good_count30,
sum(appraise_mid_count) appraise_mid_count30,
sum(appraise_bad_count) appraise_bad_count30,
sum(appraise_default_count) appraise_default_count30
from ${APP}.dws_sku_action_daycount
where dt >= date_add ('$do_date', -30)
group by sku_id
)new
on new.sku_id = old.sku_id
left join (
select
*
from ${APP}.dwd_dim_sku_info
where dt='$do_date')
sku_info
on nvl(new.sku_id,old.sku_id)= sku_info.id;insert overwrite table ${APP}.dwt_coupon_topic
select
nvl(new.coupon_id,old.coupon_id),
nvl(new.get_count,0),
nvl(new.using_count,0),
nvl(new.used_count,0),
nvl(old.get_count,0)+nvl(new.get_count,0),
nvl(old.using_count,0)+nvl(new.using_count,0),
nvl(old.used_count,0)+nvl(new.used_count,0)
from (
select * from ${APP}.dwt_coupon_topic
)old
full outer join (
select
coupon_id,
get_count,
using_count,
used_count
from ${APP}.dws_coupon_use_daycount
where dt='$do_date'
)new
on old.coupon_id=new.coupon_id;insert overwrite table ${APP}.dwt_activity_topic
select
nvl(new.id,old.id),
nvl(new.activity_name,old.activity_name),
nvl(new.order_count,0),
nvl(new.payment_count,0),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.payment_count,0)+nvl(new.payment_count,0)
from (
select * from ${APP}.dwt_activity_topic
)old
full outer join (
select
id,
activity_name,
order_count,
payment_count
from ${APP}.dws_activity_info_daycount
where dt='$do_date'
)new
on old.id=new.id;
"$hive -e "$sql"

2)增加脚本执行权限

chmod 777 dws_to_dwt.sh

3)执行脚本导入数据

dws_to_dwt.sh 2020-03-11

4)查看导入数据

hive (gmall)> select * from dwt_uv_topic limit 5;
select * from dwt_user_topic limit 5;
select * from dwt_sku_topic limit 5;
select * from dwt_coupon_topic limit 5;
select * from dwt_activity_topic limit 5;

第 7 章 数仓搭建-ADS 层

7.1 设备主题

7.1.1 活跃设备数(日、周、月)


1)建表语句

hive (gmall)> drop table if exists ads_uv_count;
create external table ads_uv_count(
`dt` string COMMENT '统计日期',
`day_count` bigint COMMENT '当日用户数量',
`wk_count` bigint COMMENT '当周用户数量',
`mn_count` bigint COMMENT '当月用户数量',
`is_weekend` string COMMENT 'Y,N 是否是周末,用于得到本周最终结果',
`is_monthend` string COMMENT 'Y,N 是否是月末,用于得到本月最终结果'
) COMMENT '活跃设备数'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_uv_count/';

2)导入数据

hive (gmall)> insert into table ads_uv_count
select
'2020-03-10',
sum(if(login_date_last='2020-03-10',1,0)),
sum(if(login_date_last >= date_add(next_day('2020-03-10','monday'),-7) and login_date_last <= date_add(next_day('2020-03-10','monday'),-1),1,0)),
sum(if(date_format(login_date_last,'yyyy-MM') = date_format('2020-03-10','yyyy-MM'),1,0)),
if('2020-03-10' = date_add(next_day('2020-03-10','monday'),-1),'Y','N'),
if('2020-03-10' = last_day('2020-03-10'),'Y','N')
from
dwt_uv_topic;

3)查询导入结果

hive (gmall)> select * from ads_uv_count;

7.1.2 每日新增设备

1)建表语句

hive (gmall)> drop table if exists ads_new_mid_count;
create external table ads_new_mid_count (
`create_date` string comment '创建时间' ,
`new_mid_count` BIGINT comment '新增设备数量'
) COMMENT '每日新增设备信息数量'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_new_mid_count/';

2)导入数据

hive (gmall)>insert into table ads_new_mid_count
select
'2020-03-10',
count(*)
from dwt_uv_topic
where login_date_first='2020-03-10';insert into table ads_new_mid_count
select
login_date_first,
count(*)
from dwt_uv_topic
where login_date_first='2020-03-10'
group by login_date_first;

3)查询导入数据

hive (gmall)> select * from ads_new_mid_count;

7.1.3 沉默用户数

需求定义:

沉默用户:只在安装当天启动过,且启动时间是在 7 天前

1)建表语句

hive (gmall)> drop table if exists ads_silent_count;
create external table ads_silent_count(
`dt` string COMMENT '统计日期',
`silent_count` bigint COMMENT '沉默设备数'
)row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_silent_count';

2)导入数据

hive (gmall)> insert into table ads_silent_count
select
'2020-03-10',
count(*)
from dwt_uv_topic
where login_date_first=login_date_last and login_date_last<=date_add('2020-03-10',-7);

3)查询导入数据

hive (gmall)> select * from ads_silent_count;

7.1.4 本周回流用户数

需求定义: 本周回流用户
上周未活跃,本周活跃的设备,且不是本周新增设备
1)建表语句

hive (gmall)> drop table if exists ads_back_count;
create external table ads_back_count(
`dt` string COMMENT '统计日期',
`wk_dt` string COMMENT '统计日期所在周',
`wastage_count` bigint COMMENT '回流设备数'
)row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_back_count';

2)导入数据

hive (gmall)> insert into table ads_back_count
select
'2020-03-10',
concat(date_add(next_day('2020-03-10','MO'),-7),'_',date_add(next_day('2020-03-10','MO'),-1)),
count(*)
from
(
select
mid_id
from dwt_uv_topic
where login_date_last>=date_add(next_day('2020-03-10','MO'),-7)
and login_date_last<= date_add(next_day('2020-03-10','MO'),-1)
and login_date_first<date_add(next_day('2020-03-10','MO'),-7)
## 优化and login_date_first<date_add(next_day('2020-03-10','MO'),-7*2)
)current_wk
left join
(
select
mid_id
from dws_uv_detail_daycount
where dt>=date_add(next_day('2020-03-10','MO'),-7*2)
and dt<= date_add(next_day('2020-03-10','MO'),-7-1)
group by mid_id
)last_wk
on current_wk.mid_id=last_wk.mid_id
where last_wk.mid_id is null;

7.1.5 流失用户数

需求定义:
流失用户:最近 7 天未活跃的设备
1)建表语句

hive (gmall)> drop table if exists ads_wastage_count;
create external table ads_wastage_count(
`dt` string COMMENT '统计日期',
`wastage_count` bigint COMMENT '流失设备数'
)row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_wastage_count';

2)导入数据

hive (gmall)> insert into table ads_wastage_count
select
'2020-03-10',
count(*)
from dwt_uv_topic
where login_date_last <= date_add('2020-03-10',-7);//不需要group by过滤 dwt层一条记录就是一个设备id
hive (gmall)> insert into table ads_wastage_count
select'2020-03-10', count(*)
from
( select
mid_id
from dwt_uv_topic
where login_date_last<=date_add('2020-03-10',-7)
group by mid_id
)t1;insert into table ads_wastage_count
select
'2020-03-11',
count(*)
from dwt_uv_topic
where login_date_last <= date_add('2020-03-11',-7);

7.1.6 留存率


明确一天所算的任务03-14

1)建表语句

hive (gmall)> drop table if exists ads_user_retention_day_rate;
create external table ads_user_retention_day_rate (
`stat_date` string comment '统计日期',
`create_date` string comment '设备新增日期',
`retention_day` int comment '截止当前日期留存天数',
`retention_count` bigint comment '留存数量',
`new_mid_count` bigint comment '设备新增数量',
`retention_ratio` decimal(10,2) comment '留存率'
) COMMENT '每日用户留存情况'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_user_retention_day_rate/';

2)导入数据

hive (gmall)>
insert into table ads_user_retention_day_rate
select
'2020-03-13',
'2020-03-10',
3,
sum(if(login_date_first='2020-03-10' and login_date_last='2020-03-13',1,0)),
sum(if(login_date_first='2020-03-10',1,0)),
sum(if(login_date_first='2020-03-10' and login_date_last='2020-03-13',1,0))/sum(if(login_date_first='2020-03-10',1,0))
from dwt_uv_topic
union all
select
'2020-03-13',
'2020-03-11',
2,
sum(if(login_date_first='2020-03-11' and login_date_last='2020-03-13',1,0)),
sum(if(login_date_first='2020-03-11',1,0)),
sum(if(login_date_first='2020-03-11' and login_date_last='2020-03-13',1,0))/sum(if(login_date_first='2020-03-11',1,0))
from dwt_uv_topic
union all
select
'2020-03-13',
'2020-03-12',
1,
sum(if(login_date_first='2020-03-12' and login_date_last='2020-03-13',1,0)),
sum(if(login_date_first='2020-03-12',1,0)),
sum(if(login_date_first='2020-03-12' and login_date_last='2020-03-13',1,0))/sum(if(login_date_first='2020-03-12',1,0))
from dwt_uv_topic;#脚本示例
select
'$do_date',
date_add('$do_date',-3),
3,
sum(if(login_date_first=date_add('$do_date',-3) and login_date_last='$do_date',1,0)),
sum(if(login_date_first=date_add('$do_date',-3),1,0)),
sum(if(login_date_first=date_add('$do_date',-3) and login_date_last='$do_date',1,0))/sum(if(login_date_first=date_add('$do_date',-3),1,0))
from dwt_uv_topic

3)查询导入数据

hive (gmall)>select * from ads_user_retention_day_rate;

7.1.7 最近连续三周活跃用户数

1)建表语句

hive (gmall)> drop table if exists ads_continuity_wk_count;
create external table ads_continuity_wk_count(
`dt` string COMMENT '统计日期,一般用结束周周日日期,如果每天计算一次,可用当天日 期',
`wk_dt` string COMMENT '持续时间',
`continuity_count` bigint COMMENT '活跃次数'
)row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_continuity_wk_count';

2)导入数据

hive (gmall)> insert into table ads_continuity_wk_count
select '2020-03-10',
concat(date_add(next_day('2020-03-10','MO'),-7*3),'_',date_add(next_day(' 2020-03-10','MO'),-1)),
count(*)
from
(
select
mid_id
from
(
select
mid_id
from
dws_uv_detail_daycount
where dt>=date_add(next_day('2020-03-10','monday'),-7) and dt<=date_add(next_day('2020-03-10','monday'),-1)
group by mid_id
union all
select
mid_id
from dws_uv_detail_daycount
where dt>=date_add(next_day('2020-03-10','monday'),-7*2) and dt<=date_add(next_day('2020-03-10','monday'),-7-1)
group by mid_id
union all
select
mid_id
from
dws_uv_detail_daycount
where dt>=date_add(next_day('2020-03-10','monday'),-7*3) and dt<=date_add(next_day('2020-03-10','monday'),-7*2-1)
group by mid_id
)t1
group by mid_id
having count(*)=3
)t2;

3)查询

hive (gmall)> select * from ads_continuity_wk_count;

7.1.8 最近七天内连续三天活跃用户数

1)建表语句

hive (gmall)> drop table if exists ads_continuity_uv_count;
create external table ads_continuity_uv_count(
`dt` string COMMENT '统计日期',
`wk_dt` string COMMENT '最近 7 天日期',
`continuity_count` bigint
) COMMENT '连续活跃设备数'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_continuity_uv_count';

开窗

2.1)写出导入数据的 SQL 语句思路1

hive (gmall)>
insert into table ads_continuity_uv_count
select
'2020-03-10',
concat(date_add('2020-03-10',-6),'_','2020-03-10'),
count(*)
from
(
select
mid_id
from
(
select
mid_id
from
(
select
mid_id,
date_add(dt,-rk) diff
from
(
select
mid_id,
dt,
rank() over(partition by mid_id order by dt) rk
from dws_uv_detail_daycount
where dt>=date_add('2020-03-10',-6)
)t1
)t2
group by mid_id,diff
having count(*) >=3
)t3
group by mid_id
)t4;

2.2)写出导入数据的 SQL 语句思路2

7.2 会员主题

7.2.1 会员主题信息

1)建表

hive (gmall)> drop table if exists ads_user_topic;
create external table ads_user_topic(
`dt` string COMMENT '统计日期',
`day_users` string COMMENT '活跃会员数',
`day_new_users` string COMMENT '新增会员数',
`day_new_payment_users` string COMMENT '新增消费会员数',
`payment_users` string COMMENT '总付费会员数',
`users` string COMMENT '总会员数',
`day_users2users` decimal(10,2) COMMENT '会员活跃率',
`payment_users2users` decimal(10,2) COMMENT '会员付费率',
`day_new_users2users` decimal(10,2) COMMENT '会员新鲜度'
) COMMENT '会员主题信息表'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_user_topic';

2)导入数据

hive (gmall)> insert into table ads_user_topic;
select
'2020-03-10',
sum(if(login_date_last='2020-03-10',1,0)),
sum(if(login_date_first='2020-03-10',1,0)),
sum(if(payment_date_first='2020-03-10',1,0)),
sum(if(payment_count>0,1,0)),
count(*),
sum(if(login_date_last='2020-03-10',1,0))/count(*),
sum(if(payment_count>0,1,0))/count(*),
sum(if(login_date_first='2020-03-10',1,0))/sum(if(login_date_last='2020-03-10',1,0))
from dwt_user_topic;

7.2.2 漏斗分析(由于事实表不全所以所有数据从dws取)


1)建表语句

hive (gmall)> drop table if exists ads_user_action_convert_day;
create external table ads_user_action_convert_day(
`dt` string COMMENT '统计日期',
`total_visitor_m_count` bigint COMMENT '总访问人数',
`cart_u_count` bigint COMMENT '加入购物车的人数',
`visitor2cart_convert_ratio` decimal(10,2) COMMENT '访问到加入购物车转化率',
`order_u_count` bigint COMMENT '下单人数',
`cart2order_convert_ratio` decimal(10,2) COMMENT '加入购物车到下单转化率',
`payment_u_count` bigint COMMENT '支付人数',
`order2payment_convert_ratio` decimal(10,2) COMMENT '下单到支付的转化率'
) COMMENT '用户行为漏斗分析'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_user_action_convert_day/';

2)导入数据

insert into table ads_user_action_convert_day
select
ua.dt,
uv.day_count,
cart_count,
cart_count/uv.day_count*100,
order_count,
order_count/cart_count*100,
payment_count,
payment_count/order_count*100
from
(
select
'2020-03-10' dt,
sum(if(cart_count>0,1,0)) cart_count,
sum(if(order_count>0,1,0)) order_count,
sum(if(payment_count>0,1,0)) payment_count
from dws_user_action_daycount
where dt='2020-03-10'
) ua
join ads_uv_count uv
on ua.dt=uv.dt;

7.3 商品主题

7.3.1 当天商品个数信息

1)建表语句

hive (gmall)> drop table if exists ads_product_info;
create external table ads_product_info(
`dt` string COMMENT '统计日期',
`sku_num` string COMMENT 'sku 个数',
`spu_num` string COMMENT 'spu 个数'
) COMMENT '商品个数信息'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_product_info';

2)导入数据

select
count(*)
count(distinct(spu_id)) # 不要这么写
from dwt_sku_topicinsert into table ads_product_info
select
'2020-03-10' dt,
sku_num,
spu_num
from
(
select
'2020-03-10' dt,
count(*) sku_num
from
dwt_sku_topic
) tmp_sku_num
join
(
select
'2020-03-10' dt,
count(*) spu_num
from
(
select
spu_id
from
dwt_sku_topic
group by
spu_id
) tmp_spu_id
) tmp_spu_num
on tmp_sku_num.dt = tmp_spu_num.dt;

7.3.2 当天商品销量排名top10

1)建表语句

hive (gmall)> drop table if exists ads_product_sale_topN;
create external table ads_product_sale_topN(
`dt` string COMMENT '统计日期',
`sku_id` string COMMENT '商品 ID',
`payment_amount` bigint COMMENT '当日销量'
) COMMENT '商品个数信息'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_product_sale_topN';

2)导入数据

insert into ads_product_sale_topN
select
'2020-03-10',
sku_id,
payment_amount
from dws_sku_action_daycount
where dt='2020-03-10'
order by payment_amount desc limit 10;

7.3.3 当天商品收藏排名top10

1)建表语句

hive (gmall)> drop table if exists ads_product_favor_topN;
create external table ads_product_favor_topN(
`dt` string COMMENT '统计日期',
`sku_id` string COMMENT '商品 ID',
`favor_count` bigint COMMENT '当日收藏量'
) COMMENT '商品收藏 TopN'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_product_favor_topN';

2)导入数据

insert into ads_product_favor_topN
select
'2020-03-10',
sku_id,
favor_count
from dws_sku_action_daycount
where dt='2020-03-10'
order by favor_count
desc
limit 10;

7.3.4 当天商品加入购物车个数排名top10

1)建表语句

hive (gmall)> drop table if exists ads_product_cart_topN;
create external table ads_product_cart_topN(
`dt` string COMMENT '统计日期',
`sku_id` string COMMENT '商品 ID',
`cart_num` bigint COMMENT '加入购物车数量'
) COMMENT '商品加入购物车 TopN'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_product_cart_topN';

2)导入数据

insert into ads_product_cart_topN
select
'2020-03-10',
sku_id,
cart_num
from dws_sku_action_daycount
where dt='2020-03-10'
order by cart_num
desc
limit 10;

7.3.5 商品退款率排名(最近 30 天)

1)建表语句

hive (gmall)> drop table if exists ads_product_refund_topN;
create external table ads_product_refund_topN(
`dt` string COMMENT '统计日期',
`sku_id` string COMMENT '商品 ID',
`refund_ratio` decimal(10,2) COMMENT '退款率'
) COMMENT '商品退款率 TopN'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_product_refund_topN';

2)导入数据

insert into table ads_product_refund_topN
select
'2020-03-10',
sku_id,
refund_last_30d_count/payment_last_30d_count*100 refund_ratio
from dwt_sku_topic
order by refund_ratio
desc
limit 10;

7.3.6 当天商品差评率

1)建表语句

hive (gmall)> drop table if exists ads_appraise_bad_topN;
create external table ads_appraise_bad_topN(
`dt` string COMMENT '统计日期',
`sku_id` string COMMENT '商品 ID',
`appraise_bad_ratio` decimal(10,2) COMMENT '差评率'
) COMMENT '商品差评率 TopN'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_appraise_bad_topN';

2)导入数据

insert into table ads_appraise_bad_topN
select
'2020-03-10',
sku_id,
appraise_bad_count/(appraise_good_count+appraise_bad_count+appraise_default_count)*100 appraise_bad_ratio
from dws_sku_action_daycount
where dt='2020-03-10'
order by appraise_bad_ratio
desc
limit 10;

7.4 营销主题(用户+商品+购买行为)

7.4.1 当天下单数目统计

需求分析:统计每日下单数,下单金额及下单用户数。
1)建表语句

hive (gmall)> drop table if exists ads_order_daycount;
create external table ads_order_daycount(
dt string comment '统计日期',
order_count bigint comment '单日下单笔数',
order_amount bigint comment '单日下单金额',
order_users bigint comment '单日下单用户数'
) comment '每日订单总计表'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_order_daycount';

2)导入数据

insert into table ads_order_daycount
select
'2020-03-10',
sum(order_count),
sum(order_amount),
sum(if(order_count>0,1,0))
from
dws_user_action_daycount
where dt='2020-03-10';

7.4.2 当天支付信息统计

每日支付金额、支付人数、支付商品数、支付笔数以及下单到支付的平均时长(取自 DWD)
1)建表

hive (gmall)> drop table if exists ads_payment_daycount;
create external table ads_payment_daycount(
dt string comment '统计日期',
payment_count bigint comment '单日支付笔数',
payment_amount decimal(10,2) comment '单日支付金额',
payment_user_count bigint comment '单日支付人数',
payment_sku_count bigint comment '单日支付商品数',
payment_avg_time double comment '下单到支付的平均时长,取分钟数'
) comment '每日订单总计表'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_payment_daycount';

2)导入数据

insert into table ads_payment_daycount
select
tmp_payment.dt,
tmp_payment.payment_count,
tmp_payment.payment_amount,
tmp_payment.payment_user_count,
tmp_skucount.payment_sku_count,
tmp_time.payment_avg_time
from
(
select
'2020-03-10' dt,
sum(payment_count) payment_count,
sum(payment_amount) payment_amount,
sum(if(payment_count>0,1,0)) payment_user_count
from dws_user_action_daycount
where dt='2020-03-10'
) tmp_payment
join
(
select
'2020-03-10' dt,
sum(if(payment_amount>0,1,0)) payment_sku_count
from dws_sku_action_daycount
where dt='2020-03-10'
) tmp_skucount on tmp_payment.dt = tmp_skucount.dt
join
(
select
'2020-03-10' dt,
sum(unix_timestamp(payment_time)-unix_timestamp(create_time))/count(*)/60 payment_avg_time
from dwd_fact_order_info
where dt='2020-03-10' and payment_time is not null
) tmp_time on tmp_payment.dt=tmp_time.dt;

7.4.3 各一级品类下月品牌复购率


1)建表语句

hive (gmall)> drop table ads_sale_tm_category1_stat_mn;
create external table ads_sale_tm_category1_stat_mn (
tm_id string comment '品牌 id',
category1_id string comment '1 级品类 id ',
category1_name string comment '1 级品类名称 ',
buycount bigint comment '购买人数',
buy_twice_last bigint comment '两次以上购买人数',
buy_twice_last_ratio decimal(10,2) comment '单次复购率',
buy_3times_last bigint comment '三次以上购买人数',
buy_3times_last_ratio decimal(10,2) comment '多次复购率',
stat_mn string comment '统计月份',
stat_date string comment '统计日期'
) COMMENT '复购率统计'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_sale_tm_category1_stat_mn/';

2)数据导入

insert into table ads_sale_tm_category1_stat_mn
select
sku_category1_id,
sku_category1_name,
sku_tm_id,
sum(if(order_count>=1,1,0)) buycount,
sum(if(order_count>=2,1,0)) buyTwiceLast,
sum(if(order_count>=3,1,0)) buy3timeLast,
sum(if(order_count>=2,1,0))/sum( if(order_count>=1,1,0)) buyTwiceLastRatio,
sum(if(order_count>=3,1,0))/sum( if(order_count>=1,1,0)) buy3timeLastRatio,
date_format('$do_date' ,'yyyy-MM') stat_mn,
'$do_date' stat_date
from
(
select
sku_category1_id,
sku_category1_name,
user_id,
sku_tm_id,
sum(order_count) order_count
from dws_sale_detail_daycount
where date_format(dt,'yyy-MM')=date_format('$do_date','yyyy-MM')
group by user_id,sku_tm_id,sku_category1_id,sku_category1_name
)t1
group by sku_tm_id,sku_category1_id,sku_category1_name;

7.5 ADS 层导入脚本

1)在/home/dw/bin 目录下创建脚本 dwt_to_ads.sh

vim dwt_to_ads.sh
#!/bin/bashhive=/opt/module/hive/bin/hiveif [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fisql="
use gmall;insert into table ads_uv_count
select
'$do_date',
sum(if(login_date_last='$do_date',1,0)),
sum(if(login_date_last >= date_add(next_day('$do_date','monday'),-7) and login_date_last <= date_add(next_day('$do_date','monday'),-1),1,0)),
sum(if(date_format(login_date_last,'yyyy-MM') = date_format('$do_date','yyyy-MM'),1,0)),
if('$do_date' = date_add(next_day('$do_date','monday'),-1),'Y','N'),
if('$do_date' = last_day('$do_date'),'Y','N')
from
dwt_uv_topic;insert into table ads_new_mid_count
select
'$do_date',
count(*)
from dwt_uv_topic
where login_date_first='$do_date';insert into table ads_new_mid_count
select
login_date_first,
count(*)
from dwt_uv_topic
where login_date_first='$do_date'
group by login_date_first;insert into table ads_silent_count
select
'$do_date',
count(*)
from dwt_uv_topic
where login_date_first=login_date_last and login_date_last<=date_add('$do_date',-7);insert into table ads_back_count
select
'$do_date',
concat(date_add(next_day('$do_date','MO'),-7),'_',date_add(next_day('$do_date','MO'),-1)),
count(*)
from
(
select
mid_id
from dwt_uv_topic
where login_date_last>=date_add(next_day('$do_date','MO'),-7)
and login_date_last<= date_add(next_day('$do_date','MO'),-1)
and login_date_first<date_add(next_day('$do_date','MO'),-7)
)current_wk
left join
(
select
mid_id
from dws_uv_detail_daycount
where dt>=date_add(next_day('$do_date','MO'),-7*2)
and dt<= date_add(next_day('$do_date','MO'),-7-1)
group by mid_id
)last_wk
on current_wk.mid_id=last_wk.mid_id
where last_wk.mid_id is null;insert into table ads_wastage_count
select
'$do_date',
count(*)
from dwt_uv_topic
where login_date_last <= date_add('$do_date',-7);insert into table ads_user_retention_day_rate
select
'$do_date',
date_add('$do_date',-3),
3,
sum(if(login_date_first=date_add('$do_date',-3) and login_date_last='$do_date',1,0)),
sum(if(login_date_first=date_add('$do_date',-3),1,0)),
sum(if(login_date_first=date_add('$do_date',-3) and login_date_last='$do_date',1,0))/sum(if(login_date_first=date_add('$do_date',-3),1,0))
from dwt_uv_topic
union all
select
'$do_date',
date_add('$do_date',-2),
2,
sum(if(login_date_first=date_add('$do_date',-2) and login_date_last='$do_date',1,0)),
sum(if(login_date_first=date_add('$do_date',-2),1,0)),
sum(if(login_date_first=date_add('$do_date',-2) and login_date_last='$do_date',1,0))/sum(if(login_date_first=date_add('$do_date',-2),1,0))
from dwt_uv_topic
union all
select
'$do_date',
date_add('$do_date',-1),
1,
sum(if(login_date_first=date_add('$do_date',-1) and login_date_last='$do_date',1,0)),
sum(if(login_date_first=date_add('$do_date',-1),1,0)),
sum(if(login_date_first=date_add('$do_date',-1) and login_date_last='$do_date',1,0))/sum(if(login_date_first=date_add('$do_date',-1),1,0))
from dwt_uv_topic;insert into table ads_continuity_wk_count
select '$do_date',
concat(date_add(next_day('$do_date','MO'),-7*3),'_',date_add(next_day('$do_date','MO'),-1)),
count(*)
from
(
select
mid_id
from
(
select
mid_id
from
dws_uv_detail_daycount
where dt>=date_add(next_day('$do_date','monday'),-7) and dt<=date_add(next_day('$do_date','monday'),-1)
group by mid_id
union all
select
mid_id
from dws_uv_detail_daycount
where dt>=date_add(next_day('$do_date','monday'),-7*2) and dt<=date_add(next_day('$do_date','monday'),-7-1)
group by mid_id
union all
select
mid_id
from
dws_uv_detail_daycount
where dt>=date_add(next_day('$do_date','monday'),-7*3) and dt<=date_add(next_day('$do_date','monday'),-7*2-1)
group by mid_id
)t1
group by mid_id
having count(*)=3
)t2;insert into table ads_continuity_uv_count
select
'$do_date',
concat(date_add('$do_date',-6),'_','$do_date'),
count(*)
from
(
select
mid_id
from
(
select
mid_id
from
(
select
mid_id,
date_add(dt,-rk) diff
from
(
select
mid_id,
dt,
rank() over(partition by mid_id order by dt) rk
from dws_uv_detail_daycount
where dt>=date_add('$do_date',-6) and dt<='$do_date'
)t1
)t2
group by mid_id,diff
having count(*)>=3
)t3
group by mid_id
)t4;insert into table ads_user_topic
select
'$do_date',
sum(if(login_date_last='$do_date',1,0)),
sum(if(login_date_first='$do_date',1,0)),
sum(if(payment_date_first='$do_date',1,0)),
sum(if(payment_count>0,1,0)),
count(*),
sum(if(login_date_last='$do_date',1,0))/count(*),
sum(if(payment_count>0,1,0))/count(*),
sum(if(login_date_first='$do_date',1,0))/sum(if(login_date_last='$do_date',1,0))
from dwt_user_topic;insert into table ads_user_action_convert_day
select
ua.dt,
uv.day_count,
cart_count,
cart_count/uv.day_count*100,
order_count,
order_count/cart_count*100,
payment_count,
payment_count/order_count*100
from
(
select
'$do_date' dt,
sum(if(cart_count>0,1,0)) cart_count,
sum(if(order_count>0,1,0)) order_count,
sum(if(payment_count>0,1,0)) payment_count
from dws_user_action_daycount
where dt='$do_date'
) ua
join ads_uv_count uv
on ua.dt=uv.dt;insert into table ads_product_info
select
'$do_date' dt,
sku_num,
spu_num
from
(
select
'$do_date' dt,
count(*) sku_num
from
dwt_sku_topic
) tmp_sku_num
join
(
select
'$do_date' dt,
count(*) spu_num
from
(
select
spu_id
from
dwt_sku_topic
group by
spu_id
) tmp_spu_id
) tmp_spu_num
on tmp_sku_num.dt = tmp_spu_num.dt;insert into ads_product_sale_topN
select
'$do_date',
sku_id,
payment_amount
from dws_sku_action_daycount
where dt='$do_date'
order by payment_amount desc limit 10;insert into ads_product_favor_topN
select
'$do_date',
sku_id,
favor_count
from dws_sku_action_daycount
where dt='$do_date'
order by favor_count
desc
limit 10;insert into ads_product_cart_topN
select
'$do_date',
sku_id,
cart_num
from dws_sku_action_daycount
where dt='$do_date'
order by cart_num
desc
limit 10;insert into table ads_product_refund_topN
select
'$do_date',
sku_id,
refund_last_30d_count/payment_last_30d_count*100 refund_ratio
from dwt_sku_topic
order by refund_ratio
desc
limit 10;insert into table ads_appraise_bad_topN
select
'$do_date',
sku_id,
appraise_bad_count/(appraise_good_count+appraise_bad_count+appraise_default_count)*100 appraise_bad_ratio
from dws_sku_action_daycount
where dt='$do_date'
order by appraise_bad_ratio
desc
limit 10;insert into table ads_order_daycount
select
'$do_date',
sum(order_count),
sum(order_amount),
sum(if(order_count>0,1,0))
from
dws_user_action_daycount
where dt='$do_date';insert into table ads_payment_daycount
select
tmp_payment.dt,
tmp_payment.payment_count,
tmp_payment.payment_amount,
tmp_payment.payment_user_count,
tmp_skucount.payment_sku_count,
tmp_time.payment_avg_time
from
(
select
'$do_date' dt,
sum(payment_count) payment_count,
sum(payment_amount) payment_amount,
sum(if(payment_count>0,1,0)) payment_user_count
from dws_user_action_daycount
where dt='$do_date'
) tmp_payment
join
(
select
'$do_date' dt,
sum(if(payment_amount>0,1,0)) payment_sku_count
from dws_sku_action_daycount
where dt='$do_date'
) tmp_skucount on tmp_payment.dt = tmp_skucount.dt
join
(
select
'$do_date' dt,
sum(unix_timestamp(payment_time)-unix_timestamp(create_time))/count(*)/60 payment_avg_time
from dwd_fact_order_info
where dt='$do_date' and payment_time is not null
) tmp_time on tmp_payment.dt=tmp_time.dt;insert into table ads_sale_tm_category1_stat_mn
select
sku_category1_id,
sku_category1_name,
sku_tm_id,
sum(if(order_count>=1,1,0)) buycount,
sum(if(order_count>=2,1,0)) buyTwiceLast,
sum(if(order_count>=3,1,0)) buy3timeLast,
sum(if(order_count>=2,1,0))/sum( if(order_count>=1,1,0)) buyTwiceLastRatio,
sum(if(order_count>=3,1,0))/sum( if(order_count>=1,1,0)) buy3timeLastRatio,
date_format('$do_date' ,'yyyy-MM') stat_mn,
'$do_date' stat_date
from
(
select
sku_category1_id,
sku_category1_name,
user_id,
sku_tm_id,
sum(order_count) order_count
from dws_sale_detail_daycount
where date_format(dt,'yyyy-MM')=date_format('$do_date','yyyy-MM')
group by user_id,sku_tm_id,sku_category1_id,sku_category1_name
)t1
group by sku_tm_id,sku_category1_id,sku_category1_name;
"
$hive -e "$sql"

第 8 章 Azkaban 调度

8.1 Azkaban 部署

8.1.1 安装前准备

将 Azkaban Web 服务器
Azkaban 执行服务器
Azkaban 的 sql 执行脚本及
MySQL 安装包拷贝到 dw1虚拟机/opt/software 目录下

8.1.2 安装 Azkaban

1)在/opt/module/目录下创建 azkaban 目录
2)解 压
azkaban-web-server-2.5.0.tar.gz 、
azkaban-executor-server-2.5.0.tar.gz 、
azkaban-sql-script-2.5.0.tar.gz
到/opt/module/azkaban目录下
3)对解压后的文件重新命名

[dw@dw1 azkaban]$ mv azkaban-executor-2.5.0/ executor
[dw@dw1 azkaban]$ mv azkaban-web-2.5.0/ server

4)azkaban 脚本导入

进入 mysql,创建 azkaban 数据库,并将解压的脚本导入到 azkaban 数据库。
mysql -uroot -p123456
create database azkaban;
use azkaban;
source /opt/module/azkaban/azkaban-2.5.0/create-all-sql-2.5.0.sql;

8.1.3 生成密钥库


1)生成 keystore 的密码及相应信息的密钥库放置到azkaban web 服务器根目录中(server)

[dw@dw1 server]$ keytool -keystore keystore -alias jetty -genkey -keyalg RSA
输入密钥库口令:123456
输入密钥库口令:
再次输入新口令:
您的名字与姓氏是什么? [Unknown]:
您的组织单位名称是什么? [Unknown]:
您的组织名称是什么? [Unknown]:
您所在的城市或区域名称是什么? [Unknown]:
您所在的省/市/自治区名称是什么?
[Unknown]:
该单位的双字母国家/地区代码是什么? [Unknown]:
CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown 是否正确?
[否]: y输入 <jetty> 的密钥口令
(如果和密钥库口令相同, 按回车): 再次输入新口令:


2)查看生成的密钥对

[dw@dw1 server]$ keytool -keystore keystore -list

8.1.4 时间同步配置

8.1.5 配置文件

8.1.5.1 Web server服务器配置

1)进入 azkaban web(server端) 服务器安装目录 conf 目录,打开 azkaban.properties 文件

[dw@dw1 conf]$ cd /opt/module/azkaban/server/conf
[dw@dw1 conf]$ vim azkaban.properties

2)按照如下配置修改 azkaban.properties 文件

#Azkaban Personalization Settings
#服务器 UI 名称,用于服务器上方显示的名字
azkaban.name=Test
#描述
azkaban.label=My Local Azkaban
#UI 颜色
azkaban.color=#FF3601
azkaban.default.servlet.path=/index
#默认web server 存放web 文件的目录
web.resource.dir=/opt/module/azkaban/server/web/
#默认时区,已改为亚洲/上海 默认为美国
default.timezone.id=Asia/Shanghai#Azkaban UserManager class
user.manager.class=azkaban.user.XmlUserManager
#用户权限管理默认类(绝对路径)
user.manager.xml.file=/opt/module/azkaban/server/conf/azkaban-users.xml#Loader for projects
#global 配 置 文 件 所 在 位 置 ( 绝 对 路 径 )
executor.global.properties=/opt/module/azkaban/executor/conf/global.properties
azkaban.project.dir=projects#数据库类型
database.type=mysql
#端口号
mysql.port=3306
# 数 据 库 连 接 IP
mysql.host=dw1
#数据库实例名
mysql.database=azkaban
#数据库用户名
mysql.user=root
#数据库密码
mysql.password=123456
#最大连接数
mysql.numconnections=100# Velocity dev mode
velocity.dev.mode=false# Azkaban Jetty server properties.
# Jetty 服务器属性.
#最大线程数
jetty.maxThreads=25
#Jetty SSL 端口
jetty.ssl.port=8443
#Jetty 端口
jetty.port=8081
#SSL 文件名(绝对路径)
jetty.keystore=/opt/module/azkaban/server/keystore
#SSL 文件密码
jetty.password=123456
#Jetty 主密码与keystore 文件相同
jetty.keypassword=123456
#SSL 文件名(绝对路径)
jetty.truststore=/opt/module/azkaban/server/keystore
#SSL 文件密码
jetty.trustpassword=123456# Azkaban Executor settings
executor.port=12321# mail settings
mail.sender=
mail.host=
job.failure.email=
job.success.email=
lockdown.create.projects=false
cache.directory=cache

3)web 服务器用户配置
在 azkaban web 服务器安装目录 conf 目录,按照如下配置修改 azkaban-users.xml 文件, 增加管理员用户。

<azkaban-users><user username="azkaban" password="azkaban" roles="admin" groups="azkaban" /><user username="metrics" password="metrics" roles="metrics"/><user username="admin" password="admin" roles="admin,metrics" /><role name="admin" permissions="ADMIN" /><role name="metrics" permissions="METRICS"/>
</azkaban-users>

8.1.5.2 执行服务器配置

#Azkaban
default.timezone.id=Asia/Shanghai# Azkaban JobTypes Plugins
azkaban.jobtype.plugin.dir=plugins/jobtypes#Loader for projects
executor.global.properties=/opt/module/azkaban/executor/conf/global.properties
azkaban.project.dir=projectsdatabase.type=mysql
mysql.port=3306
mysql.host=dw1
mysql.database=azkaban
mysql.user=root
mysql.password=123456
mysql.numconnections=100# Azkaban Executor settings
executor.maxThreads=50
executor.port=12321
executor.flow.threads=30

8.1.6 启动 executor 服务器

在 executor 服务器目录下执行启动命令

cd /opt/module/azkaban/executor
bin/azkaban-executor-start.sh

8.1.7 启动 web server服务器

在 azkaban web 服务器目录下执行启动命令

cd /opt/module/azkaban/server
bin/azkaban-web-start.sh

8.2 创建 MySQL 数据库和表

(1)创建 gmall_report 数据库

或者:SQL 语句

CREATE   DATABASE    `gmall_report`    CHARACTER   SET 'utf8'    COLLATE 'utf8_general_ci';

(2)创建表

DROP TABLE IF EXISTS `ads_user_topic`;
CREATE TABLE `ads_user_topic` (
`dt` date NOT NULL,
`day_users` bigint(255) NULL DEFAULT NULL,
`day_new_users` bigint(255) NULL DEFAULT NULL,
`day_new_payment_users` bigint(255) NULL DEFAULT NULL,
`payment_users` bigint(255) NULL DEFAULT NULL,
`users` bigint(255) NULL DEFAULT NULL,
`day_users2users` double(255, 2) NULL DEFAULT NULL,
`payment_users2users` double(255, 2) NULL DEFAULT NULL,
`day_new_users2users` double(255, 2) NULL DEFAULT NULL,
PRIMARY KEY (`dt`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci
ROW_FORMAT = Compact;

8.3 Sqoop 导出脚本

1)编写 Sqoop 导出脚本
在/home/dw/bin 目录下创建脚本 hdfs_to_mysql.sh

vim hdfs_to_mysql.sh
#!/bin/bashhive_db_name=gmall
mysql_db_name=gmall_reportexport_data() {/opt/module/sqoop/bin/sqoop export \
--connect "jdbc:mysql://dw1:3306/${mysql_db_name}?useUnicode=true&characterEncoding=utf-8"  \
--username root \
--password 123456 \
--table $1 \
--num-mappers 1 \
--export-dir /warehouse/$hive_db_name/ads/$1 \
--input-fields-terminated-by "\t" \
--update-mode allowinsert \
--update-key $2 \
--input-null-string '\\N' \
--input-null-non-string '\\N'
}case $1 in
"ads_uv_count")export_data "ads_uv_count" "dt"
;;
"ads_user_action_convert_day")export_data "ads_user_action_convert_day" "dt"
;;
"ads_user_topic")export_data "ads_user_topic" "dt"
;;
"all")export_data "ads_uv_count" "dt"export_data "ads_user_action_convert_day" "dt"export_data "ads_user_topic" "dt"
;;
esac



3)执行 Sqoop 导出脚本

[dw@dw1 bin]$ chmod 777 sqoop_export.sh
[dw@dw1 bin]$ sqoop_export.sh all

8.4 会员主题指标获取的全调度流程

生成12号日志与业务数据
如果Kafka时间索引导致的不能写入hdfs,则启动Kafka并删除topic
(需要先停止flume消费行为)

[dw@dw1 kafka]$ bin/kafka-topics.sh --zookeeper dw1:2181 --delete --topic topic_event
[dw@dw1 kafka]$ bin/kafka-topics.sh --zookeeper dw1:2181 --delete --topic topic_start
bin/kafka-topics.sh --zookeeper dw1:2181 --list
[dw@dw1 kafka]$ bin/kafka-topics.sh --zookeeper dw1:2181 --create --partitions 3 --replication-factor 1 --topic topic_start
[dw@dw1 kafka]$ bin/kafka-topics.sh --zookeeper dw1:2181 --create --partitions 3 --replication-factor 1 --topic topic_event


(1)mysql_to_hdfs.job

type=command
command=/home/dw/bin/mysql_to_hdfs.sh all ${dt}

(2)hdfs_to_ods_log.job

type=command
command=/home/dw/bin/hdfs_to_ods_log.sh ${dt}

(3)hdfs_to_ods_db.job

type=command
command=/home/dw/bin/hdfs_to_ods_db.sh all ${dt}
dependencies=mysql_to_hdfs

(4)ods_to_dwd_start_log.job

type=command
command=/home/dw/bin/ods_to_dwd_start_log.sh ${dt}
dependencies=hdfs_to_ods_log

(5)ods_to_dwd_db.job

type=command
command=/home/dw/bin/ods_to_dwd_db.sh ${dt}
dependencies=hdfs_to_ods_db

(6)dwd_to_dws.job

type=command
command=/home/dw/bin/dwd_to_dws.sh ${dt}
dependencies=ods_to_dwd_db,ods_to_dwd_start_log

(7)dws_to_dwt.job

type=command
command=/home/dw/bin/dws_to_dwt.sh ${dt}
dependencies=dwd_to_dws

(8)dwt_to_ads.job

type=command
command=/home/dw/bin/dwt_to_ads.sh ${dt}
dependencies=dws_to_dwt

(9)hdfs_to_mysql.job

type=command
command=/home/dw/bin/hdfs_to_mysql.sh ads_user_topic
dependencies=dwt_to_ads


33 大数据项目之电商数仓(电商数据仓库系统)相关推荐

  1. 尚硅谷大数据项目之电商数仓(4即席查询数据仓库)

    尚硅谷大数据项目之电商数仓(即席查询) (作者:尚硅谷大数据研发部) 版本:V4.0 第1章 Presto 1.1 Presto简介 1.1.1 Presto概念 1.1.2 Presto架构 1.1 ...

  2. 电商数仓描述_笔记-尚硅谷大数据项目数据仓库-电商数仓V1.2新版

    架构 项目框架 数仓架构 存储压缩 Snappy与LZO LZO安装: 读取LZO文件时,需要先创建索引,才可以进行切片. 框架版本选型Apache:运维麻烦,需要自己调研兼容性. CDH:国内使用最 ...

  3. 大数据项目 --- 电商数仓(一)

    这个项目实在数据采集基础使用的,需要提前复习之前学的东西,否则的话就是很难继续学习.详见博客数据项目一 ---数据采集项目.大数据项目 --- 数据采集项目_YllasdW的博客-CSDN博客大数据第 ...

  4. 31 大数据项目之电商数仓(用户行为数据采集)

    文章目录 第1章 大数据项目之电商数仓(用户行为数据采集) 第2章 项目需求 2.1 项目需求分析 2.2 项目框架 2.2.1 技术选型 2.2.2 系统架构图设计 2.2.3 系统数据流程设计 2 ...

  5. 大数据项目之电商数据仓库系统回顾

    文章目录 一.实训课题 二.实训目的 三.操作环境 四. 实训过程(实训内容及主要模块) 五.实训中用到的课程知识点 六.实训中遇到的问题及解决方法 1) 再次格式化NameNode导致无法启动dat ...

  6. 大数据项目离线数仓(全 )一(数据采集平台)

    搭建用户行为数据采集平台.搭建业务数据采集平台.搭建数据仓库系统.制作可视化报表 本篇博客包括搭建用户行为数据采集平台.搭建业务数据采集平台 搭建数据仓库系统在大数据项目离线数仓(全 )二 制作可视化 ...

  7. 大数据项目离线数仓(全 )二(数仓系统)

    本文仅仅包含数据仓库系统的搭建,其他内容请关注我的博客!在<项目>专栏里!!! 本篇文章参考尚硅谷大数据项目写成! 目录 一.数据仓库系统 1.1基础概念 1.1.1数据分层的好处 1.1 ...

  8. 电商大数据项目(二)-推荐系统实战之实时分析以及离线分析

    电商大数据项目-推荐系统实战(一)环境搭建以及日志,人口,商品分析 https://blog.51cto.com/6989066/2325073 电商大数据项目-推荐系统实战之推荐算法 https:/ ...

  9. 电商大数据项目-推荐系统实战(一)

    电商大数据项目-推荐系统实战(一)环境搭建以及日志,人口,商品分析 https://blog.51cto.com/6989066/2325073 电商大数据项目-推荐系统实战之推荐算法 https:/ ...

最新文章

  1. 大型网站系统架构实践(五)深入探讨web应用高可用方案
  2. 每天一道LeetCode-----后缀表达式求值
  3. Java NIO学习篇之缓冲区ByteBuffer详解
  4. 使用sshpass同时更新一台ubuntu和一台CentOS
  5. [轉]Android Libraries 介紹 - Butter knife
  6. QT高级编程技巧(一)-- 编写高效的signal slot通信代码
  7. Unity3D插件之DoTween
  8. AE拓展工具丨自定义工具栏 快速嵌入表达式
  9. python3吧_基于python3 抓取贴吧图片与评论 图片下载保存
  10. Pytorch中函数参数dim的理解
  11. 《少年派的奇幻漂流》:美的漂流,生的思索!
  12. 什么是幂等,什么情况下需要幂等,如何实现幂等
  13. 2020年各省二建房建挂靠价格汇总
  14. 第三方支付公司怎么收取手续费
  15. 2018/5/25-2018/6/7
  16. 如何解决DOSBox 0.74无法运行edit指令
  17. 《网络安全法》及其法律体系介绍
  18. 【经验分享】拼多多怎么查看帮别人砍价记录?
  19. Cutterman插件安装及使用说明
  20. 实例解读奈奎斯特稳定判据

热门文章

  1. html5 语音唤醒,真正实现息屏语音唤醒小爱同学!释放你的双手,逼格满满哦!...
  2. andorid身份证输入格式--EditText设置
  3. Java中浮点数取整数部分和小数部分
  4. win7打不开计算机网络连接,win7系统有网络但是打不开网页的解决方法【图文】...
  5. 高中数学一对一辅导如何用半年时间数学从60分到130分逆袭诀窍
  6. Mysql底层数据结构学习总结
  7. Laravel 文件系统/存储
  8. stylecloud ,wordcloud 库学习及使用例子
  9. 什么是软件验收测试?如何获取软件验收测试报告
  10. sdk是什么_动态贴纸在美颜sdk中起着什么作用