可以查看大数据测试系列

说明

  • 这章比较重要,是对hadoop和hive以及建模的一次实践

  • 阅读本章,请把hadoop和hive环境搭建好,可以参考如下文章:

    • 第三章 大数据之Hadoop搭建
    • 第四章 大数据之hive搭建
  • 本次实例来自对《大数据测试技术与实践》中实例补充,书中的实例并不能直接使用,有些地方是错误的,我也修改和补充

数据仓库实例

  • 在本节中,我们通过一个简单的实例介绍数据仓库对数据的处理过程。假设有一家连锁超市,它有多家分店。每一个分店都有很多种类的商品,包括日用品、肉类、冷冻食品、烘焙食品和花卉等。所有产
    品在整个连锁超市环境下有一个唯一的产品编号。图3-15为一张顾客结账清单。

  • 经过一段时间的商品销售后,连锁超市积累了大量销售数据,如下图所示,超市分店具有分店名、分店地址
    和开店时间属性,商品有商品类别、商品价格、唯一编号和生产地址属性。当然,地址可以进一步拆分为省、市等。

  • 假设对商品A进行促销,如发放代金券、降价等,现在分析促销活动对商品A销售量的,为了简便,本实例统计超市分店中商品A每天的销售量、到店消费人数和购买商品A的消费者的比例
  • 我们在数据仓的设计与构建文章中的数据仓库的设计中提到过,数据仓库分为数据接入层、数据明细层、数据汇总层和数据集市等。数据接入层负责将业务系统中的商品相关销售数据导入;数据明细层负责对数据接入层的数据进行预处理,过滤"脏”数据等;数据汇总层将数据按照订单进行汇总;数据集市层负责聚合计算相应的指标。
  • 由于要对商品在时间、地点等维度的指标进行汇总计算,因此,我们在数据仓库层使用维度建模方式建表,(我们在数据仓的设计与构建中的数据仓库建模方法也说过相应概念)。显然,我们对日期、超市分店(地址)和商品等维度比较感兴趣。图3-17所示为商品的维度模型实际的建模过程比这复杂。以日期维度为例,在实际建模中,时间维度表一般会会有当天是一个月中的哪一天,当天是一年中的哪天,当前周是一年中的哪周,当前季度是年中哪季度,以及时间视计算肭表示等字段,方便将销售指标在各种时间点上进行同比。

  • 假设超市业务系统中的销售数据是以实际购物清单拆分的形式存放,即在购物清单中,含有品、商品价格和交易时间(清单创建时间)等信息,则超市业务系统的数据库中会有如图下的表关系

  • 由于商品信息表和超市分店信息表的数据量不大,且基本无改动,因此可以选择全量更新的方式将数据加载到数据仓库。而来自各超市分店的商品销售清单的数据量很大,且每天会有新插入的数据记录,因此,在将数据加载到数据仓库时,可以选择增量加载方式

  • 在本实例中,对于数据仓库的存储,采用HDFS和Hive,在ETL过程中,使用 HiveQL。图3-19为各级数据表的关系。

数据接入层ODS

创建接入层的表

首先,在Hive中,创建数据库接入层对应的表,代码如下:

# 切换到hadoop用户
su hadoop
# 进入到hive
hive-- 创建超市分店信息表
DROP TABLE IF EXISTS ods_market_info;
create table ods_market_info(
market_id string comment '超市分店编号',
market_address string comment '超市分店地址',
start_time string comment '有效期起始时间',
end_time string comment '有效期终止时间',
market_name string comment '超市分店名称',
create_time string comment '创建时间',
update_time string comment '更新时间'
)
partitioned by(dt string)
row format delimited fields terminated by '\t';--创建商品信息表
DROP TABLE IF EXISTS ods_product_info;
CREATE TABLE ods_product_info(
product_id int comment '商品id',
type_name string comment '类别名',
supplier_phone string comment '供应商手机号',
supplier_address string comment '供应商地址',
product_price string comment '商品价格',
product_desc string comment '商品说明',
start_time string comment '有效期起始时间',
end_time string comment '有效期终止时间',
product_name string comment '商品名称',
create_time string comment '创建时间',
update_time string comment '更新时间'
) comment '商品信息表'
partitioned by(dt string)
row format delimited fields terminated by '\t';--创建清单记录表
DROP TABLE IF EXISTS ods_sale_info;
CREATE TABLE ods_sale_info(
order_id string comment '清单号',
order_status string comment '清单状态',
market_id string comment' 超市分店编号',
product_num int comment '商品数量',
product_id int comment '商品id',
create_time string comment '创建时间',
update_time string comment '更新时间'
) comment '清单记录表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';# 查看到新建成功的表
hive> show tables;
OK
course
ods_market_info
ods_product_info
ods_sale_info
stu
stu1
Time taken: 0.026 seconds, Fetched: 6 row(s)

准备业务数据

  • 批量造mysql表的数据,采用存储过程的方式

  • mysql中创建业务关系表,product_info(商品信息表)、market_info(超市分店信息表)、sale_info(清单记录表)

mysql -uroot -p
use hive-- 创建商品信息表,以id为主键
create table product_info(
id int(10) not null auto_increment primary key,
product_id int comment '商品id',
type_name varchar(100) comment '类别名',
supplier_phone varchar(100) comment '供应商手机号',
supplier_address varchar(100) comment '供应商地址',
product_price varchar(100) comment '商品价格',
product_desc varchar(100) comment '商品说明',
start_time varchar(100) comment '有效期起始时间',
end_time varchar(100) comment '有效期终止时间',
product_name varchar(100) comment '商品名称',
create_time varchar(100) comment '创建时间',
update_time varchar(100) comment '更新时间'
) engine=innodb default charset=utf8;-- 创建超市分店信息表
create table market_info(
id int(10) not null auto_increment primary key,
market_id varchar(100) comment '超市分店编号',
market_address varchar(100) comment '超市分店地址',
start_time varchar(100) comment '有效期起始时间',
end_time varchar(100) comment '有效期终止时间',
market_name varchar(100) comment '超市分店名称',
create_time varchar(100) comment '创建时间',
update_time varchar(100) comment '更新时间'
) engine=innodb default charset=utf8;--创建清单记录表
create table sale_info(
id int(10) not null auto_increment primary key,
order_id varchar(100) comment '清单号',
order_status varchar(100) comment '清单状态',
market_id varchar(100) comment' 超市分店编号',
product_num int comment '商品数量',
product_id int comment '商品id',
create_time varchar(100) comment '创建时间',
update_time varchar(100) comment '更新时间'
) engine=innodb default charset=utf8;# 查看到各个新建的三个表
mysql> show tables;| market_info                   |
| product_info                  |
| sale_info                     |
+-------------------------------+
77 rows in set (0.01 sec)

插入数据

超市分店
mysql -uroot -p
use hiveinsert into market_info(market_id,market_address,start_time,end_time,market_name,create_time,update_time) values ('1000001','湖南省长沙市开福区万达广场1021号','2021-12-16','2028-12-17','大润发开福万达店','2021-12-12','2021-12-12');insert into market_info(market_id,market_address,start_time,end_time,market_name,create_time,update_time) values ('1000002','湖南省长沙市岳麓区万达广场1021号','2021-12-16','2028-12-17','大润发岳麓万达店','2021-12-12','2021-12-12');insert into market_info(market_id,market_address,start_time,end_time,market_name,create_time,update_time) values ('1000003','湖南省长沙市雨花区万达广场1021号','2021-12-16','2028-12-16','大润发雨花万达店','2021-12-12','2021-12-12');
商品表-存储过程
mysql -uroot -p
use hivedrop procedure insert_product_info;
delimiter //
create procedure insert_product_info(type_name varchar(100),product_price varchar(100),start_time varchar(100),end_time varchar(100),create_time varchar(100),update_time varchar(100),num int)
begin
declare str char(62) default 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
declare product_name char(100);
declare product_id int;
declare i int default 0;
while i<= num DO
-- 生成商品名称随机数
set product_name=concat("商品名称",substring(str,1+floor(rand()*61),2),substring(str,1+floor(rand()*61),3));
-- 生成商品ID随机数
set product_id = floor(rand()*1000);
set i=i+1;
INSERT INTO `hive`.`product_info` (`product_id`, `type_name`, `supplier_phone`, `supplier_address`, `product_price`, `product_desc`, `start_time`, `end_time`, `product_name`, `create_time`, `update_time`) VALUES (product_id, type_name, '18576759590', '湖南省常德市', product_price, '产品描述', start_time, end_time, product_name, create_time, update_time);
end while;
end;
//# 下面这种方式调用,后面的100就是插入100条数据
mysql> call insert_product_info('食品','50','2021-12-16', '2022-12-17', '2021-12-15','2021-12-15',100) //mysql> call insert_product_info('酒水','100','2021-12-16', '2022-12-17', '2021-12-15','2021-12-15',100) //# 查询到各个插入成功的数据
mysql> select count(*) from product_info;//
+----------+
| count(*) |
+----------+
|      202 |
+----------+
  • 数据列表

清单记录表-存储过程
mysql -uroot -p
use hivedrop procedure insert_sale_info;
delimiter //
create procedure insert_sale_info(order_status varchar(10),market_id varchar(100),product_num int,product_id int,create_time varchar(100),update_time varchar(100),num int)
begin
declare order_id int;
declare i int default 0;
while i<= num DO
set i=i+1;
-- 随机生成订单id
set order_id = floor(rand()*100);
INSERT INTO `hive`.`sale_info` (`order_id`, `order_status`, `market_id`, `product_num`, `product_id`, `create_time`, `update_time`) VALUES (order_id, order_status, market_id, product_num, product_id, create_time, update_time);
end while;
end;
//# 注意//这个分隔符,是区分存储过程的,调用存储过程注意market_id,product_id的值,要从相应的超市分店,商品信息表中找到对应数据
mysql> call insert_sale_info('待付款','1000001',5, 221,'2021-12-15','2021-12-15',100) //
mysql> call insert_sale_info('已付款','1000002',10, 182, '2021-12-14','2021-12-14',100) //# 查询到刚刚插入的数据
mysql> select count(*) from sale_info;//
+----------+
| count(*) |
+----------+
|      203 |
+----------+
1 row in set (0.00 sec)
  • 数据列表如下

业务数据导入ODS-datax

datax 环境搭建

  • 建议自己用源代码编译方式,比较稳妥
  • 下载源文件解压
wget https://github.com/alibaba/DataX/archive/master.zip
unzip DataX-master.zip
  • 下载maven
sudo wget --no-check-certificate  https://dlcdn.apache.org/maven/maven-3/3.8.4/binaries/apache-maven-3.8.4-bin.tar.gztar -zxvf apache-maven-3.8.4-bin.tar.gz
# 配置maven环境变量
vi /etc/profile export M2_HOME=/usr/local/apache-maven-3.8.4 //本地maven安装home目录export PATH=$PATH:$M2_HOME/bin# 生效环境变量设置source /etc/profile

  • 配置maven本地仓库, 进如本地maven安装目录里的conf目录, vi settings.xml进行如下修改
 -- 设置仓库地址<localRepository>/usr/local/apache-maven-3.8.4/repo</localRepository>-- 设置阿里云镜像
<mirror><id>nexus-aliyun</id><mirrorOf>central</mirrorOf><name>Nexus aliyun</name><url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
  • 最后查看maven安装结果 maven -version
[root@VM-24-13-centos resp]# mvn -version
Apache Maven 3.8.4 (9b656c72d54e5bacbed989b64718c159fe39b537)
Maven home: /usr/local/apache-maven-3.8.4
Java version: 1.8.0_311, vendor: Oracle Corporation, runtime: /usr/local/jdk1.8.0_311/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1160.11.1.el7.x86_64", arch: "amd64", family: "unix"
  • 修改datax的目录中的pom.xml中的内容
   <mysql.driver.version>8.0.26</mysql.driver.version><!-- reader --><module>mysqlreader</module>      <module>hdfsreader</module><module>streamreader</module><!-- writer --><module>mysqlwriter</module><module>hdfswriter</module><module>streamwriter</module><!-- common support module --><module>plugin-rdbms-util</module><module>plugin-unstructured-storage-util</module><module>hbase20xsqlreader</module><module>hbase20xsqlwriter</module><module>kuduwriter</module>
  • hdfswrite目录下面的pom.xml修改hive和hadoop版本
  <properties><hive.version>3.1.2</hive.version><hadoop.version>3.0.3</hadoop.version></properties>
  • 在 datax的目录执行编译命令
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
WARNING] Assembly file: /usr/local/DataX-master/target/datax is not a regular file (it may be a directory). It cannot be attached to the project build for installation or deployment.
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for datax-all 0.0.1-SNAPSHOT:
[INFO] kuduwriter ......................................... SUCCESS [  2.148 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
  • 把target目录中的datax.tar.gz移动到指定目录,解压
[root@VM-24-13-centos target]# cp datax.tar.gz /usr/local/
cd /usr/local/
tar -zxvf datax.tar.gz
数据超市导入ods表
  • 创建分区信息,手动创建分区路径

  • 不过奇怪的是我用下面命令的方式创建,用datax导入报错找不到创建的分区

    hdfs dfs -mkdir -p /user/hive/warehouse/hive.db/ods_market_info/dt=2021-12-21
    
    • 用sql原生语句insert插入一条数据后,重新datax导入就成功了
    insert into ods_market_info partition(dt = '2021-12-21')
    
# 切换用户
su hadoop
# 进入到hive模式
hive
# 使用hive数据库
use hive;
# 手动插入分区信息内容
hive>insert into ods_market_info partition(dt = '2021-12-21') values ('111','222','33','44','55','66','77');
# 查看到刚刚插入的信息
hive>select * from ods_market_info;
# 新增完后,可以删除表中数据,也可不删
hive>truncate table ods_market_info;
hive>exit;
  • 查看到分区信息
[hadoop@VM-24-13-centos root]$ hadoop fs -ls /user/hive/warehouse/hive.db/ods_market_info/
drwxr-xr-x   - hadoop supergroup          0 2021-12-22 15:57 /user/hive/warehouse/hive.db/ods_market_info/dt=2021-12-21
drwxr-xr-x   - hadoop supergroup          0 2021-12-22 15:57 /user/hive/warehouse/hive.db/ods_market_info/dt=2021-12-22
  • 在datax的job目录编写一个mysql_hive_ods_market_info.json文件,同步超市分店配置用

    [root@VM-24-13-centos job]# ls
    job.json  mysql_hive_ods_market_info.json
    
  • 编辑mysql_hive_ods_market_info.json文件

    {"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["market_id","market_address","start_time","end_time","market_name","create_time","update_time"                         ],"connection": [{"jdbcUrl": ["jdbc:mysql://localhost:3306/hive"],"table": ["market_info"]}],"password": "hive1234","username": "hive"}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://localhost:9000","fileType": "text","path": "/user/hive/warehouse/hive.db/ods_market_info/dt=2021-12-21","fileName": "ods_market_info","column": [{"name": "market_id","type": "string"},{"name": "market_address","type": "string"},{"name": "start_time","type": "string"},{"name": "end_time","type": "string"},{"name": "market_name","type": "string"},{"name": "create_time","type": "string"},{"name": "update_time","type": "string"}],"writeMode": "append","fieldDelimiter": "\t",}}}]}
    }
    

    hive> show create table hive.ods_market_info;

    LOCATION
    ‘hdfs://localhost:9000/user/hive/warehouse/hive.db/ods_market_info’

    执行命令后在结果中可以看到LOCATOIN,就是hive在hdfs中的存储目录。填写到writer下的path中,dt就是刚刚创建的分区

  • 运行datax命令

# 回到root用户
[hadoop@VM-24-13-centos root]$ su
[root@VM-24-13-centos ~]# cd /usr/local/datax/bin/
# DHADOOP_USER_NAME一定要用hadoop用户,用其他用户会报错没有权限
[root@VM-24-13-centos bin]# python datax.py -p "-DHADOOP_USER_NAME=hadoop" ../job/mysql_hive_ods_market_info.json

  • 查看hive中的超市表中是否有数据
[root@VM-24-13-centos bin]# su hadoop
[hadoop@VM-24-13-centos bin]$ hive
hive> select * from hive.ods_market_info;
000001 湖南省长沙市开福区万达广场1021号        2021-12-16 00:00:00     2028-12-16 23:59:59     大润发开福万达店        2021-12-12 16:00:00     2021-12-12 16:00:00   2021-12-21
1000002 湖南省长沙市岳麓万达广场1021号  2021-12-16 00:00:00     2028-12-16 23:59:59     大润发岳麓万达店        2021-12-12 16:00:00     2021-12-12 16:00:00  2021-12-21
商品信息导入ods表
# 切换用户
su hadoop
# 进入到hive模式
hive
# 使用hive数据库
use hive;
# 手动插入分区信息内容
hive>insert into ods_product_info partition(dt = '2021-12-21') VALUES (11, '222', '18576759590', '湖南省常德市', '222', '产品描述', '333', '444', '555', '666', '77');
# 查看到刚刚插入的信息
hive>select * from ods_product_info;
# 新增完后,可以删除表中数据,也可不删
hive>truncate table ods_product_info;
hive>exit;
  • 查看到分区信息
[hadoop@VM-24-13-centos root]$ hadoop fs -ls /user/hive/warehouse/hive.db/ods_product_info/
drwxr-xr-x   - hadoop supergroup          0 2021-12-23 09:31 /user/hive/warehouse/hive.db/ods_product_info/dt=2021-12-21
  • 在datax的job目录编写一个mysql_hive_ods_product_info.json文件,同步超市分店配置用

    [root@VM-24-13-centos job]# ls
    -rwxrwxrwx 1 root root 1587 Dec 21 18:05 job.json
    -rw-r--r-- 1 root root 1861 Dec 22 15:54 mysql_hive_ods_market_info.json
    -rw-r--r-- 1 root root 1861 Dec 23 09:36 mysql_hive_ods_product_info.json
    
  • 编辑mysql_hive_ods_productinfo.json文件

{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["product_id","type_name","supplier_phone","supplier_address","product_price","product_desc","start_time","end_time","product_name","create_time","update_time",],"connection": [{"jdbcUrl": ["jdbc:mysql://localhost:3306/hive"],"table": ["product_info"]}],"password": "hive1234","username": "hive"}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://localhost:9000","fileType": "text","path": "/user/hive/warehouse/hive.db/ods_product_info/dt=2021-12-21","fileName": "ods_product_info","column": [{"name": "product_id","type": "int"},{"name": "type_name","type": "string"},{"name": "supplier_phone","type": "string"},{"name": "supplier_address","type": "string"},{"name": "product_price","type": "string"},{"name": "product_desc","type": "string"},{"name": "start_time","type": "string"},{"name": "end_time","type": "string"},{"name": "product_name","type": "string"},{"name": "create_time","type": "string"},{"name": "update_time","type": "string"}],"writeMode": "append","fieldDelimiter": "\t",}}}]}
}
  • 运行datax命令
# 回到root用户
[hadoop@VM-24-13-centos root]$ su
[root@VM-24-13-centos ~]# cd /usr/local/datax/bin/
# DHADOOP_USER_NAME一定要用hadoop用户,用其他用户会报错没有权限
[root@VM-24-13-centos bin]# python datax.py -p "-DHADOOP_USER_NAME=hadoop" ../job/mysql_hive_ods_product_info.json任务启动时刻                    : 2021-12-23 10:30:52
任务结束时刻                    : 2021-12-23 10:31:05
任务总计耗时                    :                 12s
任务平均流量                    :            2.22KB/s
记录写入速度                    :             20rec/s
读出记录总数                    :                 201
读写失败总数                    :                   0
  • 查看hive中的商品表中是否有数据
su hadoop
hive
use hive;
hive> select count(product_id) from ods_product_info;
Total MapReduce CPU Time Spent: 0 msec
OK
201
Time taken: 2.154 seconds, Fetched: 1 row(s)
销售事实导入ods表
# 切换用户
su hadoop
# 进入到hive模式
hive
# 使用hive数据库
use hive;
# 手动插入分区信息内容
hive>insert into ods_sale_info partition(dt = '2021-12-21') values (1, '222', '333', 4, 55, '666', '77');
# 查看到刚刚插入的信息
hive>select * from ods_sale_info;
# 新增完后,可以删除表中数据,也可不删
hive>truncate table ods_sale_info;
hive>exit;
  • 查看到分区信息
[hadoop@VM-24-13-centos root]$ hadoop fs -ls /user/hive/warehouse/hive.db/ods_sale_info/
drwxr-xr-x   - hadoop supergroup          0 2021-12-23 11:09 /user/hive/warehouse/hive.db/ods_sale_info/dt=2021-12-21
  • 在datax的job目录编写一个mysql_hive_ods_sale_info.json文件,同步销售事实表配置用

    [root@VM-24-13-centos job]# ls
    -rwxrwxrwx 1 root root 1587 Dec 21 18:05 job.json
    -rw-r--r-- 1 root root 1861 Dec 22 15:54 mysql_hive_ods_market_info.json
    -rw-r--r-- 1 root root 2267 Dec 23 10:28 mysql_hive_ods_product_info.json
    -rw-r--r-- 1 root root 2267 Dec 23 11:06 mysql_hive_ods_sale_info.json
  • 编辑mysql_hive_ods_productinfo.json文件

{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["order_id","order_status","market_id","product_num","product_id","create_time","update_time"],"connection": [{"jdbcUrl": ["jdbc:mysql://localhost:3306/hive"],"table": ["sale_info"]}],"password": "hive1234","username": "hive"}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://localhost:9000","fileType": "text","path": "/user/hive/warehouse/hive.db/ods_sale_info/dt=2021-12-21","fileName": "ods_sale_info","column": [{"name": "order_id","type": "string"},{"name": "order_status","type": "string"},{"name": "market_id","type": "string"},{"name": "product_num","type": "int"},{"name": "product_id","type": "int"},{"name": "create_time","type": "string"},{"name": "update_time","type": "string"}],"writeMode": "append","fieldDelimiter": "\t",}}}]}
}
  • 运行datax
# 回到root用户
[hadoop@VM-24-13-centos root]$ su
[root@VM-24-13-centos ~]# cd /usr/local/datax/bin/
# DHADOOP_USER_NAME一定要用hadoop用户,用其他用户会报错没有权限
[root@VM-24-13-centos bin]# python datax.py -p "-DHADOOP_USER_NAME=hadoop" ../job/mysql_hive_ods_sale_info.json任务启动时刻                    : 2021-12-23 11:17:21
任务结束时刻                    : 2021-12-23 11:17:34
任务总计耗时                    :                 12s
任务平均流量                    :            1.07KB/s
记录写入速度                    :             20rec/s
读出记录总数                    :                 202
读写失败总数                    :                   0
  • 查看hive中的销售事实表是否有数据
su hadoop
hive
use hive;
hive> select count(order_id) from ods_sale_info;Stage-Stage-1:  HDFS Read: 27232 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
202
  • 再次导入一次数据,造成重复的脏数据,为下一步数据清洗例子做准备
[root@VM-24-13-centos bin]# python datax.py -p "-DHADOOP_USER_NAME=hadoop" ../job/mysql_hive_ods_sale_info.json
  • 查看 hive中的销售事实表存在了404条数据,有一半重复的
su hadoop
hive
use hive;
hive> select count(order_id) from ods_sale_info;Stage-Stage-1:  HDFS Read: 27232 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
404

数据清洗

  • 在业务数据导入到ods层时,可能一些误操作,脏数据等,需要对ods层的数据进行清洗处理,本次就以ods_sale_info表中去重重复的order_id
su hadoop
hive
use hive;hive>drop table if exists tmp_ods_to_dwd_sale_info;
create table tmp_ods_to_dwd_sale_info
as select a.order_id,a.order_status,a.market_id,a.product_num,a.product_id,a.create_time,a.update_time from
(select order_id,order_status,market_id,product_num,product_id,create_time,update_time, ROW_NUMBER() OVER(partition by order_id order BY create_time DESC) rn FROM ods_sale_info) a
WHERE a.rn=1;# 查看到的只有85条数据
hive> select count(*) from  tmp_ods_to_dwd_sale_info;
OK
85
  • 参考数据去重及row_number()

数据明细层DWD

  • 数据清洗完毕后,把ODS层数据导入到OWD层

数据仓库建模

  • 在数据仓库层,采用星形模式创建超市分店维度表、商品维度表、日期维度表和销售事实表

维度建模

# 切换到hadoop用户
su hadoop
# 进入到hive
hive-- 创建超市分维度表
DROP TABLE IF EXISTS dw_dim_market_info;
create table dw_dim_market_info(
market_id string comment '超市分店编号',
market_address string comment '超市分店地址',
effective_date string comment '有效期起始时间',
expriry_date string comment '有效期终止时间',
market_name string comment '超市分店名称'
) comment '创建超市分维度表'
partitioned by(dt string)
row format delimited fields terminated by '\t';--创建商品维度表
DROP TABLE IF EXISTS dw_dim_product_info;
CREATE TABLE dw_dim_product_info(
product_id int comment '商品id',
type_name string comment '类别名',
supplier_phone string comment '供应商手机号',
supplier_address string comment '供应商地址',
product_price string comment '商品价格',
product_desc string comment '商品说明',
effective_date string comment '有效期起始时间',
expriry_date string comment '有效期终止时间',
product_name string comment '商品名称'
) comment '商品维度表'
partitioned by(dt string)
row format delimited fields terminated by '\t';--创建日期维度表
DROP TABLE IF EXISTS dw_dim_date_info;
CREATE TABLE dw_dim_date_info(
date_id string comment '日期id',
year_value string comment '年',
month_value string comment'月',
day_value string comment '日',
date_value string comment '年-月-日',
is_weekend string comment '是否周末', -- 0表示非周末,1表示周末
day_of_week string comment '一周中的周几'
) comment '日期维度表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';--创建销售事实表
DROP TABLE IF EXISTS dwd_sale_fact;
CREATE TABLE dwd_sale_fact(
order_id string comment '清单号',
order_status string comment '清单状态',
market_id string comment' 超市分店编号',
date_id string comment '日期id',
product_num int comment '商品数量',
product_id int comment '商品id',
create_time string comment '创建时间',
update_time string comment '更新时间'
) comment '销售事实表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';# 查看到新建成功的表
hive> show tables;
OK
course
dw_dim_date_info
dw_dim_market_info
dw_dim_product_info
dwd_sale_fact
ods_market_info
ods_product_info
ods_sale_info
stu
stu1
tmp_ods_to_dwd_sale_info

导入ODS层数据

  • 把数据接入层(ODS)导入到维度表中
日期维度表
  • 初始化一些测试数据,注意date_value这个字段的值,需要和tmp_ods_to_dwd_sale_info中的create_time有关联关系,要造一些相等条件的数据
su hadoop
hive
hive>use hive;
hive>insert into dw_dim_date_info partition(dt = '2021-12-21') VALUES ('2021122101', '2021', '12','15','2021-12-15', '0', '51');hive>insert into dw_dim_date_info partition(dt = '2021-12-21') VALUES ('2021122102', '2021', '12', '24','2021-12-24', '0', '51');hive>insert into dw_dim_date_info partition(dt = '2021-12-21') VALUES ('2021122103', '2021', '12', '25','2021-12-25', '1', '51');hive> select * from dw_dim_date_info;
OK
2021122101      2021    12      15      2021-12-15      0       51      2021-12-21
2021122102      2021    12      24      2021-12-24      0       51      2021-12-21
2021122103      2021    12      25      2021-12-25      1       51      2021-12-21
超市维度表
  • ods_market_info 表数据插入
  • 此次实例中,好像没有用到
hive>insert into dw_dim_market_info partition(dt = '2021-12-21') select market_id,market_address,market_name,start_time as effective_date,end_time as expiry_date
from ods_market_info;hive> select * from dw_dim_market_info;
OK
1000001 湖南省长沙市开福区万达广场1021号        大润发开福万达店        2021-12-16      2028-12-17      2021-12-21
1000002 湖南省长沙市岳麓区万达广场1021号        大润发岳麓万达店        2021-12-16      2028-12-17      2021-12-21
1000003 湖南省长沙市雨花区万达广场1021号        大润发雨花万达店        2021-12-16      2028-12-16      2021-12-21
商品维度表
  • ods_product_info表数据插入
  • 此次实例中,好像没有用到
hive>insert into dw_dim_product_info partition(dt = '2021-12-21') select product_id,product_name,type_name,supplier_phone,supplier_address,product_price,product_desc,start_time as effective_date,end_time as expiry_date
from ods_product_info;hive> select * from dw_dim_product_info;
OK
221     商品名称78ABC   食品    18576759590     湖南省常德市    50      产品描述        2021-12-16      2022-12-17      2021-12-21
545     商品名称XYyzA   食品    18576759590     湖南省常德市    50      产品描述        2021-12-16      2022-12-17      2021-12-21
639     商品名称GHdef   食品    18576759590     湖南省常德市    50      产品描述        2021-12-16      2022-12-17      2021-12-21
459     商品名称cdtuv   食品    18576759590     湖南省常德市    50      产品描述        2021-12-16      2022-12-17      2021-12-21
销售事实表
  • tmp_ods_to_dwd_sale_info表是上述处理重复销售清单记录表的过滤后的临时表
hive>insert into dwd_sale_fact partition(dt = '2021-12-21') select a.order_id,a.order_status,a.market_id,b.date_id,a.product_num,a.product_id,a.create_time,a.update_time
from tmp_ods_to_dwd_sale_info a
inner join dw_dim_date_info b
on a.create_time=b.date_value;#  查询到2021-12-15的关联数据
hive> select * from dwd_sale_fact;
OK
11      待付款  1000001 2021122101      5       221     2021-12-15      2021-12-15      2021-12-21
12      待付款  1000001 2021122101      5       221     2021-12-15      2021-12-15      2021-12-21
14      待付款  1000001 2021122101      5       221     2021-12-15      2021-12-15      2021-12-21
16      待付款  1000001 2021122101      5       221     2021-12-15      2021-12-15      2021-12-21
17      待付款  1000001 2021122101      5       221     2021-12-15      2021-12-15      2021-12-21
20      待付款  1000001 2021122101      5       221     2021-12-15      2021-12-15      2021-12-21
....

数据汇总层DWS

  • 由于我们要统计商品A的销售量,以及商品A的购买比例,因此在数据汇总层,对销售数据按照清单号进行汇总,并添加include_product_a 字段,用于表示该清单是否商品A(本实例中的商品id为221),处理过程如下:
-- 创建DWS层清单记录表
drop table if exists dws_order_info;
create table dws_order_info (
order_id string comment '清单号',
order_status string comment '清单状态',
market_id string comment' 超市分店编号',
include_product_a int comment '是否包括商品A',
date_id string comment '日期id',
a_num int comment '商品A数量',
product_info string comment '商品信息',
create_time string comment '创建时间',
update_time string comment '更新时间'
) comment '清单记录表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';-- 创建中间表,添加is_product_a字段
drop table if exists tmp_dwd_to_dws_order_info;
create table tmp_dwd_to_dws_order_info as select
order_id,order_status,market_id,date_id,
Case
when product_id=221 then 1
else 0
end as is_product_a, -- 是否为商品A
case
when product_id=221 then product_num
else 0
end as a_num, -- 商品A的数量
product_id,
product_num,
create_time,
update_time
from dwd_sale_fact;# 查询到以及过滤的product_id为221的清单数据
hive> select * from tmp_dwd_to_dws_order_info;
OK
11      待付款  1000001 2021122101      1       5       221     5       2021-12-15      2021-12-15
12      待付款  1000001 2021122101      1       5       221     5       2021-12-15      2021-12-15
14      待付款  1000001 2021122101      1       5       221     5       2021-12-15      2021-12-15
16      待付款  1000001 2021122101      1       5       221     5       2021-12-15      2021-12-15
17      待付款  1000001 2021122101      1       5       221     5       2021-12-15      2021-12-15
20      待付款  1000001 2021122101      1       5       221     5       2021-12-15      2021-12-15
22      待付款  1000001 2021122101      1       5       221     5       2021-12-15      2021-12-15
23      待付款  1000001 2021122101      1       5       221     5       2021-12-15      2021-12-15
24      待付款  1000001 2021122101      1       5       221     5       2021-12-15      2021-12-15
25      待付款  1000001 2021122101      1       5       221     5       2021-12-15      2021-12-15-- 按照清单号进行清单数据汇总
hive>insert into dws_order_info partition(dt = '2021-12-21')
select order_id,order_status,market_id,date_id,
case
when sum(is_product_a)>0 then 1
else 0
end as include_product_a,
sum(a_num) as a_num,
concat_ws('_',collect_list(cast(product_id as string)),collect_list(cast(product_num as string))) as product_info,
create_time,update_time
from tmp_dwd_to_dws_order_info group by order_id,order_status,market_id,date_id,create_time,update_time;
hive> select * from dws_order_info;
OK
11      待付款  1000001 2021122101      1       5       221_5   2021-12-15      2021-12-15      2021-12-21
12      待付款  1000001 2021122101      1       5       221_5   2021-12-15      2021-12-15      2021-12-21
14      待付款  1000001 2021122101      1       5       221_5   2021-12-15      2021-12-15      2021-12-21
16      待付款  1000001 2021122101      1       5       221_5   2021-12-15      2021-12-15      2021-12-21
17      待付款  1000001 2021122101      1       5       221_5   2021-12-15      2021-12-15      2021-12-21
20      待付款  1000001 2021122101      1       5       221_5   2021-12-15      2021-12-15      2021-12-21
  • 参考Hive中case when的两种语法
  • 参考hive中对多行进行合
  • 发现个奇怪问题,插入的字段居然要和表中的顺序一致,不然为空,比如product_info就有这样的问题

数据集市层DWM

  • 在数据集市层,需要对相关指标进行聚合计算,处理过程如下。
  • 此处商品A的id为221
drop table if exists dwn_order_info_by_day;
create table dwn_order_info_by_day
as select
count(distinct c.order_id) as consumption_num, -- 商品A销售清单
sum(c.a_num) as day_num, -- 商品A消费总数
sum(c.include_product_a)/count(distinct c.order_id) as buy_a_rate -- 购买商品A的消费比例
from
(
select
a.order_id as order_id,
a.a_num as a_num,
a.include_product_a as include_product_a,
b.year_value as year_value,
b.month_value as month_value,
b.day_value as day_value
from dws_order_info a
left join dw_dim_date_info b on a.date_id=b.date_id) c
group by c.day_value;-- 查询到商品A的购买数据记录
hive> select * from dwn_order_info_by_day;
OK
62      310     2.021122101E9
Time taken: 0.098 seconds, Fetched: 1 row(s)

总结

  • 以上就是数据从数据源经过ETL处理最终加载到数据仓库的整个过程。在实际业务过程中,数据规模庞大、业务逻辑复杂,需要生成大量的ETL处理任务,因此在数据仓库设计过程中,需要考虑中间层数据的通用性。在调度系统(如Airflow、Azkaban等)的调度下,这些ETL任务分批有序执行,最终生成报表等应用所需的数据

第五章 数据仓库实例相关推荐

  1. 《NodeJS开发指南》第五章微博实例开发总结

    所有文章搬运自我的个人主页:sheilasun.me <NodeJS开发指南>这本书用来NodeJS入门真是太好了,而且书的附录部分还讲到了闭包.this等JavaScript常用特性.第 ...

  2. Introduction to 3D Game Programming with DirectX 12 学习笔记之 --- 第十五章:第一人称摄像机和动态索引...

    Introduction to 3D Game Programming with DirectX 12 学习笔记之 --- 第十五章:第一人称摄像机和动态索引 原文:Introduction to 3 ...

  3. Programming Entity Framework-dbContext 学习笔记第五章

    ### Programming Entity Framework-dbContext 学习笔记 第五章 将图表添加到Context中的方式及容易出现的错误 方法 结果 警告 Add Root 图标中的 ...

  4. Linux内核探讨-- 第五章

    本文是个人分析<Linux内核设计与实现>而写的总结,欢迎转载,请注明出处: http://blog.csdn.net/dlutbrucezhang/article/details/123 ...

  5. 《Introduction to Tornado》中文翻译计划——第五章:异步Web服务

    http://www.pythoner.com/294.html 本文为<Introduction to Tornado>中文翻译,将在https://github.com/alioth3 ...

  6. mysql第五章项目二_Todo List:Node+Express 搭建服务端毗邻Mysql – 第五章(第1节)

    点击右上方红色按钮关注"web秀",让你真正秀起来 前言 万丈高楼平地起,我们的Todo List项目也是越来越结实了.Todo List的前面4章内容都是在为Client端开发, ...

  7. python 第一行输入n表示一天中有多少人买水果_Python编程:从入门到实践——【作业】——第五章作业...

    第五章作业 5-1条件测试 : 编写一系列条件测试: 将每个测试以及你对其结果的预测和实际结果都打印出来. 你编写的代码应类似于下面这样: car = ' subaru' print("Is ...

  8. 第五章 Python数据结构

    第五章 Python数据结构 本章更详细地讨论一些已经讲过的数据类型的使用,并引入一些新的类型. 5.1 列表 列表数据类型还有其它一些方法.下面是列表对象的所有方法: insert(i, x) -- ...

  9. 人工智能:第五章 计算智能(2)

    第五章计算智能(2) 教学内容:遗传算法的基本机理和求解步骤:进化策略的算法模型.进化策略和遗传算法的区别:进化编程的机理与表示和算法步骤:人工生命的起源.发展.定义和研究意义,及其研究内容和方法. ...

最新文章

  1. “饶毅举报”事件尘埃落定,裴钢表示未发现裴钢造假。网友:我有信心一年发20篇SCI...
  2. docker容器 与 系统时间同步
  3. Javascript模板引擎handlebars使用实例及技巧
  4. matlab仿真计算代码代写,matlab/simulink程序代写
  5. 背景图层和普通图层的区别_新手如何在PS中创建图层?不容错过的7种方法,你值得学习...
  6. 一些不错的C++网站[秋镇菜]
  7. 微信生态圈盈­利模式分析
  8. 多元线性回归(java实现)
  9. python函数isdisjoint方法_Python中的isdisjoint()函数
  10. (四) Docker 常用帮助命令
  11. QBXT Day 4 数学,数论
  12. 关于Windows Boot Manager、Bootmgfw.efi、Bootx64.efi、bcdboot.exe 的详解
  13. u12无线网卡linux驱动装不上,腾达(U12)USB无线网卡Linux驱动安装笔记
  14. 趣头条自媒体审核不过怎么办,趣头条伪原创工具教程
  15. 易烊千玺成为罗莱家纺品牌代言人
  16. python requests 异步调用_python - 如何使用requests_html异步获取()URL列表? - 堆栈内存溢出...
  17. 新疆伊犁山开挖破裂机液压岩石劈裂棒 大型岩石分裂棒行业推荐
  18. 利用代理IP爬取京东网站手机数据
  19. python命令行调试_Linux 黑乎乎的命令行下,如何调试 Python?
  20. 宇宙第一 IDE 霸主,换人了?

热门文章

  1. 云计算技术重塑视频会议系统市场新格局
  2. 【精】mysql行转列的7种方法及列转行
  3. Scale-Equalizing Pyramid Convolution for Object Detection 论文笔记
  4. 腾讯广告/shopee招聘
  5. 跑动大数据的笔记本配置_大数据告诉你,最完美的跑步姿势就是你这样的!
  6. Bootstrap之table列上下移动
  7. python读取数据方法
  8. Java后端 带File文件及其它参数的Post请求
  9. Mac 环境下 java 自带的 Jvisualvm 使用笔记
  10. C++:点赞(团体程序设计天梯赛)