1 数仓分层概念

1.1 为什么要分层

1、把复杂问题简单化
将一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解。
2、清晰数据结构
每一个数据分层都有它的作用域,这样我们在使用表的时候能更方便地定位和理解。
便于维护数据的准确性,当数据出现问题之后,可以不用修复所有的数据,只需要从有问题的步骤开始修复。
3、减少重复开发
规范数据分层,通过的中间层数据,能够减少极大的重复计算,增加一次计算结果的复用性。
4、隔离原始数据
不论是数据的异常还是数据的敏感性,使真实数据与统计数据解耦开。

1.2 分层结构图

1、ODS层(原始数据层)
原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。
2、DWD层(明细数据层)
结构和粒度与ODS层保持一致,对ODS层数据进行清洗(去除空值,脏数据,超过极限范围的数据),也有公司叫DWI。
3、DWS层(服务数据层)
以DWD为基础,进行轻度汇总。一般聚集到以用户当日,设备当日,商家当日,商品当日等等的粒度。
在这层通常会有以某一个维度为线索,组成跨主题的宽表,比如,一个用户的当日的签到数、收藏数、评论数、抽奖数、订阅数、点赞数、浏览商品数、添加购物车数、下单数、支付数、退款数、点击广告数组成的多列表。
4、 ADS层(数据应用层)
数据应用层,也有公司或书把这层命名为APP层、DAL层等。
面向实际的数据需求,以DWD或者DWS层的数据为基础,组成的各种统计报表。
统计结果最终同步到RDS以供BI或应用系统查询使用。

1.3 关于区分数据集市与数据仓库

数据集市(Date Market),早在数据仓库诞生之初,一同并存的就有数据集市的概念。
现在市面上的公司和书籍都对数据集市有不同的概念。
狭义上来讲数据集市,可以理解为数据仓库中为用户提供数据支撑的应用层,比如咱们前文说的ADS层。
广义上,数据集市,所有以主题划分的数据仓库中可供查阅的都可以成为数据集市,包括DWD,DWS,ADS层,甚至包括从Hadoop中同步到RDS的数据都可以成为数据集市。
比如订单主题,我可以提供使用者,从明细,聚合统计,比率分析等全部数据,提供给某个部门查询。那么除了订单还有用户、商品、供应商等等主题分别可以供不同的人员部门使用,这都可以称之为数据集市。

2 数仓搭建环境准备

2.1 Hive&MySQL安装

hadoop101 hadoop102 hadoop103
Hive Hive
Mysql Mysql

2.1.1 Hive&MySQL安装

详见安装博文

2.1.2 修改 hive-site.xml

1)关闭元数据检查

[hadoop@hadoop101 conf]$ pwd
/opt/modules/hive/conf
[hadoop@hadoop101 conf]$ vim hive-site.xml
增加如下配置:
<property><name>hive.metastore.schema.verification</name><value>false</value>
</property>

2.2 Hive运行引擎Tez
Tez是一个Hive的运行引擎,性能优于MR。为什么优于MR呢?见下图:

用Hive直接编写MR程序,假设有四个有依赖关系的MR作业,上图中,绿色是Rgmallce Task,云状表示写屏蔽,需要将中间结果持久化写到HDFS。
Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升DAG作业的性能。

2.2.1 安装包准备

1)下载tez的依赖包:http://tez.apache.org
2)拷贝apache-tez-0.9.1-bin.tar.gz到hadoop101的/opt/modules目录

[hadoop@hadoop101 modules]$ ls
apache-tez-0.9.1-bin.tar.gz

3)解压缩apache-tez-0.9.1-bin.tar.gz

[hadoop@hadoop101 modules]$ tar -zxvf apache-tez-0.9.1-bin.tar.gz

4)修改名称

[hadoop@hadoop101 modules]$ mv apache-tez-0.9.1-bin/ tez-0.9.1

2.2.2 配置Tez环境变量

1)进入到Hive的配置目录:/opt/modules/hive/conf

[hadoop@hadoop101 conf]$ pwd
/opt/modules/hive/conf

2)在hive-env.sh文件中添加tez环境变量配置和依赖包环境变量配置

[hadoop@hadoop101 conf]$ vim hive-env.sh

添加如下配置

# Set HADOOP_HOME to point to a specific hadoop install directory
export HADOOP_HOME=/opt/modules/hadoop-2.7.2
# Hive Configuration Directory can be controlled by:
export HIVE_CONF_DIR=/opt/modules/hive/conf
# Folder containing extra libraries required for hive compilation/execution can be controlled by:
export TEZ_HOME=/opt/modules/**tez-0.9.1**    #是你的tez的解压目录
export TEZ_JARS=""
for jar in `ls $TEZ_HOME |grep jar`; doexport TEZ_JARS=$TEZ_JARS:$TEZ_HOME/$jar
done
for jar in `ls $TEZ_HOME/lib`; doexport TEZ_JARS=$TEZ_JARS:$TEZ_HOME/lib/$jar
done
export HIVE_AUX_JARS_PATH=/opt/modules/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar$TEZ_JARS

2.2.3 配置Tez

在Hive 的/opt/modules/hive/conf下面创建一个tez-site.xml文件

[hadoop@hadoop101 conf]$ pwd
/opt/modules/hive/conf
[hadoop@hadoop101 conf]$ vim tez-site.xml

添加如下内容

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property><name>tez.lib.uris</name><value>${fs.defaultFS}/tez/tez-0.9.1,${fs.defaultFS}/tez/tez-0.9.1/lib</value>
</property>
<property><name>tez.lib.uris.classpath</name><value>${fs.defaultFS}/tez/tez-0.9.1,${fs.defaultFS}/tez/tez-0.9.1/lib</value>
</property>
<property><name>tez.use.cluster.hadoop-libs</name><value>true</value>
</property>
<property><name>tez.history.logging.service.class</name<value>org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService</value>
</property>
</configuration>

2.2.4 上传Tez到集群

1)在hive-site.xml文件中添加如下配置

<property><name>hive.execution.engine</name><value>tez</value>
</property>

2)将/opt/modules/tez-0.9.1上传到HDFS的/tez路径

[hadoop@hadoop101 conf]$ hadoop fs -mkdir /tez
[hadoop@hadoop101 conf]$ hadoop fs -put /opt/modules/tez-0.9.1/ /tez
[hadoop@hadoop101 conf]$ hadoop fs -ls /tez
/tez/tez-0.9.1

2.2.5 测试

1)启动hive

[hadoop@hadoop101 hive]$ bin/hive

2)创建lzo表。

hive (default)> create table student(
id int,
name string);

3)向表中插入数据。

hive (default)> insert into student values(1,"zhangsan");

4)如果没有报错就表示成功了

hive (default)> select * from student;
1       zhangsan

2.2.6 小结

1)运行Tez时检查到用过多内存而被NodeManager杀死进程问题:

Caused by: org.apache.tez.dag.api.SessionNotRunning: TezSession has already shutdown. Application application_1546781144082_0005 failed 2 times due to AM Container for appattempt_1546781144082_0005_000002 exited with  exitCode: -103
For more detailed output, check application tracking page:http://hadoop102:8088/cluster/app/application_1546781144082_0005Then, click on links to logs of each attempt.
Diagnostics: Container [pid=11116,containerID=container_1546781144082_0005_02_000001] is running beyond virtual memory limits. Current usage: 216.3 MB of 1 GB physical memory used; 2.6 GB of 2.1 GB virtual memory used. Killing container.

这种问题是从机上运行的Container试图使用过多的内存,而被NodeManager kill掉了。

[摘录] The NodeManager is killing your container. It sounds like you are trying to use hadoop streaming which is running as a child process of the map-reduce task. The NodeManager monitors the entire process tree of the task and if it eats up more memory than the maximum set in mapreduce.map.memory.mb or mapreduce.reduce.memory.mb respectively, we would expect the Nodemanager to kill the task, otherwise your task is stealing memory belonging to other containers, which you don't want.

解决方法:
方案一:mapred-site.xml中设置map和reduce任务的内存配置如下:(value中实际配置的内存需要根据自己机器内存大小及应用情况进行修改)

<property><name>mapreduce.map.memory.mb</name><value>1536</value>
</property>
<property><name>mapreduce.map.java.opts</name><value>-Xmx1024M</value>
</property>
<property><name>mapreduce.reduce.memory.mb</name><value>3072</value>
</property>
<property><name>mapreduce.reduce.java.opts</name><value>-Xmx2560M</value>
</property>

方案二:或者是关掉虚拟内存检查。我们选这个,修改yarn-site.xml

<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

3 数仓搭建之ODS & DWD

3.1 创建数据库

1)创建calllogs数据库

hive (default)> create database calllogs;

说明:如果数据库存在且有数据,需要强制删除时执行:

drop database calllogs cascade;

2)使用calllogs数据库

hive (default)> use calllogs;

3.2 ODS层

原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。

3.2.1 创建原始话单表ods_calllogs_cdr

1)创建输入数据是lzo输出是text,支持json解析的分区表

hive (calllogs)>
drop table if exists ods_calllogs_cdr;
CREATE EXTERNAL TABLE  `ods_calllogs_cdr`(`line` string)
PARTITIONED BY (`dt` string)
STORED ASINPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/calllogs/ods/ods_calllogs_cdr';

说明Hive的LZO压缩:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LZO
2)加载数据

hive (calllogs)>
load data inpath '/origin_data/calllogs/records/topic-cdr/2021-12-07' into table calllogs.ods_calllogs_cdr partition(dt='2021-12-07');hive (calllogs)>
load data inpath '/origin_data/calllogs/records/topic-cdr/2021-12-08' into table calllogs.ods_calllogs_cdr partition(dt='2021-12-08');

注意:时间格式都配置成YYYY-MM-DD格式,这是hive默认支持的时间格式
3)查看是否加载成功

hive (calllogs)> select * from ods_calllogs_cdr where dt='2021-12-07' limit 5;
hive (calllogs)> select * from ods_calllogs_cdr where dt='2021-12-08' limit 5;

3.2.2 创建原始话单表ods_calllogs_volte

1)创建输入数据是lzo输出是text,支持json解析的分区表

hive (calllogs)>
drop table if exists ods_calllogs_volte;
CREATE EXTERNAL TABLE  `ods_calllogs_volte`(`line` string)
PARTITIONED BY (`dt` string)
STORED ASINPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/calllogs/ods/ods_calllogs_volte';

2)加载数据

hive (calllogs)>
load data inpath '/origin_data/calllogs/records/topic-volte/2021-12-07' into table calllogs.ods_calllogs_volte partition(dt='2021-12-07');hive (calllogs)>
load data inpath '/origin_data/calllogs/records/topic-volte/2021-12-08' into table calllogs.ods_calllogs_volte partition(dt='2021-12-08');

注意:时间格式都配置成YYYY-MM-DD格式,这是hive默认支持的时间格式
3)查看是否加载成功

hive (calllogs)> select * from ods_calllogs_volte where dt='2021-12-07' limit 5;
hive (calllogs)> select * from ods_calllogs_volte where dt='2021-12-08' limit 5;

3.2.3 ODS层加载数据脚本

1)在hadoop101的/home/hadoop/bin目录下创建脚本

[hadoop@hadoop101 bin]$ vim calllogs_ods.sh

在脚本中编写如下内容

#!/bin/bash# 定义变量方便修改
DB=calllogs
hive=/opt/modules/hive/bin/hive# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n $1 ] ;thencalllogs_date=$1
else calllogs_date=`date  -d "-1 day"  +%F`
fi echo "===话单日期为 $calllogs_date==="
$hive -e "load data inpath '/origin_data/calllogs/records/topic-volte/$calllogs_date' into table "$DB".ods_calllogs_volte partition(dt='$calllogs_date')"
$hive -e "load data inpath '/origin_data/calllogs/records/topic-cdr/$calllogs_date' into table "$DB".ods_calllogs_cdr partition(dt='$calllogs_date')"

说明1:
[ -n 变量值 ] 判断变量的值,是否为空
-- 变量的值,非空,返回true
-- 变量的值,为空,返回false
说明2:
查看date命令的使用

[hadoop@hadoop101 ~]$ date --help

2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x calllogs_ods.sh

3)脚本使用

[hadoop@hadoop101 modules]$ calllogs_ods.sh 2021-12-07

4)查看导入数据

hive (calllogs)>
select * from ods_calllogs_volte where dt='2021-12-07' limit 5;
select * from ods_calllogs_cdr where dt='2021-12-07' limit 5;

5)脚本执行时间
企业开发中一般在每日凌晨30分~1点
6)扩展
删除表分区:alter table table_name drop partition (partition_name='分区名')

hive (calllogs)> alter table ods_calllogs_volte drop partition(dt='2021-12-07');

3.3 DWD层数据解析

对ODS层数据进行清洗(去除空值,脏数据,超过极限范围的数据,行式存储改为列存储,改压缩格式)。

3.3.1 创建基础明细表

明细表用于存储ODS层原始表转换过来的明细数据。
1)创建volte话单明细表

hive (calllogs)>
drop table if exists dwd_calllogs_volte;
CREATE EXTERNAL TABLE `dwd_calllogs_volte`(
`sys_id` string,
`service_name` string,
`home_province_code` string,
`visit_province_code` string,
`channel_code` string,
`service_code` string,
`cdr_gen_time` string,
`duration` string,
`record_type` string,
`imsi` string,
`msisdn` string,
`dataUpLinkVolume` string,
`dataDownLinkVolume` string,
`charge` string,
`resultCode` string,
`server_time` string)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/calllogs/dwd/dwd_calllogs_volte/';

2)创建cdr话单明细表

hive (calllogs)>
drop table if exists dwd_calllogs_cdr;
CREATE EXTERNAL TABLE `dwd_calllogs_cdr`(
`sys_id` string,
`service_name` string,
`home_province_code` string,
`visit_province_code` string,
`channel_code` string,
`service_code` string,
`cdr_gen_time` string,
`duration` string,
`record_type` string,
`imsi` string,
`msisdn` string,
`dataUpLinkVolume` string,
`dataDownLinkVolume` string,
`charge` string,
`resultCode` string,
`server_time` string)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/calllogs/dwd/dwd_calllogs_cdr/';

3.3.2 自定义UDF函数(解析json串)

1)创建一个maven模块:hivefunction
2)创建包名:com.cmcc.jackyan.hive.udf
3)在pom.xml文件中添加如下内容

<properties><project.build.sourceEncoding>UTF8</project.build.sourceEncoding><hive.version>1.2.1</hive.version></properties><dependencies><!--添加hive依赖--><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><dependency><groupId>com.cmcc.jackyan</groupId><artifactId>common</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies><repositories><repository><id>spring-plugin</id><url>https://repo.spring.io/plugins-release/</url></repository><repository><id>nexus-aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public</url></repository><repository><id>aliyunmaven</id><url>https://maven.aliyun.com/repository/spring-plugin</url></repository></repositories><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

编写自定义UDF

package com.cmcc.jackyan.hive.udf;import com.alibaba.fastjson.JSONObject;
import com.cmcc.jackyan.common.bean.GprsCdr;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;/*** 将json数据转换成一个一个的字段*/
public class CalllogsUDF extends UDF {// 1638860533569|{"cdrGenTime":"20211207150213282","channelCode":"OJjm8","charge":"51.26","dataDownLinkVolume":"1933","dataUpLinkVolume":"1774","duration":"897","homeProvinceCode":"991","imsi":"460009343700702","msisdn":"15025854116","recordType":"gsm","resultCode":"000000","serviceCode":"457261306873761","serviceName":"WXP1IUnphh","sysId":"2mrNO","visitProvinceCode":"230"}public static String evaluate(String line) {StringBuilder sb = new StringBuilder();String[] logContents = line.split("\\|");// 合法性校验if (logContents.length != 2 || StringUtils.isBlank(logContents[1])) {return "";}GprsCdr gprsCdr = JSONObject.parseObject(logContents[1], GprsCdr.class);sb.append(gprsCdr.toString() + logContents[0]);return sb.toString();}public static void main(String[] args) {String line = "1638860533569|{\"cdrGenTime\":\"20211207150213282\",\"channelCode\":\"OJjm8\",\"charge\":\"51.26\",\"dataDownLinkVolume\":\"1933\",\"dataUpLinkVolume\":\"1774\",\"duration\":\"897\",\"homeProvinceCode\":\"991\",\"imsi\":\"460009343700702\",\"msisdn\":\"15025854116\",\"recordType\":\"gsm\",\"resultCode\":\"000000\",\"serviceCode\":\"457261306873761\",\"serviceName\":\"WXP1IUnphh\",\"sysId\":\"2mrNO\",\"visitProvinceCode\":\"230\"}";System.out.println(evaluate(line));}
}

注意:使用main函数主要用于测试。
2)打包

3)将hivefunction-1.0-SNAPSHOT.jar上传到hadoop101的/opt/modules/hive/lib
4)将jar包添加到hive的classpath

hive (calllogs)>
add jar /opt/modules/hive/lib/hivefunction-1.0-SNAPSHOT.jar;
add jar /opt/modules/hive/lib/common-1.0-SNAPSHOT.jar;
add jar /opt/modules/hive/lib/fastjson-1.2.75.jar;

5)创建临时函数与开发好的UDF函数关联

hive (calllogs)>
create temporary function parse_calllogs as 'com.cmcc.jackyan.hive.udf.CalllogsUDF';

6)查看自定义UDF函数

hive (calllogs)> show functions like '*calllogs*';

3.3.4 解析volte话单

set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table dwd_calllogs_volte
PARTITION (dt)
select
sys_id,
service_name,
home_province_code,
visit_province_code,
channel_code,
service_code,
cdr_gen_time,
duration,
record_type ,
imsi,
msisdn,
dataUpLinkVolume,
dataDownLinkVolume,
charge,
resultCode,
server_time,
dt  from
(
select
split(parse_calllogs(line),'\t')[0]   as sys_id,
split(parse_calllogs(line),'\t')[1]   as service_name,
split(parse_calllogs(line),'\t')[2]   as home_province_code,
split(parse_calllogs(line),'\t')[3]   as visit_province_code,
split(parse_calllogs(line),'\t')[4]   as channel_code,
split(parse_calllogs(line),'\t')[5]   as service_code,
split(parse_calllogs(line),'\t')[6]   as cdr_gen_time,
split(parse_calllogs(line),'\t')[7]   as duration,
split(parse_calllogs(line),'\t')[8]   as record_type,
split(parse_calllogs(line),'\t')[9]   as imsi,
split(parse_calllogs(line),'\t')[10]   as msisdn,
split(parse_calllogs(line),'\t')[11]   as dataUpLinkVolume,
split(parse_calllogs(line),'\t')[12]   as dataDownLinkVolume,
split(parse_calllogs(line),'\t')[13]   as charge,
split(parse_calllogs(line),'\t')[14]   as resultCode,
split(parse_calllogs(line),'\t')[15]   as server_time,
dt
from ods_calllogs_volte where dt='2021-12-07' and parse_calllogs(line)<>''
) volte;

3.3.5 解析cdr话单

set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table dwd_calllogs_cdr
PARTITION (dt)
select
sys_id,
service_name,
home_province_code,
visit_province_code,
channel_code,
service_code,
cdr_gen_time,
duration,
record_type ,
imsi,
msisdn,
dataUpLinkVolume,
dataDownLinkVolume,
charge,
resultCode,
server_time,
dt  from
(
select
split(parse_calllogs(line),'\t')[0]   as sys_id,
split(parse_calllogs(line),'\t')[1]   as service_name,
split(parse_calllogs(line),'\t')[2]   as home_province_code,
split(parse_calllogs(line),'\t')[3]   as visit_province_code,
split(parse_calllogs(line),'\t')[4]   as channel_code,
split(parse_calllogs(line),'\t')[5]   as service_code,
split(parse_calllogs(line),'\t')[6]   as cdr_gen_time,
split(parse_calllogs(line),'\t')[7]   as duration,
split(parse_calllogs(line),'\t')[8]   as record_type,
split(parse_calllogs(line),'\t')[9]   as imsi,
split(parse_calllogs(line),'\t')[10]   as msisdn,
split(parse_calllogs(line),'\t')[11]   as dataUpLinkVolume,
split(parse_calllogs(line),'\t')[12]   as dataDownLinkVolume,
split(parse_calllogs(line),'\t')[13]   as charge,
split(parse_calllogs(line),'\t')[14]   as resultCode,
split(parse_calllogs(line),'\t')[15]   as server_time,
dt
from ods_calllogs_cdr where dt='2021-12-07' and parse_calllogs(line)<>''
) cdr;

3.3.6 DWD层数据解析脚本

1)在hadoop101的/home/hadoop/bin目录下创建脚本

[hadoop@hadoop101 bin]$ vim calllogs_dwd.sh

在脚本中编写如下内容

#!/bin/bash# 定义变量方便修改
DB=calllogs
hive=/opt/modules/hive/bin/hive# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n $1 ] ;thencalllogs_date=$1
else calllogs_date=`date  -d "-1 day"  +%F`
fi sql="add jar /opt/modules/hive/lib/hivefunction-1.0-SNAPSHOT.jar;add jar /opt/modules/hive/lib/common-1.0-SNAPSHOT.jar;add jar /opt/modules/hive/lib/fastjson-1.2.75.jar;create temporary function parse_calllogs as 'com.cmcc.jackyan.hive.udf.CalllogsUDF';set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table "$DB".dwd_calllogs_volte  PARTITION (dt)selectsys_id,service_name,home_province_code,visit_province_code,channel_code,service_code,cdr_gen_time,duration,record_type ,imsi,msisdn,dataUpLinkVolume,dataDownLinkVolume,charge,resultCode,server_time,dt  from(selectsplit(parse_calllogs(line),'\t')[0]   as sys_id,split(parse_calllogs(line),'\t')[1]   as service_name,split(parse_calllogs(line),'\t')[2]   as home_province_code,split(parse_calllogs(line),'\t')[3]   as visit_province_code,split(parse_calllogs(line),'\t')[4]   as channel_code,split(parse_calllogs(line),'\t')[5]   as service_code,split(parse_calllogs(line),'\t')[6]   as cdr_gen_time,split(parse_calllogs(line),'\t')[7]   as duration,split(parse_calllogs(line),'\t')[8]   as record_type,split(parse_calllogs(line),'\t')[9]   as imsi,split(parse_calllogs(line),'\t')[10]   as msisdn,split(parse_calllogs(line),'\t')[11]   as dataUpLinkVolume,split(parse_calllogs(line),'\t')[12]   as dataDownLinkVolume,split(parse_calllogs(line),'\t')[13]   as charge,split(parse_calllogs(line),'\t')[14]   as resultCode,split(parse_calllogs(line),'\t')[15]   as server_time,dt from "$DB".ods_calllogs_volte where dt='$calllogs_date' and parse_calllogs(line)<>'') volte;insert overwrite table "$DB".dwd_calllogs_cdrPARTITION (dt)selectsys_id,service_name,home_province_code,visit_province_code,channel_code,service_code,cdr_gen_time,duration,record_type ,imsi,msisdn,dataUpLinkVolume,dataDownLinkVolume,charge,resultCode,server_time,dt  from(selectsplit(parse_calllogs(line),'\t')[0]   as sys_id,split(parse_calllogs(line),'\t')[1]   as service_name,split(parse_calllogs(line),'\t')[2]   as home_province_code,split(parse_calllogs(line),'\t')[3]   as visit_province_code,split(parse_calllogs(line),'\t')[4]   as channel_code,split(parse_calllogs(line),'\t')[5]   as service_code,split(parse_calllogs(line),'\t')[6]   as cdr_gen_time,split(parse_calllogs(line),'\t')[7]   as duration,split(parse_calllogs(line),'\t')[8]   as record_type,split(parse_calllogs(line),'\t')[9]   as imsi,split(parse_calllogs(line),'\t')[10]   as msisdn,split(parse_calllogs(line),'\t')[11]   as dataUpLinkVolume,split(parse_calllogs(line),'\t')[12]   as dataDownLinkVolume,split(parse_calllogs(line),'\t')[13]   as charge,split(parse_calllogs(line),'\t')[14]   as resultCode,split(parse_calllogs(line),'\t')[15]   as server_time,dt from "$DB".ods_calllogs_cdr where dt='$calllogs_date' and parse_calllogs(line)<>'') cdr;
"$hive -e "$sql"

2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x calllogs_dwd.sh

3)脚本使用

[hadoop@hadoop101 modules]$ calllogs_dwd.sh 2021-12-08

4)查询导入结果

hive (calllogs)>
select * from dwd_calllogs_volte where dt='2021-12-08' limit 5;
select * from dwd_calllogs_cdr where dt='2021-12-08' limit 5;

5)脚本执行时间
企业开发中一般在每日凌晨30分~1点

3.4 DWD层

在dwd_calllogs_cdr表中包含了多种类型的话单,需要每类话单都由各自的明细数据表

3.4.1 gsm详细表

1)建表语句

hive (calllogs)>
drop table if exists dwd_calllogs_sms;
CREATE EXTERNAL TABLE `dwd_calllogs_sms`(
sys_id string,
service_name string,
home_province_code string,
visit_province_code string,
channel_code string,
service_code string,
cdr_gen_time string,
duration string,
record_type  string,
imsi string,
msisdn string,
dataUpLinkVolume string,
dataDownLinkVolume string,
charge string,
resultCode string,
server_time string
)
PARTITIONED BY (dt string)
location '/warehouse/calllogs/dwd/dwd_calllogs_sms/';

2)导入数据

hive (calllogs)>
set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table dwd_calllogs_sms
PARTITION (dt)
select
sys_id,
service_name,
home_province_code,
visit_province_code,
channel_code,
service_code,
cdr_gen_time,
duration,
record_type,
imsi,
msisdn,
dataUpLinkVolume,
dataDownLinkVolume,
charge,
resultCode,
server_time,
dt
from dwd_calllogs_cdr
where dt='2021-12-07' and record_type='sms';

3)测试

hive (calllogs)> select * from dwd_calllogs_sms limit 5;

3.4.2 mms详细表

1)建表语句

hive (calllogs)>
drop table if exists dwd_calllogs_mms;
CREATE EXTERNAL TABLE `dwd_calllogs_mms`(
sys_id string,
service_name string,
home_province_code string,
visit_province_code string,
channel_code string,
service_code string,
cdr_gen_time string,
duration string,
record_type  string,
imsi string,
msisdn string,
dataUpLinkVolume string,
dataDownLinkVolume string,
charge string,
resultCode string,
server_time string
)
PARTITIONED BY (dt string)
location '/warehouse/calllogs/dwd/dwd_calllogs_mms/';

2)导入数据

hive (calllogs)>
set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table dwd_calllogs_mms
PARTITION (dt)
select
sys_id,
service_name,
home_province_code,
visit_province_code,
channel_code,
service_code,
cdr_gen_time,
duration,
record_type,
imsi,
msisdn,
dataUpLinkVolume,
dataDownLinkVolume,
charge,
resultCode,
server_time,
dt
from dwd_calllogs_cdr
where dt='2021-12-07' and record_type='mms';

3)测试

hive (calllogs)> select * from dwd_calllogs_mms limit 5;

3.4.3 gsm详细表

1)建表语句

hive (calllogs)>
drop table if exists dwd_calllogs_gsm;
CREATE EXTERNAL TABLE `dwd_calllogs_gsm`(
sys_id string,
service_name string,
home_province_code string,
visit_province_code string,
channel_code string,
service_code string,
cdr_gen_time string,
duration string,
record_type  string,
imsi string,
msisdn string,
dataUpLinkVolume string,
dataDownLinkVolume string,
charge string,
resultCode string,
server_time string
)
PARTITIONED BY (dt string)
location '/warehouse/calllogs/dwd/dwd_calllogs_gsm/';

2)导入数据

hive (calllogs)>
set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table dwd_calllogs_gsm
PARTITION (dt)
select
sys_id,
service_name,
home_province_code,
visit_province_code,
channel_code,
service_code,
cdr_gen_time,
duration,
record_type,
imsi,
msisdn,
dataUpLinkVolume,
dataDownLinkVolume,
charge,
resultCode,
server_time,
dt
from dwd_calllogs_cdr
where dt='2021-12-07' and record_type='gsm';

3)测试

hive (calllogs)> select * from dwd_calllogs_gsm limit 5;

3.4.4 gprs详细表

1)建表语句

hive (calllogs)>
drop table if exists dwd_calllogs_gprs;
CREATE EXTERNAL TABLE `dwd_calllogs_gprs`(
sys_id string,
service_name string,
home_province_code string,
visit_province_code string,
channel_code string,
service_code string,
cdr_gen_time string,
duration string,
record_type  string,
imsi string,
msisdn string,
dataUpLinkVolume string,
dataDownLinkVolume string,
charge string,
resultCode string,
server_time string
)
PARTITIONED BY (dt string)
location '/warehouse/calllogs/dwd/dwd_calllogs_gprs/';

2)导入数据

hive (calllogs)>
set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table dwd_calllogs_gprs
PARTITION (dt)
select
sys_id,
service_name,
home_province_code,
visit_province_code,
channel_code,
service_code,
cdr_gen_time,
duration,
record_type,
imsi,
msisdn,
dataUpLinkVolume,
dataDownLinkVolume,
charge,
resultCode,
server_time,
dt
from dwd_calllogs_cdr
where dt='2021-12-07' and record_type='gprs';

3)测试

hive (calllogs)> select * from dwd_calllogs_gprs limit 5;

3.4.5 形成cdr高表

将volte话单一并存入dwd_calllogs_cdr表中,形成话单高表
1)导入数据

hive (calllogs)>
set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table dwd_calllogs_cdr
PARTITION (dt)
select
sys_id,
service_name,
home_province_code,
visit_province_code,
channel_code,
service_code,
cdr_gen_time,
duration,
record_type,
imsi,
msisdn,
dataUpLinkVolume,
dataDownLinkVolume,
charge,
resultCode,
server_time,
dt
from dwd_calllogs_volte
where dt='2021-12-07' and record_type='volte';

2)测试

hive (calllogs)> select * from dwd_calllogs_cdr where record_type='volte' limit 5;

3.4.6 DWD层加载明细数据脚本
1)在hadoop101的/home/hadoop/bin目录下创建脚本
[hadoop@hadoop101 bin]$ vim calllogs_dwd_detail.sh
在脚本中编写如下内容

#!/bin/bash# 定义变量方便修改
DB=calllogs
hive=/opt/modules/hive/bin/hive# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n $1 ] ;thencalllogs_date=$1
else calllogs_date=`date  -d "-1 day"  +%F`
fi
sql="set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table "$DB".dwd_calllogs_sms
PARTITION (dt)
select
sys_id,
service_name,
home_province_code,
visit_province_code,
channel_code,
service_code,
cdr_gen_time,
duration,
record_type,
imsi,
msisdn,
dataUpLinkVolume,
dataDownLinkVolume,
charge,
resultCode,
server_time,
dt
from "$DB".dwd_calllogs_cdr
where dt='$calllogs_date' and record_type='sms';insert overwrite table "$DB".dwd_calllogs_mms
PARTITION (dt)
select
sys_id,
service_name,
home_province_code,
visit_province_code,
channel_code,
service_code,
cdr_gen_time,
duration,
record_type,
imsi,
msisdn,
dataUpLinkVolume,
dataDownLinkVolume,
charge,
resultCode,
server_time,
dt
from "$DB".dwd_calllogs_cdr
where dt='$calllogs_date' and record_type='mms';insert overwrite table "$DB".dwd_calllogs_gsm
PARTITION (dt)
select
sys_id,
service_name,
home_province_code,
visit_province_code,
channel_code,
service_code,
cdr_gen_time,
duration,
record_type,
imsi,
msisdn,
dataUpLinkVolume,
dataDownLinkVolume,
charge,
resultCode,
server_time,
dt
from "$DB".dwd_calllogs_cdr
where dt='$calllogs_date' and record_type='gsm';insert overwrite table "$DB".dwd_calllogs_gprs
PARTITION (dt)
select
sys_id,
service_name,
home_province_code,
visit_province_code,
channel_code,
service_code,
cdr_gen_time,
duration,
record_type,
imsi,
msisdn,
dataUpLinkVolume,
dataDownLinkVolume,
charge,
resultCode,
server_time,
dt
from "$DB".dwd_calllogs_cdr
where dt='$calllogs_date' and record_type='gprs';insert overwrite table "$DB".dwd_calllogs_cdr
PARTITION (dt)
select
sys_id,
service_name,
home_province_code,
visit_province_code,
channel_code,
service_code,
cdr_gen_time,
duration,
record_type,
imsi,
msisdn,
dataUpLinkVolume,
dataDownLinkVolume,
charge,
resultCode,
server_time,
dt
from "$DB".dwd_calllogs_volte
where dt='$calllogs_date' and record_type='volte';
"$hive -e "$sql"

2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x calllogs_dwd_detail.sh

3)脚本使用

[hadoop@hadoop101 modules]$ calllogs_dwd_detail.sh 2021-12-08

4)查询导入结果

hive (calllogs)>
select * from dwd_calllogs_gsm where dt='2021-12-08' limit 5;
select * from dwd_calllogs_gprs where dt='2021-12-08' limit 5;

5)脚本执行时间
企业开发中一般在每日凌晨30分~1点

话单数据仓库搭建(2)- 数据仓库ODS及DWD层相关推荐

  1. 话单数据仓库搭建(3)- 数据仓库DWS及ADS层

    1 活跃用户 1.1 DWS层 目标:统计当日.当周.当月活跃用户 活跃用户指的是在统计周期内由过通话记录的用户 1.1.1 每日活跃用户明细 1)建表语句 hive (calllogs)> d ...

  2. 数仓建设 | ODS、DWD、DWM等理论实战(好文收藏)

    本文目录: 一.数据流向 二.应用示例 三.何为数仓DW 四.为何要分层 五.数据分层 六.数据集市 七.问题总结 导读 数仓在建设过程中,对数据的组织管理上,不仅要根据业务进行纵向的主题域划分,还需 ...

  3. 基础进阶ODS、DWD、DWM等模型分层与项目实战

    来源:大数据范式 全文导读 数仓在建设过程中,对数据的组织管理上,不仅要根据业务进行纵向的主题域划分,还需要横向的数仓分层规范.本文作者围绕企业数仓分层展开分析,希望对你有帮助. 从事数仓相关工作的人 ...

  4. 6.数据仓库搭建之数据仓库设计

    数据仓库搭建之数据仓库设计 1.数据仓库的分层规划 本项目的分层规划如下图所示: 对于原始数据层(ODS):该层我们存放的是未经处理的原始数据,结构上与源系统保持一致,这是数据仓库的数据准备区. 对于 ...

  5. 数据仓库ods层是啥意思_数据仓库和数据分层

    数据仓库,Data Warehouse,可简写为DW或DWH.是面向主题的.集成的(非简单的数据堆积).相对稳定的.反应历史变化的数据集合,数仓中的数据是有组织有结构的存储数据集合,用于对管理决策过程 ...

  6. 数据仓库——ODS/stg层数据漂移问题

    数据仓库--ODS/stg层数据漂移问题 数据漂移是ODS数据的一个顽疾,通常是指ODS表的同一个业务日期数据中包含前一天或后一天凌晨附近的数据或者丢失当天变更数据. 数据漂移的处理方式: 1)多获取 ...

  7. 数据仓库ods层是啥意思_一文读懂大数据仓库建设

    文章标签: 数据仓库ods层是啥意思 版权 从传统数仓到大数据平台,MPP数据集市,Hadoop集群,还有混合架构数仓,一直在不断演进,但是万变不离其宗,大框架和方法论终归是那一套.所以本文就来分享数 ...

  8. 数据仓库ods层是啥意思_数据仓库中的ODS、EDW、和DM层是什么?(系列3)

    在第一节内容,提到了数据仓库和数据库的区别,数据仓库作主要面向分析场景的数据库,又进行了"分层"的设计,因此出现了ods层.edw.dm层的设计(也有的公司会使用超过3层的设计模式 ...

  9. 对数据仓库ODS DW DM的理解

    今天看了一些专业的解释,还是对ODS.DW和DM认识不深刻,下班后花时间分别查了查它们的概念. ODS--操作性数据 DW--数据仓库 DM--数据集市 1.数据中心整体架构 数据中心整体架构 数据仓 ...

  10. 【转】数据仓库ODS、DW和DM概念区分

    今天看了一些专业的解释,还是对ODS.DW和DM认识不深刻,下班后花时间分别查了查它们的概念. ODS--操作性数据 DW--数据仓库 DM--数据集市 1.数据中心整体架构 数据中心整体架构 数据仓 ...

最新文章

  1. ASP.NET遍历配置文件的连接字符串
  2. C# 函数参数object sender, EventArgs e
  3. Java集合:Integer.parseInt() String.substring()
  4. 三、在应用中升级----网络编程部分
  5. 最大素因子(不是题目!自己写着玩的。。。)
  6. python汉字拼音查询_python获取一组汉字拼音首字母的方法
  7. 比尔盖茨这样审查项目 1
  8. c语言默认参数_GCC 11 将默认前端语言改成 C++ 17
  9. 今日头条极速版自动脚本_开卷有益——②今日头条极速版
  10. Objective-C JSON字符串解析
  11. 100部伴随我们长大的电影
  12. P4 Tutorial 快速上手 (1) 教程库简介
  13. 计蒜客:王子救公主---dfs
  14. 前端开发:Vue项目报错NavigationDuplicated:Avoided redundant navigation to current location:“/XXX”的解决方法
  15. 计算机网络专业自考本科难不难,自考本科计算机网络专业科目难吗?
  16. 布考斯基样样干_没有酒,我就是个无趣的人——查尔斯·布考斯基
  17. 创始人专访 | Paul:听到自己的学员取得科研成功,是最大的幸福之一
  18. php redis incr秒杀,Redis瞬时高并发秒杀方案总结
  19. 以下是MySQL增删改查的常用语句汇总
  20. java 中 ajax 的学习

热门文章

  1. 数学建模(NO.10 典型相关分析)
  2. 御剑飞行扫描后门加上burpsuite字典树爆破
  3. 2022 电工杯 B 物资配送 全部图解
  4. WINDOWS网络编程接口
  5. 数字图像处理matlab蔡利梅,数字图像处理:使用MATLAB分析与实现:using MATLAB
  6. 图像处理求均方差得c语言实现,数字图像处理第10章课件.ppt
  7. 【NOIP2013提高组】货车运输
  8. hadoop 学习心得
  9. ios html5 苹方字体,iOS中使用自定义字体-苹方字体
  10. apms阅卷系统服务器,apms全通纸笔王网上阅卷系统操作流程