Flink1.11的CDC-Connectors操作记录
一,Flink1.11引入了CDC操作,在官网我们可以看到的是:
1,Canal https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/formats/canal.html
2,Debezium https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/formats/debezium.html
然后社区提供 了
3,mysql-cdc https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector
4,changelog-json https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format
所以在这里我会做一个笔记,演示一下正常的canal-json案例,mysql-cdc案例 changelog-json案例
二,先说一下使用场景,实际都是针对mysql的数据变化,针对binlog做到了数据的实时读取。
1,读取binlog数据聚合结果写入到kafka
2,之前位维表join是缓存,或者定时更新维表数据,不能做到更实时,有了mysql-cdc,更能实时更新维表数据了。
3,数据同步。
三,依次的代码案例演示
1,canal-json
需要依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><!--<scope>provided</scope>-->
</dependency>
public class Cannal2kafkaTest {private static final String PRINT_SINK_SQL = "create table sink_print ( \n" +" aaa DECIMAL(10, 2) \n" +") with ('connector' = 'print' )";private static final String CANAL_JSON_SQL = "CREATE TABLE topic_products (" +" id BIGINT," +" name STRING," +" description STRING," +" weight DECIMAL(10, 2)" +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'products_test'," + // " 'topic' = 'products'," +" 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092'," +" 'properties.group.id' = 'test1'," + // " 'format' = 'canal-json'" +" 'format' = 'canal-json'," +" 'scan.startup.mode' = 'earliest-offset'" +")";private static final String ODS_SQL = "CREATE TABLE ods_topic (\n" +" user_id VARCHAR ," +" item_id VARCHAR," +" category_id VARCHAR," +" behavior VARCHAR," +" proctime TIMESTAMP(3)," +" ts VARCHAR" +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'ods_kafka'," +" 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092'," +" 'properties.group.id' = 'test1'," +" 'format' = 'json'," +" 'scan.startup.mode' = 'earliest-offset'" +")";public static void main(String[] args) throws Exception {StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.enableCheckpointing(5000);bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);tEnv.executeSql(CANAL_JSON_SQL);tEnv.executeSql(PRINT_SINK_SQL); // tEnv.executeSql(ODS_SQL);// Table table = tEnv.sqlQuery("select * from topic_products"); // tEnv.toRetractStream(table, Row.class).print("$$$$$$$$:");// tEnv.executeSql("insert into sink_print select sum(weight) as last_values from topic_products"); // tEnv.executeSql("select * from topic_products");Table table = tEnv.sqlQuery("select id ,sum(weight) from topic_products group by id ");tEnv.toRetractStream(table, Row.class).print("¥¥¥¥输出:");bsEnv.execute("胜多负少方式方法") ;} }
2, changelog-json 它的作用是可以讲聚合,updade stream写入到kafka,我们就可以全程以kafka作为存储,sql化操作
需要依赖:
<dependency><groupId>com.alibaba.ververica</groupId><!-- add the dependency matching your database --><artifactId>flink-format-changelog-json</artifactId><version>1.0.0</version>
</dependency>
object CdcSinkKafka {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(30001)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.buildval stenv = StreamTableEnvironment.create(env, bsSettings)val source =s"""|CREATE TABLE kafka_table (| category_id STRING,| user_id INT,| item_id STRING,| behavior STRING,| ts STRING|) WITH (| 'connector' = 'kafka',| 'topic' = 'user_behavior',| 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',| 'properties.group.id' = 'test1',| 'format' = 'json',| 'scan.startup.mode' = 'earliest-offset'|)""".stripMarginstenv.executeSql(source)val sink =s"""|CREATE TABLE kafka_gmv (| id STRING,| gmv DECIMAL(10, 5)|) WITH (| 'connector' = 'kafka',| 'topic' = 'kafka_gmv',| 'scan.startup.mode' = 'earliest-offset',| 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',| 'format' = 'changelog-json'|)""".stripMarginstenv.executeSql(sink)val insert =s"""| INSERT INTO kafka_gmv| SELECT behavior, SUM(user_id) as gmv| FROM kafka_table| GROUP BY behavior""".stripMarginstenv.executeSql(insert)val query =s"""|SELECT * FROM kafka_gmv;""".stripMarginstenv.executeSql(query).print()} }
3,mysql-cdc (目前不建议上生产,可以测试一下,用在维表join上面应该是挺棒的,而且他会将对应的表数据全部加载)
需要依赖:
<dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.0.0</version>
</dependency>
object Test1 {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(30001)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.buildval stenv = StreamTableEnvironment.create(env, bsSettings)val mysql_cdc =s"""|CREATE TABLE orders (| id INT,| name STRING,| region_id INT,| area_code STRING|) WITH (| 'connector' = 'mysql-cdc',| 'hostname' = '192.168.6.143',| 'port' = '3306',| 'username' = 'root',| 'password' = '12345678',| 'database-name' = 'flink_test2',| 'table-name' = 'base.*' --这里可以正则|)|""".stripMarginval sink_print ="""|create table sink_print ( aaa INT,bbb STRING,ccc INT,ddd STRING) with ('connector' = 'print' )""".stripMarginstenv.executeSql(mysql_cdc)stenv.executeSql(sink_print)stenv.executeSql("insert into sink_print select * from orders")// stenv.executeSql("insert into select * from orders").print()/* val source =s"""|CREATE TABLE kafka_table (| category_id STRING,| user_id INT,| item_id STRING,| behavior STRING,| ts STRING|) WITH (| 'connector' = 'kafka',| 'topic' = 'user_behavior',| 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',| 'properties.group.id' = 'test1',| 'format' = 'json',| 'scan.startup.mode' = 'earliest-offset'|)""".stripMarginstenv.executeSql(source)val sink =s"""|CREATE TABLE kafka_gmv (| id STRING,| gmv DECIMAL(10, 5)|) WITH (| 'connector' = 'kafka',| 'topic' = 'kafka_gmv',| 'scan.startup.mode' = 'earliest-offset',| 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',| 'format' = 'changelog-json'|)""".stripMarginstenv.executeSql(sink)val insert =s"""| INSERT INTO kafka_gmv| SELECT behavior, SUM(user_id) as gmv| FROM kafka_table| GROUP BY behavior""".stripMarginstenv.executeSql(insert)val query =s"""|SELECT * FROM kafka_gmv;""".stripMarginstenv.executeSql(query).print()*/} }
这里我都是写的scala,因为zeppelin上提交任务不支持java,其实都一样,没太多区别。
以后有啥需要补充的再加上吧。
增加一个mysql cdc 维表join的效果:
//todo 流join mysql cdc。 public class Mysql_cdc_join {private static final String PRINT_SINK_SQL = "create table sink_print ( \n" +" id INT," +" name STRING," +" region_id INT," +" area_code STRING" +") with ('connector' = 'print' )";private static final String PRINT_SINK_SQL2 = "create table sink_print2 (" +" s_order_id INT," +" m_id INT " +") with ('connector' = 'print' )";private static final String MYSQL_CDC_SQL = "CREATE TABLE orders (" +" id INT," +" name STRING," +" region_id INT," +" area_code STRING " +" ) WITH ( " +" 'connector' = 'mysql-cdc'," +" 'hostname' = '192.168.6.143'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '12345678'," +" 'database-name' = 'flink_test2'," +" 'table-name' = 'base_province'" +")";public static void main(String[] args) {StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment stenv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.enableCheckpointing(5000);bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<Tuple4<Integer, String, String, Long>> ds = bsEnv.addSource(new SourceFunction<Tuple4<Integer, String, String, Long>>() {@Overridepublic void run(SourceContext<Tuple4<Integer, String, String, Long>> out) throws Exception {Random random = new Random();Random random2 = new Random();Integer id = 0;while (true) {int sale = random.nextInt(1000); // int id = random2.nextInt(100);id++;out.collect(new Tuple4<>(id, "user", "product", Long.valueOf(sale)));Thread.sleep(1000L);}}@Overridepublic void cancel() {}});// todo 把 DataStream 注册为表,添加一个process时间处理字段(这里也可以设置为rowtime)stenv.createTemporaryView("stream_order", ds, $("order_id"), $("users"), $("product"), $("number"), $("proctime").proctime());//todo 维度表stenv.executeSql(MYSQL_CDC_SQL); // stenv.executeSql(PRINT_SINK_SQL);stenv.executeSql(PRINT_SINK_SQL2); // stenv.executeSql("insert into sink_print select * from orders");//todo 维度表joinString joinSql = "insert into sink_print2 SELECT" +" s.order_id," +" m.id " +"FROM " +"stream_order s " + // " JOIN orders FOR SYSTEM_TIME AS OF s.proctime as m " +" JOIN orders m " +" ON m.id = s.order_id";stenv.executeSql(joinSql);} }
注意:在之前 我们维表join是通过下图,这种方式只能在1.12才能实现跟cdc的join:
我们在上面的代码里面是普通join。
结果演示:
开始的时候mysql存储 id = 5,6, 7的数据
中途我删除了id为5的数据,然后添加了id=100的数据,在过了一点时候之后
+I(100,100)才打印出来,正确。
这个时候流数据id肯定大于100了,我在mysql添加一条 id=55的数据,控制台立马打印了。
Flink1.11的CDC-Connectors操作记录相关推荐
- Flink1.11中的CDC Connectors操作实践
Flink1.11引入了CDC的connector,通过这种方式可以很方便地捕获变化的数据,大大简化了数据处理的流程.Flink1.11的CDC connector主要包括:MySQL CDC和Pos ...
- 【Flink】Flink1.11.2 on YARN滚动日志配置
文章目录 1.概述 1.1 Flink 应用的完整日志如何查看? 1.2 滚动 1.概述 转载:Flink1.11.2 on YARN滚动日志配置 参考:https://blog.csdn.net/w ...
- 14.Flink1.11 安装部署及Release 文档解读
Flink1.11 安装部署及Release 文档解读 1. [Flink 1.11 Release 文档解读](https://ci.apache.org/projects/flink/flink- ...
- mysql数据库主从操作记录
master数据库已投入生产一段时间后,做主从复制的操作记录 环境: master库:172.18.237.13 slave库:172.18.237.14 mysql版本说明: master:mysq ...
- 运维利器-ClusterShell集群管理操作记录
在运维实战中,如果有若干台数据库服务器,想对这些服务器进行同等动作,比如查看它们当前的即时负载情况,查看它们的主机名,分发文件等等,这个时候该怎么办?一个个登陆服务器去操作,太傻帽了!写个shell去 ...
- mysql---mysql查看数据库操作记录
mysql查看数据库操作记录 MySQL的查询日志记录了所有MySQL数据库请求的信息.无论这些请求是否得到了正确的执行.默认文件名为hostname.log.默认情况下MySQL查询日志是关闭的.生 ...
- oracle11g导出dmp文件 少表,Oracle11g导出dmp并导入Oracle10g的操作记录
Oracle11g导出dmp并导入Oracle10g的操作记录. 操作环境说明: Oracle11g环境:Windows7,Oracle Database 11g Enterprise Edition ...
- C++primer第十一章 关联容器 11.3关联容器操作 11.4 无序容器
11.3关联容器操作 除了表9.2(第295页)中列出的类型,关联容器还定义了表11.3中列出的类型.这些类型表示容器关键字和值的类型. 对于set类型,key_type和value type是一样的 ...
- mysql 数据修改记录日志_mysql对数据的更新操作记录在哪个日志中?
mysql对数据的更新操作记录在通用查询日志和二进制日志中.通用查询日志用来记录用户的所有操作,包括启动和关闭 MySQL 服务.更新语句和查询语句等:二进制日志会以二进制的形式记录数据库的各种操作, ...
最新文章
- 在Ubuntu16.04上安装CUDA
- 使用Python进行地理编码和反向地理编码
- 前端学习(2706):重读vue电商网站26之JSON格式的配置文件
- System Center Virtual Machine Manager 2012 安装
- 2020年度中国人工智能学会优秀博士学位论文获奖名单正式出炉
- 关于MySQL的死锁问题
- [DirectX11]Gerstner波 实现简单的水面模拟
- Mac Book Pro Catalina不能打开软件,提示检查为恶意软件
- 医院耗材管理系统开发_2
- 模式识别经典算法——LDA
- 快速启动工具入门——以Launchy为例(二)
- 怎么起一个好听的商务邮箱名字?
- 【网络安全】网络安全期末大题 复习题
- 如何用控制面板打开计算机配置,如何查看电脑的配置?学会下面几种方法,你就可以自己看电脑配置...
- Codeforces1486 C1.Guessing the Greatest (easy version)(交互题+二分)
- 计算机二级交付遇到问题怎么解决,交期延误问题的解决办法 !
- PS\AE\PR如何切换英文?这款Adobe中英快速切换工具一键帮你解决
- 【计网实验报告】Cisco局域网模拟组建、简单网络测试
- c语言实现hdr图像合成,怎样完成一张HDR照片的拍摄与合成
- 阿里云上的使用QQ邮箱发送邮件