一. 场景

使用dinky自动savepoint、checkpoint恢复
组件 版本
flink 1.14.4
Flink-mysql-cdc 2.2.1
Mysql 5.7+
Dinky 0.6.6
温馨提示: dinky暂时不支持flink1.15.X 版本做savepoint处理, 请等待后续更新支持, 或者使用小于flink1.15的版本

二. 运行模式

flink on yarn — perjob

三. 部署

将 flink-sql-connector-mysql-cdc-2.2.1.jar 添加到 dinky家目录plugins和hdfs集群配置路径上

依赖图:

3.1. mysql 数据源准备

create database emp_1;use emp_1;CREATE TABLE IF NOT EXISTS `employees_1` (`emp_no` int(11) NOT NULL,`birth_date` date NOT NULL,`first_name` varchar(50) NOT NULL,`last_name` varchar(50) NOT NULL,`gender` enum('M','F') NOT NULL,`hire_date` date NOT NULL,PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;insert into employees_1 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
insert into employees_1 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");

3.2. Flink sql 准备

-- 测试参数, 生成环境不需设置
SET pipeline.operator-chaining = false;SET table.local-time-zone = Asia/Shanghai;
SET execution.runtime-mode = streaming;
SET execution.checkpointing.interval = 6000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =600000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET execution.checkpointing.max-concurrent-checkpoints = 1;
SET state.checkpoints.num-retained = 3;
SET restart-strategy = fixed-delay;
SET restart-strategy.fixed-delay.attempts = 10 ;
SET restart-strategy.fixed-delay.delay = 20s;
SET table.exec.source.cdc-events-duplicate = true;
SET table.sql-dialect = default;
--SET pipeline.name = hive_catalog_cdc_orders;
SET jobmanager.memory.process.size = 1600m;
SET taskmanager.memory.process.size = 1780m;
SET taskmanager.memory.managed.size = 200m;
SET taskmanager.numberOfTaskSlots=1;
SET yarn.application.queue= default;EXECUTE CDCSOURCE dinkySource_mysqlCDC_to_Doris WITH ('connector' = 'mysql-cdc','hostname' = 'hadoop102','port' = '3306','username' = 'root','password' = 'xxxxxx','checkpoint' = '3000','scan.startup.mode' = 'initial','parallelism' = '1','table-name' = 'emp_1\.employees_[0-9]+','sink.connector' = 'print',
)
补充说明:
flink需要开启checkpoint, 配置好状态后端参数

3.3. dinky 面板

savepoint 策略选择 最近一次

3.4. 任务提交

因为作业是第一次运行, 之前没有做过savepoint, 所以作业是一个新的程序,消费两条数据

flink web ui

taskmanger 成功输出两条数据

四. 使用savepoint自动恢复功能

查看作业详情栏, 如下图右上角所示, 他们的含义分别为:

名称 含义
智能停止 触发一次Savepoint, 并停止作业
Savepoint 触发 触发一次Savepoint. 作业继续运行
Savepoint 暂停 触发一次Savepoint. 并暂停运行
Savepoint 停止 触发一次Savepoint, 并停止作业
4.1. 所以这里我们点击 ‘智能停止’ 或者 ‘Savepoint停止’, 触发一次Savepoint, 并停止作业.

4.2. 等作业停止后, 在作业快照 --> Savepoint 栏中, 查看到刚刚成功保存的Savepoint记录

4.3. 在dlink数据库中, 也可以查看到保存的Savepoint元数据

4.4. 同时, 在’数据开发’ 面板对应的作业中, 右边栏也可以查看到savepoint记录

4.5. 接下来, 往表中插入一条新的数据
insert into employees_1 VALUES ("55", "2020-09-15", "huang", "meiji", "F", "2022-04-12");
4.6. 重启作业
作业会自动从之前保存的savepoint处 启动

4.7. 观察到作业, 成功做到断点续传, 只消费到一条记录

flink web ui

Taskmanager 成功输出一条记录

五. 使用checkpoint自动恢复功能

dinky的checkpoint恢复功能使用非常方便, 只需要点击一个按钮即可恢复, 整体过程如下所示:

5.1. 准备数据源 sql

create database emp_2;use emp_2;CREATE TABLE IF NOT EXISTS `employees_2` (`emp_no` int(11) NOT NULL,`birth_date` date NOT NULL,`first_name` varchar(50) NOT NULL,`last_name` varchar(50) NOT NULL,`gender` enum('M','F') NOT NULL,`hire_date` date NOT NULL,PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- flink sql
-- 测试参数, 生成环境不需设置
SET pipeline.operator-chaining = false;SET table.local-time-zone = Asia/Shanghai;
SET execution.runtime-mode = streaming;
SET execution.checkpointing.interval = 6000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =600000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET execution.checkpointing.max-concurrent-checkpoints = 1;
SET state.checkpoints.num-retained = 3;
SET restart-strategy = fixed-delay;
SET restart-strategy.fixed-delay.attempts = 10 ;
SET restart-strategy.fixed-delay.delay = 20s;
SET table.exec.source.cdc-events-duplicate = true;
SET table.sql-dialect = default;
--SET pipeline.name = hive_catalog_cdc_orders;
SET jobmanager.memory.process.size = 1600m;
SET taskmanager.memory.process.size = 1780m;
SET taskmanager.memory.managed.size = 200m;
SET taskmanager.numberOfTaskSlots=1;
SET yarn.application.queue= default;EXECUTE CDCSOURCE dinkySource_mysqlCDC_to_Doris WITH ('connector' = 'mysql-cdc','hostname' = 'hadoop102','port' = '3306','username' = 'root','password' = 'xxxxxx','checkpoint' = '3000','scan.startup.mode' = 'initial','parallelism' = '1','table-name' = 'emp_2\.employees_[0-9]+','sink.connector' = 'print',
)

5.2. 提交作业

5.3. 往 employees_2 表插入两条数据
insert into employees_2 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
insert into employees_2 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");
5.4. 成功消费两条数据

5.5. 这次我们点击 ‘普通停止’, 不做savepoint, 从checkpoint 处启动

5.6. 停止之后, 我们可以从 '作业快照’中, 查看到作业保存的checkpoint记录

这跟hdfs 上保存的checkpoint记录 是一致的

然后点击 ‘此处恢复’ 按钮, 恢复最新的checkpoint

5.7. 等作业重新启动后, 往 employees_2 表插入一条数据
insert into employees_2 VALUES ("13", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
5.8. 成功断点续传, 消费到一条数据

温馨提示

运行perjob、 app 模式的作业, 如果作业被强行kill掉、内部错误等原因导致作业意外退出, 会导致ck信息不能及时同步, 可能导致dinky保存的checkpoint记录, 跟hdfs上保存的记录不一致, 有可能晚几个版本, 所以线上作业恢复ck时, 需要查看hdfs上保存的ck记录, 跟dinky作比较

六. 手动指定某处checkpoint 恢复

6.1. 在上一个步骤中, 点击 ‘此处恢复’ 之后, 作业能 ‘断点续传’, 实际原理是, dinky将checkpoint的记录填充到了作业的右边栏, 选项为 ‘指定一次’ 然后运行的.

6.2 所以, dinky也是支持手动指定某处checkpoint 恢复, 只需
‘SavePoin策略’ 选择 ‘指定一次’, 将ck路径粘贴到 ‘SavePointPath’, 运行即可恢复checkpoint

温馨提示

运行完毕, 如查看到成功恢复ck之后, 还请将 ‘SavePoin策略’ 还原回 ‘最近一次’, 避免后续从这个检查点再次恢复

总结

优点: 使用dinky, 简化了线上作业的部署、运维、作业恢复等操作, 增强了flink作业的健壮性
不足: 如果线上作业过多, ‘运维中心’ 找到指定的作业会比较费力, 所以期待 ‘运维中心’, 增加能按照 ‘数据开发’ 面板的分目录、分层级查看作业的功能, 这样就能快速找到对应的作业

使用dinky自动savepoint、checkpoint恢复flink sql作业相关推荐

  1. 数据分析小结:使用流计算 Oceanus(Flink) SQL 作业进行数据类型转换

    作者:吴云涛,腾讯 CSIG 高级工程师 在这个数据爆炸的时代,企业做数据分析也面临着新的挑战, 如何能够更高效地做数据准备,从而缩短整个数据分析的周期,让数据更有时效性,增加数据的价值,就变得尤为重 ...

  2. 网易游戏 Flink SQL 平台化实践

    摘要:本文整理自网易游戏资深开发工程师林小铂在 Flink Forward Asia 2021 平台建设专场的演讲.主要内容包括: 网易游戏 Flink SQL 发展历程 基于模板 jar 的 Str ...

  3. 京东:Flink SQL 优化实战

    简介:本文着重从 shuffle.join 方式的选择.对象重用.UDF 重用等方面介绍了京东在 Flink SQL 任务方面做的优化措施. 本文作者为京东算法服务部的张颖和段学浩,并由 Apache ...

  4. Flink SQL 在字节跳动的优化与实践

    简介:Flink 在字节的应用实战 整理 | Aven (Flink 社区志愿者) 摘要:本文由 Apache Flink Committer,字节跳动架构研发工程师李本超分享,以四个章节来介绍 Fl ...

  5. Flink应用实战案例50篇(一)- Flink SQL 在京东的优化实战

    一.背景 目前,京东搜索推荐的数据处理流程如上图所示.可以看到实时和离线是分开的,离线数据处理大部分用的是 Hive / Spark,实时数据处理则大部分用 Flink / Storm. 这就造成了以 ...

  6. Flink SQL 1.11 on Zeppelin 平台化实践

    简介: 鉴于有很多企业都无法配备专门的团队来解决 Flink SQL 平台化的问题,那么到底有没有一个开源的.开箱即用的.功能相对完善的组件呢?答案就是本文的主角--Apache Zeppelin. ...

  7. 实时计算 Flink SQL 核心功能解密

    2019独角兽企业重金招聘Python工程师标准>>> 实时计算 Flink SQL 核心功能解密 Flink SQL 是于2017年7月开始面向集团开放流计算服务的.虽然是一个非常 ...

  8. Flink SQL Checkpoint 学习总结

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到网站:https://www.captainai.net/dongkelun 前言 学习总结Flink SQ ...

  9. flink checkpoint 恢复_Flink断点恢复机制

    作为流式计算,Flink通过checkpoint机制和kafka的可回溯性来保证作业在failover时不丢失状态. 作为生产环境的flink,我们期待做到快速failover.弹性扩缩容和平滑迁移, ...

最新文章

  1. 用Python解锁“吃鸡”正确姿势
  2. P3175 [HAOI2015]按位或(Min - Max容斥,FMT,概率期望,全网最清晰的题解!)
  3. 动画Storyboard基础
  4. 《乐高EV3机器人搭建与编程》——2.2 颜色设计
  5. ubuntu下和开发板下播放音乐
  6. iar 堆栈设置_IAR MSP430设置合理堆栈大小(the stack pointer for stack is outside the stack range)...
  7. mysql学习笔记之mysqlparameter(摘)
  8. 【站点部署】解析二级域名并部署站点
  9. [vue] vue在组件中引入插件的方法有哪些?
  10. 使用引用的方式交换数据的数值
  11. 第一章 Burp Suite 安装和环境配置
  12. 使用“管道”与“应用程序生命周期”重构:可插拔模块
  13. AcWing基础算法课Level-2 第三讲 搜索与图论
  14. BZOJ 4516 后缀数组+ST+set
  15. 管理变量、机密和事实
  16. python量化交易策略实例_Python写一个量化股票提醒系统实例
  17. Python批量修改文件后缀
  18. JavaScript------常用JS方法(utils.js)骨灰级总结
  19. 计算机考研951,清华大学2020,考研专业课951最高分经验分享
  20. 【三网话费接口】源码分享

热门文章

  1. 论文阅读 - Posting Bot Detection on Blockchain-based Social Media Platform using MachineLearning - CCF B
  2. 机器人布里茨哪个皮肤好看_蒸汽机器人皮肤特效,布里茨的皮肤介绍
  3. 西安交大计算机组成原理期末考试题库,计算机组成原理期末试卷及答案西南交大...
  4. c 语言 药房管理系统,C语言药房管理系统-20210417100738.docx-原创力文档
  5. linux常用命令(详解)
  6. CTF解题-Bugku_Web_WriteUp (下)
  7. aspose文件预览,WORD正确,PPT转pdf找不到字体,且中文乱码
  8. 图片编辑软件有哪些?建议收藏这些软件
  9. 即时通讯在线聊天APP开发解决方案
  10. 如何使用cmake生成除.exe之外的可执行文件如RTX的.rtss文件【每日一个小技巧】