DWD和DIM

  • 1. 需求分析及实现思路
    • 1.1 分层需求分析
    • 1.2 每层的职能
    • 1.3 DWD 层数据准备实现思路
  • 2. 环境搭建
  • 3. 准备用户行为日志DWD层
    • 3.1 主要任务
      • 3.1.1 识别新老用户
      • 3.1.2 利用侧输出流实现数据拆分
      • 3.1.3 将不同流的数据推送下游的 Kafka 的不同 Topic 中
    • 3.2 代码实现
      • 3.2.1 接收 Kafka 数据,并进行转换
      • 3.2.2 识别新老访客
      • 3.2.3 利用侧输出流实现数据拆分
      • 3.2.4 将不同流的数据推送到下游 kafka 的不同 Topic(分流)
    • 3.3 总体代码实现
  • 4. 准备业务数据DWD层
    • 4.1 主要任务
      • 4.1.1 接收 Kafka 数据,过滤空值数据
      • 4.1.2 实现动态分流功能
      • 4.1.3 把分好的流保存到对应表、主题中
    • 4.2 代码实现
      • 4.2.1 接收 Kafka 数据,过滤空值数据
      • 4.2.2 根据 MySQL 的配置表,动态进行分流
      • 4.2.3 分流 Sink 之保存维度到 HBase(Phoenix)
      • 4.2.4 分流 Sink 之保存业务数据到 Kafka 主题
      • 4.2.5 BaseDBApp
  • 5. 总结

1. 需求分析及实现思路

1.1 分层需求分析

在之前介绍实时数仓概念时讨论过,建设实时数仓的目的,主要是增加数据计算的复用性。每次新增加统计需求时,不至于从原始数据进行计算,而是从半成品继续加工而成。
我们这里从 Kafka 的 ODS 层读取用户行为日志以及业务数据,并进行简单处理,写回到 Kafka 作为 DWD 层

1.2 每层的职能

1.3 DWD 层数据准备实现思路

➢ 功能 1:环境搭建
➢ 功能 2:计算用户行为日志 DWD 层
➢ 功能 3:计算业务数据 DWD 层

2. 环境搭建

gmall2021-realtime

3. 准备用户行为日志DWD层

我们前面采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层,从 Kafka 的ODS 层读取的日志数据分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回 Kafka 不同主题中,作为日志 DWD 层。
页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流

3.1 主要任务

3.1.1 识别新老用户

本身客户端业务有新老用户的标识,但是不够准确,需要用实时计算再次确认(不涉及业务操作,只是单纯的做个状态确认)。

3.1.2 利用侧输出流实现数据拆分

根据日志数据内容,将日志数据分为 3 类, 页面日志、启动日志和曝光日志。页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光日志侧输出流

3.1.3 将不同流的数据推送下游的 Kafka 的不同 Topic 中

dwd_page_log
dwd_start_log
dwd_display_log

3.2 代码实现

3.2.1 接收 Kafka 数据,并进行转换

  1. 在 Kafka 的工具类中提供获取 Kafka 消费者的方法(读)
package com.atguigu.utils;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;public class MyKafaUtil {private static String brokers = "hadoop102:9092,hadoop103:9092,hadoop104:9092";public static FlinkKafkaProducer<String> getKafkaProducer(String topic){return new FlinkKafkaProducer<String>(brokers,topic,new SimpleStringSchema()   );}public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic , String groupId){Properties properties = new Properties();给配置信息对象添加配置项properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);//获取 KafkaSourcereturn new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),properties);}
}
  1. Flink 调用工具类读取数据的主程序

3.2.2 识别新老访客

保存每个 mid 的首次访问日期,每条进入该算子的访问记录,都会把 mid 对应的首次访问时间读取出来,只有首次访问时间不为空,则认为该访客是老访客,否则是新访客。
同时如果是新访客且没有访问记录的话,会写入首次访问时间。、

3.2.3 利用侧输出流实现数据拆分

根据日志数据内容,将日志数据分为 3 类, 页面日志、启动日志和曝光日志。页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光日志侧输出流

3.2.4 将不同流的数据推送到下游 kafka 的不同 Topic(分流)

  1. 程序中调用 Kafka 工具类获取 Sink
  2. 测试
    ➢ IDEA 中运行 BaseLogApp 类
    ➢ 运行 logger.sh,启动 Nginx 以及日志处理服务
    ➢ 运行 rt_applog 下模拟生成数据的 jar 包
    ➢ 到 Kafka 不同的主题下查看输出效果

3.3 总体代码实现

package com.atguigu.app.dwd;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.MyKafaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;// 数据流:web/app --> Nginx --> SpringBoot --> Kafka(ods) -> FlinkApp -> Kafka(dwd)// 程 序: mocklog -> Nginx --> Logger.sh --> Kafka(ZK) --> BaseLogApp -> Kafkapublic class BaseLogApp {public static void main(String[] args) throws Exception {// TODO 1. 获取执行环境// 1. 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.1 设置checkpoint 和 状态后端// 1.1  开启ck  checkpoint 并且指定状态后端为 FS//Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点//续传,需要从 Checkpoint 或者 Savepoint 启动程序//env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink/ck")); // 设置状态后端// 开启 Checkpoint,每隔 5 秒钟做一次 CK//env.enableCheckpointing(5000L);// 5秒钟触发一次checkpoint//env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //指定 CK 的一致性语义//nv.getCheckpointConfig().setCheckpointTimeout(10000L); // 10 秒的超时事件//env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);  ////env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);//env.setRestartStrategy(RestartStrategies.failureRateRestart()); 新版本不用配// TODO 2. 消费 ods_base_log 主题数据创建流String sourceTopic = "ods_base_log";String groupId = "base_log_app";DataStreamSource<String> kafkaDS = env.addSource(MyKafaUtil.getKafkaConsumer(sourceTopic, groupId));// TODO 3. 将每行数据转换为JSON对象
//        kafkaDS.map(line -> {//            return JSON.parseObject(line);
//        })// kafkaDS.map(JSON::parseObject);// 用process可以把数据保留下来, 把脏数据 输出到测输出流中// 用到测输出流只能用到processOutputTag<String> outputTag = new OutputTag<String>("Dirty") {};SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {@Overridepublic void processElement(String s, ProcessFunction<String, JSONObject>.Context context, Collector<JSONObject> collector) throws Exception {try {JSONObject jsonObject = JSON.parseObject(s);collector.collect(jsonObject);} catch (Exception e) {// 发生异常 ,将数据写入测输出流context.output(outputTag, s);}}});// 打印脏数据jsonObjDS.getSideOutput(outputTag).print("Dirty>>>>>>>>>>");// TODO 4. 新老用户校验 状态编程SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlagDS = jsonObjDS.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid")).map(new RichMapFunction<JSONObject, JSONObject>() {private ValueState<String> valueState;@Overridepublic void open(Configuration parameters) throws Exception {valueState = getRuntimeContext().getState(new ValueStateDescriptor<String>("value-state", String.class));}@Overridepublic JSONObject map(JSONObject jsonObject) throws Exception {// 获取数据中的“is_new"标记String isNew = jsonObject.getJSONObject("common").getString("is_new");// 判断isnew 标记是否为1if ("1".equals(isNew)) {// 获取状态数据String state = valueState.value();if (state != null) {// 修改isNew标记jsonObject.getJSONObject("common").put("is_new", "0");} else {valueState.update("1");}}return jsonObject;}});// TODO 5. 分流 测输出流 页面: 主流   启动: 测输出流     曝光:测输出流OutputTag<String> startOutPutTag = new OutputTag<String>("start"){};OutputTag<String> displayTag = new OutputTag<String>("display") {};SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() {@Overridepublic void processElement(JSONObject value, ProcessFunction<JSONObject, String>.Context context, Collector<String> collector) throws Exception {// 获取启动日志字段String start = value.getString("start");if (start != null && start.length() > 0) {// 将数据写入启动日志测输出流context.output(startOutPutTag, value.toJSONString());} else {// 将数据写入页面日志collector.collect(value.toJSONString());// 取出数据中的曝光数据JSONArray displays = value.getJSONArray("displays");if (displays != null && displays.size() > 0) {// 获取页面idString pageID = value.getJSONObject("page").getString("page_id");for (int i = 0; i < displays.size(); i++) {JSONObject display = displays.getJSONObject(i);// 添加页面IDdisplay.put("page_id", pageID);// 将数据写出到曝光的测输出流context.output(displayTag, display.toJSONString());}}}}});// TODO 6. 提取测输出流DataStream<String> startDS = pageDS.getSideOutput(startOutPutTag);DataStream<String> displayDS = pageDS.getSideOutput(displayTag);// TODO 7. 将三个流进行打印并输出到对应的Kafka主题中startDS.print("start>>>>>>>>>");pageDS.print("page>>>>>>>>>");displayDS.print("display>>>>>>>>>");startDS.addSink(MyKafaUtil.getKafkaProducer("dwd_start_log"));pageDS.addSink(MyKafaUtil.getKafkaProducer("dwd_page_log"));displayDS.addSink(MyKafaUtil.getKafkaProducer("dwd_display_log"));// TODO 8. 启动任务env.execute("BaseLogApp");}
}

4. 准备业务数据DWD层

业务数据的变化,我们可以通过 FlinkCDC 采集到,但是 FlinkCDC 是把全部数据统一写入一个 Topic 中, 这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,经过处理后,将维度数据保存到 HBase,将事实数据写回 Kafka 作为业务数据的 DWD 层。

4.1 主要任务

4.1.1 接收 Kafka 数据,过滤空值数据

对 FlinkCDC 抓取数据进行 ETL,有用的部分保留,没用的过滤掉

4.1.2 实现动态分流功能

由于 FlinkCDC 是把全部数据统一写入一个 Topic 中, 这样显然不利于日后的数据处理。

所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。
这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现
➢ 一种是用 Zookeeper 存储,通过 Watch 感知数据变化;
➢ 另一种是用 mysql 数据库存储,周期性的同步 select * ;
➢ 另一种是用 mysql 数据库存储,使用广播流。

这里选择第二种方案,主要是 MySQL 对于配置数据初始化和维护管理,使用 FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接。
所以就有了如下图:

4.1.3 把分好的流保存到对应表、主题中

业务数据保存到 Kafka 的主题中
维度数据保存到 HBase 的表中

4.2 代码实现

4.2.1 接收 Kafka 数据,过滤空值数据

4.2.2 根据 MySQL 的配置表,动态进行分流

  1. 引入 pom.xml 依赖
  2. 在 Mysql 中创建数据库
  3. 在 gmall2021_realtime 库中创建配置表 table_process
  4. 在 MySQL 配置文件中增加 gmall2021_realtime 开启 Binlog
  5. 创建配置表实体类TableProcess
  6. 编写操作读取配置表形成广播流’
  7. 程序流程分析
  8. 定义一个项目中常用的配置常量类 GmallConfig
  9. 自定义函数 TableProcessFunction
  10. 自定义函数 TableProcessFunction-open
  11. 自定义函数 TableProcessFunction-processBroadcastElement
  12. 自定义函数 TableProcessFunction-checkTable
  13. 自定义函数 TableProcessFunction-processElement()
    核心处理方法,根据 MySQL 配置表的信息为每条数据打标签,走 Kafka 还是 HBase
  14. 自定义函数 TableProcessFunction-filterColumn()
    校验字段,过滤掉多余的字段
  15. 主程序 BaseDBApp 中调用 TableProcessFunction 进行分流

GmallConfig

package com.atguigu.common;public class GmallConfig {//Phoenix 库名public static final String HBASE_SCHEMA = "GMALL2021_REALTIME";//Phoenix 驱动public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";//Phoenix 连接参数public static final String PHOENIX_SERVER ="jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";
}

TableProcessFunction

package com.atguigu.app.function;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.TableProcess;
import com.atguigu.common.GmallConfig;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private Connection connection;private OutputTag<JSONObject> objectOutputTag;private MapStateDescriptor<String, TableProcess> mapStateDescriptor;public TableProcessFunction(OutputTag<JSONObject> objectOutputTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {this.objectOutputTag = objectOutputTag;this.mapStateDescriptor = mapStateDescriptor;}@Overridepublic void open(Configuration parameters) throws Exception {Class.forName(GmallConfig.PHOENIX_DRIVER);connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);}// value: {"db":"","tn":"","before":{}, "after":{}, "type":""}@Overridepublic void processBroadcastElement(String value, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context context, Collector<JSONObject> collector) throws Exception {// 1. 获取并解析数据JSONObject jsonObject = JSON.parseObject(value);String data = jsonObject.getString("after");TableProcess tableProcess = JSON.parseObject(data, TableProcess.class);// 2. 建表if (TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())) {checkTable(tableProcess.getSinkTable(), tableProcess.getSinkColumns(), tableProcess.getSinkPk(), tableProcess.getSinkExtend());}// 3. 写入状态,广播出去BroadcastState<String, TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);String key = tableProcess.getSourceTable() + "-" + tableProcess.getOperateType();broadcastState.put(key, tableProcess);}// 建表语句: create table if noe exists ab.tn(id varchar primary key, tm_name varchar) XXX;private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {PreparedStatement preparedStatement = null;try {if (sinkPk == null) {sinkPk = "id";}if (sinkExtend == null) {sinkExtend = "";}StringBuilder create_table_sql = new StringBuilder("create table if not exists ").append(GmallConfig.HBASE_SCHEMA).append(".").append(sinkTable).append("(");String[] fields = sinkColumns.split(",");for (int i = 0; i < fields.length; i++) {String field = fields[i];// 判断是否为主键if (sinkPk.equals(field)) {create_table_sql.append(field).append(" varchar primary key");} else {create_table_sql.append(field).append(" varchar");}// 判断是否为最后一个字段, 如果不是,则添加“,”if (i < fields.length - 1) {create_table_sql.append(",");}}create_table_sql.append(")").append(sinkExtend);// 打印建表语句System.out.println(create_table_sql);// 预编译sqlpreparedStatement=connection.prepareStatement(create_table_sql.toString());// 执行preparedStatement.execute();} catch (SQLException e) {throw new RuntimeException("Phoenix表" + sinkTable+ "建表失败");}finally {if(preparedStatement != null){try {preparedStatement.close();} catch (SQLException e) {e.printStackTrace();}}}}// value: {"db":"","tn":"","before":{}, "after":{}, "type":""}@Overridepublic void processElement(JSONObject value, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext readOnlyContext, Collector<JSONObject> collector) throws Exception {// 1. 获取状态数据ReadOnlyBroadcastState<String, TableProcess> broadcastState = readOnlyContext.getBroadcastState(mapStateDescriptor);String key = value.getString("tableName") + "-" + value.getString("type");TableProcess tableProcess = broadcastState.get(key);if (tableProcess != null) {// 2. 过滤字段JSONObject data = value.getJSONObject("after");filterColumn(data, tableProcess.getSinkColumns());// 3. 分流// 将输出表/主题信息写入Valuevalue.put("sinkTable", tableProcess.getSinkTable());String sinkType = tableProcess.getSinkType();if (TableProcess.SINK_TYPE_KAFKA.equals(sinkType)) {// kafka数组写入主流collector.collect(value);} else if (TableProcess.SINK_TYPE_HBASE.equals(sinkType)) {// Hbase写入测输出流readOnlyContext.output(objectOutputTag, value);}} else {System.out.println("该组合key" + key + "不存在!");}}/*** data            {"id":"11","tm_name":"atguigu","logo_url":"aaa}* sinkSinkColumns id, tm_name* {"id":"11","tm_name":"atguigu"}*/private void filterColumn(JSONObject data, String sinkColumns) {String[] fleids = sinkColumns.split(",");// 数组变成集合List<String> columns = Arrays.asList(fleids);
//        Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator();
//        while (iterator.hasNext()){//            Map.Entry<String, Object> next = iterator.next();
//            if(!columns.contains(next.getKey())){//                iterator.remove();
//            }
//        }data.entrySet().removeIf(next -> !columns.contains(next.getKey()));}}

4.2.3 分流 Sink 之保存维度到 HBase(Phoenix)

  1. 程序流程分析
  2. 因为要用单独的 schema,所以在程序中加入 hbase-site.xml
    注意:为了开启 hbase 的 namespace 和 phoenix 的 schema 的映射,在程序中需要加这
    个配置文件,另外在 linux 服务上,也需要在 hbase 以及 phoenix 的 hbase-site.xml 配置
    文件中,加上以上两个配置,并使用 xsync 进行同步。
  3. 在 phoenix 中执行
create schema GMALL2021_REALTIME;
  1. DimSink
package com.atguigu.app.function;import com.alibaba.fastjson.JSONObject;
import com.atguigu.common.GmallConfig;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Set;public class DimSinkFunction extends RichSinkFunction<JSONObject> {private Connection connection;@Overridepublic void open(Configuration parameters) throws Exception {// 加载驱动Class.forName(GmallConfig.PHOENIX_DRIVER);// 重新赋值connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);connection.setAutoCommit(true);}// value:  {"sinkTable":"dwd_order_info",// "database":"gmall-flink",// "before":{},// "after":{"user_id":2021,"id":26450},// "type":"insert",// "tableName":"order_info"}// SQL: upsert into db.tn(id,tm_name) values(..,...)@Overridepublic void invoke(JSONObject value, Context context) throws Exception {PreparedStatement preparedStatement = null;try {// 获取sql语句String upsertSql =  genUpsertSql(value.getString("sinkTable"),value.getJSONObject("after"));System.out.println(upsertSql);// 预编译sqlpreparedStatement = connection.prepareStatement(upsertSql);// 执行插入操作preparedStatement.executeUpdate();} catch (SQLException e) {e.printStackTrace();}finally {if(preparedStatement != null){preparedStatement.close();}}}// data:  "after":{"user_id":2021,"id":26450},// SQL: upsert into db.tn(id,tm_name) values('..','...')private String genUpsertSql(String sinkTable, JSONObject data) {Set<String> keySet = data.keySet();Collection<Object> values = data.values();return "upsert into " + GmallConfig.HBASE_SCHEMA+"."+sinkTable+"("+StringUtils.join(keySet,",")+")values('"+StringUtils.join(values,"','")+"')";}
}
  1. 测试

4.2.4 分流 Sink 之保存业务数据到 Kafka 主题

  1. 在 MyKafkaUtil 中添加如下方法
private static String brokers = "hadoop102:9092,hadoop103:9092,hadoop104:9092";private static String default_topic = "default";public static <T> FlinkKafkaProducer<T> getKafkaProducer(KafkaSerializationSchema<T> kafkaSerializationSchema){Properties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);return new FlinkKafkaProducer<T>(default_topic,kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.NONE );}
  1. 在 MyKafkaUtil 中添加属性定义

  2. 两个创建 FlinkKafkaProducer 方法对比
    ➢ 前者给定确定的 Topic
    ➢ 而后者除了缺省情况下会采用 DEFAULT_TOPIC,一般情况下可以根据不同的业务
    数据在 KafkaSerializationSchema 中通过方法实现。

  3. 在主程序 BaseDBApp 中加入新 KafkaSink

  4. 测试

4.2.5 BaseDBApp

BaseDBApp

package com.atguigu.app.dwd;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.atguigu.app.function.CustomerDeserialization;
import com.atguigu.app.function.DimSinkFunction;
import com.atguigu.app.function.TableProcessFunction;
import com.atguigu.bean.TableProcess;
import com.atguigu.utils.MyKafaUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerRecord;import javax.annotation.Nullable;// 数据流 : web/app -- nginx -- springboot -- mysql -- flinkapp -- kafka(ods) -- flinkapp -- kafka(dwd)/phoenix(dim)
// 程序:         mockDB  -- Mysql -- FlinkCDC -- kafka -- BaseDBApp -- -> Kafka/Phoenix(hbase,zk,hdfs)
public class BaseDBApp {public static void main(String[] args) throws Exception {// TODO 1. 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.1 设置状态后端//env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/dwd_log/ck"));//1.2 开启 CK//env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE);//env.getCheckpointConfig().setCheckpointTimeout(60000L);// TODO 2. 消费Kafka ods_base_db 主题数据创建流String sourceTopic = "ods_base_db";String groupID = "BaseDBApp";DataStreamSource<String> kafkaDS = env.addSource(MyKafaUtil.getKafkaConsumer(sourceTopic, groupID));// TODO 3. 将每行数据转换为JSON对象并过滤(delete)  主流SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject).filter(new FilterFunction<JSONObject>() {@Overridepublic boolean filter(JSONObject value) throws Exception {// 取出数据的操作类型String type = value.getString("type");return !"delete".equals(type);}});// TODO 4. 使用FlinkCDC消费配置表并处理成   广播流DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("hadoop102").port(3306).username("root").password("000000").databaseList("gmall-realtime").tableList("gmall-realtime.table_process")// 应该有哪些字段:表名,操作类型(为了区分新增及变化数据), 输出类型(hbase,kafka),sinkTable, sinkColumns, pk,//    sourceTable       type        sinkType        sinkTable//     base_trademark   insert        hbase     dim_xxxx(Phnoeix表名)//     order_info       insert        kafka     dwd_xxxx(主题名)//  需要一个主键来查到信息   表名+操作类型 作为联合主键//  Phnoeix表需要自动创建, 还需要以下字段 sinkColumns 字段,pk 主键, extend 额外信息.startupOptions(StartupOptions.initial()).deserializer(new CustomerDeserialization()).build();DataStreamSource<String> tableProcessStrDS = env.addSource(sourceFunction);// 主流根据   表名+操作类型   可以获取到相关信息 tableprocess  拿到一行数据MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state",String.class, TableProcess.class);BroadcastStream<String> broadcastStream = tableProcessStrDS.broadcast(mapStateDescriptor);// TODO 5. 连接主流和广播流BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);// TODO 6. 分流 处理数据  广播流数据,主流数据(根据广播流数据进行处理)OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag") {};SingleOutputStreamOperator<JSONObject> kafka =  connectedStream.process(new TableProcessFunction(hbaseTag,mapStateDescriptor));// TODO 7. 提取Kafka流数据和Hbase数据DataStream<JSONObject> hbase = kafka.getSideOutput(hbaseTag);// TODO 8. 将Kafka数据写入Kafka主题, 将Hbase数据写入Phoenix表中kafka.print("kafka>>>>>>>>>>>");hbase.print("Hbase>>>>>>>>>>>");hbase.addSink(new DimSinkFunction());kafka.addSink(MyKafaUtil.getKafkaProducer(new KafkaSerializationSchema<JSONObject>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {return new ProducerRecord<byte[], byte[]>(jsonObject.getString("sinkTable"),jsonObject.getString("after").getBytes());}}));// TODO 9. 启动任务env.execute("BaseDBApp");}
}

5. 总结

DWD 的实时计算核心就是数据分流,其次是状态识别。在开发过程中我们实践了几个灵活度较强算子,比如 RichMapFunction, ProcessFunction, RichSinkFunction。 那这几个我们什么时候会用到呢?如何选择?

【学习笔记】尚硅谷大数据项目之Flink实时数仓---DWD和DIM相关推荐

  1. 大数据项目之Flink实时数仓(数据采集/ODS层)

    项目概览 实时大屏效果

  2. 电商数仓描述_笔记-尚硅谷大数据项目数据仓库-电商数仓V1.2新版

    架构 项目框架 数仓架构 存储压缩 Snappy与LZO LZO安装: 读取LZO文件时,需要先创建索引,才可以进行切片. 框架版本选型Apache:运维麻烦,需要自己调研兼容性. CDH:国内使用最 ...

  3. 尚硅谷大数据项目之电商数仓(4即席查询数据仓库)

    尚硅谷大数据项目之电商数仓(即席查询) (作者:尚硅谷大数据研发部) 版本:V4.0 第1章 Presto 1.1 Presto简介 1.1.1 Presto概念 1.1.2 Presto架构 1.1 ...

  4. 架构(B站尚硅谷大数据项目实践 电影推荐系统概述)

    详细版: 整体流程: 数据模型: 数据模型解析: 整体模块: 环境搭建: 数据加载服务:spark(scala) 推荐模块: 后台: 前端: 打包部署: 解决冷启动问题:

  5. 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...

  6. 尚硅谷大数据技术Zookeeper教程-笔记01【Zookeeper(入门、本地安装、集群操作)】

    视频地址:[尚硅谷]大数据技术之Zookeeper 3.5.7版本教程_哔哩哔哩_bilibili 尚硅谷大数据技术Zookeeper教程-笔记01[Zookeeper(入门.本地安装.集群操作)] ...

  7. 尚硅谷大数据技术Spark教程-笔记09【SparkStreaming(概念、入门、DStream入门、案例实操、总结)】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表[课程资料下载] 视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[SparkCore ...

  8. 尚硅谷大数据技术Hadoop教程-笔记02【Hadoop-入门】

    视频地址:尚硅谷大数据Hadoop教程(Hadoop 3.x安装搭建到集群调优) 尚硅谷大数据技术Hadoop教程-笔记01[大数据概论] 尚硅谷大数据技术Hadoop教程-笔记02[Hadoop-入 ...

  9. 尚硅谷大数据技术Hadoop教程-笔记03【Hadoop-HDFS】

    视频地址:尚硅谷大数据Hadoop教程(Hadoop 3.x安装搭建到集群调优) 尚硅谷大数据技术Hadoop教程-笔记01[大数据概论] 尚硅谷大数据技术Hadoop教程-笔记02[Hadoop-入 ...

  10. 尚硅谷大数据技术Scala教程-笔记04【集合】

    视频地址:尚硅谷大数据技术之Scala入门到精通教程(小白快速上手scala)_哔哩哔哩_bilibili 尚硅谷大数据技术Scala教程-笔记01[Scala课程简介.Scala入门.变量和数据类型 ...

最新文章

  1. 为什么阿里P8、P9技术大牛反复强调“结构化思维”?
  2. Spring Boot 动态数据源(多数据源自己主动切换)
  3. python 深拷贝_Python之类的浅拷贝与深拷贝
  4. openssl工具调试ssl加密ftp
  5. Spring学习(五)bean装配详解之 【XML方式配置】
  6. 使用node.js检查js语法错误
  7. B4010 菜肴制作 拓扑排序(附随机跳题代码)
  8. 二叉堆时间复杂度 php,二叉堆(Binary Heap)
  9. vue在个组件中给body加样式,或者给父级组件元素加样式
  10. How to Be Cool at College
  11. paip.php 配置ZEND DEBUGGER 断点调试for cli..
  12. origin刻度消失_使用Origin制作XRD图基本技巧,你get到了嘛?
  13. java smtp服务器,用Java实现SMTP服务器
  14. C语言循环语句的用法——while循环
  15. 推荐10个免费在线测试网页性能工具
  16. 中国不投美国国债还能投什么?
  17. vue玩转移动端H5微信支付和支付宝支付
  18. 使用CIFAR10数据集完成知识蒸馏(参照了快乐就好_Peng博主的博客)致谢
  19. dhcp服务器自动分配地址思科,cisco dhcp服务器设置方法
  20. 在Linux中各类重要文件被丢失的解决方式

热门文章

  1. 短视频解析技术原理,去水印原理分析整理汇总
  2. python程序员面试宝典 剑指offer_程序员面试宝典+剑指Offer + 算法100题系列 + 15个经典算法下载...
  3. 带通滤波器中心频率带宽matlab,带通滤波器频率设置
  4. 电子计算机教室宣传标语,电子备课室宣传标语有哪些
  5. 网页设计html5实训心得,网页设计实习心得
  6. [转载] 计算机端口详解
  7. win7 怎么干净删除php,Win7环境下彻底清除VBS病毒的教程
  8. Gambit 2.Gambit解释器
  9. 架构蓝图--软件架构 “4+1“ 视图模型
  10. 〖Python 数据库开发实战 - MySQL篇⑫〗- 数据表的字段约束