点击上方蓝色字体,选择“设为星标”

回复”面试“获取更多惊喜

我在之前的文章中已经详细的介绍过Flink CDC的原理和实践了。

如果你对Flink CDC 还没有什么概念,可以参考这里:Flink CDC 原理及生产实践

在实际生产中相信已经有很多小伙伴尝试过了,我在这里将一些个人遇到的、搜索到的、官方博客中总结的以及在Flink的邮件组中的看到过的一些常见问题进行了总结。供大家参考。

不同的kafka版本依赖冲突

不同的kafka版本依赖冲突会造成cdc报错,参考这个issue:

http://apache-flink.147419.n8.nabble.com/cdc-td8357.html#a8393

2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out (1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched from RUNNING to FAILED.
java.lang.AbstractMethodError: org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:583)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:80)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
at io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

源码如下:

public class CdcTest {public static void main(String[] args) throws Exception {SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("localhost").port(3306).databaseList("sohay") // monitor all tables under inventory database.username("root").password("123456").deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute();}
}

确实是pom中存在一个Kafka的依赖包,导致冲突,去掉后问题解决。

MySQL CDC源等待超时

在扫描表期间,由于没有可恢复的位置,因此无法执行checkpoints。为了不执行检查点,MySQL CDC源将保持检查点等待超时。超时检查点将被识别为失败的检查点,默认情况下,这将触发Flink作业的故障转移。因此,如果数据库表很大,则建议添加以下Flink配置,以避免由于超时检查点而导致故障转移:

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

数据库切换,重新开启binlog,Mysql全局锁无法释放

原因是因为切换了数据库环境,重新开启binlog,所有的作业都重新同步binlog的全量数据,导致了全局锁一直在等待,所有作业都无法执行。解决方法:记录checkpoint的地址,取消作业,然后根据checkpoint重启作业。

使用Flink SQL CDC模式创建维表异常

CREATE TABLE cdc_test
(id  STRING,ip  STRING,url STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc', 'hostname' = '127.0.0.1','port' = '3306','database-name' = 'xx','table-name' = 'xx','username' = 'xx','password' = 'xx'
);

执行查询:

SELECT * FROM cdc_test;

任务无法运行,抛出异常

User does not have the 'LOCK TABLES' privilege required to obtain a consistent snapshot by preventing concurrent writes to tables.

原因是连接MySQL的用户缺乏必要的CDC权限。

Flink SQL CDC基于Debezium实现。当启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻止其他数据库的写入,然后读取当前binlog位置以及数据库和表的schema,之后将释放全局读取锁。然后它扫描数据库表并从先前记录的位置读取binlog,Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复,因此它保证了仅一次的语义。

解决办法:创建一个新的MySQL用户并授予其必要的权限。

mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mysql> FLUSH PRIVILEGES;

Flink作业扫描MySQL全量数据出现fail-over

Flink 作业在扫描 MySQL 全量数据时,checkpoint 超时,出现作业 failover,如下图:

原因:Flink CDC 在 scan 全表数据(我们的实收表有千万级数据)需要小时级的时间(受下游聚合反压影响),而在 scan 全表过程中是没有 offset 可以记录的(意味着没法做 checkpoint),但是 Flink 框架任何时候都会按照固定间隔时间做 checkpoint,所以此处 mysql-cdc source 做了比较取巧的方式,即在 scan 全表的过程中,会让执行中的 checkpoint 一直等待甚至超时。超时的 checkpoint 会被仍未认为是 failed checkpoint,默认配置下,这会触发 Flink 的 failover 机制,而默认的 failover 机制是不重启。所以会造成上面的现象。

解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:

execution.checkpointing.interval: 10min   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint 失败容忍次数
restart-strategy: fixed-delay  # 重试策略
restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数

作业在运行时 mysql cdc source 报 no viable alternative at input 'alter table std'

原因:因为数据库中别的表做了字段修改,CDC source 同步到了 ALTER DDL 语句,但是解析失败抛出的异常。

解决方法:在 flink-cdc-connectors 最新版本中已经修复该问题(跳过了无法解析的 DDL)。升级 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替换 flink/lib 下的旧包。

多个作业共用同一张 source table 时,没有修改 server id 导致读取出来的数据有丢失。

原因:MySQL binlog 数据同步的原理是,CDC source 会伪装成 MySQL 集群的一个 slave(使用指定的 server id 作为唯一 id),然后从 MySQL 拉取 binlog 数据。如果一个 MySQL 集群中有多个 slave 有同样的 id,就会导致拉取数据错乱的问题。

解决方法:默认会随机生成一个 server id,容易有碰撞的风险。所以建议使用动态参数(table hint)在 query 中覆盖 server id。如下所示:

FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;

CDC source 扫描 MySQL 表期间,发现无法往该表 insert 数据

原因:由于使用的 MySQL 用户未授权 RELOAD 权限,导致无法获取全局读锁(FLUSH TABLES WITH READ LOCK), CDC source 就会退化成表级读锁,而使用表级读锁需要等到全表 scan 完,才能释放锁,所以会发现持锁时间过长的现象,影响其他业务写入数据。

解决方法:给使用的 MySQL 用户授予 RELOAD 权限即可。所需的权限列表详见文档:

https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server

如果出于某些原因无法授予 RELOAD 权限,也可以显式配上 'debezium.snapshot.locking.mode' = 'none'来避免所有锁的获取,但要注意只有当快照期间表的 schema 不会变更才安全。

我们在学习Flink的时候,到底在学习什么?

我们在学习Spark的时候,到底在学习什么?

【面试&个人成长】2021年过半,社招和校招的经验之谈

八千里路云和月 | 从零到大数据专家学习路径指南

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

你好,我是王知无,一个大数据领域的硬核原创作者。

做过后端架构、数据中间件、数据平台&架构、算法工程化。

专注大数据领域实时动态&技术提升&个人成长&职场进阶,欢迎关注。

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点相关推荐

  1. 找准边界,吃定安全 | 高性能硬件防御问题难解?硬件加速引擎闪亮登场

    山石网科 A7600 智能下一代防火墙 高性能.高可靠.轻量化.更便捷 软件灵活性和硬件高效性的统一 [找准边界,吃定安全]往期回顾:  串联边界设备协同,便捷运营思维让安全更有效 流量剧增?看山石网 ...

  2. 找准边界,吃定安全 | 流量剧增?看山石网科如何打破传统限制

    创新的资源管理算法 基于会话的全分布式处理流程 山石网科全分布式架构 打破了传统架构的限制 [找准边界,吃定安全]往期文章: 从访问控制谈起,再看零信任模型 威胁情报加持,泛边界下的全局主动防御体系如 ...

  3. 找准边界,吃定安全 | 云化下的新边界,东西南北流量该如何防护?

    实现业务计算集中模式的云计算数据中心 云内东西向流量不可见不可控 云计算数据中心的安全建设要求再度升级 如何保障云上环境的安全运行? [找准边界,吃定安全]往期文章: 从访问控制谈起,再看零信任模型 ...

  4. 高情商的女人都是怎么吃定男人的

    高情商的女人都是怎么吃定男人的 在爱情里,从来都不是漂亮的女人取胜,而是情商高,会说话的女人称王 那么高情商的女人都是怎么吃定男人的呢? 举个例子,当男人跟你说下个月发了工资,就给你买最爱的包包,大多 ...

  5. 25.Flink监控\什么是Metrics\Metrics分类\Flink性能优化的方法\合理调整并行度\合理调整并行度\Flink内存管理\Spark VS Flink\时间机制\容错机制等

    25.Flink监控 25.1.什么是Metrics 25.2.Metrics分类 25.2.1.Metric Types 25.2.2.代码 25.2.3.操作 26.Flink性能优化 26.1. ...

  6. 趣商宝微信招生方案 ,线上招生难,十大绝招帮您搞定微信吸粉!

    现如今各位校长在招生这一方面应该或多或少都用到了微信,但在实际操作过程中时常会有些不如意的事情发生,比如:报名量比计划中少,粉丝数不达标,积粉过程中吸粉速度不够快. 问题究竟出在哪呢?其实,吸粉的过程 ...

  7. 王者荣耀吃鸡气泡等等头像框DIY在线生成N种风格微信小程序源码下载

    这是一款头像框DIY制作的一款微信小程序源码 内含多种多样化的模板 不同的风格让你更有创意 目前拥有六大分类,每大分类都有N个不同风格模板 主播头像框,微信状态框,气泡提醒框,吃鸡炫酷框,职业头像框等 ...

  8. 线上flink任务重启报错(Hadoop问题)java.lang.NumberFormatException: For input string: “30s“

    线上Flink任务重启时,提交任务失败,具体报错如下: 2021-12-31 18:09:19 java.lang.NumberFormatException: For input string: & ...

  9. Flink线上问题: The assigned slot container_xxx was removed

    Flink线上问题: The assigned slot container_xxx was removed 客户现场使用Flink(on Yarn)进行数据抽取,Source是JDBC,Sink是K ...

最新文章

  1. CSS vs. JS Animation: 哪个更快
  2. IDEA弹出'xxx' is not allowed to run in parallel. Would you like to stop the running one?
  3. 【转】QT中添加的资源文件qrc时的路径问题小结
  4. Java设计模式——代理模式实现及原理
  5. linux使用小米随身wifi热点,【L】小米随身WiFi,Linux下AP热点驱动(开源)
  6. 上海富勒wms_【3PL | 多家三方物流应用富勒WMS,仓配一体助力供应链升级】
  7. max点缓存烘焙帧_3DMAX怎么进行点缓存?
  8. base64格式的pdf预览
  9. 表格,表单,文本域,下拉框,单选框,复选框,按钮
  10. PPT文字怎样做断开效果和穿透效果
  11. Java开发--implement Serializable
  12. ​Spring Cloud中统一异常处理是怎么做的?
  13. 【软件测试】PDM、PTM、IPD介绍(捣鼓一晚上的血泪知识)
  14. Word文档中如何打外国人姓名间隔的那一个小点
  15. Makefile编写和使用技巧
  16. 红杉官网已删长文:伴随SBF一路走来的救世主情结(上)
  17. 第13期 《由量变到质变的过程》3月刊
  18. Android 开发 17 年 5 月份面试问题总结(二)
  19. 2060年软件工程师会像电报报务员那样过时?
  20. MyCobot六轴机械臂(五)--Myblockly拖拽式编程

热门文章

  1. gaussdb200 存储过程
  2. spring解耦_云端时代的解耦:使用Spring Cloud Azure构建云端原生微服务
  3. dtsi与dts_设备树DTS规范
  4. 第二次作业:软件产品分析之网易云音乐
  5. 创维酷开智慧屏P7065英寸4K电视机质量怎么样?
  6. html5抢红包游戏,HTML5手机微信抢红包福利袋游戏代码
  7. SpringBoot入门 HelloWorld
  8. matlab导入s2p,如何将S参数导入matlab中可用的数据文件
  9. webrtc为Google提供动力的技术与视频群聊,Facebook Messenger和不和谐
  10. windows11 如何关闭防火墙