电商分析之会员活跃度

第 1 节 需求分析

会员数据是后期营销的很重要的数据。网店会专门针对会员进行一系列营销活动。

电商会员一般门槛较低,注册网站即可加入。有些电商平台的高级会员具有时效性,需要购买VIP会员卡或一年内消费额达到多少才能成为高级会员。

计算指标:

新增会员:每日新增会员数

活跃会员:每日,每周,每月的活跃会员数

会员留存:1日,2日,3日会员留存数、1日,2日,3日会员留存率

指标口径业务逻辑:

会员:以设备为判断标准,每个独立设备认为是一个会员。Android系统通常根据IMEI号,IOS系统通常根据OpenUDID来标识一个独立会员,每部移动设备是一个会员;

活跃会员:打开应用的会员即为活跃会员,暂不考虑用户的实际使用情况。一台设备每天多次打开计算为一个活跃会员。在自然周内启动过应用的会员为周活跃会员,同理还有月活跃会员;

会员活跃率:一天内活跃会员数与总会员数的比率是日活跃率;还有周活跃率(自 然周)、月活跃率(自然月);

新增会员:第一次使用应用的用户,定义为新增会员;卸载再次安装的设备,不会被算作一次新增。新增用户包括日新增会员、周(自然周)新增会员、月(自然 月)新增会员;

留存会员与留存率:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;这部分会员占当时新增会员的比例为留存率。

已知条件:
1、明确了需求
2、输入:启动日志(OK)、事件日志
3、输出:新增会员、活跃会员、留存会员
4、日志文件、ODS、DWD、DWS、ADS(输出)
下一步做什么?
数据采集:日志文件 => Flume => HDFS => ODS

第 2 节 日志数据采集

原始日志数据(一条启动日志)

2020-07-30 14:18:47.339 [main] INFO com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529} ,"attr":{"area":"泰 安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","dev ice_id":"1FB872- 9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","br and":"iphone-9"}}

数据采集的流程:

选择Flume作为采集日志数据的工具:

  • Flume 1.6

    • 无论是Spooling Directory Source、Exec Source均不能很好的满足动态实时收集的需求
  • Flume 1.8+

    • 提供了一个非常好用的 Taildir Source
    • 使用该source,可以监控多个目录,对目录中新写入的数据进行实时采集

2.1、taildir source配置

taildir Source的特点:

  • 使用正则表达式匹配目录中的文件名
  • 监控的文件中,一旦有数据写入,Flume就会将信息写入到指定的Sink
  • 高可靠,不会丢失数据不会对跟踪文件有任何处理,不会重命名也不会删除
  • 不支持Windows,不能读二进制文件。支持按行读取文本文件

taildir source配置

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/data/lagoudw/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/data/lagoudw/logs/start/.*log
  • positionFile

配置检查点文件的路径,检查点文件会以 json 格式保存已经读取文件的位置,解决断点续传的问题

  • filegroups

指定filegroups,可以有多个,以空格分隔(taildir source可同时监控多个目录中的文件)

  • filegroups.

配置每个filegroup的文件绝对路径,文件名可以用正则表达式匹配

2.2、hdfs sink配置

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =/user/data/logs/start/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = startlog.# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 100# 使用本地时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true

HDFS Sink 都会采用滚动生成文件的方式,滚动生成文件的策略有:

  • 基于时间。hdfs.rollInterval 30秒
  • 基于文件大小。hdfs.rollSize 1024字节
  • 基于event数量。hdfs.rollCount 10个event
  • 基于文件空闲时间。hdfs.idleTimeout 0
  • minBlockReplicas。默认值与 hdfs 副本数一致。设为1是为了让 Flume 感知不到hdfs的块复制,此时其他的滚动方式配置(时间间隔、文件大小、events数量)才不会受影响

2.3、Agent的配置

a1.sources = r1
a1.sinks = k1
a1.channels = c1# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/data/lagoudw/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/data/lagoudw/logs/start/.*log# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/start/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = startlog
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# 使用本地时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

/data/lagoudw/conf/flume-log2hdfs.conf

2.4、Flume的优化配置

1、启动agent

flume-ng agent --conf-file /data/lagoudw/conf/flume-log2hdfs1.conf -name a1 -Dflume.roog.logger=INFO,console

2、向 /data/lagoudw/logs/ 目录中放入日志文件

报错: java.lang.OutOfMemoryError: GC overhead limit exceeded

缺省情况下 Flume jvm堆最大分配20m,这个值太小,需要调整。

3、解决方案:在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容

export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"
# 要想使配置文件生效,还要在命令行中指定配置文件目录
flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/lagoudw/conf/flume-log2hdfs1.conf -name a1 - Dflume.roog.logger=INFO,console
  • Flume内存参数设置及优化: 根据日志数据量的大小,Jvm堆一般要设置为4G或更高
  • -Xms -Xmx 最好设置一致,减少内存抖动带来的性能影响

存在的问题:Flume放数据时,使用本地时间;不理会日志的时间戳

2.5、自定义拦截器

前面 Flume Agent 的配置使用了本地时间,可能导致数据存放的路径不正确。

要解决以上问题需要使用自定义拦截器。

agent用于测试自定义拦截器。netcat source =>logger sink

# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux122
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.lagou.dw.flume.interceptor.CustomerInterceptor$Builder# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100# sink
a1.sinks.k1.type = logger# source、channel、sink之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器的原理:

1、自定义拦截器要集成Flume 的 Interceptor

2、Event 分为header 和 body(接收的字符串)

3、获取header和body

4、从body中获取"time":1596382570539,并将时间戳转换为字符串 "yyyy-MM- dd"

5、将转换后的字符串放置header中

自定义拦截器的实现:

1、获取 event 的 header

2、获取 event 的 body

3、解析body获取json串

4、解析json串获取时间戳

5、将时间戳转换为字符串 "yyyy-MM-dd"

6、将转换后的字符串放置header中

7、返回event

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency>
</dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
package cn.lagou.dw.flume.interceptor;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
import org.apache.commons.compress.utils.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class CustomerInterceptor implements Interceptor {private static DateTimeFormatter formatter =DateTimeFormatter.ofPattern("yyyy-MM-dd");@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {String eventbody = new String(event.getBody(), Charsets.UTF_8);Map<String, String> headerMap = event.getHeaders();String[] bodyArr = eventbody.split("\\s+");try {String jsonStr = bodyArr[6];if (Strings.isNullOrEmpty(jsonStr)) {return null;}JSONObject jsonObject = JSON.parseObject(jsonStr).getJSONObject("app_active");String timestampStr = jsonObject.getString("time");// 将 timestamp 转为 时间日期类型(格式:yyyy-MM-dd)long timeStamp = Long.parseLong(timestampStr);String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault()));headerMap.put("logtime", date);event.setHeaders(headerMap);} catch (Exception e) {headerMap.put("logtime", "unknown");event.setHeaders(headerMap);}return event;}@Overridepublic List<Event> intercept(List<Event> events) {List<Event> out = new ArrayList<>();for(Event event : events) {Event outEvent = intercept(event);if (outEvent != null) {out.add(outEvent);}}return out;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new CustomerInterceptor();}@Overridepublic void configure(Context context) {}}
}

将程序打包,放在 flume/lib目录下;

启动Agent测试

flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf/ --conf-file /root/data/lagoudw/conf/flumetest1.conf -name a1 -Dflume.root.logger=INFO,console

2.6、采集启动日志(使用自定义拦截器)

1、定义配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/data/lagoudw/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/data/lagoudw/logs/start/.*log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.lagou.dw.flume.interceptor.CustomerInterceptor$Builder# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/start/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
v
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

修改:

  • 给source增加自定义拦截器
  • 去掉本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true
  • 根据header中的logtime写文件

2、启动服务

# 测试
flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf/ --conf-file /root/data/lagoudw/conf/flume-log2hdfs2.conf -name a1 -Dflume.root.logger=INFO,consolet.logger=INFO,console

3、拷贝日志

4、检查HDFS文件

2.7、采集启动日志和事件日志

本系统中要采集两种日志:启动日志、事件日志,不同的日志放置在不同的目录下。要想一次拿到全部日志需要监控多个目录。

总体思路

1、taildir监控多个目录

2、修改自定义拦截器,不同来源的数据加上不同标志

3、hdfs sink 根据标志写文件

Agent配置

a1.sources = r1
a1.sinks = k1
a1.channels = c1# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/data/lagoudw/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /root/data/lagoudw/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /root/data/lagoudw/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.lagou.dw.flume.interceptor.LogTypeInterceptor$Builder# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = %{logtype}log
a1.sinks.k1.hdfs.fileType = DataStream# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# 使用本地时间
# a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • filegroups

指定filegroups,可以有多个,以空格分隔(taildir source可同时监控多个目录中的文件)

  • headers.<filegroupName>.<headerKey>

给event增加header key。不同的filegroup,可配置不同的value

自定义拦截器

编码完成后打包上传服务器,放置在$FLUME_HOME/lib 下

package cn.lagou.dw.flume.interceptor;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
import org.apache.commons.compress.utils.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class LogTypeInterceptor implements Interceptor {private static DateTimeFormatter formatter =DateTimeFormatter.ofPattern("yyyy-MM-dd");@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {String eventbody = new String(event.getBody(), Charsets.UTF_8);Map<String, String> headerMap = event.getHeaders();String[] bodyArr = eventbody.split("\\s+");try {String jsonStr = bodyArr[6];if (Strings.isNullOrEmpty(jsonStr)) {return null;}String timestampStr = "";JSONObject jsonObject = JSON.parseObject(jsonStr);if (headerMap.getOrDefault("logtype", "").equals("start")){// 取启动日志的时间戳timestampStr = jsonObject.getJSONObject("app_active").getString("time");} else if (headerMap.getOrDefault("logtype","").equals("event")) {// 取事件日志第一条记录的时间戳JSONArray jsonArray = jsonObject.getJSONArray("lagou_event");if (jsonArray.size() > 0){timestampStr = jsonArray.getJSONObject(0).getString("time");}}// 将 timestamp 转为 时间日期类型(格式:yyyy-MM-dd)long timeStamp = Long.parseLong(timestampStr);String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault()));headerMap.put("logtime", date);event.setHeaders(headerMap);} catch (Exception e) {headerMap.put("logtime", "unknown");event.setHeaders(headerMap);}return event;}@Overridepublic List<Event> intercept(List<Event> events) {List<Event> out = new ArrayList<>();for(Event event : events) {Event outEvent = intercept(event);if (outEvent != null) {out.add(outEvent);}}return out;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new LogTypeInterceptor();}@Overridepublic void configure(Context context) {}}@Testpublic void testJunit(){String str = new String("2020-08-20 12:00:58.400 [main] INFO  com.lagou.ecommerce.AppEvent - {\"lagou_event\":[{\"name\":\"goods_detail_loading\",\"json\":{\"entry\":\"3\",\"goodsid\":\"0\",\"loading_time\":\"100\",\"action\":\"3\",\"staytime\":\"34\",\"showtype\":\"4\"},\"time\":1595340530671},{\"name\":\"praise\",\"json\":{\"id\":4,\"type\":2,\"add_time\":\"1597827924588\",\"userid\":5,\"target\":9},\"time\":1595301323236}],\"attr\":{\"area\":\"文登\",\"uid\":\"2F10092A1\",\"app_v\":\"1.1.9\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1001\",\"os_type\":\"0.93\",\"channel\":\"BB\",\"language\":\"chinese\",\"brand\":\"xiaomi-9\"}}");Map<String,String> map = new HashMap<>();Event event = new SimpleEvent();map.put("logtype","event");event.setHeaders(map);event.setBody(str.getBytes(Charsets.UTF_8));LogTypeInterceptor customerInterceptor = new LogTypeInterceptor();Event outEvent = customerInterceptor.intercept(event);Map<String, String> headers = outEvent.getHeaders();System.out.println(JSON.toJSONString(headers));}}

测试

启动Agent,拷贝日志,检查HDFS文件

# 清理环境
rm -f /data/lagoudw/conf/startlog_position.json
rm -f /data/lagoudw/logs/start/*.log
rm -f /data/lagoudw/logs/event/*.log# 启动 Agent
flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf/ --conf-file /root/data/lagoudw/conf/flume-log2hdfs3.conf -name a1 -Dflume.root.logger=INFO,console
# 拷贝日志
cd /data/lagoudw/logs/source cp event0802.log ../event/ cp start0802.log ../start/# 检查HDFS文件
hdfs dfs -ls /user/data/logs/event hdfs dfs -ls /user/data/logs/start# 生产环境中用以下方式启动Agent
nohup flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/lagoudw/conf/flume-log2hdfs3.conf -name a1 - Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 &
  • nohup,该命令允许用户退出帐户/关闭终端之后继续运行相应的进程
  • /dev/null,代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称黑洞
  • 标准输入0,从键盘获得输入 /proc/self/fd/0
  • 标准输出1,输出到屏幕(控制台) /proc/self/fd/1
  • 错误输出2,输出到屏幕(控制台) /proc/self/fd/2
  • >/dev/null 标准输出1重定向到 /dev/null 中,此时标准输出不存在,没有任何地方能够找到输出的内容
  • 2>&1 错误输出将会和标准输出输出到同一个地方
  • >/dev/null 2>&1 不会输出任何信息到控制台,也不会有任何信息输出到文件中

2.8 日志数据采集小结

  • 使用taildir source 监控指定的多个目录,可以给不同目录的日志加上不同header
  • 在每个目录中可以使用正则匹配多个文件
  • 使用自定义拦截器,主要功能是从json串中获取时间戳,加到event的header中
  • hdfs sink使用event header中的信息写数据(控制写文件的位置)
  • hdfs文件的滚动方式(基于文件大小、基于event数量、基于时间)
  • 调节flume jvm内存的分配

第 3 节 ODS建表和数据加载

ODS层的数据与源数据的格式基本相同。

创建ODS层表:

use ODS;
create external table ods.ods_start_log( `str` string)
comment '用户启动日志信息'
partitioned by (`dt` string)
location '/user/data/logs/start';-- 加载数据的功能(测试时使用)
alter table ods.ods_start_log add partition(dt='2020-08-02');
alter table ods.ods_start_log drop partition (dt='2020-08-02');

加载启动日志数据:

script/member_active/ods_load_log.sh

可以传参数确定日志,如果没有传参使用昨天日期

#!/bin/bash
APP=ODS
hive=/opt/lagou/servers/hive-2.3.7/bin/hive# 可以输入日期;如果未输入日期取昨天的时间
if [ -n "$1" ]
thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fi
# 定义要执行的SQL
sql="
alter table "$APP".ods_start_log add partition(dt='$do_date'); "
$hive -e "$sql"

第 4 节 json数据处理

数据文件中每行必须是一个完整的 json 串,一个 json串不能跨越多行。

Hive 处理json数据总体来说有三个办法:

  • 使用内建的函数get_json_object、json_tuple
  • 使用自定义的UDF
  • 第三方的SerDe

4.1、使用内建函数处理

get_json_object(string json_string, string path)

返回值:String

说明:解析json字符串json_string,返回path指定的内容;如果输入的json字符串无效,那么返回NUll;函数每次只能返回一个数据项;

json_tuple(jsonStr, k1, k2, ...)

返回值:所有的输入参数、输出参数都是String;

说明:参数为一组键k1,k2,。。。。。和json字符串,返回值的元组。该方法比 get_json_object高效,因此可以在一次调用中输入多个键;

explode,使用explod将Hive一行中复杂的 array 或 map 结构拆分成多行。

测试数据:

user1;18;male;{"id": 1,"ids": [101,102,103],"total_number": 3}
user2;20;female;{"id": 2,"ids": [201,202,203,204],"total_number":4}
user3;23;male;{"id": 3,"ids":[301,302,303,304,305],"total_number": 5}
user4;17;male;{"id": 4,"ids": [401,402,403,304],"total_number":5}
user5;35;female;{"id": 5,"ids": [501,502,503],"total_number": 3}

建表加载数据:

CREATE TABLE IF NOT EXISTS jsont1(username string,age int,sex string,json string
)
row format delimited fields terminated by ';';
load data local inpath '/root/data/lagoudw/test/weibo.json' overwrite into table jsont1;

json的处理:

-- get 单层值
select username, age, sex,
get_json_object(json, "$.id") id,
get_json_object(json, "$.ids") ids,
get_json_object(json, "$.total_number") num
from jsont1;-- get 数组
select username, age, sex, get_json_object(json, "$.id") id, get_json_object(json, "$.ids[0]") ids0, get_json_object(json, "$.ids[1]") ids1, get_json_object(json, "$.ids[2]") ids2, get_json_object(json, "$.ids[3]") ids3,get_json_object(json, "$.total_number") num
from jsont1;-- 使用 json_tuple 一次处理多个字段
select json_tuple(json, 'id', 'ids', 'total_number')
from jsont1;-- 有语法错误,只能单独处理json串。
select username, age, sex, json_tuple(json, 'id', 'ids', 'total_number')
from jsont1;-- 使用 explode + lateral view
-- 在上一步的基础上,再将数据展开
-- 第一步,将 [101,102,103] 中的 [ ] 替换掉
-- select "[101,102,103]"
-- select "101,102,103"select regexp_replace("[101,102,103]", "\\[|\\]", "");
-- 第二步,将上一步的字符串变为数组
select split(regexp_replace("[101,102,103]", "\\[|\\]", ""), ",");
-- 第三步,使用explode + lateral view 将数据展开 select username, age, sex, id, ids, numfrom jsont1
lateral view json_tuple(json, 'id', 'ids', 'total_number') t1 as id, ids, num;-- 完整代码
with tmp as(select username, age, sex, id, ids, numfrom jsont1lateral view json_tuple(json, 'id', 'ids', 'total_number') t1 as id, ids, num
)
select username, age, sex, id, ids1, numfrom tmp
lateral view explode(split(regexp_replace(ids, "\\[|\\]", ""), ",")) t1 as ids1;

小结:json_tuple 优点是一次可以解析多个json字段,对嵌套结果的解析操作复杂;

4.2、使用UDF处理

自定义UDF处理json串中的数组。

自定义UDF函数:

输入:json串、数组的key

输出:字符串数组

pom文件增加依赖

<dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>2.3.7</version><scope>provided</scope>
</dependency>
package cn.lagou.dw.hive.udf;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.junit.Test;
import java.util.ArrayList;
public class ParseJsonArray extends UDF {public ArrayList<String> evaluate(String jsonStr, String arrKey){if (Strings.isNullOrEmpty(jsonStr)) {return null;}try{JSONObject object = JSON.parseObject(jsonStr);JSONArray jsonArray = object.getJSONArray(arrKey);ArrayList<String> result = new ArrayList<>();for (Object o: jsonArray){result.add(o.toString());}return result;} catch (JSONException e){return null;}}@Testpublic void JunitParseJsonArray(){String str = "{\"id\": 1,\"ids\":[101,102,103],\"total_number\": 3}";String key = "ids";ArrayList<String> evaluate = evaluate(str, key);System.out.println(JSON.toJSONString(evaluate));}
}

使用自定义 UDF 函数:

-- 添加开发的jar包(在Hive命令行中)
add jar /root/data/lagoudw/jars/cn.lagou.dw-1.0-SNAPSHOT-jar-with-dependencies.jar;-- 创建临时函数。指定类名一定要完整的路径,即包名加类名
create temporary function lagou_json_array as "cn.lagou.dw.hive.udf.ParseJsonArray";-- 执行查询
-- 解析json串中的数组
select username, age, sex, lagou_json_array(json, "ids") ids from jsont1;-- 解析json串中的数组,并展开
select username, age, sex, ids1from jsont1
lateral view explode(lagou_json_array(json, "ids")) t1 as ids1;-- 解析json串中的id、num
select username, age, sex, id, numfrom jsont1
lateral view json_tuple(json, 'id', 'total_number') t1 as id, num;-- 解析json串中的数组,并展开
select username, age, sex, ids1, id, numfrom jsont1
lateral view explode(lagou_json_array(json, "ids")) t1 as ids1
lateral view json_tuple(json, 'id', 'total_number') t1 as id, num;

4.3、使用SerDe处理

序列化是对象转换为字节序列的过程;反序列化是字节序列恢复为对象的过程;

对象的序列化主要有两种用途:

  • 对象的持久化,即把对象转换成字节序列后保存到文件中
  • 对象数据的网络传送

SerDe 是Serializer 和 Deserializer 的简写形式。Hive使用Serde进行对象的序列与反序列化。最后实现把文件内容映射到 hive 表中的字段数据类型。SerDe包括 Serialize/Deserilize 两个功能:

  • Serialize把Hive使用的java object转换成能写入HDFS字节序列,或者其他系统能识别的流文件
  • Deserialize把字符串或者二进制流转换成Hive能识别的java object对象

Read : HDFS files => InputFileFormat => <key, value> => Deserializer => Row object

Write : Row object => Seriallizer => <key, value> => OutputFileFormat => HDFS files

常见:https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide#Deve loperGuide-HiveSerDe

Hive本身自带了几个内置的SerDe,还有其他一些第三方的SerDe可供选择。

create table t11(id string)
stored as parquet;
create table t12(id string)
stored as ORC;desc formatted t11;
desc formatted t12;

LazySimpleSerDe(默认的SerDe)

ParquetHiveSerDe

OrcSerde

对于纯 json 格式的数据,可以使用 JsonSerDe 来处理。

{"id": 1,"ids": [101,102,103],"total_number": 3}
{"id": 2,"ids": [201,202,203,204],"total_number": 4}
{"id": 3,"ids": [301,302,303,304,305],"total_number": 5}
{"id": 4,"ids": [401,402,403,304],"total_number": 5}
{"id": 5,"ids": [501,502,503],"total_number": 3}
create table jsont2(id int,ids array<string>,total_number int
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
load data local inpath '/root/data/lagoudw/test/json.dat' into table jsont2;

各种Json格式处理方法小结:

1、简单格式的json数据,使用get_json_object、json_tuple处理

2、对于嵌套数据类型,可以使用UDF

3、纯json串可使用JsonSerDe处理更简单

第 5 节 DWD层建表和数据加载

2020-08-02 18:19:32.966 [main] INFO com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json": {"entry":"1","action":"1","error_code":"0"},"time":1596309585861} ,"attr":{"area":"绍 兴","uid":"2F10092A10","app_v":"1.1.16","event_type":"common","de vice_id":"1FB872- 9A10010","os_type":"3.0","channel":"ML","language":"chinese","bra nd":"Huawei-2"}}

主要任务:ODS(包含json串) => DWD json数据解析,丢弃无用数据(数据清洗),保留有效信息,并将数据展开,形成每日启动明细表。

5.1、创建DWD层表

use DWD;
drop table if exists dwd.dwd_start_log;
CREATE TABLE dwd.dwd_start_log(`device_id` string,`area` string,`uid` string,`app_v` string,`event_type` string,`os_type` string,`channel` string,`language` string,`brand` string,`entry` string,`action` string,`error_code` string
)
PARTITIONED BY (dt string)
STORED AS parquet;

表的格式:parquet、分区表

5.2、加载DWD层数据

script/member_active/dwd_load_start.sh

#!/bin/bash
source /etc/profile
# 可以输入日期;如果未输入日期取昨天的时间
if [ -n "$1" ]
thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fi
# 定义要执行的SQL
sql="with tmp as(select split(str, ' ')[7] linefrom ods.ods_start_logwhere dt='$do_date')insert overwrite table dwd.dwd_start_logpartition(dt='$do_date')select get_json_object(line, '$.attr.device_id'),get_json_object(line, '$.attr.area'),get_json_object(line, '$.attr.uid'),get_json_object(line, '$.attr.app_v'),get_json_object(line, '$.attr.event_type'),get_json_object(line, '$.attr.os_type'),get_json_object(line, '$.attr.channel'),get_json_object(line, '$.attr.language'),get_json_object(line, '$.attr.brand'),get_json_object(line, '$.app_active.json.entry'),get_json_object(line, '$.app_active.json.action'),get_json_object(line, '$.app_active.json.error_code')from tmp;
"
hive -e "$sql"

日志文件 =》 Flume =》 HDFS =》 ODS =》 DWD ODS =》 DWD;

json数据的解析;数据清洗

下一步任务:DWD(会员的每日启动信息明细) => DWS(如何建表,如何加载数据)

活跃会员 ===> 新增会员 ===> 会员留存

第 6 节 活跃会员

活跃会员:打开应用的会员即为活跃会员;

新增会员:第一次使用应用的用户,定义为新增会员;

留存会员:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;

活跃会员指标需求:每日、每周、每月的活跃会员数

DWD:会员的每日启动信息明细(会员都是活跃会员;某个会员可能会出现多次)

DWS:每日活跃会员信息(关键)、每周活跃会员信息、每月活跃会员信息

每日活跃会员信息 ===> 每周活跃会员信息

每日活跃会员信息 ===> 每月活跃会员信息

ADS:每日、每周、每月活跃会员数(输出)

ADS表结构:
daycnt weekcnt monthcnt dt

备注:周、月为自然周、自然月

处理过程:

1、建表(每日、每周、每月活跃会员信息)

2、每日启动明细 ===> 每日活跃会员

3、每日活跃会员 => 每周活跃会员;每日活跃会员 => 每月活跃会员

4、汇总生成ADS层的数据

6.1、创建DWS层表

use dws;
drop table if exists dws.dws_member_start_day;
create table dws.dws_member_start_day
(`device_id` string, `uid` string,`app_v` string, `os_type` string,`language` string, `channel` string,`area` string,`brand` string
) COMMENT '会员日启动汇总' partitioned by(dt string) stored as parquet;drop table if exists dws.dws_member_start_week;
create table dws.dws_member_start_week(`device_id` string,`uid` string,`app_v` string,`os_type` string,`language` string,`channel` string,`area` string,`brand` string,`week` string
) COMMENT '会员周启动汇总'
PARTITIONED BY (`dt` string) stored as parquet;drop table if exists dws.dws_member_start_month;
create table dws.dws_member_start_month(`device_id` string,`uid` string,`app_v` string,`os_type` string,`language` string,`channel` string,`area` string,`brand` string,`month` string
) COMMENT '会员月启动汇总' PARTITIONED BY (`dt` string) stored as parquet;

6.2、加载DWS层数据

script/member_active/dws_load_member_start.sh

#!/bin/bash
source /etc/profile
# 可以输入日期;如果未输入日期取昨天的时间
if [ -n "$1" ]
thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fi
# 定义要执行的SQL
# 汇总得到每日活跃会员信息;每日数据汇总得到每周、每月数据
sql="
-- 汇总得到每日活跃会员
insert overwrite table dws.dws_member_start_day partition(dt='$do_date')
select device_id,concat_ws('|', collect_set(uid)), concat_ws('|', collect_set(app_v)), concat_ws('|', collect_set(os_type)), concat_ws('|', collect_set(language)), concat_ws('|', collect_set(channel)), concat_ws('|', collect_set(area)), concat_ws('|', collect_set(brand))
from dwd.dwd_start_log
where dt='$do_date'
group by device_id;-- 汇总得到每周活跃会员
insert overwrite table dws.dws_member_start_week partition(dt='$do_date')
select device_id,concat_ws('|', collect_set(uid)),concat_ws('|', collect_set(app_v)), concat_ws('|', collect_set(os_type)), concat_ws('|', collect_set(language)), concat_ws('|', collect_set(channel)), concat_ws('|', collect_set(area)), concat_ws('|', collect_set(brand)), date_add(next_day('$do_date', 'mo'), -7)
from dws.dws_member_start_day
where dt >= date_add(next_day('$do_date', 'mo'), -7)and dt <= '$do_date'
group by device_id;-- 汇总得到每月活跃会员
insert overwrite table dws.dws_member_start_month partition(dt='$do_date')
select device_id,concat_ws('|', collect_set(uid)),concat_ws('|', collect_set(app_v)), concat_ws('|', collect_set(os_type)), concat_ws('|', collect_set(language)), concat_ws('|', collect_set(channel)), concat_ws('|', collect_set(area)),concat_ws('|', collect_set(brand)), date_format('$do_date', 'yyyy-MM')
from dws.dws_member_start_day
where dt >= date_format('$do_date', 'yyyy-MM-01')and dt <= '$do_date'
group by device_id;
"
hive -e "$sql"

注意shell的引号

ODS => DWD => DWS(每日、每周、每月活跃会员的汇总表)

6.3、创建ADS层表

计算当天、当周、当月活跃会员数量

drop table if exists ads.ads_member_active_count;
create table ads.ads_member_active_count( `day_count` int COMMENT '当日会员数量', `week_count` int COMMENT '当周会员数量', `month_count` int COMMENT '当月会员数量'
) COMMENT '活跃会员数'
partitioned by(dt string)
row format delimited fields terminated by ',';

6.4、加载ADS层数据

script/member_active/ads_load_member_active.sh

#!/bin/bash
source /etc/profileif [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fisql="
with tmp as(select 'day' datelabel, count(*) cnt, dtfrom dws.dws_member_start_daywhere dt='$do_date'group by dtunion allselect 'week' datelabel, count(*) cnt, dtfrom dws.dws_member_start_weekwhere dt='$do_date'group by dtunion allselect 'month' datelabel, count(*) cnt, dtfrom dws.dws_member_start_monthwhere dt='$do_date'group by dt
)
insert overwrite table ads.ads_member_active_count
partition(dt='$do_date')
select sum(case when datelabel='day' then cnt end) as day_count,sum(case when datelabel='week' then cnt end) as week_count,sum(case when datelabel='month' then cnt end) as month_count
from tmp
group by dt;
"
hive -e "$sql"
#!/bin/bash
source /etc/profileif [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fisql="
insert overwrite table ads.ads_member_active_count partition(dt='$do_date')
select daycnt, weekcnt, monthcntfrom (select dt, count(*) daycntfrom dws.dws_member_start_daywhere dt='$do_date'group by dt) day join(select dt, count(*) weekcntfrom dws.dws_member_start_weekwhere dt='$do_date'group by dt) week on day.dt=week.dtjoin(select dt, count(*) monthcntfrom dws.dws_member_start_monthwhere dt='$do_date'group by dt) month on day.dt=month.dt;
"
hive -e "$sql"

6.5、小结

脚本执行次序:

ods_load_startlog.sh
dwd_load_startlog.sh
dws_load_member_start.sh
ads_load_member_active.sh

第7节 新增会员

留存会员:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;

新增会员:第一次使用应用的用户,定义为新增会员;卸载再次安装的设备,不会被算作一次新增。

新增会员先计算 => 计算会员留存

需求:每日新增会员数

08-02: DWD:会员每日启动明细(95-110);所有会员的信息(1-100)???

  • 新增会员:101-110
  • 新增会员数据 + 旧的所有会员的信息 = 新的所有会员的信息(1-110)

08-03: DWD:会员每日启动明细(100-120);所有会员的信息(1-110)

  • 新增会员:111-120
  • 新增会员数据 + 旧的所有会员的信息 = 新的所有会员的信息(1-120)

计算步骤:

  • 计算新增会员
  • 更新所有会员信息

改进后方法:

  • 在所有会员信息中增加时间列,表示这个会员是哪一天成为新增会员
  • 只需要一张表:所有会员的信息(id,dt)
  • 将新增会员 插入 所有会员表中

案例:如何计算新增会员

-- 日启动表 => DWS
drop table t1;
create table t1(id int, dt string)
row format delimited fields terminated by ',';
load data local inpath '/data/lagoudw/data/t1.dat' into table t1;
4,2020-08-02
5,2020-08-02
6,2020-08-02
7,2020-08-02
8,2020-08-02
9,2020-08-02
-- 全量数据 => DWS
drop table t2;
create table t2(id int, dt string)
row format delimited fields terminated by ',';
load data local inpath '/data/lagoudw/data/t2.dat' into table t2;
1,2020-08-01
2,2020-08-01
3,2020-08-01
4,2020-08-01
5,2020-08-01
6,2020-08-01
-- 找出 2020-08-02 的新用户
select t1.id, t1.dt, t2.id, t2.dtfrom t1 left join t2 on t1.id=t2.idwhere t1.dt="2020-08-02";
select t1.id, t1.dtfrom t1 left join t2 on t1.id=t2.idwhere t1.dt="2020-08-02"and t2.id is null;
-- 将找到 2020-08-02 新用户数据插入t2表中 insert into table t2
select t1.id, t1.dtfrom t1 left join t2 on t1.id=t2.idwhere t1.dt="2020-08-02"
and t2.id is null; -- 检查结果
select * from t2;
-- t1 加载 2020-08-03 的数据
14,2020-08-03
15,2020-08-03
16,2020-08-03
17,2020-08-03
18,2020-08-03
19,2020-08-03
load data local inpath '/data/lagoudw/data/t3.dat' into table t1;
-- 将找到 2020-08-03 新用户数据插入t2表中
insert into table t2
select t1.id, t1.dtfrom t1 left join t2 on t1.id=t2.idwhere t1.dt="2020-08-03"and t2.id is null;
-- 检查结果
select * from t2;

7.1、创建DWS层表

drop table if exists dws.dws_member_add_day;
create table dws.dws_member_add_day
(`device_id` string,`uid` string,`app_v` string,`os_type` string,`language` string,`channel` string,`area` string,`brand` string,`dt` string
) COMMENT '每日新增会员明细' stored as parquet;

7.2、加载DWS层数据

script/member_active/dws_load_member_add_day.sh

#!/bin/bash
source /etc/profileif [ -n "$1" ]
thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fisql="
insert into table dws.dws_member_add_day
select t1.device_id,t1.uid,t1.app_v,t1.os_type,t1.language,t1.channel,t1.area,t1.brand,'$do_date'
from dws.dws_member_start_day t1 left join
dws.dws_member_add_day t2on t1.device_id=t2.device_idwhere t1.dt='$do_date'and t2.device_id is null;
"
hive -e "$sql"

7.3、创建ADS层表

drop table if exists ads.ads_new_member_cnt;
create table ads.ads_new_member_cnt
(
`cnt` string
)
partitioned by(dt string)
row format delimited fields terminated by ',';

7.4、加载ADS层数据

script/member_active/ads_load_member_add.sh

#!/bin/bash
source /etc/profile
if [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table ads.ads_new_member_cnt
partition (dt='$do_date')
select count(1)from dws.dws_member_add_daywhere dt = '$do_date'"
hive -e "$sql"

7.5、小结

调用脚本次序:

dws_load_member_add_day.sh
ads_load_member_add.sh

第8节 留存会员

留存会员与留存率:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;这部分会员占当时新增会员的比例为留存率。

需求:1日、2日、3日的会员留存数和会员留存率

30

31

1

2

 
   

10W新会员

3W

1日留存数

 

20W

 

5W

2日留存数

30W

   

4W

3日留存数

10W新会员:dws_member_add_day(dt=08-01)明细

3W:特点 在1号是新会员,在2日启动了(2日的启动日志)

dws_member_start_day

8.1、创建DWS层表

-- 会员留存明细
drop table if exists dws.dws_member_retention_day;
create table dws.dws_member_retention_day
(`device_id` string,`uid` string,`app_v` string,`os_type` string,`language` string,`channel` string,`area` string,`brand` string,`add_date` string comment '会员新增时间', `retention_date` int comment '留存天数'
)COMMENT '每日会员留存明细'
PARTITIONED BY (`dt` string)
stored as parquet;

8.2、加载DWS层数据

script/member_active/dws_load_member_retention_day.sh

#!/bin/bash
source /etc/profile
if [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table dws.dws_member_retention_day partition(dt='$do_date')
(
select t2.device_id,t2.uid,t2.app_v,t2.os_type,t2.language,t2.channel,t2.area,t2.brand,t2.dt add_date,1
from dws.dws_member_start_day t1 join dws.dws_member_add_day t2
on t1.device_id=t2.device_idwhere t2.dt=date_add('$do_date', -1)and t1.dt='$do_date'
union all
select t2.device_id,t2.uid,t2.app_v,t2.os_type,t2.language,t2.channel,t2.area,t2.brand,t2.dt add_date,2from dws.dws_member_start_day t1 join dws.dws_member_add_day t2
on t1.device_id=t2.device_idwhere t2.dt=date_add('$do_date', -2)and t1.dt='$do_date'
union all
select t2.device_id,t2.uid,t2.app_v,t2.os_type,t2.language,t2.channel,t2.area,t2.brand,t2.dt add_date,3from dws.dws_member_start_day t1 join dws.dws_member_add_day t2
on t1.device_id=t2.device_id
where t2.dt=date_add('$do_date', -3)and t1.dt='$do_date'
);
"
hive -e "$sql"

return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask

一般是内部错误

1、找日志(hive.log【简略】 / MR的日志【详细】)

hive.log ===> 缺省情况下 /tmp/root/hive.log (hive-site.conf)

MR的日志 ===> 启动historyserver、日志聚合 + SQL运行在集群模式

2、改写SQL

#!/bin/bash
source /etc/profile
if [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fi
sql="
drop table if exists tmp.tmp_member_retention;
create table tmp.tmp_member_retention as
(
select t2.device_id,t2.uid,t2.app_v,t2.os_type,t2.language,t2.channel,t2.area,t2.brand,t2.dt add_date,1
from dws.dws_member_start_day t1 join dws.dws_member_add_day t2
on t1.device_id=t2.device_idwhere t2.dt=date_add('$do_date', -1)and t1.dt='$do_date'
union all
select t2.device_id,t2.uid,t2.app_v,t2.os_type,t2.language,t2.channel,t2.area,t2.brand,t2.dt add_date,2from dws.dws_member_start_day t1 join dws.dws_member_add_day t2
on t1.device_id=t2.device_idwhere t2.dt=date_add('$do_date', -2)and t1.dt='$do_date'
union all
select t2.device_id,t2.uid,t2.app_v,t2.os_type,t2.language,t2.channel,t2.area,t2.brand,t2.dt add_date,3from dws.dws_member_start_day t1 join dws.dws_member_add_day t2
on t1.device_id=t2.device_idwhere t2.dt=date_add('$do_date', -3)and t1.dt='$do_date'
);
insert overwrite table dws.dws_member_retention_day partition(dt='$do_date')
select * from tmp.tmp_member_retention;
"
hive -e "$sql"

8.3、创建ADS层表

-- 会员留存数
drop table if exists ads.ads_member_retention_count;
create table ads.ads_member_retention_count
(`add_date` string comment '新增日期', `retention_day` int comment '截止当前日期留存天数', `retention_count` bigint comment '留存数'
) COMMENT '会员留存数'
partitioned by(dt string)
row format delimited fields terminated by ',';-- 会员留存率
drop table if exists ads.ads_member_retention_rate;
create table ads.ads_member_retention_rate
(`add_date` string comment '新增日期',`retention_day` int comment '截止当前日期留存天数',`retention_count` bigint comment '留存数',`new_mid_count` bigint comment '当日会员新增数',`retention_ratio` decimal(10,2) comment '留存率'
) COMMENT '会员留存率'
partitioned by(dt string)
row format delimited fields terminated by ',';

8.4、加载ADS层数据

script/member_active/ads_load_member_retention.sh

#!/bin/bash
source /etc/profile
if [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table ads.ads_member_retention_count partition (dt='$do_date')
select add_date, retention_date,count(*) retention_countfrom dws.dws_member_retention_daywhere dt='$do_date'
group by add_date, retention_date;
insert overwrite table ads.ads_member_retention_rate partition (dt='$do_date')
select t1.add_date,t1.retention_day,t1.retention_count,t2.cnt,t1.retention_count/t2.cnt*100from ads.ads_member_retention_count t1 join
ads.ads_new_member_cnt t2 on t1.dt=t2.dtwhere t1.dt='$do_date';
"
hive -e "$sql"

备注:最后一条SQL的连接条件应为:t1.add_date=t2.dt。在10.4 节中有详细说明。

8.5、小结

会员活跃度--活跃会员数、新增会员、留存会员

脚本调用次序:

# 加载ODS / DWD 层采集
ods_load_startlog.sh
dwd_load_startlog.sh# 活跃会员
dws_load_member_start.sh
ads_load_member_active.sh# 新增会员
dws_load_member_add_day.sh
ads_load_member_add.sh# 会员留存
dws_load_member_retention_day.sh
ads_load_member_retention.sh

第9节 Datax 数据导出

基本概念及安装参见DataX快速入门

ADS有4张表需要从数据仓库的ADS层导入MySQL,即:Hive => MySQL

ads.ads_member_active_count
ads.ads_member_retention_count
ads.ads_member_retention_rate
ads.ads_new_member_cnt
-- MySQL 建表
-- 活跃会员数
create database dwads;
drop table if exists dwads.ads_member_active_count;
create table dwads.ads_member_active_count(`dt` varchar(10) COMMENT '统计日期',`day_count` int COMMENT '当日会员数量', `week_count` int COMMENT '当周会员数量', `month_count` int COMMENT '当月会员数量', primary key (dt)
);
-- 新增会员数
drop table if exists dwads.ads_new_member_cnt;
create table dwads.ads_new_member_cnt(`dt` varchar(10) COMMENT '统计日期',`cnt` VARCHAR(10),primary key (dt)
);
-- 会员留存数
drop table if exists dwads.ads_member_retention_count;
create table dwads.ads_member_retention_count
(`dt` varchar(10) COMMENT '统计日期',`add_date` VARCHAR(10) comment '新增日期',`retention_day` int comment '截止当前日期留存天数',`retention_count` bigint comment '留存数'
)
COMMENT '会员留存情况';-- 会员留存率
drop table if exists dwads.ads_member_retention_rate;
create table dwads.ads_member_retention_rate
(`dt` varchar(10) COMMENT '统计日期',`add_date` VARCHAR(10) comment '新增日期',`retention_day` int comment '截止当前日期留存天数', `retention_count` bigint comment '留存数',`new_mid_count` bigint comment '当日会员新增数',`retention_ratio` decimal(10,2) comment '留存率') COMMENT '会员留存率';

导出活跃会员数(ads_member_active_count)

export_member_active_count.json

hdfsreader => mysqlwriter

{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "hdfsreader","parameter": {"path": "/user/hive/warehouse/ads.db/ads_member_active_count/dt=$do_date/*","defaultFS": "hdfs://linux121:9000","column": [{"type": "string","value": "$do_date"}, {"index": 0,"type": "string"},{"index": 1,"type": "string"},{"index": 2,"type": "string"}],"fileType": "text","encoding": "UTF-8","fieldDelimiter": ","}},"writer": {"name": "mysqlwriter","parameter": {"writeMode": "replace","username": "hive","password": "12345678","column": ["dt", "day_count", "week_count", "month_count"],"preSql": [""],"connection": [{"jdbcUrl": "jdbc:mysql://linux123:3306/dwads?useUnicode=true&characterEncoding=utf-8","table": ["ads_member_active_count"]}]}}}]}
}

执行命令:

python datax.py -p "-Ddo_date=2020-08-02" /data/lagoudw/script/member_active/t1.json

export_member_active_count.sh

#!/bin/bash
JSON=/data/lagoudw/script/member_active
source /etc/profile
if [ -n "$1" ] ;thendo_date=$1
elsedo_date=`date -d "-1 day" +%F`
fi
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" $JSON/export_member_active_count.json

其他三张表的数据导出类似!!!

第三章 电商分析之会员活跃度相关推荐

  1. 第三章-电商项目-优化评论分页查询

    1优化评论分页查询 建立索引 1111.如何删除重复数据 222备份表 33删除同一订单的重复评论 4检查是否有相同评论 5如何进行分区间统计 6捕获有问题的SQL 启用mysql慢查询的日志

  2. Python + 大数据 - 数仓实战之智能电商分析平台

    Python + 大数据 - 数仓实战之智能电商分析平台 1. 项目架构 2. 数据仓库维度模型设计-事实表 事实表的特征:表里没有存放实际的内容,他是一堆主键的集合,这些ID分别能对应到维度表中的一 ...

  3. spark企业级电商分析平台项目实践(一)项目介绍和需求分析

    前言 这个专栏的系列文章,是一个电商分析平台项目实践过程中的记录和总结. 基于 spark2.4.x 和 scala2.11.x 一. 项目概述 访问电商网站时,我们的一些访问行为会产生相应的埋点日志 ...

  4. 牛客网SQL大厂真题二刷小白白话总结(三)电商场景(某东商城)

    三.电商场景(某东商城) 目录 SQL13 计算商城中2021年每月的GMV(简单) SQL14 统计2021年10月每个退货率不大于0.5的商品各项指标(中等) SQL15 某店铺的各商品毛利率及店 ...

  5. 大数据之电商分析系统(一)

    大数据之电商分析系统(一) 一:项目介绍 ​ 本项目来源于企业级电商网站的大数据统计分析平台, 该平台以 Spark 框架为核心, 对电商网站的日志进行离线和实时分析.该大数据分析平台对电商网站的各种 ...

  6. 软件架构-解密电商系统营销-会员模块业务

    上次说了商品,商品分类,品牌,分类的属性,规格.很多电商网站例如:京东,国美,苏宁他们的商品都是存储在redis中的,通过请求获取到的redis进行js的渲染动态的展示商品的信息. (一)营销模块 营 ...

  7. matlab趋势面分析,第三章趋势面分析.ppt

    <第三章趋势面分析.ppt>由会员分享,可在线阅读,更多相关<第三章趋势面分析.ppt(61页珍藏版)>请在人人文库网上搜索. 1.第三章 地理学中的经典统计分析方法,第6节 ...

  8. 电商平台运用会员体系运营的好处以及注意事项

    随着互联网的不断发展,各种电商平台都出现了.但是由于电商平台越来越大,各大平台之间的竞争也变得越来越激烈,因此各大电商平台都开始想着去做营销工作,提升自己的竞争力. 如今,比较好的一种营销方法就是会员 ...

  9. spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一)

    spark项目实战:电商分析平台之各个范围Session步长.访问时长占比统计(需求一) 项目基本信息,架构,需要一览 各个范围Session步长.访问时长占比统计概述 各个范围Session步长.访 ...

  10. spark项目实战:电商分析平台之项目概述

    spark项目实战:电商分析平台之项目概述 目录 项目概述 程序架构分析 需求解析 初始代码和完成代码存放在github上面 1. 项目概述 在访问电商网站时,我们的一些访问行为会产生相应的埋点日志( ...

最新文章

  1. 《算法图解》第四章笔记与课后练习_快速排序算法
  2. 雅思作文未来计算机的应用,9分考官级雅思大作文范文之电脑技术的忧虑
  3. Jena文档《An Introduction to RDF and the Jena RDF API》的译文
  4. ruby中exec,system,%x的区别
  5. Linux/Unix下tar命令详解
  6. linux清缓存命令多节点,Liunx手动释放buffers/cache内存_linux,缓存,
  7. 【转载】Android S5PV210 fimc驱动分析 - fimc_regs.c
  8. windows 弹出 api-ms-win-crt-runtime-l1-1-0.dll 丢失的问题
  9. Python学习之Python入门知识(一)
  10. 【小5聊】重装系统之台式电脑BIOSTAR映泰主板,启动U盘PE系统以及重装后无法启动情况
  11. 致敬mentohust,路由器使用Socket认证华科校园网
  12. 【FPGA】Vivado综合停滞、死机(PID Not Specified)解决方法
  13. 什么是SAP Analytics Cloud
  14. 深度神经网络算法有哪些,深度神经网络算法原理
  15. msn一直登陆不上,没有办法只好启用meebo!
  16. 空间中直线与直线之间的位置关系
  17. 视音频编解码技术零基础学习方法(向雷神致敬)
  18. 软件企业税收优惠政策
  19. [翻译]在Windows版或MacOS版的Microsoft Edge上安装一个谷歌浏览器拓展
  20. CentOS下Oracle11g部署

热门文章

  1. 西部世界:生存(WestLand Survival) 游戏攻略
  2. macOS + andriod studio + NDK
  3. 孙式太极拳的站桩(作者:孙剑云)
  4. 《Linux内核 学习笔记》--- 第二章 内存管理 2.9 mmap
  5. 基于FPGA的ROM-VGA图像处理(老师好帅系列)
  6. R语言 重命名指定列
  7. 微信授权文件放到域名根目录下
  8. dns配置异常怎么修复_电脑出现dns错误不能上网怎么办?dns错误修复方法
  9. 微博微信QQ等开发者平台注册应用时提交签名信息的坑点。
  10. 使用diskpart制作U盘启动盘