一、数仓规划

1.1 集群规划

  • 技术选型
位置 框架
数据采集传输 FlumeKafkaSqoop ,Logstash,DataX,
数据存储 MySqlHDFS,HBase,Redis,MongoDB
数据计算 Hive,Tez, Spark, Flink
数据查询 Presto,Druid ,Impala,Kylin
数据可视化 Echarts、Superset、Tableau、QuickBI、DataV
任务调度 Azkaban、Oozie
集群监控 Zabbix
元数据管理 Atlas
  • 框架版本选型
框架 版本(老版本)
Hadoop 3.1.3(2.7.2)
Flume 1.9.0(1.7.0)
Kafka 2.4.1(0.11.0.2)
Hive 3.1(2.3)
Sqoop 1.4.6
MySQL 5.6.24
Azkaban 2.5.0
Java 1.8
Zookeeper 3.5.7(3.4.10)
Presto 0.189
  • 集群资源规划设计
服务名称 子服务 simwor01(6G) simwor02(4G) simwor03(4G)
HDFS NameNode
DataNode
SecondaryNameNode
Yarn NodeManager
Resourcemanager
Zookeeper Quorum
Flume(采集日志) Flume
Kafka Broker
Flume(消费Kafka) Flume
Hive Hive
MySQL MySQL
Sqoop Sqoop
Presto Coordinator
Worker
Azkaban WebServer
ExecutorServer
Druid Druid
Kylin Kylin
Hbase HMaster
HRegionServer
Superset Superset
Atlas Atlas
Solr Jar
服务数总计 19 9 9
  • 系统数据流

1.2 数据模拟

1.2.1 用户行为日志

  • 启动日志格式
标签 含义
entry 入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget =5
open_ad_type 开屏广告类型: 开屏原生广告=1, 开屏插屏广告=2
action 状态:成功=1 失败=2
loading_time 加载时长:计算下拉开始到接口返回数据的时间,(开始加载报0,加载成功或加载失败才上报时间)
detail 失败码(没有则上报空)
extend1 失败的message(没有则上报空)
en 日志类型start
{"action": "1","ar": "MX","ba": "Huawei","detail": "542","en": "start","entry": "4","extend1": "","g": "K0MKH4F4@gmail.com","hw": "640*960","l": "pt","la": "-16.6","ln": "-99.3","loading_time": "3","md": "Huawei-11","mid": "17","nw": "WIFI","open_ad_type": "2","os": "8.1.2","sr": "U","sv": "V2.7.5","t": "1621094955645","uid": "17","vc": "8","vn": "1.1.8"
}
  • 用户行为日志格式
# 样例
{"ap": "xxxxx", //项目数据来源 app pc"cm": { //公共字段"mid": "", // (String) 设备唯一标识"uid": "", // (String) 用户标识"vc": "1", // (String) versionCode,程序版本号"vn": "1.0", // (String) versionName,程序版本名"l": "zh", // (String) language系统语言"sr": "", // (String) 渠道号,应用从哪个渠道来的。"os": "7.1.1", // (String) Android系统版本"ar": "CN", // (String) area区域"md": "BBB100-1", // (String) model手机型号"ba": "blackberry", // (String) brand手机品牌"sv": "V2.2.1", // (String) sdkVersion"g": "", // (String) gmail"hw": "1620x1080", // (String) heightXwidth,屏幕宽高"t": "1506047606608", // (String) 客户端日志产生时的时间"nw": "WIFI", // (String) 网络模式"ln": 0, // (double) lng经度"la": 0 // (double) lat 纬度},"et": [ //事件{"ett": "1506047605364", //客户端事件产生时间"en": "display", //事件名称"kv": { //事件结果,以key-value形式自行定义"goodsid": "236","action": "1","extend1": "1","place": "2","category": "75"}}]
}# 实例
{"cm": {"ln": "-47.2","sv": "V2.4.9","os": "8.1.8","g": "V58ACQ4P@gmail.com","mid": "999","nw": "4G","l": "en","vc": "0","hw": "640*960","ar": "MX","uid": "999","t": "1620912298272","la": "-36.8","md": "sumsung-10","vn": "1.3.9","ba": "Sumsung","sr": "T"},"ap": "app","et": [{"ett": "1620836353194","en": "display","kv": {"goodsid": "248","action": "1","extend1": "2","place": "2","category": "30"}}, {"ett": "1620852165154","en": "newsdetail","kv": {"entry": "3","goodsid": "249","news_staytime": "10","loading_time": "0","action": "4","showtype": "5","category": "51","type1": "201"}}, {"ett": "1620850884761","en": "loading","kv": {"extend2": "","loading_time": "0","action": "2","extend1": "","type": "1","type1": "","loading_way": "1"}}, {"ett": "1620880365609","en": "notification","kv": {"ap_time": "1620848443801","action": "2","type": "1","content": ""}}, {"ett": "1620873678198","en": "error","kv": {"errorDetail": "java.lang.NullPointerException\\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief": "at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}}]
}
  • 代码生成模拟日志

随机生成启动日志和用户行为日志,默认在 ‘/tmp/logs/app-YYYY-MM-dd.log’。

参数一:控制发送每条的延时时间,默认是0

参数二:循环遍历次数(产生日志的条数),默认是1000

[omm@simwor01 ~]$ cat bin/genlog #! /bin/bashfor i in simwor01 simwor02
dossh $i "java -jar /opt/soft/GenLog-1.0-SNAPSHOT-jar-with-dependencies.jar $1 $2 &>/dev/null &"
done[omm@simwor01 ~]$

1.2.2 业务交互数据

  • 电商业务流程

  • 电商常识
  1. SKU=Stock Keeping Unit(库存量基本单位)。现在已经被引申为产品统一编号的简称,每种产品均对应有唯一的SKU号。
  2. SPU(Standard Product Unit):是商品信息聚合的最小单位,是一组可复用、易检索的标准化信息集合。
  3. 例如:iPhoneX手机就是SPU。一台银色、128G内存的、支持联通网络的iPhoneX,就是SKU。
  • 电商表结构

  • 代码生成业务数据

mock.date : 模拟生成哪一天的数据

mock.clear : 是否清除历史数据(0 否 1 是)

[omm@simwor01 mysql-data-generator]$ pwd
/opt/module/mysql-data-generator
[omm@simwor01 mysql-data-generator]$ vim application.properties
[omm@simwor01 mysql-data-generator]$ grep mock. application.properties
mock.date=2021-03-11
mock.clear=0
mock.user.count=50
mock.user.male-rate=20
mock.favor.cancel-rate=10
mock.favor.count=100
mock.cart.count=10
mock.cart.sku-maxcount-per-cart=3
mock.order.user-rate=80
mock.order.sku-rate=70
mock.order.join-activity=1
mock.order.use-coupon=1
mock.coupon.user-count=10
mock.payment.rate=70
mock.payment.payment-type=30:60:10
mock.comment.appraise-rate=30:10:10:50
mock.refund.reason-rate=30:10:20:5:15:5:5
[omm@simwor01 mysql-data-generator]$ java -jar gmall-mock-db-2020-03-16-SNAPSHOT.jar
*************************** 16. row ***************************id: 3225consignee: 祁波宁consignee_tel: 13559397760final_total_amount: 8384.00order_status: 1004user_id: 6delivery_address: 第8大街第27号楼2单元925门order_comment: 描述927727out_trade_no: 518433858745953trade_body: 联想(Lenovo)Y9000X 2019新款 15.6英寸高性能标压轻薄本笔记本电脑(i5-9300H 16G 512GSSD FHD)深空灰等2件商品create_time: 2021-03-11 00:00:00operate_time: 2021-03-11 00:00:00expire_time: 2021-03-11 00:15:00tracking_no: NULLparent_order_id: NULLimg_url: http://img.gmall.com/362367.jpgprovince_id: 20
benefit_reduce_amount: 6227.00
original_total_amount: 14598.00feight_fee: 13.00
16 rows in set (0.00 sec)mysql> select * from order_info\G

1.3 日志采集

1.3.1 file -> flume -> kafka

  • Source
  1. TailDir Source: 断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
  2. Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
  3. Spooling Directory Source监控目录,不支持断点续传。

  • 日志收集的配置文件
[omm@simwor01 ~]$ cd /opt/module/flume/jobs/
[omm@simwor01 jobs]$ cat file-flume-kafka.conf
a1.sources=r1
a1.channels=c1 c2# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/jobs/file-flume-kafka.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2# interceptor
a1.sources.r1.interceptors =  i1 i2
a1.sources.r1.interceptors.i1.type = com.simwor.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.simwor.flume.interceptor.LogTypeInterceptor$Buildera1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = simwor01:9092,simwor02:9092,simwor03:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumera1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = simwor01:9092,simwor02:9092,simwor03:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
[omm@simwor01 jobs]$
  • 日志清洗拦截器
package com.simwor.flume.interceptor;import com.simwor.flume.utils.LogUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;public class LogETLInterceptor implements Interceptor {public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new LogETLInterceptor();}@Override public void configure(Context context) { }}@Override public void initialize() { }@Overridepublic Event intercept(Event event) {byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);if(log.contains("start") && LogUtils.validateStart(log))return event;else if(LogUtils.validateEvent(log))return event;return null;}@Overridepublic List<Event> intercept(List<Event> events) {ArrayList<Event> interceptors = new ArrayList<>();Event eventIntercepted;for (Event event : events) {eventIntercepted = intercept(event);if (eventIntercepted != null){interceptors.add(eventIntercepted);}}return interceptors;}@Override public void close() { }
}
  • 日志类型分发拦截器
package com.simwor.flume.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class LogTypeInterceptor implements Interceptor {public static class Builder implements  Interceptor.Builder{@Overridepublic Interceptor build() {return new LogTypeInterceptor();}@Override public void configure(Context context) { }}@Override public void initialize() { }@Overridepublic Event intercept(Event event) {String log = new String(event.getBody(), StandardCharsets.UTF_8);Map<String, String> headers = event.getHeaders();if (log.contains("start")) {headers.put("topic","topic_start");}else {headers.put("topic","topic_event");}return event;}@Overridepublic List<Event> intercept(List<Event> events) {ArrayList<Event> interceptors = new ArrayList<>();for (Event event : events)interceptors.add(intercept(event));return interceptors;}@Override public void close() { }
}
  • 效果验证
  1. 日志采集脚本
[omm@simwor01 ~]$ cat bin/file-flume-kafka#! /bin/bashcase $1 in"start"){for i in simwor01 simwor02doecho " --------启动 $i 采集flume-------"ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/jobs/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE &>/opt/module/flume/jobs/file-flume-kafka.log &"done};;"stop"){for i in simwor01 simwor02doecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "done};;esac[omm@simwor01 ~]$
  1. 展示
[omm@simwor01 ~]$ kafka-console-consumer.sh --topic topic_start --bootstrap-server simwor01:9092 --from-beginning | tail
^CProcessed a total of 3509 messages[omm@simwor01 ~]$ file-flume-kafka start--------启动 simwor01 采集flume---------------启动 simwor02 采集flume-------
[omm@simwor01 ~]$ genlog
[omm@simwor01 ~]$ kafka-console-consumer.sh --topic topic_start --bootstrap-server simwor01:9092 --from-beginning | tail
^CProcessed a total of 4477 messages[omm@simwor01 ~]$

1.3.2 kafka -> flume -> hdfs

  • FileChannel和MemoryChannel区别
  1. MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。
  2. FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。
  • FileChannel优化
  1. 通过配置 dataDirs 指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
  2. checkpointDirbackupCheckpointDir 也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
  • HDFS Sink
  1. hdfs.rollInterval = 3600 文件创建超3600秒时会滚动生成新文件
  2. hdfs.rollSize = 134217728 或文件在达到128M时会滚动生成新文件
  3. hdfs.rollCount = 0 不要根据event数量作为生成新文件的依据

  • 配置文件
[omm@simwor03 jobs]$ pwd
/opt/module/flume/jobs
[omm@simwor03 jobs]$ cat kafka-flume-hdfs.conf
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = simwor01:9092,simwor02:9092,simwor03:9092
a1.sources.r1.kafka.topics=topic_start## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = simwor01:9092,simwor02:9092,simwor03:9092
a1.sources.r2.kafka.topics=topic_event## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/jobs/kafka-flume-hdfs/checkpoint/topic-start
a1.channels.c1.dataDirs = /opt/module/flume/jobs/kafka-flume-hdfs/data/topic-start
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/jobs/kafka-flume-hdfs/checkpoint/topic-event
a1.channels.c2.dataDirs = /opt/module/flume/jobs/kafka-flume-hdfs/data/topic-event
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
[omm@simwor03 jobs]$
# 启动 kafka -> flume -> hdfs 采集任务
[omm@simwor03 ~]$ /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/jobs/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE# 生成新的日志
[omm@simwor01 ~]$ genlog
[omm@simwor01 ~]$ ps -ef | grep file-flume-kafka
...
org.apache.flume.node.Application --conf-file /opt/module/flume/jobs/file-flume-kafka.conf --name a1flume-kafka
...
[omm@simwor01 ~]$ # 验证 HDFS
[omm@simwor03 ~]$ hdfs dfs -ls /origin_data/gmall/log
Found 2 items
drwxr-xr-x   - omm supergroup          0 2021-05-16 15:26 /origin_data/gmall/log/topic_event
drwxr-xr-x   - omm supergroup          0 2021-05-16 15:26 /origin_data/gmall/log/topic_start
[omm@simwor03 ~]$ hdfs dfs -ls /origin_data/gmall/log/topic_event/2021-05-16
Found 1 items
-rw-r--r--   1 omm supergroup     310973 2021-05-16 15:27 /origin_data/gmall/log/topic_event/2021-05-16/logevent-.1621150013089.lzo
[omm@simwor03 ~]$
  • 启停脚本
[omm@simwor03 ~]$ cat bin/kafka-flume-hdfs
#! /bin/bashcase $1 in"start"){for i in simwor03doecho " --------启动 $i 消费flume-------"ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/jobs/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE &>/opt/module/flume/jobs/kafka-flume-hdfs.log &"done};;"stop"){for i in simwor03doecho " --------停止 $i 消费flume-------"ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"done};;esac
[omm@simwor03 ~]$

1.4 数据导入

1.4.1 安装 Sqoop

  1. 准备软件包
[omm@simwor01 soft]$ ll | grep sqoop
-rw-r--r--. 1 omm omm  16870735 May 11 21:00 sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz
[omm@simwor01 soft]$ tar -zxf sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz -C /opt/module/
[omm@simwor01 soft]$ ln -s /opt/module/sqoop-1.4.6.bin__hadoop-2.0.4-alpha /opt/module/sqoop
[omm@simwor01 soft]$ cd /opt/module/sqoop
[omm@simwor01 sqoop]$
  1. 修改配置文件
[omm@simwor01 sqoop]$ cd conf/
[omm@simwor01 conf]$ cp sqoop-env-template.sh sqoop-env.sh
[omm@simwor01 conf]$ vi sqoop-env.sh
[omm@simwor01 conf]$ cat sqoop-env.sh
#Set path to where bin/hadoop is available
export HADOOP_COMMON_HOME=/opt/module/hadoop#Set path to where hadoop-*-core.jar is available
export HADOOP_MAPRED_HOME=/opt/module/hadoop#set the path to where bin/hbase is available
#export HBASE_HOME=#Set the path to where bin/hive is available
export HIVE_HOME=/opt/module/hive#Set the path for where zookeper config dir is
export ZOOKEEPER_HOME=/opt/module/zookeeper
export ZOOCFGDIR=/opt/module/zookeeper/conf
[omm@simwor01 conf]$
  1. 准备 mysql jar 包
[omm@simwor01 ~]$ cd /opt/soft/
[omm@simwor01 soft]$ cp mysql-connector-java-5.1.48.jar /opt/module/sqoop/lib/
[omm@simwor01 soft]$
  1. 添加环境变量并验证
[omm@simwor01 ~]$ sudo vim /etc/profile.d/omm_env.sh
[omm@simwor01 ~]$ source /etc/profile.d/omm_env.sh
[omm@simwor01 ~]$ sqoop help
Available commands:codegen            Generate code to interact with database recordscreate-hive-table  Import a table definition into Hiveeval               Evaluate a SQL statement and display the resultsexport             Export an HDFS directory to a database tablehelp               List available commandsimport             Import a table from a database to HDFSimport-all-tables  Import tables from a database to HDFSimport-mainframe   Import datasets from a mainframe server to HDFSjob                Work with saved jobslist-databases     List available databases on a serverlist-tables        List available tables in a databasemerge              Merge results of incremental importsmetastore          Run a standalone Sqoop metastoreversion            Display version informationSee 'sqoop help COMMAND' for information on a specific command.
[omm@simwor01 ~]$
  1. 第一条 sqoop 命令
[omm@simwor01 ~]$ sqoop list-databases --connect jdbc:mysql://simwor01:3306/ --username root --password abcd1234..
information_schema
mysql
performance_schema
sys
[omm@simwor01 ~]$

1.4.2 数据同步策略

  • 类型
  1. 全量表:存储完整的数据。
  2. 增量表:存储新增加的数据。
  3. 新增及变化表:存储新增加的数据和变化的数据。
  4. 特殊表:只需要存储一次。
  • 全量同步策略

每日全量,就是每天存储一份完整数据,作为一个分区。

适用于表数据量不大,且每天既会有新数据插入,也会有旧数据的修改的场景。

  • 增量同步策略

每日增量,就是每天存储一份增量数据,作为一个分区。

适用于表数据量大,且每天只会有新数据插入的场景。

  • 新增及变化策略
  1. 每日新增及变化,就是存储创建时间和操作时间都是今天的数据。
  2. 适用场景为,表的数据量大,既会有新增,又会有变化
  • 特殊策略
  1. 客观世界维度:没变化的客观世界的维度(比如性别,地区,民族,政治成分,鞋子尺码)可以只存一份固定值。
  2. 日期维度:日期维度可以一次性导入一年或若干年的数据。
  3. 地区维度:省份表、地区表。
  • 分析表同步策略

1.4.3 mysql -> sqoop -> hdfs

  1. 数据同步导入脚本
#! /bin/bashsqoop=/opt/module/sqoop/bin/sqoop
do_date=`date -d '-1 day' +%F`# 是否指定导入日期,否则导入前一天的数据。
if [[ -n "$2" ]]; thendo_date=$2
fiimport_data(){$sqoop import \
--connect jdbc:mysql://simwor01:3306/gmall \
--username root \
--password abcd1234.. \
--target-dir /origin_data/gmall/db/$1/$do_date \
--delete-target-dir \
--query "$2 and  \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
# Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null;
# 为了保证数据两端的一致性:
#   在导出数据时采用--input-null-string和--input-null-non-string两个参数。
#   导入数据时采用--null-string和--null-non-string。
--null-string '\\N' \
--null-non-string '\\N'
}import_order_info(){import_data order_info "selectid, final_total_amount, order_status, user_id, out_trade_no, create_time, operate_time,province_id,benefit_reduce_amount,original_total_amount,feight_fee      from order_infowhere (date_format(create_time,'%Y-%m-%d')='$do_date' or date_format(operate_time,'%Y-%m-%d')='$do_date')"
}import_coupon_use(){import_data coupon_use "selectid,coupon_id,user_id,order_id,coupon_status,get_time,using_time,used_timefrom coupon_usewhere (date_format(get_time,'%Y-%m-%d')='$do_date'or date_format(using_time,'%Y-%m-%d')='$do_date'or date_format(used_time,'%Y-%m-%d')='$do_date')"
}import_order_status_log(){import_data order_status_log "selectid,order_id,order_status,operate_timefrom order_status_logwhere date_format(operate_time,'%Y-%m-%d')='$do_date'"
}import_activity_order(){import_data activity_order "selectid,activity_id,order_id,create_timefrom activity_orderwhere date_format(create_time,'%Y-%m-%d')='$do_date'"
}import_user_info(){import_data "user_info" "select id,name,birthday,gender,email,user_level, create_time,operate_timefrom user_info where (DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date' or DATE_FORMAT(operate_time,'%Y-%m-%d')='$do_date')"
}import_order_detail(){import_data order_detail "select od.id,order_id, user_id, sku_id,sku_name,order_price,sku_num, od.create_time  from order_detail odjoin order_info oion od.order_id=oi.idwhere DATE_FORMAT(od.create_time,'%Y-%m-%d')='$do_date'"
}import_payment_info(){import_data "payment_info"  "select id,  out_trade_no, order_id, user_id, alipay_trade_no, total_amount,  subject, payment_type, payment_time from payment_info where DATE_FORMAT(payment_time,'%Y-%m-%d')='$do_date'"
}import_comment_info(){import_data comment_info "selectid,user_id,sku_id,spu_id,order_id,appraise,comment_txt,create_timefrom comment_infowhere date_format(create_time,'%Y-%m-%d')='$do_date'"
}import_order_refund_info(){import_data order_refund_info "selectid,user_id,order_id,sku_id,refund_type,refund_num,refund_amount,refund_reason_type,create_timefrom order_refund_infowhere date_format(create_time,'%Y-%m-%d')='$do_date'"
}import_sku_info(){import_data sku_info "select id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,create_timefrom sku_info where 1=1"
}import_base_category1(){import_data "base_category1" "select id,name from base_category1 where 1=1"
}import_base_category2(){import_data "base_category2" "selectid,name,category1_id from base_category2 where 1=1"
}import_base_category3(){import_data "base_category3" "selectid,name,category2_idfrom base_category3 where 1=1"
}import_base_province(){import_data base_province "selectid,name,region_id,area_code,iso_codefrom base_provincewhere 1=1"
}import_base_region(){import_data base_region "selectid,region_namefrom base_regionwhere 1=1"
}import_base_trademark(){import_data base_trademark "selecttm_id,tm_namefrom base_trademarkwhere 1=1"
}import_spu_info(){import_data spu_info "selectid,spu_name,category3_id,tm_idfrom spu_infowhere 1=1"
}import_favor_info(){import_data favor_info "selectid,user_id,sku_id,spu_id,is_cancel,create_time,cancel_timefrom favor_infowhere 1=1"
}import_cart_info(){import_data cart_info "selectid,user_id,sku_id,cart_price,sku_num,sku_name,create_time,operate_time,is_ordered,order_timefrom cart_infowhere 1=1"
}import_coupon_info(){import_data coupon_info "selectid,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,spu_id,tm_id,category3_id,limit_num,operate_time,expire_timefrom coupon_infowhere 1=1"
}import_activity_info(){import_data activity_info "selectid,activity_name,activity_type,start_time,end_time,create_timefrom activity_infowhere 1=1"
}import_activity_rule(){import_data activity_rule "selectid,activity_id,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_levelfrom activity_rulewhere 1=1"
}import_base_dic(){import_data base_dic "selectdic_code,dic_name,parent_code,create_time,operate_timefrom base_dicwhere 1=1"
}# 传入要导入哪些表的数据
# 1. first : 全量导入27张表
# 2. all : 去除不变化表
# 3. other : 只导入具体的表case $1 in"order_info")import_order_info
;;"base_category1")import_base_category1
;;"base_category2")import_base_category2
;;"base_category3")import_base_category3
;;"order_detail")import_order_detail
;;"sku_info")import_sku_info
;;"user_info")import_user_info
;;"payment_info")import_payment_info
;;"base_province")import_base_province
;;"base_region")import_base_region
;;"base_trademark")import_base_trademark
;;"activity_info")import_activity_info
;;"activity_order")import_activity_order
;;"cart_info")import_cart_info
;;"comment_info")import_comment_info
;;"coupon_info")import_coupon_info
;;"coupon_use")import_coupon_use
;;"favor_info")import_favor_info
;;"order_refund_info")import_order_refund_info
;;"order_status_log")import_order_status_log
;;"spu_info")import_spu_info
;;"activity_rule")import_activity_rule
;;"base_dic")import_base_dic
;;"first")import_base_category1import_base_category2import_base_category3import_order_infoimport_order_detailimport_sku_infoimport_user_infoimport_payment_infoimport_base_provinceimport_base_regionimport_base_trademarkimport_activity_infoimport_activity_orderimport_cart_infoimport_comment_infoimport_coupon_useimport_coupon_infoimport_favor_infoimport_order_refund_infoimport_order_status_logimport_spu_infoimport_activity_ruleimport_base_dic
;;
"all")import_base_category1import_base_category2import_base_category3import_order_infoimport_order_detailimport_sku_infoimport_user_infoimport_payment_infoimport_base_trademarkimport_activity_infoimport_activity_orderimport_cart_infoimport_comment_infoimport_coupon_useimport_coupon_infoimport_favor_infoimport_order_refund_infoimport_order_status_logimport_spu_infoimport_activity_ruleimport_base_dic
;;
esac
  1. 运行
nohup mysql-sqoop-hdfs first 2021-03-10 &> /tmp/mysql-sqoop-hdfs.log &
...
2021-05-17 11:02:09,772 INFO mapreduce.Job: Running job: job_1621220373303_0002
2021-05-17 11:02:15,930 INFO mapreduce.Job: Job job_1621220373303_0002 running in uber mode : false
2021-05-17 11:02:15,935 INFO mapreduce.Job:  map 0% reduce 0%
2021-05-17 11:02:26,229 INFO mapreduce.Job:  map 100% reduce 0%
2021-05-17 11:02:27,247 INFO mapreduce.Job: Job job_1621220373303_0002 completed successfully
...
  1. 验证

1.5 集群脚本

  • 脚本
[omm@simwor01 ~]$ cat bin/cluster
#! /bin/bashcase $1 in"start"){echo " -------- 启动 集群 -------"echo " -------- 启动 hadoop集群 -------"ssh simwor01 "/opt/module/hadoop/sbin/start-dfs.sh"ssh simwor02 "/opt/module/hadoop/sbin/start-yarn.sh"#启动 Zookeeper集群/home/omm/bin/zk startsleep 4s;#启动 Kafka采集集群/home/omm/bin/kafka startsleep 10s;#启动 Flume采集集群/home/omm/bin/file-flume-kafka start#启动 Flume消费集群/home/omm/bin/kafka-flume-hdfs start
};;"stop"){echo " -------- 停止 集群 -------"#停止 Flume消费集群/home/omm/bin/kafka-flume-hdfs stop#停止 Flume采集集群/home/omm/bin/file-flume-kafka stop#停止 Kafka采集集群/home/omm/bin/kafka stopsleep 10s;#停止 Zookeeper集群/home/omm/bin/zk stopecho " -------- 停止 hadoop集群 -------"ssh simwor02 "/opt/module/hadoop/sbin/stop-yarn.sh"ssh simwor01 "/opt/module/hadoop/sbin/stop-dfs.sh"
};;esac
[omm@simwor01 ~]$
  • 群起
[omm@simwor01 ~]$ cluster start-------- 启动 集群 --------------- 启动 hadoop集群 -------
Starting namenodes on [simwor01]
Starting datanodes
Starting secondary namenodes [simwor03]
Starting resourcemanager
Starting nodemanagers
------------- simwor01 -------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
------------- simwor02 -------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
------------- simwor03 -------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED--------启动 simwor01 Kafka---------------启动 simwor02 Kafka---------------启动 simwor03 Kafka---------------启动 simwor01 采集flume---------------启动 simwor02 采集flume---------------启动 simwor03 消费flume-------
[omm@simwor01 ~]$
[omm@simwor01 ~]$ xcall jps
--------- simwor01 ----------
13361 Kafka
12548 DataNode
12391 NameNode
12827 NodeManager
13453 Application
12974 QuorumPeerMain
13566 Jps
--------- simwor02 ----------
6929 Jps
5666 DataNode
6820 Application
6742 Kafka
5847 ResourceManager
6348 QuorumPeerMain
4990 Application
6159 NodeManager
--------- simwor03 ----------
4742 DataNode
4854 SecondaryNameNode
4940 NodeManager
5468 Kafka
5085 QuorumPeerMain
5551 Application
5663 Jps
[omm@simwor01 ~]$
  • 验证
# 日志产生前
[omm@simwor01 ~]$ hdfs dfs -ls /origin_data/gmall/log/topic_event/2021-05-16
Found 1 items
-rw-r--r--   1 omm supergroup     321161 2021-05-16 16:17 /origin_data/gmall/log/topic_event/2021-05-16/logevent-.1621153059068.lzo# 日志产生
[omm@simwor01 ~]$ genlog# 日志产生后
[omm@simwor01 ~]$ hdfs dfs -ls /origin_data/gmall/log/topic_event/2021-05-16
Found 2 items
-rw-r--r--   1 omm supergroup     321161 2021-05-16 16:17 /origin_data/gmall/log/topic_event/2021-05-16/logevent-.1621153059068.lzo
-rw-r--r--   1 omm supergroup     438807 2021-05-16 16:20 /origin_data/gmall/log/topic_event/2021-05-16/logevent-.1621153218732.lzo
[omm@simwor01 ~]$
  • 群停
[omm@simwor01 ~]$ cluster stop-------- 停止 集群 ---------------停止 simwor03 消费flume---------------停止 simwor02 采集flume---------------停止 simwor01 采集flume-------
/home/omm/bin/cluster: line 3: 18056 Killed                  /home/omm/bin/file-flume-kafka stop--------停止 simwor01 Kafka---------------停止 simwor02 Kafka---------------停止 simwor03 Kafka-------
------------- simwor01 -------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
------------- simwor02 -------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
------------- simwor03 -------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED-------- 停止 hadoop集群 -------
Stopping nodemanagers
Stopping resourcemanager
Stopping namenodes on [simwor01]
Stopping datanodes
Stopping secondary namenodes [simwor03]
[omm@simwor01 ~]$
[omm@simwor01 ~]$ xcall jps
--------- simwor01 ----------
18634 Jps
--------- simwor02 ----------
11106 Jps
--------- simwor03 ----------
8546 Jps
[omm@simwor01 ~]$

1.6 读写压测

1.6.1 HDFS

[omm@simwor02 ~]$ hadoop jar /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB
...
2021-05-13 21:15:31,325 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write
2021-05-13 21:15:31,325 INFO fs.TestDFSIO:             Date & time: Thu May 13 21:15:31 CST 2021
2021-05-13 21:15:31,325 INFO fs.TestDFSIO:         Number of files: 10
2021-05-13 21:15:31,325 INFO fs.TestDFSIO:  Total MBytes processed: 1280
2021-05-13 21:15:31,325 INFO fs.TestDFSIO:       Throughput mb/sec: 37.64
2021-05-13 21:15:31,325 INFO fs.TestDFSIO:  Average IO rate mb/sec: 72.93
2021-05-13 21:15:31,325 INFO fs.TestDFSIO:   IO rate std deviation: 51.95
2021-05-13 21:15:31,325 INFO fs.TestDFSIO:      Test exec time sec: 39.9
2021-05-13 21:15:31,325 INFO fs.TestDFSIO:
[omm@simwor02 ~]$
[omm@simwor02 ~]$ hadoop jar /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB
...
2021-05-13 21:18:36,111 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read
2021-05-13 21:18:36,112 INFO fs.TestDFSIO:             Date & time: Thu May 13 21:18:36 CST 2021
2021-05-13 21:18:36,112 INFO fs.TestDFSIO:         Number of files: 10
2021-05-13 21:18:36,112 INFO fs.TestDFSIO:  Total MBytes processed: 1280
2021-05-13 21:18:36,112 INFO fs.TestDFSIO:       Throughput mb/sec: 76.6
2021-05-13 21:18:36,113 INFO fs.TestDFSIO:  Average IO rate mb/sec: 97.63
2021-05-13 21:18:36,113 INFO fs.TestDFSIO:   IO rate std deviation: 55.87
2021-05-13 21:18:36,113 INFO fs.TestDFSIO:      Test exec time sec: 29.07
2021-05-13 21:18:36,113 INFO fs.TestDFSIO:
[omm@simwor02 ~]$
[omm@simwor02 ~]$ hadoop jar /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean
2021-05-13 21:19:25,350 INFO fs.TestDFSIO: TestDFSIO.1.8
2021-05-13 21:19:25,352 INFO fs.TestDFSIO: nrFiles = 1
2021-05-13 21:19:25,352 INFO fs.TestDFSIO: nrBytes (MB) = 1.0
2021-05-13 21:19:25,352 INFO fs.TestDFSIO: bufferSize = 1000000
2021-05-13 21:19:25,352 INFO fs.TestDFSIO: baseDir = /benchmarks/TestDFSIO
2021-05-13 21:19:25,872 INFO fs.TestDFSIO: Cleaning up test files
[omm@simwor02 ~]$

1.6.2 Kafka

  • 写压测
  1. record-size是一条信息有多大,单位是字节。
  2. num-records是总共发送多少条信息。
  3. throughput 是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量。
[omm@simwor01 logs]$ kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=simwor01:9092,simwor02:9092,simwor03:9092
100000 records sent, 105263.157895 records/sec (10.04 MB/sec), 243.79 ms avg latency, 327.00 ms max latency, 270 ms 50th, 305 ms 95th, 323 ms 99th, 326 ms 99.9th.
[omm@simwor01 logs]$
  • 读压测
  1. --fetch-size 指定每次fetch的数据的大小
  2. --messages 总共要消费的消息个数
[omm@simwor01 logs]$ kafka-consumer-perf-test.sh --broker-list simwor01:9092,simwor02:9092,simwor03:9092 --topic test --fetch-size 10000 --messages 100000 --threads 1
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-05-16 13:33:16:184, 2021-05-16 13:33:17:248, 9.5399, 8.9661, 100033, 94015.9774, 1621143196431, -1621143195367, -0.0000, -0.0001
[omm@simwor01 logs]$

二、数仓概论

2.1 范式

目前业界范式有:第一范式(1NF)、第二范式(2NF)、第三范式(3NF)、巴斯-科德范式(BCNF)、第四范式(4NF)、第五范式(5NF)。

  • 完全函数依赖
  1. 设X,Y是关系R的两个属性集合,X’是X的真子集,存在X→Y,但对每一个X’都有X’!→Y,则称Y完全函数依赖于X。
  2. 比如:通过,(学号,课程) 推出分数 ,但是单独用学号推断不出来分数,那么就可以说:分数 完全依赖于(学号,课程) 。
  3. 即:通过AB能得出C,但是AB单独得不出C,那么说C完全依赖于AB。
  • 部分函数依赖
  1. 假如 Y函数依赖于 X,但同时 Y 并不完全函数依赖于 X,那么我们就称 Y 部分函数依赖于 X,记做:
  2. 比如:通过,(学号,课程) 推出姓名,因为其实直接可以通过,学号推出姓名,所以:姓名 部分依赖于 (学号,课程)
  3. 即:通过AB能得出C,通过A也能得出C,或者通过B也能得出C,那么说C部分依赖于AB。
  • 传递函数依赖
  1. 设X,Y,Z是关系R中互不相同的属性集合,存在X→Y(Y !→X),Y→Z,则称Z传递函数依赖于X。记做:
  2. 比如:学号 推出 系名 , 系名 推出 系主任, 但是,系主任推不出学号,系主任主要依赖于系名。这种情况可以说:系主任 传递依赖于 学号
  3. 通过A得到B,通过B得到C,但是C得不到A,那么说C传递依赖于A。

遵循的范式越高,数据的冗余性越低,数据间的关系越复杂(即获取数据时,需要通过Join拼接出最后的数据)。

  • 第一范式

第一范式1NF核心原则就是:属性不可切割

  • 第二范式

第二范式2NF核心原则:不能存在“部分函数依赖”

以上表格明显存在,部分依赖。比如,这张表的主键是 (学号,课名),分数确实完全依赖于 (学号,课名),但是姓名并不完全依赖于(学号,课名)。

  • 第三范式

第三范式 3NF核心原则:不能存在传递函数依赖。

在上面这张表中,存在传递函数依赖:学号->系名->系主任,但是系主任推不出学号。

2.2 建模

对比属性 OLTP OLAP
读特性 每次查询只返回少量记录 对大量记录进行汇总
写特性 随机、低延时写入用户的输入 批量导入
使用场景 用户,Java EE项目 内部分析师,为决策提供支持
数据表征 最新数据状态 随时间变化的历史状态
数据规模 GB TB到PB
  1. 当今的数据处理大致可以分成两大类:联机事务处理OLTP(on-line transaction processing)、联机分析处理OLAP(On-Line Analytical Processing)
  2. OLTP是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理,例如银行交易。
  3. OLAP是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。
  • 关系建模

  1. 关系模型如图所示,严格遵循第三范式(3NF);
  2. 从图中可以看出,较为松散、零碎,物理表数量多,而数据冗余程度低;
  3. 由于数据分布于众多的表中,这些数据可以更为灵活地被应用,功能性较强;
  4. 关系模型主要应用于OLTP系统中,为了保证数据的一致性以及避免冗余,所以大部分业务系统的表都是遵循第三范式的。
  • 维度建模

  1. 维度模型如图所示,主要应用于OLAP系统中;
  2. 通常以某一个事实表为中心进行表的组织,主要面向业务,特征是可能存在数据的冗余,但是能方便的得到数据
  3. 关系模型虽然冗余少,但是在大规模数据,跨表分析统计查询过程中,会造成多表关联,这会大大降低执行效率;
  4. 所以数仓项目中通常我们采用维度模型建模,把相关各种表整理成两种:事实表和维度表两种。

2.3 建表

  • 维度表

维度表:一般是对事实的描述信息。每一张维表对应现实世界中的一个对象或者概念。 例如:用户、商品、日期、地区等。

  1. 维表的范围很宽(具有多个属性、列比较多)
  2. 跟事实表相比,行数相对较小:通常< 10万条
  3. 内容相对固定:编码表

  • 事实表

事实表中的每行数据代表一个业务事件(下单、支付、退款、评价等)。“事实”这个术语表示的是业务事件的度量值(可统计次数、个数、金额等)。

  1. 每一个事实表的行包括:具有可加性的数值型的度量值、与维表相连接的外键、通常具有两个和两个以上的外键、外键之间表示维表之间多对多的关系;
  2. 事实表的特征:非常的大;内容相对的窄:列数较少;经常发生变化,每天会新增加很多。
  3. 事务型事实表:以每个事务或事件为单位,例如一个销售订单记录,一笔支付记录等,作为事实表里的一行数据。一旦事务被提交,事实表数据被插入,数据就不再进行更改,其更新方式为增量更新。
  4. 周期型快照事实表:周期型快照事实表中不会保留所有数据,只保留固定时间间隔的数据,例如每天或者每月的销售额,或每月的账户余额等。
  5. 累积型快照事实表:累计快照事实表用于跟踪业务事实的变化。例如,数据仓库中可能需要累积或者存储订单从下订单开始,到订单商品被打包、运输、和签收的各个业务阶段的时间点数据来跟踪订单声明周期的进展情况。当这个业务过程进行时,事实表的记录也要不断更新。

2.4 分层

  • ODS层
  1. 保持数据原貌不做任何修改,起到备份数据的作用。
  2. 数据采用压缩,减少磁盘存储空间(例如:原始数据100G,可以压缩到10G左右)
  3. 创建分区表,防止后续的全表扫描
  • DWD层

DWD层需 构建维度模型:选择业务过程 → 声明粒度 → 确认维度 → 确认事实

  1. 选择业务过程:在业务系统中,挑选我们感兴趣的业务线,比如下单业务,支付业务,退款业务,物流业务,一条业务线对应一张事实表
  2. 声明粒度
    2.1 数据粒度指数据仓库的表中保存数据的细化程度或综合程度的级别。
    2.2 声明粒度意味着精确定义事实表中的一行数据表示什么,应该尽可能选择最小粒度,以此来应各种各样的需求。
    2.3 典型的粒度声明如下:订单中,每个商品项作为下单事实表中的一行
  3. 确定维度:维度的主要作用是描述业务是事实,主要表示的是“谁,何处,何时”等信息。
  4. 确定事实
    4.1 此处的**“事实”一词,指的是业务中的度量值**,例如订单金额、下单次数等。
    4.2 在DWD层,以业务过程为建模驱动,基于每个具体业务过程的特点,构建最细粒度的明细层事实表。事实表可做适当的宽表化处理。

至此,数仓的维度建模已经完毕,DWS、DWT和ADS和维度建模已经没有关系了。

DWS和DWT都是建宽表,宽表都是按照主题去建。主题相当于观察问题的角度。对应着维度表。

时间 用户 地区 商品 优惠券 活动 编码 度量值
订单 件数/金额
订单详情 件数/金额
支付 金额
加购 件数/金额
收藏 个数
评价 个数
退款 件数/金额
优惠券领用 个数
  • DWS层

统计各个主题对象的当天行为,服务于DWT层的主题宽表。

  • DWT层

以分析的主题对象为建模驱动,基于上层的应用和产品的指标需求,构建主题对象的全量宽表。

  • ADS层

对电商系统各大主题指标分别进行分析。

三、数仓搭建

3.1 ODS 层

  • 用户行为数据
  1. 用户事件日志转用户事件原始表
hive (default)> create database gmall;hive (default)> use gmall;hive (gmall)> drop table if exists ods_event_log;hive (gmall)> CREATE EXTERNAL TABLE ods_event_log(`line` string)> PARTITIONED BY (`dt` string)> STORED AS>   INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'>   OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'> LOCATION '/warehouse/gmall/ods/ods_event_log';hive (gmall)> load data inpath '/origin_data/gmall/log/topic_event/2021-05-16' into table gmall.ods_event_log partition(dt='2021-05-16');hive (gmall)> select * from ods_event_log limit 2;
ods_event_log.line  ods_event_log.dt
1621150009268|{"cm":{"ln":"-50.6","sv":"V2.6.1","os":"8.0.8","g":"WY49HXZ4@gmail.com","mid":"0","nw":"WIFI","l":"pt","vc":"0","hw":"640*960","ar":"MX","uid":"0","t":"1621136246955","la":"24.8","md":"HTC-11","vn":"1.1.8","ba":"HTC","sr":"H"},"ap":"app","et":[{"ett":"1621138028818","en":"display","kv":{"goodsid":"0","action":"2","extend1":"1","place":"2","category":"73"}},{"ett":"1621077773013","en":"loading","kv":{"extend2":"","loading_time":"21","action":"1","extend1":"","type":"3","type1":"","loading_way":"1"}},{"ett":"1621118651067","en":"notification","kv":{"ap_time":"1621112223280","action":"2","type":"4","content":""}},{"ett":"1621058356736","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1621098733764","en":"favorites","kv":{"course_id":9,"id":0,"add_time":"1621147636292","userid":1}},{"ett":"1621096613172","en":"praise","kv":{"target_id":7,"id":4,"type":3,"add_time":"1621113330503","userid":1}}]}    2021-05-16
1621150009275|{"cm":{"ln":"-72.1","sv":"V2.4.2","os":"8.0.5","g":"CZ1249A7@gmail.com","mid":"1","nw":"3G","l":"es","vc":"19","hw":"1080*1920","ar":"MX","uid":"1","t":"1621067543403","la":"14.1","md":"sumsung-15","vn":"1.3.9","ba":"Sumsung","sr":"Y"},"ap":"app","et":[{"ett":"1621084852590","en":"loading","kv":{"extend2":"","loading_time":"25","action":"3","extend1":"","type":"2","type1":"","loading_way":"1"}},{"ett":"1621134380392","en":"active_background","kv":{"active_source":"1"}},{"ett":"1621113098715","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1621065584231","en":"favorites","kv":{"course_id":0,"id":0,"add_time":"1621068899977","userid":1}},{"ett":"1621075696863","en":"praise","kv":{"target_id":8,"id":6,"type":2,"add_time":"1621123273271","userid":3}}]}  2021-05-16
Time taken: 1.2 seconds, Fetched: 2 row(s)
hive (gmall)>

建立 LZO 压缩索引

[omm@simwor01 bin]$ hadoop jar \
> /opt/module/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar \
> com.hadoop.compression.lzo.DistributedLzoIndexer \
> /warehouse/gmall/ods/ods_event_log/dt=2021-05-16

  1. 用户启动日志转用户启动原始表
drop table if exists ods_start_log;CREATE EXTERNAL TABLE ods_start_log (`line` string)
PARTITIONED BY (`dt` string)
STORED ASINPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/gmall/ods/ods_start_log';load data inpath '/origin_data/gmall/log/topic_start/2021-05-16' into table gmall.ods_start_log partition(dt='2021-05-16');hadoop jar /opt/module/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /warehouse/gmall/ods/ods_start_log/dt=2021-05-16
  1. 数据加载脚本

执行:hdfs-log-to-ods 2021-05-16

#!/bin/bashdb=gmall
hive=/opt/module/hive/bin/hive
do_date=`date -d '-1 day' +%F`if [[ -n "$1" ]]; thendo_date=$1
fisql="
load data inpath '/origin_data/gmall/log/topic_start/$do_date' into table ${db}.ods_start_log partition(dt='$do_date');
load data inpath '/origin_data/gmall/log/topic_event/$do_date' into table ${db}.ods_event_log partition(dt='$do_date');
"$hive -e "$sql"hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /warehouse/gmall/ods/ods_start_log/dt=$do_date
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /warehouse/gmall/ods/ods_event_log/dt=$do_date
  • 业务数据
  1. 建表
-- 3.1.1 订单表(增量及更新)drop table if exists ods_order_info;
create external table ods_order_info (`id` string COMMENT '订单号',`final_total_amount` decimal(10,2) COMMENT '订单金额',`order_status` string COMMENT '订单状态',`user_id` string COMMENT '用户id',`out_trade_no` string COMMENT '支付流水号',`create_time` string COMMENT '创建时间',`operate_time` string COMMENT '操作时间',`province_id` string COMMENT '省份ID',`benefit_reduce_amount` decimal(10,2) COMMENT '优惠金额',`original_total_amount` decimal(10,2)  COMMENT '原价金额',`feight_fee` decimal(10,2)  COMMENT '运费'
) COMMENT '订单表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_order_info/';-- 3.1.2 订单详情表(增量)drop table if exists ods_order_detail;
create external table ods_order_detail( `id` string COMMENT '订单编号',`order_id` string  COMMENT '订单号', `user_id` string COMMENT '用户id',`sku_id` string COMMENT '商品id',`sku_name` string COMMENT '商品名称',`order_price` decimal(10,2) COMMENT '商品价格',`sku_num` bigint COMMENT '商品数量',
`create_time` string COMMENT '创建时间'
) COMMENT '订单详情表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_order_detail/';-- 3.1.3 SKU商品表(全量)drop table if exists ods_sku_info;
create external table ods_sku_info( `id` string COMMENT 'skuId',`spu_id` string   COMMENT 'spuid', `price` decimal(10,2) COMMENT '价格',`sku_name` string COMMENT '商品名称',`sku_desc` string COMMENT '商品描述',`weight` string COMMENT '重量',`tm_id` string COMMENT '品牌id',`category3_id` string COMMENT '品类id',`create_time` string COMMENT '创建时间'
) COMMENT 'SKU商品表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_sku_info/';-- 3.1.4 用户表(增量及更新)drop table if exists ods_user_info;
create external table ods_user_info( `id` string COMMENT '用户id',`name`  string COMMENT '姓名',`birthday` string COMMENT '生日',`gender` string COMMENT '性别',`email` string COMMENT '邮箱',`user_level` string COMMENT '用户等级',`create_time` string COMMENT '创建时间',`operate_time` string COMMENT '操作时间'
) COMMENT '用户表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_user_info/';-- 3.1.5 商品一级分类表(全量)drop table if exists ods_base_category1;
create external table ods_base_category1( `id` string COMMENT 'id',`name`  string COMMENT '名称'
) COMMENT '商品一级分类表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_base_category1/';-- 3.1.6 商品二级分类表(全量)drop table if exists ods_base_category2;
create external table ods_base_category2( `id` string COMMENT ' id',`name` string COMMENT '名称',category1_id string COMMENT '一级品类id'
) COMMENT '商品二级分类表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_base_category2/';-- 3.1.7 商品三级分类表(全量)drop table if exists ods_base_category3;
create external table ods_base_category3(`id` string COMMENT ' id',`name`  string COMMENT '名称',category2_id string COMMENT '二级品类id'
) COMMENT '商品三级分类表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_base_category3/';-- 3.1.8 支付流水表(增量)drop table if exists ods_payment_info;
create external table ods_payment_info(`id`   bigint COMMENT '编号',`out_trade_no`    string COMMENT '对外业务编号',`order_id`        string COMMENT '订单编号',`user_id`         string COMMENT '用户编号',`alipay_trade_no` string COMMENT '支付宝交易流水编号',`total_amount`    decimal(16,2) COMMENT '支付金额',`subject`         string COMMENT '交易内容',`payment_type`    string COMMENT '支付类型',`payment_time`    string COMMENT '支付时间'
)  COMMENT '支付流水表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_payment_info/';-- 3.1.9 省份表(特殊)drop table if exists ods_base_province;
create external table ods_base_province (`id`   bigint COMMENT '编号',`name`        string COMMENT '省份名称',`region_id`    string COMMENT '地区ID',`area_code`    string COMMENT '地区编码',`iso_code` string COMMENT 'iso编码')  COMMENT '省份表'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_base_province/';-- 3.1.10 地区表(特殊)drop table if exists ods_base_region;
create external table ods_base_region (`id`   bigint COMMENT '编号',`region_name`        string COMMENT '地区名称')  COMMENT '地区表'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_base_region/';-- 3.1.11 品牌表(全量)drop table if exists ods_base_trademark;
create external table ods_base_trademark (`tm_id`   bigint COMMENT '编号',`tm_name` string COMMENT '品牌名称'
)  COMMENT '品牌表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_base_trademark/';-- 3.1.12 订单状态表(增量)drop table if exists ods_order_status_log;
create external table ods_order_status_log (`id`   bigint COMMENT '编号',`order_id` string COMMENT '订单ID',`order_status` string COMMENT '订单状态',`operate_time` string COMMENT '修改时间'
)  COMMENT '订单状态表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_order_status_log/';-- 3.1.13 SPU商品表(全量)drop table if exists ods_spu_info;
create external table ods_spu_info(`id` string COMMENT 'spuid',`spu_name` string COMMENT 'spu名称',`category3_id` string COMMENT '品类id',`tm_id` string COMMENT '品牌id'
) COMMENT 'SPU商品表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_spu_info/';-- 3.1.14 商品评论表(增量)drop table if exists ods_comment_info;
create external table ods_comment_info(`id` string COMMENT '编号',`user_id` string COMMENT '用户ID',`sku_id` string COMMENT '商品sku',`spu_id` string COMMENT '商品spu',`order_id` string COMMENT '订单ID',`appraise` string COMMENT '评价',`create_time` string COMMENT '评价时间'
) COMMENT '商品评论表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_comment_info/';-- 3.1.15 退单表(增量)drop table if exists ods_order_refund_info;
create external table ods_order_refund_info(`id` string COMMENT '编号',`user_id` string COMMENT '用户ID',`order_id` string COMMENT '订单ID',`sku_id` string COMMENT '商品ID',`refund_type` string COMMENT '退款类型',`refund_num` bigint COMMENT '退款件数',`refund_amount` decimal(16,2) COMMENT '退款金额',`refund_reason_type` string COMMENT '退款原因类型',`create_time` string COMMENT '退款时间'
) COMMENT '退单表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_order_refund_info/';-- 3.1.16 加购表(全量)drop table if exists ods_cart_info;
create external table ods_cart_info(`id` string COMMENT '编号',`user_id` string  COMMENT '用户id',`sku_id` string  COMMENT 'skuid',`cart_price` string  COMMENT '放入购物车时价格',`sku_num` string  COMMENT '数量',`sku_name` string  COMMENT 'sku名称 (冗余)',`create_time` string  COMMENT '创建时间',`operate_time` string COMMENT '修改时间',`is_ordered` string COMMENT '是否已经下单',
`order_time` string  COMMENT '下单时间'
) COMMENT '加购表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_cart_info/';-- 3.1.17 商品收藏表(全量)drop table if exists ods_favor_info;
create external table ods_favor_info(`id` string COMMENT '编号',`user_id` string  COMMENT '用户id',`sku_id` string  COMMENT 'skuid',`spu_id` string  COMMENT 'spuid',`is_cancel` string  COMMENT '是否取消',`create_time` string  COMMENT '收藏时间',`cancel_time` string  COMMENT '取消时间'
) COMMENT '商品收藏表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_favor_info/';-- 3.1.18 优惠券领用表(新增及变化)drop table if exists ods_coupon_use;
create external table ods_coupon_use(`id` string COMMENT '编号',`coupon_id` string  COMMENT '优惠券ID',`user_id` string  COMMENT 'skuid',`order_id` string  COMMENT 'spuid',`coupon_status` string  COMMENT '优惠券状态',`get_time` string  COMMENT '领取时间',`using_time` string  COMMENT '使用时间(下单)',`used_time` string  COMMENT '使用时间(支付)'
) COMMENT '优惠券领用表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_coupon_use/';-- 3.1.19 优惠券表(全量)drop table if exists ods_coupon_info;
create external table ods_coupon_info(`id` string COMMENT '购物券编号',`coupon_name` string COMMENT '购物券名称',`coupon_type` string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',`condition_amount` string COMMENT '满额数',`condition_num` string COMMENT '满件数',`activity_id` string COMMENT '活动编号',`benefit_amount` string COMMENT '减金额',`benefit_discount` string COMMENT '折扣',`create_time` string COMMENT '创建时间',`range_type` string COMMENT '范围类型 1、商品 2、品类 3、品牌',`spu_id` string COMMENT '商品id',`tm_id` string COMMENT '品牌id',`category3_id` string COMMENT '品类id',`limit_num` string COMMENT '最多领用次数',`operate_time`  string COMMENT '修改时间',`expire_time`  string COMMENT '过期时间'
) COMMENT '优惠券表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_coupon_info/';-- 3.1.20 活动表(全量)drop table if exists ods_activity_info;
create external table ods_activity_info(`id` string COMMENT '编号',`activity_name` string  COMMENT '活动名称',`activity_type` string  COMMENT '活动类型',`start_time` string  COMMENT '开始时间',`end_time` string  COMMENT '结束时间',`create_time` string  COMMENT '创建时间'
) COMMENT '活动表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_activity_info/';-- 3.1.21 活动订单关联表(增量)drop table if exists ods_activity_order;
create external table ods_activity_order(`id` string COMMENT '编号',`activity_id` string  COMMENT '优惠券ID',`order_id` string  COMMENT 'skuid',`create_time` string  COMMENT '领取时间'
) COMMENT '活动订单关联表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_activity_order/';-- 3.1.22 活动规则表(全量)drop table if exists ods_activity_rule;
create external table ods_activity_rule(`id` string COMMENT '编号',`activity_id` string  COMMENT '活动ID',`condition_amount` string  COMMENT '满减金额',`condition_num` string  COMMENT '满减件数',`benefit_amount` string  COMMENT '优惠金额',`benefit_discount` string  COMMENT '优惠折扣',`benefit_level` string  COMMENT '优惠级别'
) COMMENT '优惠规则表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_activity_rule/';-- 3.1.23 编码字典表(全量)drop table if exists ods_base_dic;
create external table ods_base_dic(`dic_code` string COMMENT '编号',`dic_name` string  COMMENT '编码名称',`parent_code` string  COMMENT '父编码',`create_time` string  COMMENT '创建日期',`operate_time` string  COMMENT '操作日期'
) COMMENT '编码字典表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ods/ods_base_dic/';
  1. 分区数据导入脚本

第一次导入:hdfs-db-to-ods first 2021-03-10

其它导入:hdfs-db-to-ods all 2021-03-12

#!/bin/bashAPP=gmall
hive=/opt/module/hive/bin/hive# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;thendo_date=$2
else do_date=`date -d "-1 day" +%F`
fisql1="
load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table ${APP}.ods_order_info partition(dt='$do_date');load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table ${APP}.ods_order_detail partition(dt='$do_date');load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table ${APP}.ods_sku_info partition(dt='$do_date');load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table ${APP}.ods_user_info partition(dt='$do_date');load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table ${APP}.ods_payment_info partition(dt='$do_date');load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table ${APP}.ods_base_category1 partition(dt='$do_date');load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table ${APP}.ods_base_category2 partition(dt='$do_date');load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table ${APP}.ods_base_category3 partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/base_trademark/$do_date' OVERWRITE into table ${APP}.ods_base_trademark partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/activity_info/$do_date' OVERWRITE into table ${APP}.ods_activity_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/activity_order/$do_date' OVERWRITE into table ${APP}.ods_activity_order partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/cart_info/$do_date' OVERWRITE into table ${APP}.ods_cart_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/comment_info/$do_date' OVERWRITE into table ${APP}.ods_comment_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/coupon_info/$do_date' OVERWRITE into table ${APP}.ods_coupon_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/coupon_use/$do_date' OVERWRITE into table ${APP}.ods_coupon_use partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/favor_info/$do_date' OVERWRITE into table ${APP}.ods_favor_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/order_refund_info/$do_date' OVERWRITE into table ${APP}.ods_order_refund_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/order_status_log/$do_date' OVERWRITE into table ${APP}.ods_order_status_log partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/spu_info/$do_date' OVERWRITE into table ${APP}.ods_spu_info partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/activity_rule/$do_date' OVERWRITE into table ${APP}.ods_activity_rule partition(dt='$do_date'); load data inpath '/origin_data/$APP/db/base_dic/$do_date' OVERWRITE into table ${APP}.ods_base_dic partition(dt='$do_date');
"sql2="
load data inpath '/origin_data/$APP/db/base_province/$do_date' OVERWRITE into table ${APP}.ods_base_province;load data inpath '/origin_data/$APP/db/base_region/$do_date' OVERWRITE into table ${APP}.ods_base_region;
"
case $1 in
"first"){$hive -e "$sql1$sql2"
};;
"all"){$hive -e "$sql1"
};;
esac
  1. 数据验证
hive (gmall)> select count(*) from ods_favor_info;
_c0
700hive (gmall)> select * from ods_favor_info limit 3;
ods_favor_info.id   ods_favor_info.user_id  ods_favor_info.sku_id   ods_favor_info.spu_id   ods_favor_info.is_cancel      ods_favor_info.create_time    ods_favor_info.cancel_time  ods_favor_info.dt
1394110075715887105 26  11  null    0   2021-03-10 00:00:00.0   null    2021-03-10
1394110075715887106 20  10  null    0   2021-03-10 00:00:00.0   null    2021-03-10
1394110075715887107 32  10  null    0   2021-03-10 00:00:00.0   null    2021-03-10

3.2 DWD 层

3.2.1 用户启动日志

  1. 建表
drop table if exists dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`open_ad_type` string,
`action` string,
`loading_time` string,
`detail` string,
`extend1` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_start_log/'
TBLPROPERTIES('parquet.compression'='lzo');
  1. ods -> dwd

get_json_object 可以解析标准的JSON字符串

insert overwrite table dwd_start_logPARTITION (dt='2021-05-16')
select get_json_object(line,'$.mid') mid_id,get_json_object(line,'$.uid') user_id,get_json_object(line,'$.vc') version_code,get_json_object(line,'$.vn') version_name,get_json_object(line,'$.l') lang,get_json_object(line,'$.sr') source,get_json_object(line,'$.os') os,get_json_object(line,'$.ar') area,get_json_object(line,'$.md') model,get_json_object(line,'$.ba') brand,get_json_object(line,'$.sv') sdk_version,get_json_object(line,'$.g') gmail,get_json_object(line,'$.hw') height_width,get_json_object(line,'$.t') app_time,get_json_object(line,'$.nw') network,get_json_object(line,'$.ln') lng,get_json_object(line,'$.la') lat,get_json_object(line,'$.entry') entry,get_json_object(line,'$.open_ad_type') open_ad_type,get_json_object(line,'$.action') action,get_json_object(line,'$.loading_time') loading_time,get_json_object(line,'$.detail') detail,get_json_object(line,'$.extend1') extend1
from ods_start_log
where dt='2021-05-16';
  1. 表转换导入脚本
#!/bin/bash# ods_to_dwd_start_log.sh# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;thendo_date=$1
else do_date=`date -d "-1 day" +%F`
fi # 设置hive.input.format为HiveInputFormat,否则LZO索引文件会被当做数据导入。
sql="
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table "$APP".dwd_start_log
PARTITION (dt='$do_date')
select get_json_object(line,'$.mid') mid_id,get_json_object(line,'$.uid') user_id,get_json_object(line,'$.vc') version_code,get_json_object(line,'$.vn') version_name,get_json_object(line,'$.l') lang,get_json_object(line,'$.sr') source,get_json_object(line,'$.os') os,get_json_object(line,'$.ar') area,get_json_object(line,'$.md') model,get_json_object(line,'$.ba') brand,get_json_object(line,'$.sv') sdk_version,get_json_object(line,'$.g') gmail,get_json_object(line,'$.hw') height_width,get_json_object(line,'$.t') app_time,get_json_object(line,'$.nw') network,get_json_object(line,'$.ln') lng,get_json_object(line,'$.la') lat,get_json_object(line,'$.entry') entry,get_json_object(line,'$.open_ad_type') open_ad_type,get_json_object(line,'$.action') action,get_json_object(line,'$.loading_time') loading_time,get_json_object(line,'$.detail') detail,get_json_object(line,'$.extend1') extend1
from "$APP".ods_start_log
where dt='$do_date';
"$hive -e "$sql"

3.2.2 用户行为日志

  1. 思路分析

用户行为日志字段比较复杂,可以通过自定义函数解析各个字段。

  1. UDF 函数

通过 key 来取出复杂字符串的各个字段。

package com.simwor.ds.hive;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONObject;public class LogUDF extends UDF {public String evaluate(String line, String key) {if(StringUtils.isBlank(line) || StringUtils.isBlank(key))return "";String[] split = line.split("\\|");if(split.length != 2)return "";String serverTime = split[0].trim();JSONObject jsonObject = new JSONObject(split[1].trim());if(key.equals("st"))return serverTime;else if(key.equals("et")) {if(jsonObject.has("et"))return jsonObject.getString("et");} else {JSONObject cm = jsonObject.getJSONObject("cm");if(cm.has(key))return cm.getString(key);}return "";}}
  1. UDTF 函数

用户行为日志中一条日志中包含多个用户行为,此UDTF函数将用户行为数组分割成多条用户行为记录。

package com.simwor.ds.hive;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;import java.util.ArrayList;
import java.util.List;public class LogUDTF extends GenericUDTF {@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {//输入参数校验List<? extends StructField> allStructFieldRefs = argOIs.getAllStructFieldRefs();if(allStructFieldRefs.size() != 1)throw new UDFArgumentException("参数个数不为1");if(!"string".equals(allStructFieldRefs.get(0).getFieldObjectInspector().getTypeName()))throw new UDFArgumentException("参数类型不为string");//规定输出参数ArrayList<String> fieldNames = new ArrayList<>();ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();fieldNames.add("event_name");fieldNames.add("event_json");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);}@Overridepublic void process(Object[] args) throws HiveException {String eventArray = args[0].toString();JSONArray jsonArray = new JSONArray(eventArray);// 遍历事件数组中每个事件,解析其事件类型;// 每个事件输出一行for(int i=0; i<jsonArray.length(); i++) {String[] result = new String[2];result[0] = jsonArray.getJSONObject(i).getString("en");result[1] = jsonArray.getString(i);forward(result);}}@Overridepublic void close() throws HiveException {}
}
  1. 创建永久用户自定义函数
[omm@simwor01 ~]$ hdfs dfs -mkdir -p /user/hive/jars
[omm@simwor01 ~]$ hdfs dfs -put Hive-1.0-SNAPSHOT.jar /user/hive/jars/
[omm@simwor01 ~]$ hdfs dfs -ls /user/hive/jars/
Found 1 items
-rw-r--r--   1 omm supergroup       4643 2021-05-23 14:32 /user/hive/jars/Hive-1.0-SNAPSHOT.jar
[omm@simwor01 ~]$
hive (gmall)> create function base_analizer as 'com.simwor.ds.hive.LogUDF' using jar 'hdfs://simwor01:8020/user/hive/jars/Hive-1.0-SNAPSHOT.jar';hive (gmall)> create function flat_analizer as 'com.simwor.ds.hive.LogUDTF' using jar 'hdfs://simwor01:8020/user/hive/jars/Hive-1.0-SNAPSHOT.jar';
  1. 建表
drop table if exists dwd_base_event_log;
CREATE EXTERNAL TABLE dwd_base_event_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`event_name` string,
`event_json` string,
`server_time` string)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_base_event_log/'
TBLPROPERTIES('parquet.compression'='lzo');
  1. 插入数据
insert overwrite table dwd_base_event_log partition(dt='2021-05-16')
selectbase_analizer(line,'mid') as mid_id,base_analizer(line,'uid') as user_id,base_analizer(line,'vc') as version_code,base_analizer(line,'vn') as version_name,base_analizer(line,'l') as lang,base_analizer(line,'sr') as source,base_analizer(line,'os') as os,base_analizer(line,'ar') as area,base_analizer(line,'md') as model,base_analizer(line,'ba') as brand,base_analizer(line,'sv') as sdk_version,base_analizer(line,'g') as gmail,base_analizer(line,'hw') as height_width,base_analizer(line,'t') as app_time,base_analizer(line,'nw') as network,base_analizer(line,'ln') as lng,base_analizer(line,'la') as lat,event_name,event_json,base_analizer(line,'st') as server_time
from ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tmp_flat as event_name,event_json
where dt='2021-05-16' and base_analizer(line,'et')<>'';
  1. 插入数据脚本
#!/bin/bash# ods_to_dwd_base_event_log.sh# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;thendo_date=$1
else do_date=`date -d "-1 day" +%F`
fi sql="
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_base_event_log partition(dt='$do_date')
select${APP}.base_analizer(line,'mid') as mid_id,${APP}.base_analizer(line,'uid') as user_id,${APP}.base_analizer(line,'vc') as version_code,${APP}.base_analizer(line,'vn') as version_name,${APP}.base_analizer(line,'l') as lang,${APP}.base_analizer(line,'sr') as source,${APP}.base_analizer(line,'os') as os,${APP}.base_analizer(line,'ar') as area,${APP}.base_analizer(line,'md') as model,${APP}.base_analizer(line,'ba') as brand,${APP}.base_analizer(line,'sv') as sdk_version,${APP}.base_analizer(line,'g') as gmail,${APP}.base_analizer(line,'hw') as height_width,${APP}.base_analizer(line,'t') as app_time,${APP}.base_analizer(line,'nw') as network,${APP}.base_analizer(line,'ln') as lng,${APP}.base_analizer(line,'la') as lat,event_name,event_json,${APP}.base_analizer(line,'st') as server_time
from ${APP}.ods_event_log lateral view ${APP}.flat_analizer(${APP}.base_analizer(line,'et')) tem_flat as event_name,event_json
where dt='$do_date'  and ${APP}.base_analizer(line,'et')<>'';
"$hive -e "$sql";
  1. 每种用户行为拆分成一张表 – 商品曝光表

drop table if exists dwd_display_log;
CREATE EXTERNAL TABLE dwd_display_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`goodsid` string,
`place` string,
`extend1` string,
`category` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_display_log/'
TBLPROPERTIES('parquet.compression'='lzo');
insert overwrite table dwd_display_log
PARTITION (dt='2021-05-16')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.goodsid') goodsid,
get_json_object(event_json,'$.kv.place') place,
get_json_object(event_json,'$.kv.extend1') extend1,
get_json_object(event_json,'$.kv.category') category,
server_time
from dwd_base_event_log
where dt='2021-05-16' and event_name='display';
  1. 10种用户行为批量建表
drop table if exists dwd_display_log;
CREATE EXTERNAL TABLE dwd_display_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`goodsid` string,
`place` string,
`extend1` string,
`category` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_display_log/'
TBLPROPERTIES('parquet.compression'='lzo');drop table if exists dwd_newsdetail_log;
CREATE EXTERNAL TABLE dwd_newsdetail_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`action` string,
`goodsid` string,
`showtype` string,
`news_staytime` string,
`loading_time` string,
`type1` string,
`category` string,
`server_time` string)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_newsdetail_log/'
TBLPROPERTIES('parquet.compression'='lzo');drop table if exists dwd_loading_log;
CREATE EXTERNAL TABLE dwd_loading_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`loading_time` string,
`loading_way` string,
`extend1` string,
`extend2` string,
`type` string,
`type1` string,
`server_time` string)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_loading_log/'
TBLPROPERTIES('parquet.compression'='lzo');drop table if exists dwd_ad_log;
CREATE EXTERNAL TABLE dwd_ad_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`action` string,
`contentType` string,
`displayMills` string,
`itemId` string,
`activityId` string,
`server_time` string)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_ad_log/'
TBLPROPERTIES('parquet.compression'='lzo');drop table if exists dwd_notification_log;
CREATE EXTERNAL TABLE dwd_notification_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`noti_type` string,
`ap_time` string,
`content` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_notification_log/'
TBLPROPERTIES('parquet.compression'='lzo');drop table if exists dwd_active_background_log;
CREATE EXTERNAL TABLE dwd_active_background_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`active_source` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_background_log/'
TBLPROPERTIES('parquet.compression'='lzo');drop table if exists dwd_comment_log;
CREATE EXTERNAL TABLE dwd_comment_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`comment_id` int,
`userid` int,
`p_comment_id` int,
`content` string,
`addtime` string,
`other_id` int,
`praise_count` int,
`reply_count` int,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_comment_log/'
TBLPROPERTIES('parquet.compression'='lzo');drop table if exists dwd_favorites_log;
CREATE EXTERNAL TABLE dwd_favorites_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` int,
`course_id` int,
`userid` int,
`add_time` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_favorites_log/'
TBLPROPERTIES('parquet.compression'='lzo');drop table if exists dwd_praise_log;
CREATE EXTERNAL TABLE dwd_praise_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` string,
`userid` string,
`target_id` string,
`type` string,
`add_time` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_praise_log/'
TBLPROPERTIES('parquet.compression'='lzo');drop table if exists dwd_error_log;
CREATE EXTERNAL TABLE dwd_error_log(
`mid_id` string,
`user_id` string,
`version_code`string,
`version_name`string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`errorBrief` string,
`errorDetail` string,
`server_time` string)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_error_log/'
TBLPROPERTIES('parquet.compression'='lzo');
  1. 10种用户行为批量导表
#!/bin/bash# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;thendo_date=$1
else do_date=`date -d "-1 day" +%F`
fi sql="
set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table "$APP".dwd_display_log
PARTITION (dt='$do_date')
select mid_id,user_id,version_code,version_name,lang,source,os,area,model,brand,sdk_version,gmail,height_width,app_time,network,lng,lat,get_json_object(event_json,'$.kv.action') action,get_json_object(event_json,'$.kv.goodsid') goodsid,get_json_object(event_json,'$.kv.place') place,get_json_object(event_json,'$.kv.extend1') extend1,get_json_object(event_json,'$.kv.category') category,server_time
from "$APP".dwd_base_event_log
where dt='$do_date' and event_name='display';insert overwrite table "$APP".dwd_newsdetail_log
PARTITION (dt='$do_date')
select mid_id,user_id,version_code,version_name,lang,source,os,area,model,brand,sdk_version,gmail,height_width,app_time,network,lng,lat,get_json_object(event_json,'$.kv.entry') entry,get_json_object(event_json,'$.kv.action') action,get_json_object(event_json,'$.kv.goodsid') goodsid,get_json_object(event_json,'$.kv.showtype') showtype,get_json_object(event_json,'$.kv.news_staytime') news_staytime,get_json_object(event_json,'$.kv.loading_time') loading_time,get_json_object(event_json,'$.kv.type1') type1,get_json_object(event_json,'$.kv.category') category,server_time
from "$APP".dwd_base_event_log
where dt='$do_date' and event_name='newsdetail';insert overwrite table "$APP".dwd_loading_log
PARTITION (dt='$do_date')
select mid_id,user_id,version_code,version_name,lang,source,os,area,model,brand,sdk_version,gmail,height_width,app_time,network,lng,lat,get_json_object(event_json,'$.kv.action') action,get_json_object(event_json,'$.kv.loading_time') loading_time,get_json_object(event_json,'$.kv.loading_way') loading_way,get_json_object(event_json,'$.kv.extend1') extend1,get_json_object(event_json,'$.kv.extend2') extend2,get_json_object(event_json,'$.kv.type') type,get_json_object(event_json,'$.kv.type1') type1,server_time
from "$APP".dwd_base_event_log
where dt='$do_date' and event_name='loading';insert overwrite table "$APP".dwd_ad_log
PARTITION (dt='$do_date')
select mid_id,user_id,version_code,version_name,lang,source,os,area,model,brand,sdk_version,gmail,height_width,app_time,network,lng,lat,get_json_object(event_json,'$.kv.entry') entry,get_json_object(event_json,'$.kv.action') action,get_json_object(event_json,'$.kv.contentType') contentType,get_json_object(event_json,'$.kv.displayMills') displayMills,get_json_object(event_json,'$.kv.itemId') itemId,get_json_object(event_json,'$.kv.activityId') activityId,server_time
from "$APP".dwd_base_event_log
where dt='$do_date' and event_name='ad';insert overwrite table "$APP".dwd_notification_log
PARTITION (dt='$do_date')
select mid_id,user_id,version_code,version_name,lang,source,os,area,model,brand,sdk_version,gmail,height_width,app_time,network,lng,lat,get_json_object(event_json,'$.kv.action') action,get_json_object(event_json,'$.kv.noti_type') noti_type,get_json_object(event_json,'$.kv.ap_time') ap_time,get_json_object(event_json,'$.kv.content') content,server_time
from "$APP".dwd_base_event_log
where dt='$do_date' and event_name='notification';insert overwrite table "$APP".dwd_active_background_log
PARTITION (dt='$do_date')
select mid_id,user_id,version_code,version_name,lang,source,os,area,model,brand,sdk_version,gmail,height_width,app_time,network,lng,lat,get_json_object(event_json,'$.kv.active_source') active_source,server_time
from "$APP".dwd_base_event_log
where dt='$do_date' and event_name='active_background';insert overwrite table "$APP".dwd_comment_log
PARTITION (dt='$do_date')
select mid_id,user_id,version_code,version_name,lang,source,os,area,model,brand,sdk_version,gmail,height_width,app_time,network,lng,lat,get_json_object(event_json,'$.kv.comment_id') comment_id,get_json_object(event_json,'$.kv.userid') userid,get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,get_json_object(event_json,'$.kv.content') content,get_json_object(event_json,'$.kv.addtime') addtime,get_json_object(event_json,'$.kv.other_id') other_id,get_json_object(event_json,'$.kv.praise_count') praise_count,get_json_object(event_json,'$.kv.reply_count') reply_count,server_time
from "$APP".dwd_base_event_log
where dt='$do_date' and event_name='comment';insert overwrite table "$APP".dwd_favorites_log
PARTITION (dt='$do_date')
select mid_id,user_id,version_code,version_name,lang,source,os,area,model,brand,sdk_version,gmail,height_width,app_time,network,lng,lat,get_json_object(event_json,'$.kv.id') id,get_json_object(event_json,'$.kv.course_id') course_id,get_json_object(event_json,'$.kv.userid') userid,get_json_object(event_json,'$.kv.add_time') add_time,server_time
from "$APP".dwd_base_event_log
where dt='$do_date' and event_name='favorites';insert overwrite table "$APP".dwd_praise_log
PARTITION (dt='$do_date')
select mid_id,user_id,version_code,version_name,lang,source,os,area,model,brand,sdk_version,gmail,height_width,app_time,network,lng,lat,get_json_object(event_json,'$.kv.id') id,get_json_object(event_json,'$.kv.userid') userid,get_json_object(event_json,'$.kv.target_id') target_id,get_json_object(event_json,'$.kv.type') type,get_json_object(event_json,'$.kv.add_time') add_time,server_time
from "$APP".dwd_base_event_log
where dt='$do_date' and event_name='praise';insert overwrite table "$APP".dwd_error_log
PARTITION (dt='$do_date')
select mid_id,user_id,version_code,version_name,lang,source,os,area,model,brand,sdk_version,gmail,height_width,app_time,network,lng,lat,get_json_object(event_json,'$.kv.errorBrief') errorBrief,get_json_object(event_json,'$.kv.errorDetail') errorDetail,server_time
from "$APP".dwd_base_event_log
where dt='$do_date' and event_name='error';
"$hive -e "$sql"

3.2.3 业务数据

  • 商品维度表(全量表)

  1. 建表语句
DROP TABLE IF EXISTS `dwd_dim_sku_info`;
CREATE EXTERNAL TABLE `dwd_dim_sku_info` (`id` string COMMENT '商品id',`spu_id` string COMMENT 'spuid',`price` double COMMENT '商品价格',`sku_name` string COMMENT '商品名称',`sku_desc` string COMMENT '商品描述',`weight` double COMMENT '重量',`tm_id` string COMMENT '品牌id',`tm_name` string COMMENT '品牌名称',`category3_id` string COMMENT '三级分类id',`category2_id` string COMMENT '二级分类id',`category1_id` string COMMENT '一级分类id',`category3_name` string COMMENT '三级分类名称',`category2_name` string COMMENT '二级分类名称',`category1_name` string COMMENT '一级分类名称',`spu_name` string COMMENT 'spu名称',`create_time` string COMMENT '创建时间'
)
COMMENT '商品维度表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_sku_info/'
tblproperties ("parquet.compression"="lzo");
  1. 数据装载
insert overwrite table dwd_dim_sku_info partition(dt='2020-03-10')
select  sku.id,sku.spu_id,sku.price,sku.sku_name,sku.sku_desc,sku.weight,sku.tm_id,ob.tm_name,sku.category3_id,c2.id category2_id,c1.id category1_id,c3.name category3_name,c2.name category2_name,c1.name category1_name,spu.spu_name,sku.create_time
from
(select * from ods_sku_info where dt='2020-03-10'
)sku
join
(select * from ods_base_trademark where dt='2020-03-10'
)ob on sku.tm_id=ob.tm_id
join
(select * from ods_spu_info where dt='2020-03-10'
)spu on spu.id = sku.spu_id
join
(select * from ods_base_category3 where dt='2020-03-10'
)c3 on sku.category3_id=c3.id
join
(select * from ods_base_category2 where dt='2020-03-10'
)c2 on c3.category2_id=c2.id
join
(select * from ods_base_category1 where dt='2020-03-10'
)c1 on c2.category1_id=c1.id;
  • 优惠券信息表(全量)
  1. 建表语句
drop table if exists dwd_dim_coupon_info;
create external table dwd_dim_coupon_info(`id` string COMMENT '购物券编号',`coupon_name` string COMMENT '购物券名称',`coupon_type` string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',`condition_amount` string COMMENT '满额数',`condition_num` string COMMENT '满件数',`activity_id` string COMMENT '活动编号',`benefit_amount` string COMMENT '减金额',`benefit_discount` string COMMENT '折扣',`create_time` string COMMENT '创建时间',`range_type` string COMMENT '范围类型 1、商品 2、品类 3、品牌',`spu_id` string COMMENT '商品id',`tm_id` string COMMENT '品牌id',`category3_id` string COMMENT '品类id',`limit_num` string COMMENT '最多领用次数',`operate_time`  string COMMENT '修改时间',`expire_time`  string COMMENT '过期时间'
) COMMENT '优惠券信息表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_coupon_info/'
tblproperties ("parquet.compression"="lzo");
  1. 数据加载
insert overwrite table dwd_dim_coupon_info partition(dt='2020-03-10')
selectid,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,spu_id,tm_id,category3_id,limit_num,operate_time,expire_time
from ods_coupon_info
where dt='2020-03-10';
  • 活动维度表(全量)

  1. 建表语句
drop table if exists dwd_dim_activity_info;
create external table dwd_dim_activity_info(`id` string COMMENT '编号',`activity_name` string  COMMENT '活动名称',`activity_type` string  COMMENT '活动类型',`condition_amount` string  COMMENT '满减金额',`condition_num` string  COMMENT '满减件数',`benefit_amount` string  COMMENT '优惠金额',`benefit_discount` string  COMMENT '优惠折扣',`benefit_level` string  COMMENT '优惠级别',`start_time` string  COMMENT '开始时间',`end_time` string  COMMENT '结束时间',`create_time` string  COMMENT '创建时间'
) COMMENT '活动信息表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_activity_info/'
tblproperties ("parquet.compression"="lzo");
  1. 数据装载
insert overwrite table dwd_dim_activity_info partition(dt='2020-03-10')
selectinfo.id,info.activity_name,info.activity_type,rule.condition_amount,rule.condition_num,rule.benefit_amount,rule.benefit_discount,rule.benefit_level,info.start_time,info.end_time,info.create_time
from
(select * from ods_activity_info where dt='2020-03-10'
)info
left join
(select * from ods_activity_rule where dt='2020-03-10'
)rule on info.id = rule.activity_id;
  • 地区维度表(特殊)

  1. 建表语句
DROP TABLE IF EXISTS `dwd_dim_base_province`;
CREATE EXTERNAL TABLE `dwd_dim_base_province` (`id` string COMMENT 'id',`province_name` string COMMENT '省市名称',`area_code` string COMMENT '地区编码',`iso_code` string COMMENT 'ISO编码',`region_id` string COMMENT '地区id',`region_name` string COMMENT '地区名称'
)
COMMENT '地区省市表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_base_province/'
tblproperties ("parquet.compression"="lzo");
  1. 数据装载
insert overwrite table dwd_dim_base_province
select bp.id,bp.name,bp.area_code,bp.iso_code,bp.region_id,br.region_name
from ods_base_province bp
join ods_base_region br
on bp.region_id=br.id;
  • 时间维度表(特殊)
  1. 建表语句
DROP TABLE IF EXISTS `dwd_dim_date_info`;
CREATE EXTERNAL TABLE `dwd_dim_date_info`(`date_id` string COMMENT '日',`week_id` int COMMENT '周',`week_day` int COMMENT '周的第几天',`day` int COMMENT '每月的第几天',`month` int COMMENT '第几月',`quarter` int COMMENT '第几季度',`year` int COMMENT '年',`is_workday` int COMMENT '是否是周末',`holiday_id` int COMMENT '是否是节假日'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_date_info/'
tblproperties ("parquet.compression"="lzo");
  1. 把date_info.txt文件上传到hadoop102的/opt/module/db_log/路径

  2. 创建临时表,非列式存储

DROP TABLE IF EXISTS `dwd_dim_date_info_tmp`;
CREATE EXTERNAL TABLE `dwd_dim_date_info_tmp`(`date_id` string COMMENT '日',`week_id` int COMMENT '周',`week_day` int COMMENT '周的第几天',`day` int COMMENT '每月的第几天',`month` int COMMENT '第几月',`quarter` int COMMENT '第几季度',`year` int COMMENT '年',`is_workday` int COMMENT '是否是周末',`holiday_id` int COMMENT '是否是节假日'
)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_dim_date_info_tmp/';
  1. 将数据导入临时表
load data local inpath '/opt/module/db_log/date_info.txt' into table dwd_dim_date_info_tmp;
  1. 将数据导入正式表
insert overwrite table dwd_dim_date_info select * from dwd_dim_date_info_tmp;
  • 订单明细事实表(事务型事实表)

  1. 建表语句
drop table if exists dwd_fact_order_detail;
create external table dwd_fact_order_detail (`id` string COMMENT '',`order_id` string COMMENT '',`province_id` string COMMENT '',`user_id` string COMMENT '',`sku_id` string COMMENT '',`create_time` string COMMENT '',`total_amount` decimal(20,2) COMMENT '',`sku_num` bigint COMMENT ''
)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_detail/'
tblproperties ("parquet.compression"="lzo");
  1. 数据装载
insert overwrite table dwd_fact_order_detail partition(dt='2020-03-10')
selectod.id,od.order_id,oi.province_id,od.user_id,od.sku_id,od.create_time,od.order_price*od.sku_num,od.sku_num
from
(select * from ods_order_detail where dt='2020-03-10'
) od
join
(select * from ods_order_info where dt='2020-03-10'
) oi
on od.order_id=oi.id;
  • 支付事实表(事务型事实表)

  1. 建表语句
drop table if exists dwd_fact_payment_info;
create external table dwd_fact_payment_info (`id` string COMMENT '',`out_trade_no` string COMMENT '对外业务编号',`order_id` string COMMENT '订单编号',`user_id` string COMMENT '用户编号',`alipay_trade_no` string COMMENT '支付宝交易流水编号',`payment_amount`    decimal(16,2) COMMENT '支付金额',`subject`         string COMMENT '交易内容',`payment_type` string COMMENT '支付类型',`payment_time` string COMMENT '支付时间',`province_id` string COMMENT '省份ID'
)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_payment_info/'
tblproperties ("parquet.compression"="lzo");
  1. 数据装载
insert overwrite table dwd_fact_payment_info partition(dt='2020-03-10')
selectpi.id,pi.out_trade_no,pi.order_id,pi.user_id,pi.alipay_trade_no,pi.total_amount,pi.subject,pi.payment_type,pi.payment_time,          oi.province_id
from
(select * from ods_payment_info where dt='2020-03-10'
)pi
join
(select id, province_id from ods_order_info where dt='2020-03-10'
)oi
on pi.order_id = oi.id;
  • 退款事实表(事务型事实表)
drop table if exists dwd_fact_order_refund_info;
create external table dwd_fact_order_refund_info(`id` string COMMENT '编号',`user_id` string COMMENT '用户ID',`order_id` string COMMENT '订单ID',`sku_id` string COMMENT '商品ID',`refund_type` string COMMENT '退款类型',`refund_num` bigint COMMENT '退款件数',`refund_amount` decimal(16,2) COMMENT '退款金额',`refund_reason_type` string COMMENT '退款原因类型',`create_time` string COMMENT '退款时间'
) COMMENT '退款事实表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_fact_order_refund_info/';
insert overwrite table dwd_fact_order_refund_info partition(dt='2020-03-10')
selectid,user_id,order_id,sku_id,refund_type,refund_num,refund_amount,refund_reason_type,create_time
from ods_order_refund_info
where dt='2020-03-10';
  • 评价事实表(事务型事实表)
drop table if exists dwd_fact_comment_info;
create external table dwd_fact_comment_info(`id` string COMMENT '编号',`user_id` string COMMENT '用户ID',`sku_id` string COMMENT '商品sku',`spu_id` string COMMENT '商品spu',`order_id` string COMMENT '订单ID',`appraise` string COMMENT '评价',`create_time` string COMMENT '评价时间'
) COMMENT '评价事实表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_fact_comment_info/';
insert overwrite table dwd_fact_comment_info partition(dt='2020-03-10')
selectid,user_id,sku_id,spu_id,order_id,appraise,create_time
from ods_comment_info
where dt='2020-03-10';
  • 加购事实表(周期型快照事实表,每日快照)
  1. 由于购物车的数量是会发生变化,所以导增量不合适。
  2. 每天做一次快照,导入的数据是全量,区别于事务型事实表是每天导入新增。
  3. 周期型快照事实表劣势:存储的数据量会比较大。
  4. 解决方案:周期型快照事实表存储的数据比较讲究时效性,时间太久了的意义不大,可以删除以前的数据。
drop table if exists dwd_fact_cart_info;
create external table dwd_fact_cart_info(`id` string COMMENT '编号',`user_id` string  COMMENT '用户id',`sku_id` string  COMMENT 'skuid',`cart_price` string  COMMENT '放入购物车时价格',`sku_num` string  COMMENT '数量',`sku_name` string  COMMENT 'sku名称 (冗余)',`create_time` string  COMMENT '创建时间',`operate_time` string COMMENT '修改时间',`is_ordered` string COMMENT '是否已经下单。1为已下单;0为未下单',
`order_time` string  COMMENT '下单时间'
) COMMENT '加购事实表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_fact_cart_info/';
insert overwrite table dwd_fact_cart_info partition(dt='2020-03-10')
selectid,user_id,sku_id,cart_price,sku_num,sku_name,create_time,operate_time,
is_ordered,
order_time
from ods_cart_info
where dt='2020-03-10';
  • 收藏事实表(周期型快照事实表,每日快照)
  1. 收藏的标记,是否取消,会发生变化,做增量不合适。
  2. 每天做一次快照,导入的数据是全量,区别于事务型事实表是每天导入新增。
drop table if exists dwd_fact_favor_info;
create external table dwd_fact_favor_info(`id` string COMMENT '编号',`user_id` string  COMMENT '用户id',`sku_id` string  COMMENT 'skuid',`spu_id` string  COMMENT 'spuid',`is_cancel` string  COMMENT '是否取消',`create_time` string  COMMENT '收藏时间',`cancel_time` string  COMMENT '取消时间'
) COMMENT '收藏事实表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_fact_favor_info/';
insert overwrite table dwd_fact_favor_info partition(dt='2020-03-10')
selectid,user_id,sku_id,spu_id,is_cancel,create_time,cancel_time
from ods_favor_info
where dt='2020-03-10';
  • 优惠券领用事实表(累积型快照事实表)
  1. 优惠卷的生命周期:领取优惠卷-》用优惠卷下单-》优惠卷参与支付
  2. 累积型快照事实表使用:统计优惠卷领取次数、优惠卷下单次数、优惠卷参与支付次数
  3. dt是按照优惠卷领用时间get_time做为分区。
drop table if exists dwd_fact_coupon_use;
create external table dwd_fact_coupon_use(`id` string COMMENT '编号',`coupon_id` string  COMMENT '优惠券ID',`user_id` string  COMMENT 'userid',`order_id` string  COMMENT '订单id',`coupon_status` string  COMMENT '优惠券状态',`get_time` string  COMMENT '领取时间',`using_time` string  COMMENT '使用时间(下单)',`used_time` string  COMMENT '使用时间(支付)'
) COMMENT '优惠券领用事实表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_fact_coupon_use/';

set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_fact_coupon_use partition(dt)
selectif(new.id is null,old.id,new.id),if(new.coupon_id is null,old.coupon_id,new.coupon_id),if(new.user_id is null,old.user_id,new.user_id),if(new.order_id is null,old.order_id,new.order_id),if(new.coupon_status is null,old.coupon_status,new.coupon_status),if(new.get_time is null,old.get_time,new.get_time),if(new.using_time is null,old.using_time,new.using_time),if(new.used_time is null,old.used_time,new.used_time),date_format(if(new.get_time is null,old.get_time,new.get_time),'yyyy-MM-dd')
from
(selectid,coupon_id,user_id,order_id,coupon_status,get_time,using_time,used_timefrom dwd_fact_coupon_usewhere dt in(selectdate_format(get_time,'yyyy-MM-dd')from ods_coupon_usewhere dt='2020-03-10')
)old
full outer join
(selectid,coupon_id,user_id,order_id,coupon_status,get_time,using_time,used_timefrom ods_coupon_usewhere dt='2020-03-10'
)new
on old.id=new.id;
  • 订单事实表(累积型快照事实表)
  1. 订单生命周期:创建时间=》支付时间=》取消时间=》完成时间=》退款时间=》退款完成时间。
  2. 由于ODS层订单表只有创建时间和操作时间两个状态,不能表达所有时间含义,所以需要关联订单状态表。订单事实表里面增加了活动id,所以需要关联活动订单表。
drop table if exists dwd_fact_order_info;
create external table dwd_fact_order_info (`id` string COMMENT '订单编号',`order_status` string COMMENT '订单状态',`user_id` string COMMENT '用户id',`out_trade_no` string COMMENT '支付流水号',`create_time` string COMMENT '创建时间(未支付状态)',`payment_time` string COMMENT '支付时间(已支付状态)',`cancel_time` string COMMENT '取消时间(已取消状态)',`finish_time` string COMMENT '完成时间(已完成状态)',`refund_time` string COMMENT '退款时间(退款中状态)',`refund_finish_time` string COMMENT '退款完成时间(退款完成状态)',`province_id` string COMMENT '省份ID',`activity_id` string COMMENT '活动ID',`original_total_amount` string COMMENT '原价金额',`benefit_reduce_amount` string COMMENT '优惠金额',`feight_fee` string COMMENT '运费',`final_total_amount` decimal(10,2) COMMENT '订单金额'
)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_info/'
tblproperties ("parquet.compression"="lzo");

set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_fact_order_info partition(dt)
selectif(new.id is null,old.id,new.id),if(new.order_status is null,old.order_status,new.order_status),if(new.user_id is null,old.user_id,new.user_id),if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),if(new.tms['1001'] is null,old.create_time,new.tms['1001']),--1001对应未支付状态if(new.tms['1002'] is null,old.payment_time,new.tms['1002']),if(new.tms['1003'] is null,old.cancel_time,new.tms['1003']),if(new.tms['1004'] is null,old.finish_time,new.tms['1004']),if(new.tms['1005'] is null,old.refund_time,new.tms['1005']),if(new.tms['1006'] is null,old.refund_finish_time,new.tms['1006']),if(new.province_id is null,old.province_id,new.province_id),if(new.activity_id is null,old.activity_id,new.activity_id),if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),if(new.feight_fee is null,old.feight_fee,new.feight_fee),if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount),date_format(if(new.tms['1001'] is null,old.create_time,new.tms['1001']),'yyyy-MM-dd')
from
(selectid,order_status,user_id,out_trade_no,create_time,payment_time,cancel_time,finish_time,refund_time,refund_finish_time,province_id,activity_id,original_total_amount,benefit_reduce_amount,feight_fee,final_total_amountfrom dwd_fact_order_infowhere dtin(selectdate_format(create_time,'yyyy-MM-dd')from ods_order_infowhere dt='2020-03-10')
)old
full outer join
(selectinfo.id,info.order_status,info.user_id,info.out_trade_no,info.province_id,act.activity_id,log.tms,info.original_total_amount,info.benefit_reduce_amount,info.feight_fee,info.final_total_amountfrom(selectorder_id,str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') tmsfrom ods_order_status_logwhere dt='2020-03-10'group by order_id)logjoin(select * from ods_order_info where dt='2020-03-10')infoon log.order_id=info.idleft join(select * from ods_activity_order where dt='2020-03-10')acton log.order_id=act.order_id
)new
on old.id=new.id;
  • 用户维度表(拉链表)

用户表中的数据每日既有可能新增,也有可能修改,但修改频率并不高,属于缓慢变化维度,此处采用拉链表存储用户维度数据。

  1. 什么是拉链表

  1. 为什么要做拉链表

  1. 如何使用拉链表

  1. 拉链表形成过程


5. 拉链表制作过程图

  1. 初始化拉链表
drop table if exists dwd_dim_user_info_his;
create external table dwd_dim_user_info_his(`id` string COMMENT '用户id',`name` string COMMENT '姓名', `birthday` string COMMENT '生日',`gender` string COMMENT '性别',`email` string COMMENT '邮箱',`user_level` string COMMENT '用户等级',`create_time` string COMMENT '创建时间',`operate_time` string COMMENT '操作时间',`start_date`  string COMMENT '有效开始日期',`end_date`  string COMMENT '有效结束日期'
) COMMENT '订单拉链表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_user_info_his/'
tblproperties ("parquet.compression"="lzo");
insert overwrite table dwd_dim_user_info_his
selectid,name,birthday,gender,email,user_level,create_time,operate_time,'2020-03-10','9999-99-99'
from ods_user_info oi
where oi.dt='2020-03-10';
  1. 建立临时表
drop table if exists dwd_dim_user_info_his_tmp;
create external table dwd_dim_user_info_his_tmp(`id` string COMMENT '用户id',`name` string COMMENT '姓名', `birthday` string COMMENT '生日',`gender` string COMMENT '性别',`email` string COMMENT '邮箱',`user_level` string COMMENT '用户等级',`create_time` string COMMENT '创建时间',`operate_time` string COMMENT '操作时间',`start_date`  string COMMENT '有效开始日期',`end_date`  string COMMENT '有效结束日期'
) COMMENT '订单拉链临时表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_user_info_his_tmp/'
tblproperties ("parquet.compression"="lzo");
  1. 数据装载
insert overwrite table dwd_dim_user_info_his_tmp
select * from
(select id,name,birthday,gender,email,user_level,create_time,operate_time,'2020-03-11' start_date,'9999-99-99' end_datefrom ods_user_info where dt='2020-03-11'union all select uh.id,uh.name,uh.birthday,uh.gender,uh.email,uh.user_level,uh.create_time,uh.operate_time,uh.start_date,if(ui.id is not null  and uh.end_date='9999-99-99', date_add(ui.dt,-1), uh.end_date) end_datefrom dwd_dim_user_info_his uh left join (select*from ods_user_infowhere dt='2020-03-11') ui on uh.id=ui.id
)his
order by his.id, start_date;
  1. 把临时表覆盖给拉链表
insert overwrite table dwd_dim_user_info_his select * from dwd_dim_user_info_his_tmp;
  • DWD层业务数据导入脚本
  1. 初次导入:
    1.1 时间维度表 - 单独导入
    1.2 用户维度表 - 拉链表初始化
    1.3 其余表 - ods_to_dwd_db.sh first 2020-03-10
  2. 每日定时导入 - ods_to_dwd_db.sh all 2020-03-11
#!/bin/bash# ods_to_dwd_db.shAPP=gmall
hive=/opt/module/hive/bin/hive# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;thendo_date=$2
else do_date=`date -d "-1 day" +%F`
fisql1="
set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table ${APP}.dwd_dim_sku_info partition(dt='$do_date')
select  sku.id,sku.spu_id,sku.price,sku.sku_name,sku.sku_desc,sku.weight,sku.tm_id,ob.tm_name,sku.category3_id,c2.id category2_id,c1.id category1_id,c3.name category3_name,c2.name category2_name,c1.name category1_name,spu.spu_name,sku.create_time
from
(select * from ${APP}.ods_sku_info where dt='$do_date'
)sku
join
(select * from ${APP}.ods_base_trademark where dt='$do_date'
)ob on sku.tm_id=ob.tm_id
join
(select * from ${APP}.ods_spu_info where dt='$do_date'
)spu on spu.id = sku.spu_id
join
(select * from ${APP}.ods_base_category3 where dt='$do_date'
)c3 on sku.category3_id=c3.id
join
(select * from ${APP}.ods_base_category2 where dt='$do_date'
)c2 on c3.category2_id=c2.id
join
(select * from ${APP}.ods_base_category1 where dt='$do_date'
)c1 on c2.category1_id=c1.id;insert overwrite table ${APP}.dwd_dim_coupon_info partition(dt='$do_date')
selectid,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,spu_id,tm_id,category3_id,limit_num,operate_time,expire_time
from ${APP}.ods_coupon_info
where dt='$do_date';insert overwrite table ${APP}.dwd_dim_activity_info partition(dt='$do_date')
selectinfo.id,info.activity_name,info.activity_type,rule.condition_amount,rule.condition_num,rule.benefit_amount,rule.benefit_discount,rule.benefit_level,info.start_time,info.end_time,info.create_time
from
(select * from ${APP}.ods_activity_info where dt='$do_date'
)info
left join
(select * from ${APP}.ods_activity_rule where dt='$do_date'
)rule on info.id = rule.activity_id;insert overwrite table ${APP}.dwd_fact_order_detail partition(dt='$do_date')
selectod.id,od.order_id,oi.province_id,od.user_id,od.sku_id,od.create_time,od.order_price*od.sku_num,od.sku_num
from
(select * from ${APP}.ods_order_detail where dt='$do_date'
) od
join
(select * from ${APP}.ods_order_info where dt='$do_date'
) oi
on od.order_id=oi.id;insert overwrite table ${APP}.dwd_fact_payment_info partition(dt='$do_date')
selectpi.id,pi.out_trade_no,pi.order_id,pi.user_id,pi.alipay_trade_no,pi.total_amount,pi.subject,pi.payment_type,pi.payment_time,          oi.province_id
from
(select * from ${APP}.ods_payment_info where dt='$do_date'
)pi
join
(select id, province_id from ${APP}.ods_order_info where dt='$do_date'
)oi
on pi.order_id = oi.id;insert overwrite table ${APP}.dwd_fact_order_refund_info partition(dt='$do_date')
selectid,user_id,order_id,sku_id,refund_type,refund_num,refund_amount,refund_reason_type,create_time
from ${APP}.ods_order_refund_info
where dt='$do_date';insert overwrite table ${APP}.dwd_fact_comment_info partition(dt='$do_date')
selectid,user_id,sku_id,spu_id,order_id,appraise,create_time
from ${APP}.ods_comment_info
where dt='$do_date';insert overwrite table ${APP}.dwd_fact_cart_info partition(dt='$do_date')
selectid,user_id,sku_id,cart_price,sku_num,sku_name,create_time,operate_time,is_ordered,order_time
from ${APP}.ods_cart_info
where dt='$do_date';insert overwrite table ${APP}.dwd_fact_favor_info partition(dt='$do_date')
selectid,user_id,sku_id,spu_id,is_cancel,create_time,cancel_time
from ${APP}.ods_favor_info
where dt='$do_date';insert overwrite table ${APP}.dwd_fact_coupon_use partition(dt)
selectif(new.id is null,old.id,new.id),if(new.coupon_id is null,old.coupon_id,new.coupon_id),if(new.user_id is null,old.user_id,new.user_id),if(new.order_id is null,old.order_id,new.order_id),if(new.coupon_status is null,old.coupon_status,new.coupon_status),if(new.get_time is null,old.get_time,new.get_time),if(new.using_time is null,old.using_time,new.using_time),if(new.used_time is null,old.used_time,new.used_time),date_format(if(new.get_time is null,old.get_time,new.get_time),'yyyy-MM-dd')
from
(selectid,coupon_id,user_id,order_id,coupon_status,get_time,using_time,used_timefrom ${APP}.dwd_fact_coupon_usewhere dt in(selectdate_format(get_time,'yyyy-MM-dd')from ${APP}.ods_coupon_usewhere dt='$do_date')
)old
full outer join
(selectid,coupon_id,user_id,order_id,coupon_status,get_time,using_time,used_timefrom ${APP}.ods_coupon_usewhere dt='$do_date'
)new
on old.id=new.id;insert overwrite table ${APP}.dwd_fact_order_info partition(dt)
selectif(new.id is null,old.id,new.id),if(new.order_status is null,old.order_status,new.order_status),if(new.user_id is null,old.user_id,new.user_id),if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),if(new.tms['1001'] is null,old.create_time,new.tms['1001']),--1001对应未支付状态if(new.tms['1002'] is null,old.payment_time,new.tms['1002']),if(new.tms['1003'] is null,old.cancel_time,new.tms['1003']),if(new.tms['1004'] is null,old.finish_time,new.tms['1004']),if(new.tms['1005'] is null,old.refund_time,new.tms['1005']),if(new.tms['1006'] is null,old.refund_finish_time,new.tms['1006']),if(new.province_id is null,old.province_id,new.province_id),if(new.activity_id is null,old.activity_id,new.activity_id),if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),if(new.feight_fee is null,old.feight_fee,new.feight_fee),if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount),date_format(if(new.tms['1001'] is null,old.create_time,new.tms['1001']),'yyyy-MM-dd')
from
(selectid,order_status,user_id,out_trade_no,create_time,payment_time,cancel_time,finish_time,refund_time,refund_finish_time,province_id,activity_id,original_total_amount,benefit_reduce_amount,feight_fee,final_total_amountfrom ${APP}.dwd_fact_order_infowhere dtin(selectdate_format(create_time,'yyyy-MM-dd')from ${APP}.ods_order_infowhere dt='$do_date')
)old
full outer join
(selectinfo.id,info.order_status,info.user_id,info.out_trade_no,info.province_id,act.activity_id,log.tms,info.original_total_amount,info.benefit_reduce_amount,info.feight_fee,info.final_total_amountfrom(selectorder_id,str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') tmsfrom ${APP}.ods_order_status_logwhere dt='$do_date'group by order_id)logjoin(select * from ${APP}.ods_order_info where dt='$do_date')infoon log.order_id=info.idleft join(select * from ${APP}.ods_activity_order where dt='$do_date')acton log.order_id=act.order_id
)new
on old.id=new.id;
"sql2="
insert overwrite table ${APP}.dwd_dim_base_province
select bp.id,bp.name,bp.area_code,bp.iso_code,bp.region_id,br.region_name
from ${APP}.ods_base_province bp
join ${APP}.ods_base_region br
on bp.region_id=br.id;
"sql3="
insert overwrite table ${APP}.dwd_dim_user_info_his_tmp
select * from
(select id,name,birthday,gender,email,user_level,create_time,operate_time,'$do_date' start_date,'9999-99-99' end_datefrom ${APP}.ods_user_info where dt='$do_date'union all select uh.id,uh.name,uh.birthday,uh.gender,uh.email,uh.user_level,uh.create_time,uh.operate_time,uh.start_date,if(ui.id is not null  and uh.end_date='9999-99-99', date_add(ui.dt,-1), uh.end_date) end_datefrom ${APP}.dwd_dim_user_info_his uh left join (select*from ${APP}.ods_user_infowhere dt='$do_date') ui on uh.id=ui.id
)his
order by his.id, start_date;insert overwrite table ${APP}.dwd_dim_user_info_his
select * from ${APP}.dwd_dim_user_info_his_tmp;
"case $1 in
"first"){$hive -e "$sql1$sql2"
};;
"all"){$hive -e "$sql1$sql3"
};;
esac

3.3 DWS 层

3.3.1 每日设备行为

drop table if exists dws_uv_detail_daycount;
create external table dws_uv_detail_daycount
(`mid_id` string COMMENT '设备唯一标识',`user_id` string COMMENT '用户标识',`version_code` string COMMENT '程序版本号', `version_name` string COMMENT '程序版本名', `lang` string COMMENT '系统语言', `source` string COMMENT '渠道号', `os` string COMMENT '安卓系统版本', `area` string COMMENT '区域', `model` string COMMENT '手机型号', `brand` string COMMENT '手机品牌', `sdk_version` string COMMENT 'sdkVersion', `gmail` string COMMENT 'gmail', `height_width` string COMMENT '屏幕宽高',`app_time` string COMMENT '客户端日志产生时的时间',`network` string COMMENT '网络模式',`lng` string COMMENT '经度',`lat` string COMMENT '纬度'
)
partitioned by(dt string)
stored as parquet
location '/warehouse/gmall/dws/dws_uv_detail_daycount';
insert overwrite table dws_uv_detail_daycount partition(dt='2020-03-10')
select  mid_id,concat_ws('|', collect_set(user_id)) user_id,concat_ws('|', collect_set(version_code)) version_code,concat_ws('|', collect_set(version_name)) version_name,concat_ws('|', collect_set(lang))lang,concat_ws('|', collect_set(source)) source,concat_ws('|', collect_set(os)) os,concat_ws('|', collect_set(area)) area, concat_ws('|', collect_set(model)) model,concat_ws('|', collect_set(brand)) brand,concat_ws('|', collect_set(sdk_version)) sdk_version,concat_ws('|', collect_set(gmail)) gmail,concat_ws('|', collect_set(height_width)) height_width,concat_ws('|', collect_set(app_time)) app_time,concat_ws('|', collect_set(network)) network,concat_ws('|', collect_set(lng)) lng,concat_ws('|', collect_set(lat)) lat
from dwd_start_log
where dt='2020-03-10'
group by mid_id;

3.3.2 每日会员行为

DWS层的宽表字段,是站在不同维度的视角去看事实表。重点关注事实表的度量值。

drop table if exists dws_user_action_daycount;
create external table dws_user_action_daycount
(   user_id string comment '用户 id',login_count bigint comment '登录次数',cart_count bigint comment '加入购物车次数',order_count bigint comment '下单次数',order_amount    decimal(16,2)  comment '下单金额',payment_count   bigint      comment '支付次数',payment_amount  decimal(16,2) comment '支付金额',order_detail_stats array<struct<sku_id:string,sku_num:bigint,order_count:bigint,order_amount:decimal(20,2)>> comment '下单明细统计'
) COMMENT '每日用户行为'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dws/dws_user_action_daycount/'
tblproperties ("parquet.compression"="lzo");
with
tmp_login as
(selectuser_id,count(*) login_countfrom dwd_start_logwhere dt='2020-03-10'and user_id is not nullgroup by user_id
),
tmp_cart as
(selectuser_id,count(*) cart_countfrom dwd_fact_cart_infowhere dt='2020-03-10'
and date_format(create_time,'yyyy-MM-dd')='2020-03-10'group by user_id
),
tmp_order as
(selectuser_id,count(*) order_count,sum(final_total_amount) order_amountfrom dwd_fact_order_infowhere dt='2020-03-10'group by user_id
) ,
tmp_payment as
(selectuser_id,count(*) payment_count,sum(payment_amount) payment_amountfrom dwd_fact_payment_infowhere dt='2020-03-10'group by user_id
),
tmp_order_detail as
(selectuser_id,collect_set(named_struct('sku_id',sku_id,'sku_num',sku_num,'order_count',order_count,'order_amount',order_amount)) order_statsfrom(selectuser_id,sku_id,sum(sku_num) sku_num,count(*) order_count,cast(sum(total_amount) as decimal(20,2)) order_amountfrom dwd_fact_order_detailwhere dt='2020-03-10'group by user_id,sku_id)tmpgroup by user_id
)insert overwrite table dws_user_action_daycount partition(dt='2020-03-10')
selectcoalesce(tmp_login.user_id,tmp_cart.user_id,tmp_order.user_id,tmp_payment.user_id,tmp_order_detail.user_id),nvl(login_count,0),nvl(cart_count,0),nvl(order_count,0),nvl(order_amount,0.0),nvl(payment_count,0),nvl(payment_amount,0.0),order_stats
from tmp_login
full outer join tmp_cart on tmp_login.user_id=tmp_cart.user_id
full outer join tmp_order on tmp_login.user_id=tmp_order.user_id
full outer join tmp_payment on tmp_login.user_id=tmp_payment.user_id
full outer join tmp_order_detail on tmp_login.user_id=tmp_order_detail.user_id;

3.3.3 每日商品行为

drop table if exists dws_sku_action_daycount;
create external table dws_sku_action_daycount
(   sku_id string comment 'sku_id',order_count bigint comment '被下单次数',order_num bigint comment '被下单件数',order_amount decimal(16,2) comment '被下单金额',payment_count bigint  comment '被支付次数',payment_num bigint comment '被支付件数',payment_amount decimal(16,2) comment '被支付金额',refund_count bigint  comment '被退款次数',refund_num bigint comment '被退款件数',refund_amount  decimal(16,2) comment '被退款金额',cart_count bigint comment '被加入购物车次数',favor_count bigint comment '被收藏次数',appraise_good_count bigint comment '好评数',appraise_mid_count bigint comment '中评数',appraise_bad_count bigint comment '差评数',appraise_default_count bigint comment '默认评价数'
) COMMENT '每日商品行为'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dws/dws_sku_action_daycount/'
tblproperties ("parquet.compression"="lzo");
with
tmp_order as
(selectsku_id,count(*) order_count,sum(sku_num) order_num,sum(total_amount) order_amountfrom dwd_fact_order_detailwhere dt='2020-03-10'group by sku_id
),
tmp_payment as
(selectsku_id,count(*) payment_count,sum(sku_num) payment_num,sum(total_amount) payment_amountfrom dwd_fact_order_detailwhere dt='2020-03-10'and order_id in(selectidfrom dwd_fact_order_infowhere (dt='2020-03-10'or dt=date_add('2020-03-10',-1))and date_format(payment_time,'yyyy-MM-dd')='2020-03-10')group by sku_id
),
tmp_refund as
(selectsku_id,count(*) refund_count,sum(refund_num) refund_num,sum(refund_amount) refund_amountfrom dwd_fact_order_refund_infowhere dt='2020-03-10'group by sku_id
),
tmp_cart as
(selectsku_id,count(*) cart_countfrom dwd_fact_cart_infowhere dt='2020-03-10'and date_format(create_time,'yyyy-MM-dd')='2020-03-10'group by sku_id
),
tmp_favor as
(selectsku_id,count(*) favor_countfrom dwd_fact_favor_infowhere dt='2020-03-10'and date_format(create_time,'yyyy-MM-dd')='2020-03-10'group by sku_id
),
tmp_appraise as
(
selectsku_id,sum(if(appraise='1201',1,0)) appraise_good_count,sum(if(appraise='1202',1,0)) appraise_mid_count,sum(if(appraise='1203',1,0)) appraise_bad_count,sum(if(appraise='1204',1,0)) appraise_default_count
from dwd_fact_comment_info
where dt='2020-03-10'
group by sku_id
)insert overwrite table dws_sku_action_daycount partition(dt='2020-03-10')
selectsku_id,sum(order_count),sum(order_num),sum(order_amount),sum(payment_count),sum(payment_num),sum(payment_amount),sum(refund_count),sum(refund_num),sum(refund_amount),sum(cart_count),sum(favor_count),sum(appraise_good_count),sum(appraise_mid_count),sum(appraise_bad_count),sum(appraise_default_count)
from
(selectsku_id,order_count,order_num,order_amount,0 payment_count,0 payment_num,0 payment_amount,0 refund_count,0 refund_num,0 refund_amount,0 cart_count,0 favor_count,0 appraise_good_count,0 appraise_mid_count,0 appraise_bad_count,0 appraise_default_countfrom tmp_orderunion allselectsku_id,0 order_count,0 order_num,0 order_amount,payment_count,payment_num,payment_amount,0 refund_count,0 refund_num,0 refund_amount,0 cart_count,0 favor_count,0 appraise_good_count,0 appraise_mid_count,0 appraise_bad_count,0 appraise_default_countfrom tmp_paymentunion allselectsku_id,0 order_count,0 order_num,0 order_amount,0 payment_count,0 payment_num,0 payment_amount,refund_count,refund_num,refund_amount,0 cart_count,0 favor_count,0 appraise_good_count,0 appraise_mid_count,0 appraise_bad_count,0 appraise_default_count        from tmp_refundunion allselectsku_id,0 order_count,0 order_num,0 order_amount,0 payment_count,0 payment_num,0 payment_amount,0 refund_count,0 refund_num,0 refund_amount,cart_count,0 favor_count,0 appraise_good_count,0 appraise_mid_count,0 appraise_bad_count,0 appraise_default_countfrom tmp_cartunion allselectsku_id,0 order_count,0 order_num,0 order_amount,0 payment_count,0 payment_num,0 payment_amount,0 refund_count,0 refund_num,0 refund_amount,0 cart_count,favor_count,0 appraise_good_count,0 appraise_mid_count,0 appraise_bad_count,0 appraise_default_countfrom tmp_favorunion allselectsku_id,0 order_count,0 order_num,0 order_amount,0 payment_count,0 payment_num,0 payment_amount,0 refund_count,0 refund_num,0 refund_amount,0 cart_count,0 favor_count,appraise_good_count,appraise_mid_count,appraise_bad_count,appraise_default_countfrom tmp_appraise
)tmp
group by sku_id;

3.3.4 每日活动统计

drop table if exists dws_activity_info_daycount;
create external table dws_activity_info_daycount(`id` string COMMENT '编号',`activity_name` string  COMMENT '活动名称',`activity_type` string  COMMENT '活动类型',`start_time` string  COMMENT '开始时间',`end_time` string  COMMENT '结束时间',`create_time` string  COMMENT '创建时间',`order_count` bigint COMMENT '下单次数',`payment_count` bigint COMMENT '支付次数'
) COMMENT '购物车信息表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dws/dws_activity_info_daycount/'
tblproperties ("parquet.compression"="lzo");
insert overwrite table dws_activity_info_daycount partition(dt='2020-03-10')
selectoi.activity_id,ai.activity_name,ai.activity_type,ai.start_time,ai.end_time,ai.create_time,oi.order_count,oi.payment_count
from
(selectactivity_id,sum(if(date_format(create_time,'yyyy-MM-dd')='2020-03-10',1,0)) order_count,sum(if(date_format(payment_time,'yyyy-MM-dd')='2020-03-10',1,0)) payment_countfrom dwd_fact_order_infowhere (dt='2020-03-10' or dt=date_add('2020-03-10',-1))and activity_id is not nullgroup by activity_id
)oi
join
(select*from dwd_dim_activity_infowhere dt='2020-03-10'
)ai
on oi.activity_id=ai.id;

3.3.5 每日地区统计

drop table if exists dws_area_stats_daycount;
create external table dws_area_stats_daycount(`id` bigint COMMENT '编号',`province_name` string COMMENT '省份名称',`area_code` string COMMENT '地区编码',`iso_code` string COMMENT 'iso编码',`region_id` string COMMENT '地区ID',`region_name` string COMMENT '地区名称',`order_count` bigint COMMENT '下单次数',`order_amount` decimal(20,2) COMMENT '下单金额',`payment_count` bigint COMMENT '支付次数',`payment_amount` decimal(20,2) COMMENT '支付金额'
) COMMENT '购物车信息表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/gmall/dws/dws_area_stats_daycount/'
tblproperties ("parquet.compression"="lzo");
with
tmp_op as
(selectprovince_id,sum(if(date_format(create_time,'yyyy-MM-dd')='2020-03-10',1,0)) order_count,sum(if(date_format(create_time,'yyyy-MM-dd')='2020-03-10',final_total_amount,0)) order_amount,sum(if(date_format(payment_time,'yyyy-MM-dd')='2020-03-10',1,0)) payment_count,sum(if(date_format(payment_time,'yyyy-MM-dd')='2020-03-10',final_total_amount,0)) payment_amountfrom dwd_fact_order_infowhere (dt='2020-03-10' or dt=date_add('2020-03-10',-1))group by province_id
)
insert overwrite table dws_area_stats_daycount partition(dt='2020-03-10')
selectpro.id,pro.province_name,pro.area_code,pro.iso_code,pro.region_id,pro.region_name,nvl(tmp_op.order_count,0),nvl(tmp_op.order_amount,0.0),nvl(tmp_op.payment_count,0),nvl(tmp_op.payment_amount,0.0)
from dwd_dim_base_province pro
left join tmp_op on pro.id=tmp_op.province_id;

3.3.6 DWS 层数据导入脚本

#!/bin/bashAPP=gmall
hive=/opt/module/hive/bin/hive# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fisql="
insert overwrite table ${APP}.dws_uv_detail_daycount partition(dt='$do_date')
select  mid_id,concat_ws('|', collect_set(user_id)) user_id,concat_ws('|', collect_set(version_code)) version_code,concat_ws('|', collect_set(version_name)) version_name,concat_ws('|', collect_set(lang))lang,concat_ws('|', collect_set(source)) source,concat_ws('|', collect_set(os)) os,concat_ws('|', collect_set(area)) area, concat_ws('|', collect_set(model)) model,concat_ws('|', collect_set(brand)) brand,concat_ws('|', collect_set(sdk_version)) sdk_version,concat_ws('|', collect_set(gmail)) gmail,concat_ws('|', collect_set(height_width)) height_width,concat_ws('|', collect_set(app_time)) app_time,concat_ws('|', collect_set(network)) network,concat_ws('|', collect_set(lng)) lng,concat_ws('|', collect_set(lat)) lat
from ${APP}.dwd_start_log
where dt='$do_date'
group by mid_id;with
tmp_login as
(selectuser_id,count(*) login_countfrom ${APP}.dwd_start_logwhere dt='$do_date'and user_id is not nullgroup by user_id
),
tmp_cart as
(selectuser_id,count(*) cart_countfrom ${APP}.dwd_fact_cart_infowhere dt='$do_date'
and date_format(create_time,'yyyy-MM-dd')='$do_date'group by user_id
),
tmp_order as
(selectuser_id,count(*) order_count,sum(final_total_amount) order_amountfrom ${APP}.dwd_fact_order_infowhere dt='$do_date'group by user_id
) ,
tmp_payment as
(selectuser_id,count(*) payment_count,sum(payment_amount) payment_amountfrom ${APP}.dwd_fact_payment_infowhere dt='$do_date'group by user_id
),
tmp_order_detail as
(selectuser_id,collect_set(named_struct('sku_id',sku_id,'sku_num',sku_num,'order_count',order_count,'order_amount',order_amount)) order_statsfrom(selectuser_id,sku_id,sum(sku_num) sku_num,count(*) order_count,cast(sum(total_amount) as decimal(20,2)) order_amountfrom ${APP}.dwd_fact_order_detailwhere dt='$do_date'group by user_id,sku_id)tmpgroup by user_id
)insert overwrite table ${APP}.dws_user_action_daycount partition(dt='$do_date')
selectcoalesce(tmp_login.user_id,tmp_cart.user_id,tmp_order.user_id,tmp_payment.user_id,tmp_order_detail.user_id),nvl(login_count,0),nvl(cart_count,0),nvl(order_count,0),nvl(order_amount,0.0),nvl(payment_count,0),nvl(payment_amount,0.0),order_stats
from tmp_login
full outer join tmp_cart on tmp_login.user_id=tmp_cart.user_id
full outer join tmp_order on tmp_login.user_id=tmp_order.user_id
full outer join tmp_payment on tmp_login.user_id=tmp_payment.user_id
full outer join tmp_order_detail on tmp_login.user_id=tmp_order_detail.user_id;with
tmp_order as
(selectsku_id,count(*) order_count,sum(sku_num) order_num,sum(total_amount) order_amountfrom ${APP}.dwd_fact_order_detailwhere dt='$do_date'group by sku_id
),
tmp_payment as
(selectsku_id,count(*) payment_count,sum(sku_num) payment_num,sum(total_amount) payment_amountfrom ${APP}.dwd_fact_order_detailwhere dt='$do_date'and order_id in(selectidfrom ${APP}.dwd_fact_order_infowhere (dt='$do_date'or dt=date_add('$do_date',-1))and date_format(payment_time,'yyyy-MM-dd')='$do_date')group by sku_id
),
tmp_refund as
(selectsku_id,count(*) refund_count,sum(refund_num) refund_num,sum(refund_amount) refund_amountfrom ${APP}.dwd_fact_order_refund_infowhere dt='$do_date'group by sku_id
),
tmp_cart as
(selectsku_id,count(*) cart_countfrom ${APP}.dwd_fact_cart_infowhere dt='$do_date'and date_format(create_time,'yyyy-MM-dd')='$do_date'group by sku_id
),
tmp_favor as
(selectsku_id,count(*) favor_countfrom ${APP}.dwd_fact_favor_infowhere dt='$do_date'and date_format(create_time,'yyyy-MM-dd')='$do_date'group by sku_id
),
tmp_appraise as
(
selectsku_id,sum(if(appraise='1201',1,0)) appraise_good_count,sum(if(appraise='1202',1,0)) appraise_mid_count,sum(if(appraise='1203',1,0)) appraise_bad_count,sum(if(appraise='1204',1,0)) appraise_default_count
from ${APP}.dwd_fact_comment_info
where dt='$do_date'
group by sku_id
)insert overwrite table ${APP}.dws_sku_action_daycount partition(dt='$do_date')
selectsku_id,sum(order_count),sum(order_num),sum(order_amount),sum(payment_count),sum(payment_num),sum(payment_amount),sum(refund_count),sum(refund_num),sum(refund_amount),sum(cart_count),sum(favor_count),sum(appraise_good_count),sum(appraise_mid_count),sum(appraise_bad_count),sum(appraise_default_count)
from
(selectsku_id,order_count,order_num,order_amount,0 payment_count,0 payment_num,0 payment_amount,0 refund_count,0 refund_num,0 refund_amount,0 cart_count,0 favor_count,0 appraise_good_count,0 appraise_mid_count,0 appraise_bad_count,0 appraise_default_countfrom tmp_orderunion allselectsku_id,0 order_count,0 order_num,0 order_amount,payment_count,payment_num,payment_amount,0 refund_count,0 refund_num,0 refund_amount,0 cart_count,0 favor_count,0 appraise_good_count,0 appraise_mid_count,0 appraise_bad_count,0 appraise_default_countfrom tmp_paymentunion allselectsku_id,0 order_count,0 order_num,0 order_amount,0 payment_count,0 payment_num,0 payment_amount,refund_count,refund_num,refund_amount,0 cart_count,0 favor_count,0 appraise_good_count,0 appraise_mid_count,0 appraise_bad_count,0 appraise_default_count        from tmp_refundunion allselectsku_id,0 order_count,0 order_num,0 order_amount,0 payment_count,0 payment_num,0 payment_amount,0 refund_count,0 refund_num,0 refund_amount,cart_count,0 favor_count,0 appraise_good_count,0 appraise_mid_count,0 appraise_bad_count,0 appraise_default_countfrom tmp_cartunion allselectsku_id,0 order_count,0 order_num,0 order_amount,0 payment_count,0 payment_num,0 payment_amount,0 refund_count,0 refund_num,0 refund_amount,0 cart_count,favor_count,0 appraise_good_count,0 appraise_mid_count,0 appraise_bad_count,0 appraise_default_countfrom tmp_favorunion allselectsku_id,0 order_count,0 order_num,0 order_amount,0 payment_count,0 payment_num,0 payment_amount,0 refund_count,0 refund_num,0 refund_amount,0 cart_count,0 favor_count,appraise_good_count,appraise_mid_count,appraise_bad_count,appraise_default_countfrom tmp_appraise
)tmp
group by sku_id; insert overwrite table ${APP}.dws_activity_info_daycount partition(dt='$do_date')
selectoi.activity_id,ai.activity_name,ai.activity_type,ai.start_time,ai.end_time,ai.create_time,oi.order_count,oi.payment_count
from
(selectactivity_id,sum(if(date_format(create_time,'yyyy-MM-dd')='$do_date',1,0)) order_count,sum(if(date_format(payment_time,'yyyy-MM-dd')='$do_date',1,0)) payment_countfrom ${APP}.dwd_fact_order_infowhere (dt='$do_date' or dt=date_add('$do_date',-1))and activity_id is not nullgroup by activity_id
)oi
join
(select*from ${APP}.dwd_dim_activity_infowhere dt='$do_date'
)ai
on oi.activity_id=ai.id;with
tmp_op as
(selectprovince_id,sum(if(date_format(create_time,'yyyy-MM-dd')='$do_date',1,0)) order_count,sum(if(date_format(create_time,'yyyy-MM-dd')='$do_date',final_total_amount,0)) order_amount,sum(if(date_format(payment_time,'yyyy-MM-dd')='$do_date',1,0)) payment_count,sum(if(date_format(payment_time,'yyyy-MM-dd')='$do_date',final_total_amount,0)) payment_amountfrom ${APP}.dwd_fact_order_infowhere (dt='$do_date' or dt=date_add('$do_date',-1))group by province_id
)
insert overwrite table ${APP}.dws_area_stats_daycount partition(dt='$do_date')
selectpro.id,pro.province_name,pro.area_code,pro.iso_code,pro.region_id,pro.region_name,nvl(tmp_op.order_count,0),nvl(tmp_op.order_amount,0.0),nvl(tmp_op.payment_count,0),nvl(tmp_op.payment_amount,0.0)
from ${APP}.dwd_dim_base_province pro
left join tmp_op on pro.id=tmp_op.province_id;
"$hive -e "$sql"

3.4 DWT 层

3.4.1 设备主题宽表

drop table if exists dwt_uv_topic;
create external table dwt_uv_topic
(`mid_id` string COMMENT '设备唯一标识',`model` string COMMENT '手机型号',`brand` string COMMENT '手机品牌',`login_date_first` string  comment '首次活跃时间',`login_date_last` string  comment '末次活跃时间',`login_count` bigint comment '累积活跃天数'
)
stored as parquet
location '/warehouse/gmall/dwt/dwt_uv_topic';
insert overwrite table dwt_uv_topic
selectnvl(new.mid_id,old.mid_id),nvl(new.model,old.model),nvl(new.brand,old.brand),nvl(old.login_date_first,'2020-03-10'),if(new.mid_id is not null,'2020-03-10',old.login_date_last),nvl(old.login_count,0)+if(new.mid_id is not null,1,0)
from
(select*from dwt_uv_topic
)old
full outer join
(select*from dws_uv_detail_daycountwhere dt='2020-03-10'
)new
on old.mid_id=new.mid_id;

3.4.2 会员主题宽表

drop table if exists dwt_user_topic;
create external table dwt_user_topic
(user_id string  comment '用户id',login_date_first string  comment '首次登录时间',login_date_last string  comment '末次登录时间',login_count bigint comment '累积登录天数',login_last_30d_count bigint comment '最近30日登录天数',order_date_first string  comment '首次下单时间',order_date_last string  comment '末次下单时间',order_count bigint comment '累积下单次数',order_amount decimal(16,2) comment '累积下单金额',order_last_30d_count bigint comment '最近30日下单次数',order_last_30d_amount bigint comment '最近30日下单金额',payment_date_first string  comment '首次支付时间',payment_date_last string  comment '末次支付时间',payment_count decimal(16,2) comment '累积支付次数',payment_amount decimal(16,2) comment '累积支付金额',payment_last_30d_count decimal(16,2) comment '最近30日支付次数',payment_last_30d_amount decimal(16,2) comment '最近30日支付金额')COMMENT '用户主题宽表'
stored as parquet
location '/warehouse/gmall/dwt/dwt_user_topic/'
tblproperties ("parquet.compression"="lzo");
insert overwrite table dwt_user_topic
selectnvl(new.user_id,old.user_id),if(old.login_date_first is null and new.user_id is not null,'2020-03-10',old.login_date_first),if(new.user_id is not null,'2020-03-10',old.login_date_last),nvl(old.login_count,0)+if(new.user_id is not null,1,0),nvl(new.login_last_30d_count,0),if(old.order_date_first is null and new.order_count>0,'2020-03-10',old.order_date_first),if(new.order_count>0,'2020-03-10',old.order_date_last),nvl(old.order_count,0)+nvl(new.order_count,0),nvl(old.order_amount,0)+nvl(new.order_amount,0),nvl(new.order_last_30d_count,0),nvl(new.order_last_30d_amount,0),if(old.payment_date_first is null and new.payment_count>0,'2020-03-10',old.payment_date_first),if(new.payment_count>0,'2020-03-10',old.payment_date_last),nvl(old.payment_count,0)+nvl(new.payment_count,0),nvl(old.payment_amount,0)+nvl(new.payment_amount,0),nvl(new.payment_last_30d_count,0),nvl(new.payment_last_30d_amount,0)
from
dwt_user_topic old
full outer join
(selectuser_id,sum(if(dt='2020-03-10',order_count,0)) order_count,sum(if(dt='2020-03-10',order_amount,0)) order_amount,sum(if(dt='2020-03-10',payment_count,0)) payment_count,sum(if(dt='2020-03-10',payment_amount,0)) payment_amount,sum(if(login_count>0,1,0)) login_last_30d_count,sum(order_count) order_last_30d_count,sum(order_amount) order_last_30d_amount,sum(payment_count) payment_last_30d_count,sum(payment_amount) payment_last_30d_amountfrom dws_user_action_daycountwhere dt>=date_add( '2020-03-10',-30)group by user_id
)new
on old.user_id=new.user_id;

3.4.3 商品主题宽表

drop table if exists dwt_sku_topic;
create external table dwt_sku_topic
(sku_id string comment 'sku_id',spu_id string comment 'spu_id',order_last_30d_count bigint comment '最近30日被下单次数',order_last_30d_num bigint comment '最近30日被下单件数',order_last_30d_amount decimal(16,2)  comment '最近30日被下单金额',order_count bigint comment '累积被下单次数',order_num bigint comment '累积被下单件数',order_amount decimal(16,2) comment '累积被下单金额',payment_last_30d_count   bigint  comment '最近30日被支付次数',payment_last_30d_num bigint comment '最近30日被支付件数',payment_last_30d_amount  decimal(16,2) comment '最近30日被支付金额',payment_count   bigint  comment '累积被支付次数',payment_num bigint comment '累积被支付件数',payment_amount  decimal(16,2) comment '累积被支付金额',refund_last_30d_count bigint comment '最近三十日退款次数',refund_last_30d_num bigint comment '最近三十日退款件数',refund_last_30d_amount decimal(10,2) comment '最近三十日退款金额',refund_count bigint comment '累积退款次数',refund_num bigint comment '累积退款件数',refund_amount decimal(10,2) comment '累积退款金额',cart_last_30d_count bigint comment '最近30日被加入购物车次数',cart_count bigint comment '累积被加入购物车次数',favor_last_30d_count bigint comment '最近30日被收藏次数',favor_count bigint comment '累积被收藏次数',appraise_last_30d_good_count bigint comment '最近30日好评数',appraise_last_30d_mid_count bigint comment '最近30日中评数',appraise_last_30d_bad_count bigint comment '最近30日差评数',appraise_last_30d_default_count bigint comment '最近30日默认评价数',appraise_good_count bigint comment '累积好评数',appraise_mid_count bigint comment '累积中评数',appraise_bad_count bigint comment '累积差评数',appraise_default_count bigint comment '累积默认评价数')COMMENT '商品主题宽表'
stored as parquet
location '/warehouse/gmall/dwt/dwt_sku_topic/'
tblproperties ("parquet.compression"="lzo");
insert overwrite table dwt_sku_topic
select nvl(new.sku_id,old.sku_id),sku_info.spu_id,nvl(new.order_count30,0),nvl(new.order_num30,0),nvl(new.order_amount30,0),nvl(old.order_count,0) + nvl(new.order_count,0),nvl(old.order_num,0) + nvl(new.order_num,0),nvl(old.order_amount,0) + nvl(new.order_amount,0),nvl(new.payment_count30,0),nvl(new.payment_num30,0),nvl(new.payment_amount30,0),nvl(old.payment_count,0) + nvl(new.payment_count,0),nvl(old.payment_num,0) + nvl(new.payment_count,0),nvl(old.payment_amount,0) + nvl(new.payment_count,0),nvl(new.refund_count30,0),nvl(new.refund_num30,0),nvl(new.refund_amount30,0),nvl(old.refund_count,0) + nvl(new.refund_count,0),nvl(old.refund_num,0) + nvl(new.refund_num,0),nvl(old.refund_amount,0) + nvl(new.refund_amount,0),nvl(new.cart_count30,0),nvl(old.cart_count,0) + nvl(new.cart_count,0),nvl(new.favor_count30,0),nvl(old.favor_count,0) + nvl(new.favor_count,0),nvl(new.appraise_good_count30,0),nvl(new.appraise_mid_count30,0),nvl(new.appraise_bad_count30,0),nvl(new.appraise_default_count30,0)  ,nvl(old.appraise_good_count,0) + nvl(new.appraise_good_count,0),nvl(old.appraise_mid_count,0) + nvl(new.appraise_mid_count,0),nvl(old.appraise_bad_count,0) + nvl(new.appraise_bad_count,0),nvl(old.appraise_default_count,0) + nvl(new.appraise_default_count,0)
from
(selectsku_id,spu_id,order_last_30d_count,order_last_30d_num,order_last_30d_amount,order_count,order_num,order_amount  ,payment_last_30d_count,payment_last_30d_num,payment_last_30d_amount,payment_count,payment_num,payment_amount,refund_last_30d_count,refund_last_30d_num,refund_last_30d_amount,refund_count,refund_num,refund_amount,cart_last_30d_count,cart_count,favor_last_30d_count,favor_count,appraise_last_30d_good_count,appraise_last_30d_mid_count,appraise_last_30d_bad_count,appraise_last_30d_default_count,appraise_good_count,appraise_mid_count,appraise_bad_count,appraise_default_count from dwt_sku_topic
)old
full outer join
(select sku_id,sum(if(dt='2020-03-10', order_count,0 )) order_count,sum(if(dt='2020-03-10',order_num ,0 ))  order_num, sum(if(dt='2020-03-10',order_amount,0 )) order_amount ,sum(if(dt='2020-03-10',payment_count,0 )) payment_count,sum(if(dt='2020-03-10',payment_num,0 )) payment_num,sum(if(dt='2020-03-10',payment_amount,0 )) payment_amount,sum(if(dt='2020-03-10',refund_count,0 )) refund_count,sum(if(dt='2020-03-10',refund_num,0 )) refund_num,sum(if(dt='2020-03-10',refund_amount,0 )) refund_amount,  sum(if(dt='2020-03-10',cart_count,0 )) cart_count,sum(if(dt='2020-03-10',favor_count,0 )) favor_count,sum(if(dt='2020-03-10',appraise_good_count,0 )) appraise_good_count,  sum(if(dt='2020-03-10',appraise_mid_count,0 ) ) appraise_mid_count ,sum(if(dt='2020-03-10',appraise_bad_count,0 )) appraise_bad_count,  sum(if(dt='2020-03-10',appraise_default_count,0 )) appraise_default_count,sum(order_count) order_count30 ,sum(order_num) order_num30,sum(order_amount) order_amount30,sum(payment_count) payment_count30,sum(payment_num) payment_num30,sum(payment_amount) payment_amount30,sum(refund_count) refund_count30,sum(refund_num) refund_num30,sum(refund_amount) refund_amount30,sum(cart_count) cart_count30,sum(favor_count) favor_count30,sum(appraise_good_count) appraise_good_count30,sum(appraise_mid_count) appraise_mid_count30,sum(appraise_bad_count) appraise_bad_count30,sum(appraise_default_count) appraise_default_count30 from dws_sku_action_daycountwhere dt >= date_add ('2020-03-10', -30)group by sku_id
)new
on new.sku_id = old.sku_id
left join
(select * from dwd_dim_sku_info where dt='2020-03-10') sku_info
on nvl(new.sku_id,old.sku_id)= sku_info.id;

3.4.4 活动主题宽表

drop table if exists dwt_activity_topic;
create external table dwt_activity_topic(`id` string COMMENT '活动id',`activity_name` string  COMMENT '活动名称',`order_day_count` bigint COMMENT '当日日下单次数',`payment_day_count` bigint COMMENT '当日支付次数',`order_count` bigint COMMENT '累积下单次数',`payment_count` bigint COMMENT '累积支付次数'
) COMMENT '活动主题宽表'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwt/dwt_activity_topic/'
tblproperties ("parquet.compression"="lzo");
insert overwrite table dwt_activity_topic
selectnvl(new.id,old.id),nvl(new.activity_name,old.activity_name),nvl(new.order_count,0),nvl(new.payment_count,0),nvl(old.order_count,0)+nvl(new.order_count,0),nvl(old.payment_count,0)+nvl(new.payment_count,0)
from
(select*from dwt_activity_topic
)old
full outer join
(selectid,activity_name,order_count,payment_countfrom dws_activity_info_daycountwhere dt='2020-03-10'
)new
on old.id=new.id;

3.4.5 地区主题宽表

drop table if exists dwt_area_topic;
create external table dwt_area_topic(`id` bigint COMMENT '编号',`province_name` string COMMENT '省份名称',`area_code` string COMMENT '地区编码',`iso_code` string COMMENT 'iso编码',`region_id` string COMMENT '地区ID',`region_name` string COMMENT '地区名称',`order_day_count` bigint COMMENT '当天下单次数',`order_day_amount` decimal(20,2) COMMENT '当天下单金额',`order_last_30d_count` bigint COMMENT '最近30天下单次数',`order_last_30d_amount` decimal(20,2) COMMENT '最近30天下单金额',`payment_day_count` bigint COMMENT '当天支付次数',`payment_day_amount` decimal(20,2) COMMENT '当天支付金额',`payment_last_30d_count` bigint COMMENT '最近30天支付次数',`payment_last_30d_amount` decimal(20,2) COMMENT '最近30天支付金额'
) COMMENT '地区主题宽表'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwt/dwt_area_topic/'
tblproperties ("parquet.compression"="lzo");
insert overwrite table dwt_area_topic
selectnvl(old.id,new.id),nvl(old.province_name,new.province_name),nvl(old.area_code,new.area_code),nvl(old.iso_code,new.iso_code),nvl(old.region_id,new.region_id),nvl(old.region_name,new.region_name),nvl(new.order_day_count,0),nvl(new.order_day_amount,0.0),nvl(new.order_last_30d_count,0),nvl(new.order_last_30d_amount,0.0),nvl(new.payment_day_count,0),nvl(new.payment_day_amount,0.0),nvl(new.payment_last_30d_count,0),nvl(new.payment_last_30d_amount,0.0)
from
(select*from dwt_area_topic
)old
full outer join
(selectid,province_name,area_code,iso_code,region_id,region_name,sum(if(dt='2020-03-10',order_count,0)) order_day_count,sum(if(dt='2020-03-10',order_amount,0.0)) order_day_amount,sum(if(dt='2020-03-10',payment_count,0)) payment_day_count,sum(if(dt='2020-03-10',payment_amount,0.0)) payment_day_amount,sum(order_count) order_last_30d_count,sum(order_amount) order_last_30d_amount,sum(payment_count) payment_last_30d_count,sum(payment_amount) payment_last_30d_amountfrom dws_area_stats_daycountwhere dt>=date_add('2020-03-10',-30)group by id,province_name,area_code,iso_code,region_id,region_name
)new
on old.id=new.id;

3.4.6 DWT 层数据导入脚本

#!/bin/bashAPP=gmall
hive=/opt/module/hive/bin/hive# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;thendo_date=$1
else do_date=`date -d "-1 day" +%F`
fisql="
insert overwrite table ${APP}.dwt_uv_topic
selectnvl(new.mid_id,old.mid_id),nvl(new.model,old.model),nvl(new.brand,old.brand),nvl(old.login_date_first,'$do_date'),if(new.mid_id is not null,'$do_date',old.login_date_last),nvl(old.login_count,0)+if(new.mid_id is not null,1,0)
from
(select*from ${APP}.dwt_uv_topic
)old
full outer join
(select*from ${APP}.dws_uv_detail_daycountwhere dt='$do_date'
)new
on old.mid_id=new.mid_id;insert overwrite table ${APP}.dwt_user_topic
selectnvl(new.user_id,old.user_id),if(old.login_date_first is null and new.user_id is not null,'$do_date',old.login_date_first),if(new.user_id is not null,'$do_date',old.login_date_last),nvl(old.login_count,0)+if(new.user_id is not null,1,0),nvl(new.login_last_30d_count,0),if(old.order_date_first is null and new.order_count>0,'$do_date',old.order_date_first),if(new.order_count>0,'$do_date',old.order_date_last),nvl(old.order_count,0)+nvl(new.order_count,0),nvl(old.order_amount,0)+nvl(new.order_amount,0),nvl(new.order_last_30d_count,0),nvl(new.order_last_30d_amount,0),if(old.payment_date_first is null and new.payment_count>0,'$do_date',old.payment_date_first),if(new.payment_count>0,'$do_date',old.payment_date_last),nvl(old.payment_count,0)+nvl(new.payment_count,0),nvl(old.payment_amount,0)+nvl(new.payment_amount,0),nvl(new.payment_last_30d_count,0),nvl(new.payment_last_30d_amount,0)
from
${APP}.dwt_user_topic old
full outer join
(selectuser_id,sum(if(dt='$do_date',order_count,0)) order_count,sum(if(dt='$do_date',order_amount,0)) order_amount,sum(if(dt='$do_date',payment_count,0)) payment_count,sum(if(dt='$do_date',payment_amount,0)) payment_amount,sum(if(login_count>0,1,0)) login_last_30d_count,sum(order_count) order_last_30d_count,sum(order_amount) order_last_30d_amount,sum(payment_count) payment_last_30d_count,sum(payment_amount) payment_last_30d_amountfrom ${APP}.dws_user_action_daycountwhere dt>=date_add( '$do_date',-30)group by user_id
)new
on old.user_id=new.user_id;insert overwrite table ${APP}.dwt_sku_topic
select nvl(new.sku_id,old.sku_id),sku_info.spu_id,nvl(new.order_count30,0),nvl(new.order_num30,0),nvl(new.order_amount30,0),nvl(old.order_count,0) + nvl(new.order_count,0),nvl(old.order_num,0) + nvl(new.order_num,0),nvl(old.order_amount,0) + nvl(new.order_amount,0),nvl(new.payment_count30,0),nvl(new.payment_num30,0),nvl(new.payment_amount30,0),nvl(old.payment_count,0) + nvl(new.payment_count,0),nvl(old.payment_num,0) + nvl(new.payment_count,0),nvl(old.payment_amount,0) + nvl(new.payment_count,0),nvl(new.refund_count30,0),nvl(new.refund_num30,0),nvl(new.refund_amount30,0),nvl(old.refund_count,0) + nvl(new.refund_count,0),nvl(old.refund_num,0) + nvl(new.refund_num,0),nvl(old.refund_amount,0) + nvl(new.refund_amount,0),nvl(new.cart_count30,0),nvl(old.cart_count,0) + nvl(new.cart_count,0),nvl(new.favor_count30,0),nvl(old.favor_count,0) + nvl(new.favor_count,0),nvl(new.appraise_good_count30,0),nvl(new.appraise_mid_count30,0),nvl(new.appraise_bad_count30,0),nvl(new.appraise_default_count30,0)  ,nvl(old.appraise_good_count,0) + nvl(new.appraise_good_count,0),nvl(old.appraise_mid_count,0) + nvl(new.appraise_mid_count,0),nvl(old.appraise_bad_count,0) + nvl(new.appraise_bad_count,0),nvl(old.appraise_default_count,0) + nvl(new.appraise_default_count,0)
from
(selectsku_id,spu_id,order_last_30d_count,order_last_30d_num,order_last_30d_amount,order_count,order_num,order_amount  ,payment_last_30d_count,payment_last_30d_num,payment_last_30d_amount,payment_count,payment_num,payment_amount,refund_last_30d_count,refund_last_30d_num,refund_last_30d_amount,refund_count,refund_num,refund_amount,cart_last_30d_count,cart_count,favor_last_30d_count,favor_count,appraise_last_30d_good_count,appraise_last_30d_mid_count,appraise_last_30d_bad_count,appraise_last_30d_default_count,appraise_good_count,appraise_mid_count,appraise_bad_count,appraise_default_count from ${APP}.dwt_sku_topic
)old
full outer join
(select sku_id,sum(if(dt='$do_date', order_count,0 )) order_count,sum(if(dt='$do_date',order_num ,0 ))  order_num, sum(if(dt='$do_date',order_amount,0 )) order_amount ,sum(if(dt='$do_date',payment_count,0 )) payment_count,sum(if(dt='$do_date',payment_num,0 )) payment_num,sum(if(dt='$do_date',payment_amount,0 )) payment_amount,sum(if(dt='$do_date',refund_count,0 )) refund_count,sum(if(dt='$do_date',refund_num,0 )) refund_num,sum(if(dt='$do_date',refund_amount,0 )) refund_amount,  sum(if(dt='$do_date',cart_count,0 )) cart_count,sum(if(dt='$do_date',favor_count,0 )) favor_count,sum(if(dt='$do_date',appraise_good_count,0 )) appraise_good_count,  sum(if(dt='$do_date',appraise_mid_count,0 ) ) appraise_mid_count ,sum(if(dt='$do_date',appraise_bad_count,0 )) appraise_bad_count,  sum(if(dt='$do_date',appraise_default_count,0 )) appraise_default_count,sum(order_count) order_count30 ,sum(order_num) order_num30,sum(order_amount) order_amount30,sum(payment_count) payment_count30,sum(payment_num) payment_num30,sum(payment_amount) payment_amount30,sum(refund_count) refund_count30,sum(refund_num) refund_num30,sum(refund_amount) refund_amount30,sum(cart_count) cart_count30,sum(favor_count) favor_count30,sum(appraise_good_count) appraise_good_count30,sum(appraise_mid_count) appraise_mid_count30,sum(appraise_bad_count) appraise_bad_count30,sum(appraise_default_count) appraise_default_count30 from ${APP}.dws_sku_action_daycountwhere dt >= date_add ('$do_date', -30)group by sku_id
)new
on new.sku_id = old.sku_id
left join
(select * from ${APP}.dwd_dim_sku_info where dt='$do_date') sku_info
on nvl(new.sku_id,old.sku_id)= sku_info.id;insert overwrite table ${APP}.dwt_activity_topic
selectnvl(new.id,old.id),nvl(new.activity_name,old.activity_name),nvl(new.order_count,0),nvl(new.payment_count,0),nvl(old.order_count,0)+nvl(new.order_count,0),nvl(old.payment_count,0)+nvl(new.payment_count,0)
from
(select*from ${APP}.dwt_activity_topic
)old
full outer join
(selectid,activity_name,order_count,payment_countfrom ${APP}.dws_activity_info_daycountwhere dt='$do_date'
)new
on old.id=new.id;insert overwrite table ${APP}.dwt_area_topic
selectnvl(old.id,new.id),nvl(old.province_name,new.province_name),nvl(old.area_code,new.area_code),nvl(old.iso_code,new.iso_code),nvl(old.region_id,new.region_id),nvl(old.region_name,new.region_name),nvl(new.order_day_count,0),nvl(new.order_day_amount,0.0),nvl(new.order_last_30d_count,0),nvl(new.order_last_30d_amount,0.0),nvl(new.payment_day_count,0),nvl(new.payment_day_amount,0.0),nvl(new.payment_last_30d_count,0),nvl(new.payment_last_30d_amount,0.0)
from
(select*from ${APP}.dwt_area_topic
)old
full outer join
(selectid,province_name,area_code,iso_code,region_id,region_name,sum(if(dt='$do_date',order_count,0)) order_day_count,sum(if(dt='$do_date',order_amount,0.0)) order_day_amount,sum(if(dt='$do_date',payment_count,0)) payment_day_count,sum(if(dt='$do_date',payment_amount,0.0)) payment_day_amount,sum(order_count) order_last_30d_count,sum(order_amount) order_last_30d_amount,sum(payment_count) payment_last_30d_count,sum(payment_amount) payment_last_30d_amountfrom ${APP}.dws_area_stats_daycountwhere dt>=date_add('$do_date',-30)group by id,province_name,area_code,iso_code,region_id,region_name
)new
on old.id=new.id;
"$hive -e "$sql"

3.5 ADS 层

3.5.1 设备主题

  • 活跃设备数(日、周、月)
drop table if exists ads_uv_count;
create external table ads_uv_count( `dt` string COMMENT '统计日期',`day_count` bigint COMMENT '当日用户数量',`wk_count`  bigint COMMENT '当周用户数量',`mn_count`  bigint COMMENT '当月用户数量',`is_weekend` string COMMENT 'Y,N是否是周末,用于得到本周最终结果',`is_monthend` string COMMENT 'Y,N是否是月末,用于得到本月最终结果'
) COMMENT '活跃设备数'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_uv_count/';
insert into table ads_uv_count
select  '2020-03-10' dt,daycount.ct,wkcount.ct,mncount.ct,if(date_add(next_day('2020-03-10','MO'),-1)='2020-03-10','Y','N') ,if(last_day('2020-03-10')='2020-03-10','Y','N')
from
(select  '2020-03-10' dt,count(*) ctfrom dwt_uv_topicwhere login_date_last='2020-03-10'
)daycount join
( select  '2020-03-10' dt,count (*) ctfrom dwt_uv_topicwhere login_date_last>=date_add(next_day('2020-03-10','MO'),-7) and login_date_last<= date_add(next_day('2020-03-10','MO'),-1)
) wkcount on daycount.dt=wkcount.dt
join
( select  '2020-03-10' dt,count (*) ctfrom dwt_uv_topicwhere date_format(login_date_last,'yyyy-MM')=date_format('2020-03-10','yyyy-MM')
)mncount on daycount.dt=mncount.dt;
  • 每日新增设备
drop table if exists ads_new_mid_count;
create external table ads_new_mid_count
(`create_date`     string comment '创建时间' ,`new_mid_count`   BIGINT comment '新增设备数量'
)  COMMENT '每日新增设备信息数量'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_new_mid_count/';
insert into table ads_new_mid_count
selectlogin_date_first,count(*)
from dwt_uv_topic
where login_date_first='2020-03-10'
group by login_date_first;
  • 沉默用户数
drop table if exists ads_silent_count;
create external table ads_silent_count( `dt` string COMMENT '统计日期',`silent_count` bigint COMMENT '沉默设备数'
)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_silent_count';
insert into table ads_silent_count
select'2020-03-15',count(*)
from dwt_uv_topic
where login_date_first=login_date_last
and login_date_last<=date_add('2020-03-15',-7);
  • 本周回流用户

本周回流用户:上周未活跃,本周活跃的设备,且不是本周新增设备。

drop table if exists ads_back_count;
create external table ads_back_count( `dt` string COMMENT '统计日期',`wk_dt` string COMMENT '统计日期所在周',`wastage_count` bigint COMMENT '回流设备数'
)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_back_count';
insert into table ads_back_count
select
'2020-03-15',
concat(date_add(next_day('2020-03-10','MO'),-7),'_', date_add(next_day('2020-03-15','MO'),-1)),count(*)
from
(selectmid_idfrom dwt_uv_topicwhere login_date_last>=date_add(next_day('2020-03-10','MO'),-7) and login_date_last<= date_add(next_day('2020-03-10','MO'),-1)and login_date_first<date_add(next_day('2020-03-10','MO'),-7)
)current_wk
left join
(selectmid_idfrom dws_uv_detail_daycountwhere dt>=date_add(next_day('2020-03-10','MO'),-7*2) and dt<= date_add(next_day('2020-03-10','MO'),-7-1) group by mid_id
)last_wk
on current_wk.mid_id=last_wk.mid_id
where last_wk.mid_id is null;
  • 流失用户数

流失用户:最近7天未活跃的设备。

drop table if exists ads_wastage_count;
create external table ads_wastage_count( `dt` string COMMENT '统计日期',`wastage_count` bigint COMMENT '流失设备数'
)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_wastage_count';
insert into table ads_wastage_count
select'2020-03-20',count(*)
from
(select mid_idfrom dwt_uv_topicwhere login_date_last<=date_add('2020-03-20',-7)group by mid_id
)t1;
  • 留存率

drop table if exists ads_user_retention_day_rate;
create external table ads_user_retention_day_rate
(`stat_date`          string comment '统计日期',`create_date`       string  comment '设备新增日期',`retention_day`     int comment '截止当前日期留存天数',`retention_count`    bigint comment  '留存数量',`new_mid_count`     bigint comment '设备新增数量',`retention_ratio`   decimal(10,2) comment '留存率'
)  COMMENT '每日用户留存情况'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_user_retention_day_rate/';
insert into table ads_user_retention_day_rate
select'2020-03-10',--统计日期date_add('2020-03-10',-1),--新增日期1,--留存天数sum(if(login_date_first=date_add('2020-03-10',-1) and login_date_last='2020-03-10',1,0)),--2020-03-09的1日留存数sum(if(login_date_first=date_add('2020-03-10',-1),1,0)),--2020-03-09新增sum(if(login_date_first=date_add('2020-03-10',-1) and login_date_last='2020-03-10',1,0))/sum(if(login_date_first=date_add('2020-03-10',-1),1,0))*100
from dwt_uv_topicunion allselect'2020-03-10',--统计日期date_add('2020-03-10',-2),--新增日期2,--留存天数sum(if(login_date_first=date_add('2020-03-10',-2) and login_date_last='2020-03-10',1,0)),--2020-03-08的2日留存数sum(if(login_date_first=date_add('2020-03-10',-2),1,0)),--2020-03-08新增sum(if(login_date_first=date_add('2020-03-10',-2) and login_date_last='2020-03-10',1,0))/sum(if(login_date_first=date_add('2020-03-10',-2),1,0))*100
from dwt_uv_topicunion allselect'2020-03-10',--统计日期date_add('2020-03-10',-3),--新增日期3,--留存天数sum(if(login_date_first=date_add('2020-03-10',-3) and login_date_last='2020-03-10',1,0)),--2020-03-07的3日留存数sum(if(login_date_first=date_add('2020-03-10',-3),1,0)),--2020-03-07新增sum(if(login_date_first=date_add('2020-03-10',-3) and login_date_last='2020-03-10',1,0))/sum(if(login_date_first=date_add('2020-03-10',-3),1,0))*100
from dwt_uv_topic;
  • 最近连续三周活跃用户数
drop table if exists ads_continuity_wk_count;
create external table ads_continuity_wk_count( `dt` string COMMENT '统计日期,一般用结束周周日日期,如果每天计算一次,可用当天日期',`wk_dt` string COMMENT '持续时间',`continuity_count` bigint COMMENT '活跃次数'
)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_continuity_wk_count';
insert into table ads_continuity_wk_count
select'2020-03-10',concat(date_add(next_day('2020-03-10','MO'),-7*3),'_',date_add(next_day('2020-03-10','MO'),-1)),count(*)
from
(selectmid_idfrom(selectmid_idfrom dws_uv_detail_daycountwhere dt>=date_add(next_day('2020-03-10','monday'),-7)and dt<=date_add(next_day('2020-03-10','monday'),-1)group by mid_idunion allselectmid_idfrom dws_uv_detail_daycountwhere dt>=date_add(next_day('2020-03-10','monday'),-7*2)and dt<=date_add(next_day('2020-03-10','monday'),-7-1)group by mid_idunion allselectmid_idfrom dws_uv_detail_daycountwhere dt>=date_add(next_day('2020-03-10','monday'),-7*3)and dt<=date_add(next_day('2020-03-10','monday'),-7*2-1)group by mid_id)t1group by mid_idhaving count(*)=3
)t2
  • 最近七天内连续三天活跃用户数
drop table if exists ads_continuity_uv_count;
create external table ads_continuity_uv_count( `dt` string COMMENT '统计日期',`wk_dt` string COMMENT '最近7天日期',`continuity_count` bigint
) COMMENT '连续活跃设备数'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_continuity_uv_count';
insert into table ads_continuity_uv_count
select'2020-03-12',concat(date_add('2020-03-12',-6),'_','2020-03-12'),count(*)
from
(select mid_idfrom(select mid_id      from(select mid_id,date_sub(dt,rank) date_diffrom(select mid_id,dt,rank() over(partition by mid_id order by dt) rankfrom dws_uv_detail_daycountwhere dt>=date_add('2020-03-12',-6) and dt<='2020-03-12')t1)t2 group by mid_id,date_difhaving count(*)>=3)t3 group by mid_id
)t4;

3.5.2 会员主题

  • 会员主题信息
drop table if exists ads_user_topic;
create external table ads_user_topic(`dt` string COMMENT '统计日期',`day_users` string COMMENT '活跃会员数',`day_new_users` string COMMENT '新增会员数',`day_new_payment_users` string COMMENT '新增消费会员数',`payment_users` string COMMENT '总付费会员数',`users` string COMMENT '总会员数',`day_users2users` decimal(10,2) COMMENT '会员活跃率',`payment_users2users` decimal(10,2) COMMENT '会员付费率',`day_new_users2users` decimal(10,2) COMMENT '会员新鲜度'
) COMMENT '会员主题信息表'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_user_topic';
insert into table ads_user_topic
select'2020-03-10',sum(if(login_date_last='2020-03-10',1,0)),sum(if(login_date_first='2020-03-10',1,0)),sum(if(payment_date_first='2020-03-10',1,0)),sum(if(payment_count>0,1,0)),count(*),sum(if(login_date_last='2020-03-10',1,0))/count(*),sum(if(payment_count>0,1,0))/count(*),sum(if(login_date_first='2020-03-10',1,0))/sum(if(login_date_last='2020-03-10',1,0))
from dwt_user_topic
  • 漏斗

统计“首页->购物车->下单->支付”的转化率

思路:统计各个行为的人数,然后计算比值。

drop table if exists ads_user_action_convert_day;
create external  table ads_user_action_convert_day(`dt` string COMMENT '统计日期',`total_visitor_m_count`  bigint COMMENT '总访问人数',`cart_u_count` bigint COMMENT '加入购物车的人数',`visitor2cart_convert_ratio` decimal(10,2) COMMENT '访问到加入购物车转化率',`order_u_count` bigint     COMMENT '下单人数',`cart2order_convert_ratio`  decimal(10,2) COMMENT '加入购物车到下单转化率',`payment_u_count` bigint     COMMENT '支付人数',`order2payment_convert_ratio` decimal(10,2) COMMENT '下单到支付的转化率') COMMENT '用户行为漏斗分析'
row format delimited  fields terminated by '\t'
location '/warehouse/gmall/ads/ads_user_action_convert_day/';
insert into table ads_user_action_convert_day
select '2020-03-10',uv.day_count,ua.cart_count,cast(ua.cart_count/uv.day_count as  decimal(10,2)) visitor2cart_convert_ratio,ua.order_count,cast(ua.order_count/ua.cart_count as  decimal(10,2)) visitor2order_convert_ratio,ua.payment_count,cast(ua.payment_count/ua.order_count as  decimal(10,2)) order2payment_convert_ratio
from
(select dt,sum(if(cart_count>0,1,0)) cart_count,sum(if(order_count>0,1,0)) order_count,sum(if(payment_count>0,1,0)) payment_countfrom dws_user_action_daycount
where dt='2020-03-10'
group by dt
)ua join ads_uv_count uv on uv.dt=ua.dt;

3.5.3 商品主题

  • 商品个数信息
drop table if exists ads_product_info;
create external table ads_product_info(`dt` string COMMENT '统计日期',`sku_num` string COMMENT 'sku个数',`spu_num` string COMMENT 'spu个数'
) COMMENT '商品个数信息'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_product_info';
insert into table ads_product_info
select'2020-03-10' dt,sku_num,spu_num
from
(select'2020-03-10' dt,count(*) sku_numfromdwt_sku_topic
) tmp_sku_num
join
(select'2020-03-10' dt,count(*) spu_numfrom(selectspu_idfromdwt_sku_topicgroup byspu_id) tmp_spu_id
) tmp_spu_num
ontmp_sku_num.dt=tmp_spu_num.dt;
  • 商品销量排名
drop table if exists ads_product_sale_topN;
create external table ads_product_sale_topN(`dt` string COMMENT '统计日期',`sku_id` string COMMENT '商品ID',`payment_amount` bigint COMMENT '销量'
) COMMENT '商品个数信息'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_product_sale_topN';
insert into table ads_product_sale_topN
select'2020-03-10' dt,sku_id,payment_amount
fromdws_sku_action_daycount
wheredt='2020-03-10'
order by payment_amount desc
limit 10;
  • 商品收藏排名
drop table if exists ads_product_favor_topN;
create external table ads_product_favor_topN(`dt` string COMMENT '统计日期',`sku_id` string COMMENT '商品ID',`favor_count` bigint COMMENT '收藏量'
) COMMENT '商品收藏TopN'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_product_favor_topN';
insert into table ads_product_favor_topN
select'2020-03-10' dt,sku_id,favor_count
fromdws_sku_action_daycount
wheredt='2020-03-10'
order by favor_count desc
limit 10;
  • 商品加入购物车排名
drop table if exists ads_product_cart_topN;
create external table ads_product_cart_topN(`dt` string COMMENT '统计日期',`sku_id` string COMMENT '商品ID',`cart_count` bigint COMMENT '加入购物车次数'
) COMMENT '商品加入购物车TopN'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_product_cart_topN';
insert into table ads_product_cart_topN
select'2020-03-10' dt,sku_id,cart_count
fromdws_sku_action_daycount
wheredt='2020-03-10'
order by cart_count desc
limit 10;
  • 商品退款率排名(最近30天)
drop table if exists ads_product_refund_topN;
create external table ads_product_refund_topN(`dt` string COMMENT '统计日期',`sku_id` string COMMENT '商品ID',`refund_ratio` decimal(10,2) COMMENT '退款率'
) COMMENT '商品退款率TopN'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_product_refund_topN';
insert into table ads_product_refund_topN
select'2020-03-10',sku_id,refund_last_30d_count/payment_last_30d_count*100 refund_ratio
from dwt_sku_topic
order by refund_ratio desc
limit 10;
  • 商品差评率
drop table if exists ads_appraise_bad_topN;
create external table ads_appraise_bad_topN(`dt` string COMMENT '统计日期',`sku_id` string COMMENT '商品ID',`appraise_bad_ratio` decimal(10,2) COMMENT '差评率'
) COMMENT '商品差评率TopN'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_appraise_bad_topN';
insert into table ads_appraise_bad_topN
select'2020-03-10' dt,sku_id,
appraise_bad_count/(appraise_good_count+appraise_mid_count+appraise_bad_count+appraise_default_count) appraise_bad_ratio
fromdws_sku_action_daycount
wheredt='2020-03-10'
order by appraise_bad_ratio desc
limit 10;

3.5.4 营销主题(用户+商品+购买行为)

  • 下单数目统计

需求分析:统计每日下单数,下单金额及下单用户数。

drop table if exists ads_order_daycount;
create external table ads_order_daycount(dt string comment '统计日期',order_count bigint comment '单日下单笔数',order_amount bigint comment '单日下单金额',order_users bigint comment '单日下单用户数'
) comment '每日订单总计表'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_order_daycount';
insert into table ads_order_daycount
select'2020-03-10',sum(order_count),sum(order_amount),sum(if(order_count>0,1,0))
from dws_user_action_daycount
where dt='2020-03-10';
  • 支付信息统计

每日支付金额、支付人数、支付商品数、支付笔数以及下单到支付的平均时长(取自DWD)

drop table if exists ads_payment_daycount;
create external table ads_payment_daycount(dt string comment '统计日期',order_count bigint comment '单日支付笔数',order_amount bigint comment '单日支付金额',payment_user_count bigint comment '单日支付人数',payment_sku_count bigint comment '单日支付商品数',payment_avg_time double comment '下单到支付的平均时长,取分钟数'
) comment '每日订单总计表'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_payment_daycount';
insert into table ads_payment_daycount
selecttmp_payment.dt,tmp_payment.payment_count,tmp_payment.payment_amount,tmp_payment.payment_user_count,tmp_skucount.payment_sku_count,tmp_time.payment_avg_time
from
(select'2020-03-10' dt,sum(payment_count) payment_count,sum(payment_amount) payment_amount,sum(if(payment_count>0,1,0)) payment_user_countfrom dws_user_action_daycountwhere dt='2020-03-10'
)tmp_payment
join
(select'2020-03-10' dt,sum(if(payment_count>0,1,0)) payment_sku_count from dws_sku_action_daycountwhere dt='2020-03-10'
)tmp_skucount on tmp_payment.dt=tmp_skucount.dt
join
(select'2020-03-10' dt,sum(unix_timestamp(payment_time)-unix_timestamp(create_time))/count(*)/60 payment_avg_timefrom dwd_fact_order_infowhere dt='2020-03-10'and payment_time is not null
)tmp_time on tmp_payment.dt=tmp_time.dt
  • 品牌复购率
drop table ads_sale_tm_category1_stat_mn;
create external table ads_sale_tm_category1_stat_mn
(  tm_id string comment '品牌id',category1_id string comment '1级品类id ',category1_name string comment '1级品类名称 ',buycount   bigint comment  '购买人数',buy_twice_last bigint  comment '两次以上购买人数',buy_twice_last_ratio decimal(10,2)  comment  '单次复购率',buy_3times_last   bigint comment   '三次以上购买人数',buy_3times_last_ratio decimal(10,2)  comment  '多次复购率',stat_mn string comment '统计月份',stat_date string comment '统计日期'
)   COMMENT '复购率统计'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_sale_tm_category1_stat_mn/';
with
tmp_order as
(selectuser_id,order_stats_struct.sku_id sku_id,order_stats_struct.order_count order_countfrom dws_user_action_daycount lateral view explode(order_detail_stats) tmp as order_stats_structwhere date_format(dt,'yyyy-MM')=date_format('2020-03-10','yyyy-MM')
),
tmp_sku as
(selectid,tm_id,category1_id,category1_namefrom dwd_dim_sku_infowhere dt='2020-03-10'
)
insert into table ads_sale_tm_category1_stat_mn
selecttm_id,category1_id,category1_name,sum(if(order_count>=1,1,0)) buycount,sum(if(order_count>=2,1,0)) buyTwiceLast,sum(if(order_count>=2,1,0))/sum( if(order_count>=1,1,0)) buyTwiceLastRatio,sum(if(order_count>=3,1,0))  buy3timeLast  ,sum(if(order_count>=3,1,0))/sum( if(order_count>=1,1,0)) buy3timeLastRatio ,date_format('2020-03-10' ,'yyyy-MM') stat_mn,'2020-03-10' stat_date
from
(select tmp_order.user_id,tmp_sku.category1_id,tmp_sku.category1_name,tmp_sku.tm_id,sum(order_count) order_countfrom tmp_orderjoin tmp_skuon tmp_order.sku_id=tmp_sku.idgroup by tmp_order.user_id,tmp_sku.category1_id,tmp_sku.category1_name,tmp_sku.tm_id
)tmp
group by tm_id, category1_id, category1_name

3.5.5 地区主题信息

drop table if exists ads_area_topic;
create external table ads_area_topic(`dt` string COMMENT '统计日期',`id` bigint COMMENT '编号',`province_name` string COMMENT '省份名称',`area_code` string COMMENT '地区编码',`iso_code` string COMMENT 'iso编码',`region_id` string COMMENT '地区ID',`region_name` string COMMENT '地区名称',`order_day_count` bigint COMMENT '当天下单次数',`order_day_amount` decimal(20,2) COMMENT '当天下单金额',`payment_day_count` bigint COMMENT '当天支付次数',`payment_day_amount` decimal(20,2) COMMENT '当天支付金额'
) COMMENT '地区主题宽表'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/ads/ads_area_topic/';
insert into table ads_area_topic
select'2020-03-10',id,province_name,area_code,iso_code,region_id,region_name,order_day_count,order_day_amount,payment_day_count,payment_day_amount
from dwt_area_topic;

四、Azkaban

4.1 安装部署

参考如何编译:https://blog.csdn.net/weixin_38822045/article/details/91857425

[omm@simwor01 azkaban]$ pwd
/opt/soft/azkaban
[omm@simwor01 azkaban]$ tar -zxf azkaban-db-3.73.1.tar.gz -C /opt/module/azkaban/
[omm@simwor01 azkaban]$ tar -zxf azkaban-exec-server-3.73.1.tar.gz -C /opt/module/azkaban/
[omm@simwor01 azkaban]$ tar -zxf azkaban-web-server-3.73.1.tar.gz -C /opt/module/azkaban/[omm@simwor01 azkaban]$ cd /opt/module/azkaban/
[omm@simwor01 azkaban]$ ln -s azkaban-web-server-3.73.1 server
[omm@simwor01 azkaban]$ ln -s azkaban-exec-server-3.73.1 executor
[omm@simwor01 ~]$ mysql -uroot -pabcd1234..mysql> create database azkaban;mysql> use azkaban;mysql> source /opt/module/azkaban/azkaban-db-3.73.1/create-all-sql-3.73.1.sql

把配置文件中的相对路径改成绝对路径,这样可以站在任意目录启动服务而不会报文件或目录找不到的错误。

[omm@simwor01 conf]$ pwd
/opt/module/azkaban/server/conf
[omm@simwor01 conf]$ grep "/opt/module/azkaban" azkaban.properties
web.resource.dir=/opt/module/azkaban/server/web/
user.manager.xml.file=/opt/module/azkaban/server/conf/azkaban-users.xml
executor.global.properties=/opt/module/azkaban/executor/conf/global.properties
[omm@simwor01 conf]$ grep Shanghai azkaban.properties
default.timezone.id=Asia/Shanghai
[omm@simwor01 conf]$ grep ^mysql azkaban.properties
mysql.port=3306
mysql.host=simwor01
mysql.database=azkaban
mysql.user=root
mysql.password=abcd1234..
mysql.numconnections=100
[omm@simwor01 conf]$
[omm@simwor01 conf]$ vi azkaban-users.xml
[omm@simwor01 conf]$ cat azkaban-users.xml
<azkaban-users><user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/><user password="metrics" roles="metrics" username="metrics"/><user password="abcd1234.." roles="admin" username="admin"/><role name="admin" permissions="ADMIN"/><role name="metrics" permissions="METRICS"/>
</azkaban-users>
[omm@simwor01 conf]$
[omm@simwor01 conf]$ pwd
/opt/module/azkaban/executor/conf
[omm@simwor01 conf]$ grep "/opt/module" azkaban.properties
web.resource.dir=/opt/module/azkaban/executor/web/
user.manager.xml.file=/opt/module/azkaban/server/conf/azkaban-users.xml
executor.global.properties=/opt/module/azkaban/executor/conf/global.properties
[omm@simwor01 conf]$ grep Shanghai azkaban.properties
default.timezone.id=Asia/Shanghai
[omm@simwor01 conf]$ grep azkaban.webserver.url azkaban.properties
azkaban.webserver.url=http://simwor01:8081
[omm@simwor01 conf]$ grep ^mysql azkaban.properties
mysql.port=3306
mysql.host=simwor01
mysql.database=azkaban
mysql.user=root
mysql.password=abcd1234..
mysql.numconnections=100
[omm@simwor01 conf]$
[omm@simwor01 azkaban]$ pwd
/opt/module/azkaban
[omm@simwor01 azkaban]$ ./server/bin/start-web.sh
[omm@simwor01 azkaban]$ ./executor/bin/start-exec.sh
[omm@simwor01 azkaban]$

会报找不到活跃的 Executor,修改下MySQL再重启下就可以了。

2021/05/30 08:36:42.126 +0800 INFO [ExecutorManager] [Azkaban] Initializing executors from database.
2021/05/30 08:36:42.127 +0800 ERROR [ExecutorManager] [Azkaban] No active executors found
2021/05/30 08:36:42.127 +0800 ERROR [StdOutErrRedirect] [Azkaban] Exception in thread "main"
2021/05/30 08:36:42.127 +0800 ERROR [StdOutErrRedirect] [Azkaban] azkaban.executor.ExecutorManagerException: No active executors found
2021/05/30 08:36:42.127 +0800 ERROR [StdOutErrRedirect] [Azkaban]       at azkaban.executor.ActiveExecutors.setupExecutors(ActiveExecutors.java:52)
2021/05/30 08:36:42.127 +0800 ERROR [StdOutErrRedirect] [Azkaban]       at azkaban.executor.ExecutorManager.setupExecutors(ExecutorManager.java:197)
2021/05/30 08:36:42.127 +0800 ERROR [StdOutErrRedirect] [Azkaban]       at azkaban.executor.ExecutorManager.initialize(ExecutorManager.java:131)
2021/05/30 08:36:42.127 +0800 ERROR [StdOutErrRedirect] [Azkaban]       at azkaban.executor.ExecutorManager.start(ExecutorManager.java:145)
2021/05/30 08:36:42.127 +0800 ERROR [StdOutErrRedirect] [Azkaban]       at azkaban.webapp.AzkabanWebServer.launch(AzkabanWebServer.java:231)
2021/05/30 08:36:42.127 +0800 ERROR [StdOutErrRedirect] [Azkaban]       at azkaban.webapp.AzkabanWebServer.main(AzkabanWebServer.java:224)
[omm@simwor01 ~]$ mysql -uroot -pabcd1234..mysql> update azkaban.executors set active=1;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0mysql>

默认端口是 http://192.168.1.101:8081,因为没配SSL所以用http访问。

4.2 HDFS -> Sqoop -> MySQL

  • 创建用户主题
DROP TABLE IF EXISTS `ads_user_topic`;
CREATE TABLE `ads_user_topic`  (`dt` date NOT NULL,`day_users` bigint(255) NULL DEFAULT NULL,`day_new_users` bigint(255) NULL DEFAULT NULL,`day_new_payment_users` bigint(255) NULL DEFAULT NULL,`payment_users` bigint(255) NULL DEFAULT NULL,`users` bigint(255) NULL DEFAULT NULL,`day_users2users` double(255, 2) NULL DEFAULT NULL,`payment_users2users` double(255, 2) NULL DEFAULT NULL,`day_new_users2users` double(255, 2) NULL DEFAULT NULL,PRIMARY KEY (`dt`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
  • 创建地区主题表
DROP TABLE IF EXISTS `ads_area_topic`;
CREATE TABLE `ads_area_topic`  (`dt` date NOT NULL,`id` int(11) NULL DEFAULT NULL,`province_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`area_code` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`iso_code` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,`region_id` int(11) NULL DEFAULT NULL,`region_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`order_day_count` bigint(255) NULL DEFAULT NULL,`order_day_amount` double(255, 2) NULL DEFAULT NULL,`payment_day_count` bigint(255) NULL DEFAULT NULL,`payment_day_amount` double(255, 2) NULL DEFAULT NULL,PRIMARY KEY (`dt`, `iso_code`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
  • sqoop 导出脚本
#!/bin/bashhive_db_name=gmall
mysql_db_name=gmall_reportexport_data() {/opt/module/sqoop/bin/sqoop export \
--connect "jdbc:mysql://simwor01:3306/${mysql_db_name}?useUnicode=true&characterEncoding=utf-8"  \
--username root \
--password abcd1234.. \
--table $1 \
--num-mappers 1 \
--export-dir /warehouse/$hive_db_name/ads/$1 \
--input-fields-terminated-by "\t" \
--update-mode allowinsert \
--update-key $2 \
--input-null-string '\\N'    \
--input-null-non-string '\\N'
}case $1 in"ads_uv_count")export_data "ads_uv_count" "dt"
;;"ads_user_action_convert_day") export_data "ads_user_action_convert_day" "dt"
;;"ads_user_topic")export_data "ads_user_topic" "dt"
;;"ads_area_topic")export_data "ads_area_topic" "dt,iso_code"
;;"all")export_data "ads_user_topic" "dt"export_data "ads_area_topic" "dt,iso_code"#其余表省略未写
;;
esac

4.3 全流程调度

  • mysql_to_hdfs.job
type=command
command=/home/omm/bin/mysql_to_hdfs.sh all ${dt}
  • hdfs_to_ods_log.job
type=command
command=/home/omm/bin/hdfs_to_ods_log.sh ${dt}
  • hdfs_to_ods_db.job
type=command
command=/home/omm/bin/hdfs_to_ods_db.sh all ${dt}
dependencies=mysql_to_hdfs
  • ods_to_dwd_start_log.job
type=command
command=/home/omm/bin/ods_to_dwd_start_log.sh ${dt}
dependencies=hdfs_to_ods_log
  • ods_to_dwd_db.job
type=command
command=/home/omm/bin/ods_to_dwd_db.sh all ${dt}
dependencies=hdfs_to_ods_db
  • dwd_to_dws.job
type=command
command=/home/omm/bin/dwd_to_dws.sh ${dt}
dependencies=ods_to_dwd_db,ods_to_dwd_start_log
  • dws_to_dwt.job
type=command
command=/home/omm/bin/dws_to_dwt.sh ${dt}
dependencies=dwd_to_dws
  • dwt_to_ads.job
type=command
command=/home/omm/bin/dwt_to_ads.sh ${dt}
dependencies=dws_to_dwt
  • hdfs_to_mysql.job
type=command
command=/home/omm/bin/hdfs_to_mysql.sh all
dependencies=dwt_to_ads


五、Superset

5.1 环境

  • Miniconda

conda是一个开源的包、环境管理器,可以用于在同一个机器上安装不同Python版本的软件包及其依赖,并能够在不同的Python环境之间切换。

[omm@simwor01 soft]$ bash Miniconda3-latest-Linux-x86_64.sh
[omm@simwor01 soft]$ source /home/omm/.bashrc
(base) [omm@simwor01 soft]$ (base) [omm@simwor01 ~]$ python
Python 3.7.4 (default, Aug 13 2019, 20:35:49)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
(base) [omm@simwor01 ~]$

Miniconda安装完成后,每次打开终端都会激活其默认的base环境,我们可通过以下命令,禁止激活默认base环境。

(base) [omm@simwor01 ~]$ conda config --set auto_activate_base false
(base) [omm@simwor01 ~]$ logout[C:\~]$ [omm@simwor01 ~]$

Superset是由Python语言编写的Web应用,要求Python3.6的环境。

[omm@simwor01 ~]$ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
[omm@simwor01 ~]$ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
[omm@simwor01 ~]$ conda config --set show_channel_urls yes
[omm@simwor01 ~]$ conda create --name superset python=3.6
...
#
# To activate this environment, use
#
#     $ conda activate superset
#
# To deactivate an active environment, use
#
#     $ conda deactivate
[omm@simwor01 ~]$ conda activate superset
(superset) [omm@simwor01 ~]$ python
Python 3.6.13 |Anaconda, Inc.| (default, Feb 23 2021, 21:15:04)
[GCC 7.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
(superset) [omm@simwor01 ~]$ conda deactivate
[omm@simwor01 ~]$

5.2 安装

  1. 安装Superset之前,需安装以下所需依赖
[omm@simwor01 ~]$ sudo yum install -y python-setuptools
[omm@simwor01 ~]$ sudo yum install -y gcc gcc-c++ libffi-devel python-devel python-pip python-wheel openssl-devel cyrus-sasl-devel openldap-devel
  1. 安装Superset
[omm@simwor01 ~]$ conda activate superset# 安装(更新)setuptools和pip
# 说明:pip是python的包管理工具,可以和centos中的yum类比
(superset) [omm@simwor01 ~]$ pip install --upgrade setuptools pip -i https://pypi.douban.com/simple/# 安装Supetset
# 说明:-i的作用是指定镜像,这里选择国内镜像
(superset) [omm@simwor01 ~]$ pip install apache-superset -i https://pypi.douban.com/simple/# 初始化Supetset数据库
(superset) [omm@simwor01 ~]$ superset db upgrade# 创建管理员用户
# 说明:flask是一个python web框架,Superset使用的就是flask
(superset) [omm@simwor01 ~]$ export FLASK_APP=superset
(superset) [omm@simwor01 ~]$ flask fab create-admin
Username [admin]: admin
User first name [admin]:
User last name [user]:
Email [admin@fab.org]:
Password:
Repeat for confirmation:
...
Recognized Database Authentications.
Admin User admin created.
(superset) [omm@simwor01 ~]$ # Superset初始化
(superset) [omm@simwor01 ~]$ superset init
  1. 启动Supterset
# 安装gunicorn
# 说明:gunicorn是一个Python Web Server,可以和java中的TomCat类比
(superset) [omm@simwor01 ~]$ pip install gunicorn -i https://pypi.douban.com/simple/# 启动Superset,确保当前conda环境为superset
# --workers:指定进程个数
# --timeout:worker进程超时时间,超时会自动重启
# --bind:绑定本机地址,即为Superset访问地址
# --daemon:后台运行
(superset) [omm@simwor01 ~]$ gunicorn --workers 5 --timeout 120 --bind 192.168.1.101:8787  "superset.app:create_app()" --daemon# 停掉gunicorn进程
# ps -ef | awk '/gunicorn/ && !/awk/{print $2}' | xargs kill -9# 退出superset环境
# conda deactivate

5.3 使用

  • 对接MySQL数据源
# 安装依赖
conda install mysqlclient# 说明:对接不同的数据源,需安装不同的依赖,以下地址为官网说明
# http://superset.apache.org/installation.html#database-dependencies
# 停止Superset
ps -ef | awk '/gunicorn/ && !/awk/{print $2}' | xargs kill -9# 启动Superset
gunicorn --workers 5 --timeout 120 --bind 192.168.1.101:8787  "superset.app:create_app()" --daemon
  • 添加数据源

注:SQL Alchemy URI编写规范:mysql://账号:密码@IP/数据库名称

  • 添加Tables

  • 创建仪表盘

  • 创建图表

大数据实战项目 -- 离线数仓相关推荐

  1. 大数据旅游项目(离线数仓实战)

    文章目录 大数据旅游项目 1 项目分析 1.1 项目分析流程图 2 项目前期准备 2.1 hdfs权限验证 3 数仓前期准备 3.1 本地创建对应账号(root权限) 3.2 hdfs创建分层对应目录 ...

  2. 大数据电商离线数仓项目-上篇

    下一篇:电商数仓项目-下篇 文章目录 第1章 数仓分层 1.1 为什么要分层 1.2 数据集市与数据仓库概念 1.3 数仓命名规范 1.3.1 表命名 1.3.2 脚本命名 1.3.3 表字段类型 第 ...

  3. 大数据电商离线数仓项目-下篇

    上一篇:电商数仓系统1 文章目录 第5章 数仓搭建-DWS层 5.1 业务术语 5.2 系统函数 5.2.1 nvl函数 5.2.2 日期处理函数 5.2.3 复杂数据类型定义 5.3 DWS层 5. ...

  4. 大数据面试演讲稿 离线数仓实时分析

    有关大数据学习资源,请关注微信公众号"码农书斋".回复"大数据",免费获取学习视频.源码及资料! 自我介绍 ​ 面试官好! ​ 我叫xxx,xxx年毕业于xxx ...

  5. 大数据项目离线数仓(全 )一(数据采集平台)

    搭建用户行为数据采集平台.搭建业务数据采集平台.搭建数据仓库系统.制作可视化报表 本篇博客包括搭建用户行为数据采集平台.搭建业务数据采集平台 搭建数据仓库系统在大数据项目离线数仓(全 )二 制作可视化 ...

  6. 大数据架构师——音乐数据中心平台离线数仓综合项目(一)

    文章目录 音乐数据中心平台离线数仓综合项目 数据库与ER建模 数据库 数据库三范式 第一范式(1NF):原子性,字段不可分 第二范式(2NF):唯一性,一个表只能说明一个事物,有主键,非主键字段依赖主 ...

  7. 大数据架构师——音乐数据中心平台离线数仓综合项目(四)

    文章目录 音乐数据中心平台离线数仓综合项目 第四个业务:商户营收统计 需求 模型设计 数据处理流程 1. 将数据导入MySQL业务库 2. 执行第二.三个业务 3. 使用Sqoop抽取mysql数据到 ...

  8. 大数据实战项目之电商数仓(一)

    大数据实战项目之电商数仓(一) 项目介绍 数据仓库概念 ​ 数据仓库是为企业所有决策制定过程,提供所有系统数据支持的战略集合.通过对数据仓库中数据的分析,可以帮助企业改进业务流程,控制成本,提高产品质 ...

  9. 大数据架构详解_【数据如何驱动增长】(3)大数据背景下的数仓建设 amp; 数据分层架构设计...

    背景 了解数据仓库.数据流架构的搭建原理对于合格的数据分析师或者数据科学家来说是一项必不可少的能力.它不仅能够帮助分析人员更高效的开展分析任务,帮助公司或者业务线搭建一套高效的数据处理架构,更是能够从 ...

最新文章

  1. Jquery的跨域传输数据(JSONP)
  2. openStack高可用性和灾备方案
  3. [概统]本科二年级 概率论与数理统计 第二讲 几何概型
  4. React项目动态设置title标题
  5. 07 ORA系列:ORA-01747 或列说明无效 user.table.column, table.column
  6. Android Studio Linking an external C++ project 时候 报Invalid file name. Expected: CMakeLists.txt
  7. 点在多边形内算法,C#判断一个点是否在一个复杂多边形的内部
  8. Win的phpstudy安装VC报错
  9. FireDAC 中文字段过滤问题
  10. 【jquery系列|Jquery总结篇】包含各种实例,文末有彩蛋!
  11. Django之model模型
  12. 略论bs架构设计的几种模式
  13. nfine框架 上传文件,nfine(nfine快速开发框架)
  14. 学MFC的九九八十一难
  15. python编程学习笔记_python学习笔记--python编程基础
  16. 50 Fast Flash MX Techniques
  17. html自动写对联,css实现的对联广告代码_CSS/HTML
  18. 南开hpd openbilibili
  19. Mac下将ISO写入U盘镜像
  20. 图像加噪与滤波处理(python+opencv)

热门文章

  1. Ora2Pg配置文件详解
  2. VBA:新增某个月的FC至Excel版物料Forecast
  3. 我一个插画师给AI打下手,月入3千
  4. CF1635E Cars
  5. 【线性DP】跳格子问题 + 光签题(取石子游戏)
  6. 电脑鼠标不管用但键盘可以用怎么办
  7. SLCP验厂辅导,企业在认证之后便可以将经验证过后的数据信息分享给其它各托管平台
  8. 免费域名和付费域名的几个区别
  9. tex 表格内容换行_{Latex}{Tabular}文本超出表格自动换行
  10. java jwe/jws_一篇文章带你分清楚JWT,JWS与JWE