一、问题提出
        官方一直称Hive是Hadoop数据仓库解决方案。既然是数据仓库就离不开多维、CDC、SCD这些概念,于是尝试了一把在Hive上实现SCD1和SCD2。这有两个关键点,一个是行级更新,一个是生成代理键。行级更新hive本身就是支持的,但需要一些配置,还有一些限制。具体可参考 http://blog.csdn.net/wzy0623/article/details/51483674。生成代理键在RDBMS上一般都用自增序列。Hive也有一些对自增序列的支持,本实验分别使用了窗口函数ROW_NUMBER()和hive自带的UDFRowSequence实现生成代理键。

二、软件版本
Hadoop 2.7.2
Hive 2.0.0

三、实验步骤
1. 准备初始数据文件a.txt,内容如下:

1,张三,US,CA
2,李四,US,CB
3,王五,CA,BB
4,赵六,CA,BC
5,老刘,AA,AA

2. 用ROW_NUMBER()方法实现初始装载和定期装载

(1)建立初始装载脚本init_row_number.sql,内容如下:

USE test;-- 建立过渡表
DROP TABLE IF EXISTS tbl_stg;
CREATE TABLE tbl_stg (id INT,name STRING,cty STRING,st STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';-- 建立维度表
DROP TABLE IF EXISTS tbl_dim;
CREATE TABLE tbl_dim (sk INT,id INT,name STRING,cty STRING,st STRING,version INT,effective_date DATE,expiry_date DATE)
CLUSTERED BY (id) INTO 8 BUCKETS
STORED AS ORC TBLPROPERTIES ('transactional'='true');-- 向过渡表加载初始数据
LOAD DATA LOCAL INPATH '/home/grid/BigDataDWTest/a.txt' INTO TABLE tbl_stg;-- 向维度表装载初始数据
INSERT INTO tbl_dim
SELECTROW_NUMBER() OVER (ORDER BY tbl_stg.id) + t2.sk_max,tbl_stg.*,1,CAST('1900-01-01' AS DATE),CAST('2200-01-01' AS DATE)
from tbl_stg CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;

(2)执行初始装载

hive -S -f /home/grid/BigDataDWTest/init_row_number.sql

(3)修改数据文件a.txt,内容如下:

1,张,U,C
3,王五,CA,BB
4,赵六,AC,CB
5,刘,AA,AA
6,老杨,DD,DD

说明:
1. 新增了第6条数据
2. 删除了第2条数据
3. 修改了第1条数据的name列、cty列和st列(name列按SCD2处理,cty列和st列按SCD1处理)
4. 修改了第4条数据的cty列和st列(按SCD1处理)
5. 修改了第5条数据的name列(按SCD2处理)

(4)建立定期装载脚本scd_row_number.sql,内容如下:

USE test;-- 设置日期变量
SET hivevar:pre_date = DATE_ADD(CURRENT_DATE(),-1);
SET hivevar:max_date = CAST('2200-01-01' AS DATE);-- 向过渡表加载更新后的数据
LOAD DATA LOCAL INPATH '/home/grid/BigDataDWTest/a.txt' OVERWRITE INTO TABLE tbl_stg;-- 向维度表装载更新后的数据
-- 设置已删除记录和SCD2的过期
UPDATE tbl_dim
SET expiry_date = ${hivevar:pre_date}
WHERE sk IN
(SELECT a.sk FROM (
SELECT sk,id,name FROM tbl_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN tbl_stg b ON a.id=b.id
WHERE b.id IS NULL OR a.name<>b.name);-- 处理SCD2新增行
INSERT INTO tbl_dim
SELECTROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max,t1.id,t1.name,t1.cty,t1.st,t1.version,t1.effective_date,t1.expiry_date
FROM
(
SELECTt2.id id,t2.name name,t2.cty cty,t2.st st,t1.version + 1 version,${hivevar:pre_date} effective_date,${hivevar:max_date} expiry_date
FROM tbl_dim t1 INNER JOIN tbl_stg t2
ON t1.id=t2.id AND t1.name<>t2.name AND t1.expiry_date = ${hivevar:pre_date}
LEFT JOIN tbl_dim t3 ON T1.id = t3.id AND t3.expiry_date = ${hivevar:max_date}
WHERE t3.sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;-- 处理SCD1
-- 因为hive的update还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update
-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有cty或st改变的记录,而不是仅仅更新当前版本的记录
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
SELECT a.sk,a.id,a.name,b.cty,b.st,a.version,a.effective_date,a.expiry_date FROM tbl_dim a, tbl_stg b
WHERE a.id=b.id AND (a.cty <> b.cty OR a.st <> b.st);
DELETE FROM tbl_dim WHERE sk IN (SELECT sk FROM tmp);
INSERT INTO tbl_dim SELECT * FROM tmp;-- 处理新增记录
INSERT INTO tbl_dim
SELECTROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max,t1.id,t1.name,t1.cty,t1.st,1,${hivevar:pre_date},${hivevar:max_date}
FROM
(
SELECT t1.* FROM tbl_stg t1 LEFT JOIN tbl_dim t2 ON t1.id = t2.id
WHERE t2.sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;

(5)执行定期装载

hive -S -f /home/grid/BigDataDWTest/scd_row_number.sql

查询维度表结果如图1所示。

select * from tbl_dim order by id,version;
图1

(6)再次执行定期装载,维度表的数据没有变化

hive -S -f /home/grid/BigDataDWTest/scd_row_number.sql

2. 用UDFRowSequence方法实现初始装载和定期装载
实验过程和ROW_NUMBER()方法基本一样,只是先要将hive-contrib-2.0.0.jar传到HDFS上,否则会报错。

hadoop dfs -put /home/grid/hive/lib/hive-contrib-2.0.0.jar /user

初始装载脚本init_UDFRowSequence.sql,内容如下:

USE test;ADD JAR hdfs:///user/hive-contrib-2.0.0.jar;
CREATE TEMPORARY FUNCTION row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';-- 建立过渡表
DROP TABLE IF EXISTS tbl_stg;
CREATE TABLE tbl_stg (id INT,name STRING,cty STRING,st STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';-- 建立维度表
DROP TABLE IF EXISTS tbl_dim;
CREATE TABLE tbl_dim (sk INT,id INT,name STRING,cty STRING,st STRING,version INT,effective_date DATE,expiry_date DATE)
CLUSTERED BY (id) INTO 8 BUCKETS
STORED AS ORC TBLPROPERTIES ('transactional'='true');-- 向过渡表加载初始数据
LOAD DATA LOCAL INPATH '/home/grid/BigDataDWTest/a.txt' INTO TABLE tbl_stg;-- 向维度表装载初始数据
INSERT INTO tbl_dim
SELECTt2.sk_max + row_sequence(),tbl_stg.*,1,CAST('1900-01-01' AS DATE),CAST('2200-01-01' AS DATE)
from tbl_stg CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;

定期装载脚本scd_UDFRowSequence.sql,内容如下:

USE test;ADD JAR hdfs:///user/hive-contrib-2.0.0.jar;
CREATE TEMPORARY FUNCTION row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';-- 设置日期变量
SET hivevar:pre_date = DATE_ADD(CURRENT_DATE(),-1);
SET hivevar:max_date = CAST('2200-01-01' AS DATE);-- 向过渡表加载更新后的数据
LOAD DATA LOCAL INPATH '/home/grid/BigDataDWTest/a.txt' OVERWRITE INTO TABLE tbl_stg;-- 向维度表装载更新后的数据
-- 设置已删除记录和SCD2的过期
UPDATE tbl_dim
SET expiry_date = ${hivevar:pre_date}
WHERE sk IN
(SELECT a.sk FROM (
SELECT sk,id,name FROM tbl_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN tbl_stg b ON a.id=b.id
WHERE b.id IS NULL OR a.name<>b.name);-- 处理SCD2新增行
INSERT INTO tbl_dim
SELECTt2.sk_max + row_sequence(),t1.id,t1.name,t1.cty,t1.st,t1.version,t1.effective_date,t1.expiry_date
FROM
(
SELECTt2.id id,t2.name name,t2.cty cty,t2.st st,t1.version + 1 version,${hivevar:pre_date} effective_date,${hivevar:max_date} expiry_date
FROM tbl_dim t1 INNER JOIN tbl_stg t2
ON t1.id=t2.id AND t1.name<>t2.name AND t1.expiry_date = ${hivevar:pre_date}
LEFT JOIN tbl_dim t3 ON T1.id = t3.id AND t3.expiry_date = ${hivevar:max_date}
WHERE t3.sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;-- 处理SCD1
-- 因为hive的update还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update
-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有cty或st改变的记录,而不是仅仅更新当前版本的记录
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
SELECT a.sk,a.id,a.name,b.cty,b.st,a.version,a.effective_date,a.expiry_date FROM tbl_dim a, tbl_stg b
WHERE a.id=b.id AND (a.cty <> b.cty OR a.st <> b.st);
DELETE FROM tbl_dim WHERE sk IN (SELECT sk FROM tmp);
INSERT INTO tbl_dim SELECT * FROM tmp;-- 处理新增记录
INSERT INTO tbl_dim
SELECTt2.sk_max + row_sequence(),t1.id,t1.name,t1.cty,t1.st,1,${hivevar:pre_date},${hivevar:max_date}
FROM
(
SELECT t1.* FROM tbl_stg t1 LEFT JOIN tbl_dim t2 ON t1.id = t2.id
WHERE t2.sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;

参考:
http://blog.csdn.net/wzy0623/article/details/49616643
http://www.remay.com.br/blog/hdp-2-2-how-to-create-a-surrogate-key-on-hive/
http://www.aboutechnologies.com/2016/02/hive-auto-increment-column.html

在Hive上实现SCD相关推荐

  1. 在hive上创建数据

    hive将元数据存储在metastore上,将数据存储在hdfs上:二者怎么关联呢? 其实就是:我们在hive客户端建立一个table,当该table的路径与hdfs上的存储数据的路径一致时,二者便可 ...

  2. 【Hadoop Summit Tokyo 2016】LLAP:Hive上的次秒级分析查询

    本讲义出自Yuta Imai在Hadoop Summit Tokyo 2016上的演讲,主要分享了为什么选择LLAP,并对于LLAP的相关概念进行了分享,在演讲中还介绍了Hive 2 与LLAP的架构 ...

  3. 使用Spark SQL读取Hive上的数据

    Spark SQL主要目的是使得用户可以在Spark上使用SQL,其数据源既可以是RDD,也可以是外部的数据源(比如Parquet.Hive.Json等).Spark SQL的其中一个分支就是Spar ...

  4. hive上亿级别的表关联 调优

    环境:公司决定使用宽表,将10个相关的大表进行全量关联 (1个上亿级别的表,5个上千万的表,剩下的表都不到百万的表) 花了两天的时间研究,测试 例如: a~g这几个表中,a表为上亿级别的表,5个上千万 ...

  5. spark如何正确的删除hive外部表【删除表时同时删除hdfs上的数据】?

    在spark 2.4.x和hive 3.1.x环境下,spark通过sql (drop table xxx)删除hive的外部表,只能删除hive的元数据信息,但没有删除hdfs上的数据,导致hdfs ...

  6. 总结:Hive性能优化上的一些总结

    Hive性能优化上的一些总结 注意,本文百分之九十来源于此文:Hive性能优化,很感谢作者的细心整理,其中有些部分我做了补充和追加,要是有什么写的不对的地方,请留言赐教,谢谢 前言 今天电话面试突然被 ...

  7. 【大数据处理技术】「#1」本地数据集上传到数据仓库Hive

    文章目录 实验数据集下载 下载实验数据集 建立一个用于运行本案例的目录dbtaobao 数据集的预处理 删除文件第一行记录,即字段名称 获取数据集中双11的前100000条数据 导入数据仓库 实验数据 ...

  8. 从技术上解读大数据的应用现状和开源未来

    来源:网络大数据 作者 | 韩锐. Lizy Kurian John.詹剑锋 摘要:近年来,随着大数据系统的快速发展,各式各样的开源基准测试集被开发出来,以评测和分析大数据系统并促进其技术改进.然而, ...

  9. Hive mac 客户端工具DbVisualizer配置

    1.下载连接Hive的GUI SQL工具 下载地址:www.dbvis.com/download/10- 需要jar  hive-jdbc-uber-2.4.0.0-169.jar (github.c ...

最新文章

  1. linux的共享内存,linux共享内存实际在哪里?
  2. SAP MM服务类采购申请里的总账科目的修改
  3. 视频直播中 | 5G到底有多快?现场测速,带你走进5G生活
  4. Java程序员的工资为什么一直那么高?
  5. linux 进入redis 数据库,Linux下Redis数据库的安装方法与自动启动脚本分享
  6. pil python 安装_20行Python代码给微信头像戴帽子
  7. 区块链查比特币_登图区块链课堂——比特币矿机发展史
  8. 活动执行手册_如何从一无所知到独立规划陈列手册?
  9. SoundSource 5 for Mac(音频控制工具)
  10. http 301、304状态码
  11. .Net环境下有关打印页面设置、打印机设置、打印预览对
  12. 【UE】UE4下载安装及测试demo
  13. Reeder for Mac(RSS阅读器)
  14. 规范使用地图,从规范制图开始
  15. 数据分析全国薪酬分布状况
  16. 【5G】5GC网元服务及对应消息
  17. JavaWeb登陆界面
  18. Gentoo Linux+KDE Plasma桌面安装教程
  19. RPG Maker MV 路标,随机移动,随机对话
  20. win10需要修复计算机,Win10开机蓝屏,显示“你的电脑/设备需要修复”怎么办?答案来了...

热门文章

  1. 【Vscode】预览md文件
  2. CV学习笔记-立体视觉(点云模型、Spin image、三维重建)
  3. 一篇让我思考良久的文章
  4. 【LOJ#570】Misaka Network 与任务
  5. Hive-day07_课下练习
  6. 访问学校图书馆资源和内网服务器
  7. Redis学习(一)之redis简介
  8. BatchNormalization
  9. java 关闭jpanel_从JFrame [Java]关闭JPanel窗口
  10. 开源给Windows当家:15款管理工具任选