一,Flink1.11引入了CDC操作,在官网我们可以看到的是:

1,Canal    https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/formats/canal.html

2,Debezium  https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/formats/debezium.html

然后社区提供 了

3,mysql-cdc  https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector

4,changelog-json  https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format

所以在这里我会做一个笔记,演示一下正常的canal-json案例,mysql-cdc案例 changelog-json案例

二,先说一下使用场景,实际都是针对mysql的数据变化,针对binlog做到了数据的实时读取。

1,读取binlog数据聚合结果写入到kafka

2,之前位维表join是缓存,或者定时更新维表数据,不能做到更实时,有了mysql-cdc,更能实时更新维表数据了。

3,数据同步。

三,依次的代码案例演示

1,canal-json

需要依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><!--<scope>provided</scope>-->
</dependency>
public class Cannal2kafkaTest {private static final String PRINT_SINK_SQL = "create table sink_print ( \n" +" aaa DECIMAL(10, 2) \n" +") with ('connector' = 'print' )";private static final String CANAL_JSON_SQL = "CREATE TABLE topic_products (" +"  id BIGINT," +"  name STRING," +"  description STRING," +"  weight DECIMAL(10, 2)" +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'products_test'," +
//            " 'topic' = 'products'," +" 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092'," +" 'properties.group.id' = 'test1'," +
//            " 'format' = 'canal-json'" +" 'format' = 'canal-json'," +" 'scan.startup.mode' = 'earliest-offset'" +")";private static final String ODS_SQL = "CREATE TABLE ods_topic (\n" +" user_id VARCHAR ," +" item_id VARCHAR," +" category_id VARCHAR," +" behavior VARCHAR," +" proctime TIMESTAMP(3)," +" ts VARCHAR" +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'ods_kafka'," +" 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092'," +" 'properties.group.id' = 'test1'," +" 'format' = 'json'," +" 'scan.startup.mode' = 'earliest-offset'" +")";public static void main(String[] args) throws Exception {StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.enableCheckpointing(5000);bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);tEnv.executeSql(CANAL_JSON_SQL);tEnv.executeSql(PRINT_SINK_SQL);
//        tEnv.executeSql(ODS_SQL);//        Table table = tEnv.sqlQuery("select *  from topic_products");
//        tEnv.toRetractStream(table, Row.class).print("$$$$$$$$:");//        tEnv.executeSql("insert into sink_print select sum(weight) as last_values from topic_products");
//        tEnv.executeSql("select * from topic_products");Table table = tEnv.sqlQuery("select id ,sum(weight) from topic_products group by id ");tEnv.toRetractStream(table, Row.class).print("¥¥¥¥输出:");bsEnv.execute("胜多负少方式方法") ;}
}

2, changelog-json  它的作用是可以讲聚合,updade stream写入到kafka,我们就可以全程以kafka作为存储,sql化操作

需要依赖:

<dependency><groupId>com.alibaba.ververica</groupId><!-- add the dependency matching your database --><artifactId>flink-format-changelog-json</artifactId><version>1.0.0</version>
</dependency>
object CdcSinkKafka {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(30001)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.buildval stenv = StreamTableEnvironment.create(env, bsSettings)val source =s"""|CREATE TABLE kafka_table (| category_id STRING,| user_id INT,| item_id STRING,| behavior STRING,| ts STRING|) WITH (| 'connector' = 'kafka',| 'topic' = 'user_behavior',| 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',| 'properties.group.id' = 'test1',| 'format' = 'json',| 'scan.startup.mode' = 'earliest-offset'|)""".stripMarginstenv.executeSql(source)val sink =s"""|CREATE TABLE kafka_gmv (|  id STRING,|  gmv DECIMAL(10, 5)|) WITH (|    'connector' = 'kafka',|    'topic' = 'kafka_gmv',|    'scan.startup.mode' = 'earliest-offset',|    'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',|    'format' = 'changelog-json'|)""".stripMarginstenv.executeSql(sink)val insert =s"""| INSERT INTO kafka_gmv|    SELECT behavior, SUM(user_id) as gmv|    FROM kafka_table|    GROUP BY behavior""".stripMarginstenv.executeSql(insert)val query =s"""|SELECT * FROM kafka_gmv;""".stripMarginstenv.executeSql(query).print()}
}

3,mysql-cdc (目前不建议上生产,可以测试一下,用在维表join上面应该是挺棒的,而且他会将对应的表数据全部加载)

需要依赖:

<dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.0.0</version>
</dependency>
object Test1 {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(30001)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.buildval stenv = StreamTableEnvironment.create(env, bsSettings)val mysql_cdc =s"""|CREATE TABLE orders (|  id INT,|  name STRING,|  region_id INT,|  area_code STRING|) WITH (|  'connector' = 'mysql-cdc',|  'hostname' = '192.168.6.143',|  'port' = '3306',|  'username' = 'root',|  'password' = '12345678',|  'database-name' = 'flink_test2',|  'table-name' = 'base.*'    --这里可以正则|)|""".stripMarginval sink_print ="""|create table sink_print ( aaa INT,bbb STRING,ccc INT,ddd STRING) with ('connector' = 'print' )""".stripMarginstenv.executeSql(mysql_cdc)stenv.executeSql(sink_print)stenv.executeSql("insert into sink_print select * from orders")//    stenv.executeSql("insert into select * from orders").print()/* val source =s"""|CREATE TABLE kafka_table (| category_id STRING,| user_id INT,| item_id STRING,| behavior STRING,| ts STRING|) WITH (| 'connector' = 'kafka',| 'topic' = 'user_behavior',| 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',| 'properties.group.id' = 'test1',| 'format' = 'json',| 'scan.startup.mode' = 'earliest-offset'|)""".stripMarginstenv.executeSql(source)val sink =s"""|CREATE TABLE kafka_gmv (|  id STRING,|  gmv DECIMAL(10, 5)|) WITH (|    'connector' = 'kafka',|    'topic' = 'kafka_gmv',|    'scan.startup.mode' = 'earliest-offset',|    'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',|    'format' = 'changelog-json'|)""".stripMarginstenv.executeSql(sink)val insert  =s"""| INSERT INTO kafka_gmv|    SELECT behavior, SUM(user_id) as gmv|    FROM kafka_table|    GROUP BY behavior""".stripMarginstenv.executeSql(insert)val  query  =s"""|SELECT * FROM kafka_gmv;""".stripMarginstenv.executeSql(query).print()*/}
}

这里我都是写的scala,因为zeppelin上提交任务不支持java,其实都一样,没太多区别。

以后有啥需要补充的再加上吧。

增加一个mysql  cdc 维表join的效果:

//todo 流join mysql cdc。
public class Mysql_cdc_join {private static final String PRINT_SINK_SQL = "create table sink_print ( \n" +" id INT," +" name STRING," +" region_id INT," +" area_code STRING" +") with ('connector' = 'print' )";private static final String PRINT_SINK_SQL2 = "create table sink_print2 (" +" s_order_id INT," +" m_id INT " +") with ('connector' = 'print' )";private static final String MYSQL_CDC_SQL = "CREATE TABLE orders (" +" id INT," +" name STRING," +" region_id INT," +" area_code STRING " +" ) WITH ( " +" 'connector' = 'mysql-cdc'," +" 'hostname' = '192.168.6.143'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '12345678'," +" 'database-name' = 'flink_test2'," +" 'table-name' = 'base_province'" +")";public static void main(String[] args) {StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment stenv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.enableCheckpointing(5000);bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<Tuple4<Integer, String, String, Long>> ds = bsEnv.addSource(new SourceFunction<Tuple4<Integer, String, String, Long>>() {@Overridepublic void run(SourceContext<Tuple4<Integer, String, String, Long>> out) throws Exception {Random random = new Random();Random random2 = new Random();Integer id = 0;while (true) {int sale = random.nextInt(1000);
//                    int id = random2.nextInt(100);id++;out.collect(new Tuple4<>(id, "user", "product", Long.valueOf(sale)));Thread.sleep(1000L);}}@Overridepublic void cancel() {}});// todo 把 DataStream 注册为表,添加一个process时间处理字段(这里也可以设置为rowtime)stenv.createTemporaryView("stream_order", ds, $("order_id"), $("users"), $("product"), $("number"), $("proctime").proctime());//todo 维度表stenv.executeSql(MYSQL_CDC_SQL);
//        stenv.executeSql(PRINT_SINK_SQL);stenv.executeSql(PRINT_SINK_SQL2);
//        stenv.executeSql("insert into sink_print select * from orders");//todo 维度表joinString joinSql = "insert into sink_print2 SELECT" +"  s.order_id," +"  m.id " +"FROM " +"stream_order s " +
//                "  JOIN orders FOR SYSTEM_TIME AS OF s.proctime as m " +"  JOIN orders m " +"  ON m.id = s.order_id";stenv.executeSql(joinSql);}
}

注意:在之前 我们维表join是通过下图,这种方式只能在1.12才能实现跟cdc的join:

我们在上面的代码里面是普通join。

结果演示:

开始的时候mysql存储  id = 5,6, 7的数据

中途我删除了id为5的数据,然后添加了id=100的数据,在过了一点时候之后

+I(100,100)才打印出来,正确。

这个时候流数据id肯定大于100了,我在mysql添加一条 id=55的数据,控制台立马打印了。

Flink1.11的CDC-Connectors操作记录相关推荐

  1. Flink1.11中的CDC Connectors操作实践

    Flink1.11引入了CDC的connector,通过这种方式可以很方便地捕获变化的数据,大大简化了数据处理的流程.Flink1.11的CDC connector主要包括:MySQL CDC和Pos ...

  2. 【Flink】Flink1.11.2 on YARN滚动日志配置

    文章目录 1.概述 1.1 Flink 应用的完整日志如何查看? 1.2 滚动 1.概述 转载:Flink1.11.2 on YARN滚动日志配置 参考:https://blog.csdn.net/w ...

  3. 14.Flink1.11 安装部署及Release 文档解读

    Flink1.11 安装部署及Release 文档解读 1. [Flink 1.11 Release 文档解读](https://ci.apache.org/projects/flink/flink- ...

  4. mysql数据库主从操作记录

    master数据库已投入生产一段时间后,做主从复制的操作记录 环境: master库:172.18.237.13 slave库:172.18.237.14 mysql版本说明: master:mysq ...

  5. 运维利器-ClusterShell集群管理操作记录

    在运维实战中,如果有若干台数据库服务器,想对这些服务器进行同等动作,比如查看它们当前的即时负载情况,查看它们的主机名,分发文件等等,这个时候该怎么办?一个个登陆服务器去操作,太傻帽了!写个shell去 ...

  6. mysql---mysql查看数据库操作记录

    mysql查看数据库操作记录 MySQL的查询日志记录了所有MySQL数据库请求的信息.无论这些请求是否得到了正确的执行.默认文件名为hostname.log.默认情况下MySQL查询日志是关闭的.生 ...

  7. oracle11g导出dmp文件 少表,Oracle11g导出dmp并导入Oracle10g的操作记录

    Oracle11g导出dmp并导入Oracle10g的操作记录. 操作环境说明: Oracle11g环境:Windows7,Oracle Database 11g Enterprise Edition ...

  8. C++primer第十一章 关联容器 11.3关联容器操作 11.4 无序容器

    11.3关联容器操作 除了表9.2(第295页)中列出的类型,关联容器还定义了表11.3中列出的类型.这些类型表示容器关键字和值的类型. 对于set类型,key_type和value type是一样的 ...

  9. mysql 数据修改记录日志_mysql对数据的更新操作记录在哪个日志中?

    mysql对数据的更新操作记录在通用查询日志和二进制日志中.通用查询日志用来记录用户的所有操作,包括启动和关闭 MySQL 服务.更新语句和查询语句等:二进制日志会以二进制的形式记录数据库的各种操作, ...

最新文章

  1. 在Ubuntu16.04上安装CUDA
  2. 使用Python进行地理编码和反向地理编码
  3. 前端学习(2706):重读vue电商网站26之JSON格式的配置文件
  4. System Center Virtual Machine Manager 2012 安装
  5. 2020年度中国人工智能学会优秀博士学位论文获奖名单正式出炉
  6. 关于MySQL的死锁问题
  7. [DirectX11]Gerstner波 实现简单的水面模拟
  8. Mac Book Pro Catalina不能打开软件,提示检查为恶意软件
  9. 医院耗材管理系统开发_2
  10. 模式识别经典算法——LDA
  11. 快速启动工具入门——以Launchy为例(二)
  12. 怎么起一个好听的商务邮箱名字?
  13. 【网络安全】网络安全期末大题 复习题
  14. 如何用控制面板打开计算机配置,如何查看电脑的配置?学会下面几种方法,你就可以自己看电脑配置...
  15. Codeforces1486 C1.Guessing the Greatest (easy version)(交互题+二分)
  16. 计算机二级交付遇到问题怎么解决,交期延误问题的解决办法 !
  17. PS\AE\PR如何切换英文?这款Adobe中英快速切换工具一键帮你解决
  18. 【计网实验报告】Cisco局域网模拟组建、简单网络测试
  19. c语言实现hdr图像合成,怎样完成一张HDR照片的拍摄与合成
  20. 阿里云上的使用QQ邮箱发送邮件

热门文章

  1. Qt开发之中国象棋篇(四):棋子移动(上)
  2. linux如何统计目录下的文件个数,linux怎么查看一个目录下的文件数量
  3. 初手必看之Nginx详细介绍
  4. 高蛋白饮食≠健康 多组学分析揭示植物高蛋白对血糖和肝脏脂质代谢的影响
  5. 腾讯云服务器和阿里云服务器怎么选?
  6. 咪咕盒子MGV2000免拆机卡刷精简固件
  7. 移植蜂鸟E203内核至达芬奇pro35T【集创芯来RISC-V杯】(一)
  8. 前端开发之JS篇(四)
  9. 外呼系统是怎么帮助销售提升电销效率的?
  10. Cognitive Behavioral Therapy by Lawrence Wallace