1、Mysql数据准备

(1)创建实时同步数据库

create database flink_gmall

(2)将Mysql.sql文件导入到Mysql中

source /opt/data/Mysql.sql

(3)查看数据库表

show tables;

2、开启数据库的binlog

(1)在mysql中对需要进行实时数据监测的库开启binlog

sudo vim /etc/my.cnf#添加数据库的binlog
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=flink_gmall#重启MySQL服务
sudo systemctl restart mysqld

(2)查询生成日志

cd /var/lib/mysql

3、编写脚本

(1)创建项目

  • 项目目录详述
目录 作用
app 产生各层的flink任务
bean 数据对象
common 公共常量
utils 工具类

(2)编写ODS代码

  • 创建topic
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --create --replication-factor 3 --partitions 6 --topic ods_behavior_db
  • 工具类MyKafkaUtil
package com.lhw.utils;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;public class MyKafkaUtil {public static FlinkKafkaProducer<String> getKafkaProducer(String topic){return new FlinkKafkaProducer<String>("hadoop102:9092,hadoop103:9092,hadoop104:9092",topic,new SimpleStringSchema());}
}
  • 反序列化函数CustomerDeserialization
package com.lhw.app.function;import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {/*** 封装的数据格式* {* "database":"",* "tableName":"",* "before":{"id":"","tm_name":""....},* "after":{"id":"","tm_name":""....},* "type":"c u d",* //"ts":156456135615* }*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {//1.创建JSON对象用于存储最终数据JSONObject result = new JSONObject();//2.获取库名&表名String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct value = (Struct) sourceRecord.value();//3.获取"before"数据Struct before = value.getStruct("before");JSONObject beforeJson = new JSONObject();if (before != null) {Schema beforeSchema = before.schema();List<Field> beforeFields = beforeSchema.fields();for (Field field : beforeFields) {Object beforeValue = before.get(field);beforeJson.put(field.name(), beforeValue);}}//4.获取"after"数据Struct after = value.getStruct("after");JSONObject afterJson = new JSONObject();if (after != null) {Schema afterSchema = after.schema();List<Field> afterFields = afterSchema.fields();for (Field field : afterFields) {Object afterValue = after.get(field);afterJson.put(field.name(), afterValue);}}//5.获取操作类型  CREATE UPDATE DELETEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);String type = operation.toString().toLowerCase();if ("create".equals(type)) {type = "insert";}//6.将字段写入JSON对象result.put("database", database);result.put("tableName", tableName);result.put("before", beforeJson);result.put("after", afterJson);result.put("type", type);//7.输出数据collector.collect(result.toJSONString());}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}
}
  • FlinkCDC的mainclass
package com.lhw.app.ods;import com.lhw.app.function.CustomerDeserialization;
import com.lhw.utils.MyKafkaUtil;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;public class MysqlFlinkCDC {public static void main(String[] args) throws Exception {// 1、获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);//        // 2、开启CK并指定状态后端为FS
//        // 2.1、制定存储ck地址
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
//        // 2.2、指定ck储存触发间隔时间
//        env.enableCheckpointing(5000);
//        // 2.3、指定ck模式
//        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        // 2.4、指定超时时间
//        env.getCheckpointConfig().setAlignmentTimeout(Duration.ofSeconds(1000));
//        // 2.5、CK最小触发间隔时间
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2000);// 3、通过FlinkCDC构建SourceFunctionDebeziumSourceFunction<String> mysqlSource = MySqlSource.<String>builder().hostname("hadoop102").port(3306).databaseList("flink_gmall")  // set captured database.tableList("flink_gmall.base_category1") // 如果不添加该参数,则消费指定数据库中的所有表.username("root").password("123456").startupOptions(StartupOptions.initial()).deserializer(new CustomerDeserialization()).build();// 4、使用CDC Source方式从mysql中读取数据DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);// 5、打印数据并将数据输入到kafkamysqlDS.print();String sinkTopic = "ods_behavior_db";mysqlDS.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));// 5、执行任务env.execute("flinkcdcmysql3");}
}

(3)运行测试

  • 本地IDEA打印输出 & kafka集群消费
bin/kafka-console-consumer.sh \--bootstrap-server hadoop102:9092 --topic ods_behavior_db

(4)检测变化

  • Mysql进行增改删工作
#新增数据
insert into flink_gmall.base_category1 values(18,"奢侈品");#修改数据
update flink_gmall.base_category1 set name="轻奢品" where id = 18;#删除数据
delete from flink_gmall.base_category1 where id=18;
  • IDEA检测变化 & Kakfa集群检测变化

(5)模拟数据生成(检测整个库的变化)

①文件上传并运行

#文件上传
rz application.propertiesgmall2020-mock-db-2020-11-27.jar

②修改配置文件

修改对应的库名,用户名&密码

③运行数据生成项目文件

java -jar gmall2020-mock-db-2020-11-27.jar

④IDEA检测变化 & kafka消费数据变化

4、打包部署

(1)修改代码

  • 添加CK保存位置
package com.lhw.app.ods;import com.lhw.app.function.CustomerDeserialization;
import com.lhw.utils.MyKafkaUtil;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;public class MysqlFlinkCDC {public static void main(String[] args) throws Exception {// 1、获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 2、开启CK并指定状态后端为FS// 2.1、制定存储ck地址env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC/flinkcdc-gmall-db"));// 2.2、指定ck储存触发间隔时间env.enableCheckpointing(5000);// 2.3、指定ck模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 2.4、指定超时时间env.getCheckpointConfig().setAlignmentTimeout(Duration.ofSeconds(1000));// 2.5、CK最小触发间隔时间env.getCheckpointConfig().setMaxConcurrentCheckpoints(2000);// 3、通过FlinkCDC构建SourceFunctionDebeziumSourceFunction<String> mysqlSource = MySqlSource.<String>builder().hostname("hadoop102").port(3306).databaseList("flink_gmall")  // set captured database.username("root").password("123456").startupOptions(StartupOptions.initial()).deserializer(new CustomerDeserialization()).build();// 4、使用CDC Source方式从mysql中读取数据DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);// 5、打印数据并将数据输入到kafkamysqlDS.print();String sinkTopic = "ods_behavior_db";mysqlDS.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));// 6、执行任务env.execute("flinkcdcmysql3");}
}

(2)打全量包

  • maven–>package

    (3)上传脚本并运行
bin/flink run -m hadoop102:8081 -c com.lhw.MysqlFlinkCDC /opt/data/gmall-flinkcdc-mysql.jar
  • 查看运行结果

  • 查看WEB端的打印输出

    (4)变更业务数据

  • 运行业务数据的变更包

java -jar gmall2020-mock-db-2020-11-27.jar
  • 查看运行结果(WEB端运行状态 & Kafka消费者消费消息)

  • 查看WEB端的打印输出

5、代码运行步骤

  • 代码路径: /opt/module/data/mysql_cdc

  • 启动步骤
#1、启动Flink
bin/start-culter#2、启动kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ods_behavior_db#3、启动flink的WEB端
http://hadoop102:8081/#4、启动flink任务
bin/flink run -m hadoop102:8081 -c com.lhw.MysqlFlinkCDC /opt/data/gmall-flinkcdc-mysql.jar
  • 运行界面

6、源码&资料下载

链接:https://pan.baidu.com/s/1mVRzQ4G5wqKkCAW74IzDOA
提取码:eni1

第二章 数据采集模块之FlinkCDC实时采集Mysql业务数据(源码资料见文末)相关推荐

  1. 第二章 数据采集模块之SpringBoot埋点数据采集(源码 资料见文末)

    1.模拟生成数据 (1)在hadoop102上创建目录 [atguigu@hadoop102 module]$ mkdir data_make cd data_make/ (2)上传数据生成脚本 rz ...

  2. 基于JAVA机票实时比价系统计算机毕业设计源码+数据库+lw文档+系统+部署

    基于JAVA机票实时比价系统计算机毕业设计源码+数据库+lw文档+系统+部署 基于JAVA机票实时比价系统计算机毕业设计源码+数据库+lw文档+系统+部署 本源码技术栈: 项目架构:B/S架构 开发语 ...

  3. 采用Flume实时采集和处理数据

    它已成功安装Flume在...的基础上.本文将总结使用Flume实时采集和处理数据,详细过程,如下面: 第一步,在$FLUME_HOME/conf文件夹下,编写Flume的配置文件,命名为flume_ ...

  4. Flume实时采集mysql数据到kafka中并输出

    环境说明 centos7 flume1.9.0(flume-ng-sql-source插件版本1.5.3) jdk1.8 kafka 2.1.1 zookeeper(这个我用的kafka内置的zk) ...

  5. DirectSound播放PCM(可播放实时采集的音频数据)

    前言 该篇整理的原始来源为http://blog.csdn.net/leixiaohua1020/article/details/40540147.非常感谢该博主的无私奉献,写了不少关于不同多媒体库的 ...

  6. 基于HDP使用Flume实时采集MySQL中数据传到Kafka

    注意:HDP中Kafka broker的端口是6667,不是9092 如有需要请看:基于HDP使用Flume实时采集MySQL中数据传到Kafka+Hive/HDFS 1.将flume-ng-sql- ...

  7. Android实现手部检测和手势识别(可实时运行,含Android源码)

    Android实现手部检测和手势识别(可实时运行,含Android源码) 目录 Android实现手部检测和手势识别(可实时运行,含Android源码) 1. 前言 2. 手势识别的方法 (1)基于多 ...

  8. Apache Storm 实时流处理系统通信机制源码分析

    我们今天就来仔细研究一下Apache Storm 2.0.0-SNAPSHOT的通信机制.下面我将从大致思想以及源码分析,然后我们细致分析实时流处理系统中源码通信机制研究. 1. 简介 Worker间 ...

  9. github可视化_Cesium数据可视化-仓储调度系统可视化部分(附github源码)

    Cesium数据可视化-仓储调度系统可视化部分 目的 仓储调度系统需要一个可视化展示物资运输实况的界面,需要配合GPS设备发送的位置信息,实时绘制物资运输情况和仓储仓库.因此,使用Cesium可视化该 ...

最新文章

  1. HDU5154拓扑排序模版题
  2. 鸿蒙可以和安卓抗衡吗,鸿蒙手机系统正式登场!继承EMUI的衣钵,能抗衡安卓系统吗?...
  3. MySQL存储引擎类别
  4. planahead 动态重构_部分动态可重构
  5. 计算机室是学校重要的教学设施,计算机室治理规章.doc
  6. ASP.NET中实现MSN通知消息功能
  7. 日本第四次产业革命瞄准物联网
  8. MySQL可视化工具使用
  9. CentOS 7.4 64位 .tar.bz2 解压
  10. 晴天的魔法乐园——谢尔宾斯基地毯(递归打印图形)
  11. js两只手指控制div图片放大缩小功能(2)
  12. mysql输出max函数_MySQL中的max()函数使用教程
  13. 读书笔记:《程序员修炼之道:通向务实的最高境界》
  14. 爬取东方财富网数据笔记
  15. 聚焦到Windows的窗口,激活窗口获取键盘输入,不需要鼠标点击,C++接口,focus, active, foreground
  16. mysql wal_我所理解的MySQL之一:基础架构
  17. LeetCode刷题记录(1)
  18. 华为 MateBook E GO 标准版评测
  19. 博士申请 | 美国佛罗里达大学陈世刚教授招收人工智能/机器学习方向博士生
  20. 腾讯云整体产品矩阵及生态圈

热门文章

  1. 打印20以内的素数c语言,c语言编程输出2~100之间的所有素数(每行输出10个),并将它们打印出来....
  2. golang gorm增删改查db.Model db.Where db.Table
  3. 短视频、直播平台——电商直播源码第三方SDK接入教程
  4. 华为云空间費用_华为云空间有什么用
  5. alipay 证书 java_alipay-sdk-java
  6. linux netstat 命令 (转载)
  7. 道客巴巴免费下载文档技巧
  8. 2021年语音合成年度总结
  9. arcgis 栅格计算器,img叠加运算,con函数,img转txt
  10. 诶嘿,~~o(╥﹏╥)o~~大东北我又来了