1 数据仓库概念

数据仓库( Data Warehouse ),可简写为DW或DWH。数据仓库,是为企业所有决策制定过程,提供所有系统数据支持的战略集合。
通过对数据仓库中数据的分析,可以帮助企业,改进业务流程、控制成本、提高产品质量等。
数据仓库,并不是数据的最终目的地,而是为数据最终的目的地做好准备。这些准备包括对数据的:清洗,转义,分类,重组,合并,拆分,统计等等。

2 项目需求

2.1 项目需求分析

1、实时采集埋点的用户行为数据
2、实现数据仓库的分层搭建
3、每天定时导入业务数据
4、根据数据仓库中的数据进行报表分析

2.2 项目框架

2.2.1 技术选型

数据采集传输:Flume,Kafka,Logstash,DataX,Sqoop
数据存储:Hive,MySql,HDFS,HBase,S3
数据计算:Spark,Hive,Tez,Flink,Storm
数据查询:Presto,Impala,Kylin

2.2.2 常见系统架构图设计

2.2.3 常见系统数据流程设计

2.2.4 框架版本选型

软件产品 版本
Hadoop 2.7.2
Flume 1.7.0
Kafka 0.11.0.2
Kafka Manager 1.3.3.22
Hive 1.2.1
Sqoop 1.4.6
Mysql 5.7
Azkaban 2.5.0
Java 1.8
Zookeeper 3.4.10

注意事项:框架选型尽量不要选择最新的框架,选择最新框架半年前左右的稳定版。
2.2.5 集群资源规划设计

服务器1 服务器2 服务器3
HDFS NameNode,DataNode DataNode DataNode
Yarn NodeManager Resourcemanager,NodeManager NodeManager
Zookeeper Zookeeper Zookeeper Zookeeper
Flume(采集日志) Flume Flume
Kafka Kafka Kafka Kafka
Flume(消费Kafka) Flume
Hive Hive
Mysql Mysql

3 数据生成模块

3.1 话单数据字段

private String sysId;               // 平台编码
private String serviceName;         // 接口服务名称
private String homeProvinceCode;    // 归属省
private String visitProvinceCode;   // 访问省
private String channelCode;         // 渠道编码
private String serviceCode;         // 业务流水号
private String cdrGenTime;  // 开始时间
private String duration;            // 时长
private String recordType;          // 话单类型
private String imsi;                // 46000开头15位编码
private String msisdn;              // 通常指手机号码
private String dataUpLinkVolume;    // 上行流量
private String dataDownLinkVolume;  // 下行流量
private String charge;              // 费用
private String resultCode;          // 结果编码

3.2 话单数据格式:

{"cdrGenTime":"20211206165309844","channelCode":"wRFUC","charge":"62.28","dataDownLinkVolume":"2197","dataUpLinkVolume":"467","duration":"1678","homeProvinceCode":"551","imsi":"460007237312954","msisdn":"15636420864","recordType":"gprs","resultCode":"000000","serviceCode":"398814910321850","serviceName":"a9icp8xcNy","sysId":"U1ob6","visitProvinceCode":"451"}

3.3 编写项目模拟生成话单数据

详见项目:cdr_gen_app
对项目进行打包并部署到Hadoop101机器上,执行命令测试数据生成

[hadoop@hadoop101 ~]$ head datas/call.log
{"cdrGenTime":"20211206173630762","channelCode":"QEHVc","charge":"78.59","dataDownLinkVolume":"3007","dataUpLinkVolume":"3399","duration":"1545","homeProvinceCode":"571","imsi":"460002441249614","msisdn":"18981080935","recordType":"gprs","resultCode":"000000","serviceCode":"408688053609568","serviceName":"8lr23kV4C2","sysId":"j4FbV","visitProvinceCode":"931"}
{"cdrGenTime":"20211206173630884","channelCode":"TY4On","charge":"31.17","dataDownLinkVolume":"1853","dataUpLinkVolume":"2802","duration":"42","homeProvinceCode":"891","imsi":"460008474876800","msisdn":"15998955319","recordType":"gprs","resultCode":"000000","serviceCode":"739375515156215","serviceName":"6p87h0OHdC","sysId":"UnI9V","visitProvinceCode":"991"}
...

4 数据采集

4.1 Hadoop环境准备

Hadoop101 Hadoop102 Hadoop103
HDFD NameNode, DataNode DataNode DataNode
Yarn NodeManager ResourceManager,NodeManager NodeManager

4.1.1 添加LZO支持包

1)先下载lzo的jar项目
<u>https://github.com/twitter/hadoop-lzo/archive/master.zip</u>
2)下载后的文件名是hadoop-lzo-master,它是一个zip格式的压缩包,先进行解压,然后用maven编译。生成hadoop-lzo-0.4.20。
3)将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/

[hadoop@hadoop101 common]$ pwd
/opt/modules/hadoop-2.7.2/share/hadoop/common
[hadoop@hadoop101 common]$ ls
hadoop-lzo-0.4.20.jar

4)同步hadoop-lzo-0.4.20.jar到hadoop102、hadoop103

[hadoop@hadoop101 common]$ xsync hadoop-lzo-0.4.20.jar

4.1.2 添加配置

1)core-site.xml增加配置支持LZO压缩

    <property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec</value></property><property><name>io.compression.codec.lzo.class</name><value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

2)同步core-site.xml到hadoop102、hadoop103

[hadoop@hadoop101 hadoop]$ xsync core-site.xml

4.1.2 启动集群

[hadoop@hadoop101 hadoop-2.7.2]$ sbin/start-dfs.sh
[hadoop@hadoop102 hadoop-2.7.2]$ sbin/start-yarn.sh

4.1.3 验证

1)web和进程查看

  • Web查看:http://hadoop101:50070
  • 进程查看:jps查看各个节点状态。

4.2 Zookeeper环境准备

Hadoop101 Hadoop102 Hadoop103
Zookeeper Zookeeper Zookeeper Zookeeper

4.2.2 ZK集群启动停止脚本

1)在hadoop101的/home/hadoop/bin目录下创建脚本

[hadoop@hadoop101 bin]$ vim zk.sh

在脚本中编写如下内容

#! /bin/bashcase $1 in
"start"){for i in hadoop101 hadoop102 hadoop103dossh $i "/opt/modules/zookeeper-3.4.10/bin/zkServer.sh start"done};;
"stop"){for i in hadoop101 hadoop102 hadoop103dossh $i "/opt/modules/zookeeper-3.4.10/bin/zkServer.sh stop"done};;
esac

2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x zk.sh

3)Zookeeper集群启动脚本

[hadoop@hadoop101 modules]$ zk.sh start

4)Zookeeper集群停止脚本

[hadoop@hadoop101 modules]$ zk.sh stop

4.3 Flume环境准备

Hadoop101 Hadoop102 Hadoop103
Flume(采集) Flume

flume配置分析

Flume的具体配置
(1)在/opt/datas/calllogs/flume1/目录下创建flume1.conf文件
[hadoop@hadoop101 conf]$ vim flume1.conf
在文件配置如下内容

a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2 # configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/datas/calllogs/flume/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/datas/calllogs/records/calllogs.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.cmcc.jackyan.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.cmcc.jackyan.flume.interceptor.LogTypeInterceptor$Builder# selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = recordType
a1.sources.r1.selector.mapping.volte= c1
a1.sources.r1.selector.mapping.cdr= c2# configure channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20a1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20# configure sink
# start-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_volte
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.channel = c1# event-sink
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic_cdr
a1.sinks.k2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k2.kafka.flumeBatchSize = 2000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.channel = c2

注意:com.cmcc.jackyan.flume.interceptor.LogETLInterceptor和com.jackyan.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

4.3.3 自定义Flume拦截器

这里自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。
ETL拦截器主要用于,过滤时间戳不合法和json数据不完整的日志
日志类型区分拦截器主要用于,将volte话单和其他话单区分开来,方便发往kafka的不同topic。
1)创建maven模块flume-interceptor
2)创建包名:com.cmcc.jackyan.flume.interceptor
3)在pom.xml文件中添加如下配置

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.7.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>com.cmcc.jackyan.appclient.AppMain</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

4)在com.cmcc.jackyan.flume.interceptor包下创建LogETLInterceptor类名
Flume ETL拦截器LogETLInterceptor

public class LogETLInterceptor implements Interceptor {//打印日志,用于调试private static final Logger logger = LoggerFactory.getLogger(LogETLInterceptor.class);@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {String body = new String(event.getBody(), Charset.forName("UTF-8"));
//        logger.info("Before:" + body);event.setBody(LogUtils.addTimeStamp(body).getBytes());body = new String(event.getBody(), Charset.forName("UTF-8"));//        logger.info("After:" + body);return event;}@Overridepublic List<Event> intercept(List<Event> events) {ArrayList<Event> intercepts = new ArrayList<>();// 遍历所有的events,过滤掉不合法的for (Event event : events) {Event interceptEvent = intercept(event);if (interceptEvent != null) {intercepts.add(event);}}return intercepts;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new LogETLInterceptor();}@Overridepublic void configure(Context context) {}}
}

4)Flume日志处理工具类

public class LogUtils {private static Logger logger = LoggerFactory.getLogger(LogUtils.class);/*** 给日志拼接一个时间戳** @param log*/public static String addTimeStamp(String log) {//{"cdrGenTime":"20211207091838913","channelCode":"k82Ce","charge":"86.14","dataDownLinkVolume":"3083","dataUpLinkVolume":"2311","duration":"133","homeProvinceCode":"210","imsi":"460004320498004","msisdn":"17268221069","recordType":"mms","resultCode":"000000","serviceCode":"711869363795868","serviceName":"H79cKXuJO4","sysId":"aGIL4","visitProvinceCode":"220"}long t = System.currentTimeMillis();return t + "|" + log;}
}

5)Flume日志类型区分拦截器LogTypeInterceptor

public class LogTypeInterceptor implements Interceptor {//打印日志,用于调试private static final Logger logger = LoggerFactory.getLogger(LogTypeInterceptor.class);@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 1获取flume接收消息头Map<String, String> headers = event.getHeaders();// 2获取flume接收的json数据数组byte[] json = event.getBody();// 将json数组转换为字符串String jsonStr = new String(json);String recordType = "" ;// volteif (jsonStr.contains("volte")) {recordType = "volte";}// cdrelse {recordType = "cdr";}// 3将日志类型存储到flume头中headers.put("recordType", recordType);//        logger.info("recordType:" + recordType);return event;}@Overridepublic List<Event> intercept(List<Event> events) {ArrayList<Event> interceptors = new ArrayList<>();for (Event event : events) {Event interceptEvent = intercept(event);interceptors.add(interceptEvent);}return interceptors;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {public Interceptor build() {return new LogTypeInterceptor();}@Overridepublic void configure(Context context) {}}
}

6)打包
拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面。
7)需要先将打好的包放入到hadoop101的/opt/modules/flume/lib文件夹下面。

[hadoop@hadoop101 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT.jar

8)启动flume

[hadoop@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/datas/calllogs/flume1/flume1.conf &

4.3.4 日志采集Flume启动停止脚本
1)在/home/hadoop/bin目录下创建脚本calllog-flume1.sh

[hadoop@hadoop101 bin]$ vim calllog-flume1.sh

在脚本中填写如下内容

#! /bin/bashcase $1 in
"start"){for i in hadoop101doecho " --------启动 $i 消费flume-------"ssh $i "nohup /opt/modules/flume/bin/flume-ng agent --conf /opt/modules/flume/conf/ --conf-file /opt/datas/calllogs/flume1/flume1.conf &"done
};;
"stop"){for i in hadoop101doecho " --------停止 $i 消费flume-------"ssh $i "ps -ef | grep flume1 | grep -v grep | awk '{print \$2}' | xargs kill -9"done};;
esac

说明:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思。
2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x calllog-flume1.sh

3)flume1集群启动脚本

[hadoop@hadoop101 modules]$ calllog-flume1.sh start

4)flume1集群停止脚本

[hadoop@hadoop101 modules]$ calllog-flume1.sh stop

4.4 Kafka环境准备

Hadoop101 Hadoop102 Hadoop103
Kafka Kafka Kafka Kafka

4.4.1 Kafka集群启动停止脚本

1)在/home/hadoop/bin目录下创建脚本kafka.sh

[hadoop@hadoop101 bin]$ vim kafka.sh

在脚本中填写如下内容

#! /bin/bashcase $1 in
"start"){for i in hadoop101 hadoop102 hadoop103doecho " --------启动 $i kafka-------"# 用于KafkaManager监控ssh $i "export JMX_PORT=9988 && /opt/modules/kafka/bin/kafka-server-start.sh -daemon /opt/modules/kafka/config/server.properties "done
};;
"stop"){for i in hadoop101 hadoop102 hadoop103doecho " --------停止 $i kafka-------"ssh $i "ps -ef | grep server.properties | grep -v grep| awk '{print $2}' | xargs kill >/dev/null 2>&1 &"done
};;
esac

注意:启动Kafka时要先开启JMX端口,是用于后续KafkaManager监控。
2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x kafka.sh

3)kf集群启动脚本

[hadoop@hadoop101 modules]$ kafka.sh start

4)kf集群停止脚本

[hadoop@hadoop101 modules]$ kafka.sh stop

4.4.2 查看所有Kafka topic

[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list

4.4.3 创建 Kafka topic

进入到/opt/modules/kafka/目录下分别创建:启动日志主题、事件日志主题。
1)创建volte主题

[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181  --create --replication-factor 2 --partitions 3 --topic topic-volte

2)创建cdr主题

[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181  --create --replication-factor 2 --partitions 3 --topic topic-cdr

4.4.4 删除 Kafka topic

[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic-volte[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic-cdr

4.4.5 生产消息

[hadoop@hadoop101 kafka]$ bin/kafka-console-producer.sh \
--bootstrap-server hadoop101:9092 --topic topic-volte
>hello world
>hadoop  hadoop

4.4.6 消费消息

[hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop101:9092 --from-beginning --topic topic-volte

--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

4.4.7 查看某个Topic的详情

[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 \
--describe --topic topic-volte

4.5 Flume消费Kafka数据写到HDFS

Hadoop101 Hadoop102 Hadoop103
Flume(消费kafka) Flume

配置分析

Flume的具体配置
(1)在hadoop103的/opt/datas/calllogs/flume2/目录下创建flume2.conf文件

[hadoop@hadoop101 conf]$ vim flume2.conf

在文件配置如下内容

## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.zookeeperConnect = hadoop101:2181,hadoop102:2181,hadoop103:2181
a1.sources.r1.kafka.topics=topic-volte## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r2.kafka.zookeeperConnect = hadoop101:2181,hadoop102:2181,hadoop103:2181
a1.sources.r2.kafka.topics=topic-cdr## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/calllogs/records/topic-volte/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = volte-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/calllogs/records/topic-cdr/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = cdr-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 30
a1.sinks.k2.hdfs.roundUnit = second## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k2.hdfs.rollInterval = 30
a1.sinks.k2.hdfs.rollSize = 0
a1.sinks.k2.hdfs.rollCount = 0## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

4.5.2 Flume异常处理
1)问题描述:如果启动消费Flume抛出如下异常

ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解决方案步骤:
(1)在hadoop101服务器的/opt/modules/flume/conf/flume-env.sh文件中增加如下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

(2)同步配置到hadoop102、hadoop103服务器

[hadoop@hadoop101 conf]$ xsync flume-env.sh

4.5.2 日志消费Flume启动停止脚本
1)在/home/hadoop/bin目录下创建脚本calllog-flume2.sh

[hadoop@hadoop101 bin]$ vim calllog-flume2.sh

在脚本中填写如下内容

#! /bin/bashcase $1 in
"start"){for i in hadoop103doecho " --------启动 $i 消费flume-------"ssh $i "nohup /opt/modules/flume/bin/flume-ng agent --conf /opt/modules/flume/conf/ --conf-file /opt/datas/calllogs/flume2/flume2.conf &"done
};;
"stop"){for i in hadoop103doecho " --------停止 $i 消费flume-------"ssh $i "ps -ef | grep flume2 | grep -v grep | awk '{print \$2}' | xargs kill -9"done};;
esac

2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x calllog-flume2.sh

3)f2集群启动脚本

[hadoop@hadoop101 modules]$ calllog-flume2.sh start

4)f2集群停止脚本

[hadoop@hadoop101 modules]$ calllog-flume2.sh stop

4.6 采集通道启动/停止脚本

1)在/opt/datas/calllogs目录下创建脚本cluster.sh

[hadoop@hadoop101 calllogs]$ vim cluster.sh

在脚本中填写如下内容

#! /bin/bashcase $1 in
"start"){echo " -------- 启动 集群 -------"echo " -------- 启动 hadoop集群 -------"/opt/modules/hadoop-2.7.2/sbin/start-dfs.sh ssh hadoop102 "/opt/modules/hadoop-2.7.2/sbin/start-yarn.sh"#启动 Zookeeper集群zk.sh start#启动 Flume采集集群calllog-flume1.sh start#启动 Kafka采集集群kafka.sh startsleep 4s;#启动 Flume消费集群calllog-flume2.sh start
};;
"stop"){echo " -------- 停止 集群 -------"#停止 Flume消费集群calllog-flume2.sh stop#停止 Kafka采集集群kafka.sh stopsleep 4s;#停止 Flume采集集群calllog-flume1.sh stop#停止 Zookeeper集群zk.sh stopecho " -------- 停止 hadoop集群 -------"ssh hadoop102 "/opt/modules/hadoop-2.7.2/sbin/stop-yarn.sh"/opt/modules/hadoop-2.7.2/sbin/stop-dfs.sh
};;
esac

2)增加脚本执行权限

[hadoop@hadoop101 calllogs]$ chmod +x cluster.sh

3)cluster集群启动脚本

[hadoop@hadoop101 calllogs]$ ./cluster.sh start

4)cluster集群停止脚本

[hadoop@hadoop101 calllogs]$ ./cluster.sh stop

话单数据仓库搭建(1)- 数仓概念及数据采集相关推荐

  1. 离线数仓03——业务数据采集平台

    文章目录 第1章 电商业务简介 1.1 电商业务流程 1.2 电商常识 1.2.1 SKU和SPU 1.2.2 平台属性和销售属性 第2章 业务数据介绍 2.1 电商系统表结构 2.1 MySQL安装 ...

  2. 数据仓库 — 01_项目需求分析与技术选型(数仓概念、项目需求及架构设计、数据生成模块格式要求)

    文章目录 1 数据仓库的概念 2 项目需求分析 3 项目框架 3.1 技术选型 3.2 系统数据流程设计 3.3 框架版本选型 3.4 服务器选型 3.5 集群资源规划设计 3.5.1 集群规模计算 ...

  3. 话单数据仓库搭建(3)- 数据仓库DWS及ADS层

    1 活跃用户 1.1 DWS层 目标:统计当日.当周.当月活跃用户 活跃用户指的是在统计周期内由过通话记录的用户 1.1.1 每日活跃用户明细 1)建表语句 hive (calllogs)> d ...

  4. 数仓之基础概念汇集1

    数仓概念汇集 1.什么叫数据仓库? 数据仓库是一个面向主题的(Subject Oriented).集成的(Integrate).相对稳定的(Non-Volatile).反映历史变化(Time Vari ...

  5. 【HBZ分享】数仓里面的概念-宽表-维度表-事实表概念讲解

    数仓概念 1. 度量值: 可被统计的,比如:次数,销量,营销额,订单表中的下单金额等可以统计的值叫度量值2. 维度表:(1). 对事实描述的信息,每一张表都对应现实世界中的一个对象或概念,比如:用户, ...

  6. 尚硅谷-离线数仓-笔记

    尚硅谷-离线数仓-笔记 一.数仓建模理论 第一章 数仓概述 1.1 数仓概念 数据仓库是一个为数据分析而设计的企业级数据管理系统.数据仓库可集中.整合多个信息源的大量数据,借助数据仓库的分析能力,企业 ...

  7. 数仓01-概念的理解和方法论

    数仓-概念的理解和方法论 大数据相关概念 什么是大数据 大数据主要涉及的行业 对数仓相关概念的初步理解 数仓 数据集市 数据中台(数据仓库和数据中台区别) 数据湖 olap 区别 前置知识-分析.事实 ...

  8. 数仓分层(ODS、DWD、DWS、DWT、ADS)和数仓建模

    文章目录 一.数仓分层 数仓概念 ODS(原始数据层)做了哪些事 DWD(明细数据层)做了哪些事 DWS(服务数据层)做了哪些事 DWT(主题数据层)做了哪些事 ADS(应用数据层)做了哪些事 二.数 ...

  9. 【项目】数仓项目(四)

    总结 1)数仓概念总结 [1]数据仓库的输入数据源和输出系统分别是什么? 输入系统:埋点产生的用户行为数据.JavaEE 后台产生的业务数据 输出系统:报表系统.用户画像系统.推荐系统 2)项目需求及 ...

  10. 实时数仓到底是什么呢?与传统数仓有什么区别?

    数仓,即存放数据的仓库,包括全量数据.历史数据.类型上又分为实时数仓.离线数仓,所谓实时数仓是指数据的实时性更高.延迟性低,一般是统计一天以内的数据,支持毫秒级的统计,在建设工具上一般采用Flink, ...

最新文章

  1. TensorFlow学习笔记:共享变量
  2. BZOJ3808 : Neerc2012 Labyrinth of the Minotaur
  3. 计算机职称excel2007,职称计算机Excel2007中文电子表格考试大纲
  4. obs可以推到中转服务器吗,能否使用OBS(Open Broadcaster Software)、FMLE(Flash Media Live Encoder)等第三方软件进行推流?...
  5. Jmeter设置代理,抓包之app请求
  6. Android艺术探索笔记 - 创建AIDL文件后自动生成的文件分析
  7. 嵌入式成长轨迹52 【Zigbee项目】【CC2430基础实验】【在PC用串口收数并发数】...
  8. js获取当天0点和24点的时间戳
  9. linux rpm安装简要说明
  10. 02. Win32 API简介
  11. 2021蓝桥杯Java复习【史上最详细攻略】【持续更新】
  12. 当exe文件运行时,汉字出现乱码
  13. 以字符串为例,谈谈Python到底要学到什么程度
  14. 绿色便携版谷歌浏览器制作流程
  15. 龙芯Fedora21平台制作feodra21-tools docker镜像
  16. java 创建 777 权限的目录
  17. web前端优化10点总结
  18. centos7安装mysql57--实际操作可行
  19. 计算机更新系统d盘东西还在吗,重装系统其他盘的东西还在吗_重做系统其它盘东西会丢失吗-win7之家...
  20. 最近标杆型自媒体程苓峰发文章说自己的广告效果多么好

热门文章

  1. 发电机机房设计规范_柴油发电机房设计规范.docx
  2. 【数学建模】关联与因果问题
  3. ies文件 vray_VRayIES灯光
  4. Windows华丽变身MAC OS X
  5. DevExpress DXperience 的本地化(汉化)方法
  6. qt如何编写android程序,如何利用Qt开发Android应用程序
  7. 网络蜘蛛C#开源示例
  8. linux操作系统环境搭建实验报告,操作系统实验报告 Linux基本环境
  9. Linux命令大全详解
  10. CodeProject终于迁到Asp.Net了