摘要:CDL是一种简单、高效的数据实时集成服务,能够从各种OLTP数据库中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消费Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。

本文分享自华为云社区《华为FusionInsight MRS CDL使用指南》,作者:晋红轻。

说明

CDL是一种简单、高效的数据实时集成服务,能够从各种OLTP数据库中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消费Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。

CDL服务包含了两个重要的角色:CDLConnector和CDLService。CDLConnector是具体执行数据抓取任务的实例,CDLService是负责管理和创建任务的实例。

本此实践介绍以mysql作为数据源进行数据抓取

前提条件

  • MRS集群已安装CDL服务。
  • MySQL数据库需要开启mysql的bin log功能(默认情况下是开启的)。

查看MySQL是否开启bin log:

使用工具或者命令行连接MySQL数据库(本示例使用navicat工具连接),执行show variables like 'log_%'命令查看。

例如在navicat工具选择"File > New Query"新建查询,输入如下SQL命令,单击"Run"在结果中"log_bin"显示为"ON"则表示开启成功。

show variables like 'log_%'

工具准备

现在cdl只能使用rest api的方式进行命令提交,所以需要提前安装工具进行调试。本文使用VSCode工具。

完成之后安装rest client插件:

完成之后创建一个cdl.http的文件进行编辑:

创建CDL任务

CDL任务创建的流程图如下所示:

说明:需要先创建一个MySQL link, 在创建一个Kafka link, 然后再创建一个CDL同步任务并启动。

MySQL link部分rest请求代码

@hostname = 172.16.9.113
@port = 21495
@host = {{hostname}}:{{port}}
@bootstrap = "172.16.9.113:21007"
@bootstrap_normal = "172.16.9.113:21005"
@mysql_host = "172.16.2.118"
@mysql_port = "3306"
@mysql_database = "hudi"
@mysql_user = "root"
@mysql_password = "Huawei@123"### get links
get https://{{host}}/api/v1/cdl/link### mysql link validatepost https://{{host}}/api/v1/cdl/link?validate=true
content-type: application/json{
"name": "MySQL_link", //link名,全局唯一,不能重复
"description":"MySQL connection", //link描述
"link-type":"mysql", //link的类型
"enabled":"true",
"link-config-values":  {
"inputs": [{ "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip{ "name": "port", "value": {{mysql_port}} },//数据库监听的端口{ "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名{ "name": "user", "value": {{mysql_user}} }, //用户{ "name": "password","value": {{mysql_password}} } ,//密码{ "name":"schema", "value": {{mysql_database}}}//同数据库名]}
}### mysql link createpost https://{{host}}/api/v1/cdl/link
content-type: application/json{
"name": "MySQL_link", //link名,全局唯一,不能重复
"description":"MySQL connection", //link描述
"link-type":"mysql", //link的类型
"enabled":"true",
"link-config-values":  {
"inputs": [{ "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip{ "name": "port", "value": {{mysql_port}} },//数据库监听的端口{ "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名{ "name": "user", "value": {{mysql_user}} }, //用户{ "name": "password","value": {{mysql_password}} } ,//密码{ "name":"schema", "value": {{mysql_database}}}//同数据库名]}
}### mysql link updateput https://{{host}}/api/v1/cdl/link/MySQL_link
content-type: application/json{
"name": "MySQL_link", //link名,全局唯一,不能重复
"description":"MySQL connection", //link描述
"link-type":"mysql", //link的类型
"enabled":"true",
"link-config-values":  {
"inputs": [{ "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip{ "name": "port", "value": {{mysql_port}} },//数据库监听的端口{ "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名{ "name": "user", "value": {{mysql_user}} }, //用户{ "name": "password","value": {{mysql_password}} } ,//密码{ "name":"schema", "value": {{mysql_database}}}//同数据库名]}
}

Kafka link部分rest请求代码

### get links
get https://{{host}}/api/v1/cdl/link### kafka link validatepost https://{{host}}/api/v1/cdl/link?validate=true
content-type: application/json{
"name": "kafka_link",
"description":"test kafka link",
"link-type":"kafka",
"enabled":"true",
"link-config-values":  {
"inputs": [{ "name": "bootstrap.servers", "value": "172.16.9.113:21007" },{ "name": "sasl.kerberos.service.name", "value": "kafka" },{ "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT]}
}### kafka link createpost https://{{host}}/api/v1/cdl/link
content-type: application/json{
"name": "kafka_link",
"description":"test kafka link",
"link-type":"kafka",
"enabled":"true",
"link-config-values":  {
"inputs": [{ "name": "bootstrap.servers", "value": "172.16.9.113:21007" },{ "name": "sasl.kerberos.service.name", "value": "kafka" },{ "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT]}
}### kafka link updateput https://{{host}}/api/v1/cdl/link/kafka_link
content-type: application/json{
"name": "kafka_link",
"description":"test kafka link",
"link-type":"kafka",
"enabled":"true",
"link-config-values":  {
"inputs": [{ "name": "bootstrap.servers", "value": "172.16.9.113:21007" },{ "name": "sasl.kerberos.service.name", "value": "kafka" },{ "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT]}
}

CDL任务命令部分rest请求代码

@hostname = 172.16.9.113
@port = 21495
@host = {{hostname}}:{{port}}
@bootstrap = "172.16.9.113:21007"
@bootstrap_normal = "172.16.9.113:21005"
@mysql_host = "172.16.2.118"
@mysql_port = "3306"
@mysql_database = "hudi"
@mysql_user = "root"
@mysql_password = "Huawei@123"### create job
post https://{{host}}/api/v1/cdl/job
content-type: application/json{"job_type": "CDL_JOB", //job类型,目前只支持CDL_JOB这一种"name": "mysql_to_kafka", //job名称"description":"mysql_to_kafka", //job描述"from-link-name": "MySQL_link",  //数据源Link"to-link-name": "kafka_link", //目标源Link"from-config-values": {"inputs": [{"name" : "connector.class", "value" : "com.huawei.cdc.connect.mysql.MysqlSourceConnector"},{"name" : "schema", "value" : "hudi"},{"name" : "db.name.alias", "value" : "hudi"},{"name" : "whitelist", "value" : "hudisource"},{"name" : "tables", "value" : "hudisource"},{"name" : "tasks.max", "value" : "10"},{"name" : "mode", "value" : "insert,update,delete"},{"name" : "parse.dml.data", "value" : "true"},{"name" : "schema.auto.creation", "value" : "false"},{"name" : "errors.tolerance", "value" : "all"},{"name" : "multiple.topic.partitions.enable", "value" : "false"},{"name" : "topic.table.mapping", "value" : "[{\"topicName\":\"huditableout\", \"tableName\":\"hudisource\"}]"},{"name" : "producer.override.security.protocol", "value" : "SASL_PLAINTEXT"},//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT{"name" : "consumer.override.security.protocol", "value" : "SASL_PLAINTEXT"}//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT]},"to-config-values": {"inputs": []},"job-config-values": {"inputs": [{"name" : "global.topic", "value" : "demo"}]}
}### get all job
get https://{{host}}/api/v1/cdl/job
### submit job
put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/start
### get job status
get https://{{host}}/api/v1/cdl/submissions?jobName=mysql_to_kafka
### stop job
put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/submissions/13/stop
### delete job
DELETE https://{{host}}/api/v1/cdl/job/mysql_to_kafka

场景验证

生产库MySQL原始数据如下:

提交CDL任务之后

增加操作: insert into hudi.hudisource values (11,“蒋语堂”,38,“女”,“图”,“播放器”,28732);

对应kafka消息体:

更改操作: UPDATE hudi.hudisource SET uname=‘Anne Marie333’ WHERE uid=11;

对应kafka消息体:

删除操作:delete from hudi.hudisource where uid=11;

对应kafka消息体:

点击关注,第一时间了解华为云新鲜技术~

一文讲清楚FusionInsight MRS CDL如何使用相关推荐

  1. 详解MRS CDL整体架构设计

    摘要:MRS CDL是FusionInsight MRS推出的一种数据实时同步服务,旨在将传统OLTP数据库中的事件信息捕捉并实时推送到大数据产品中去,本文档会详细为大家介绍CDL的整体架构以及关键技 ...

  2. FusionInsight MRS:你的大数据“管家”

    本文分享自华为云社区<HDC.Cloud2021 一文回顾华为云FusionInsight MRS云原生数据湖亮点>,原文作者:沙漏. 2021年4月26日,HDC.Cloud2021(华 ...

  3. MRS CDL架构设计与实现

    1 前言 MRS CDL是FusionInsight MRS推出的一种数据实时同步服务,旨在将传统OLTP数据库中的事件信息捕捉并实时推送到大数据产品中去,本文档会详细为大家介绍CDL的整体架构以及关 ...

  4. HDFS 细粒度锁优化,FusionInsight MRS有妙招

    本文分享自华为云社区<FusionInsight MRS HDFS 细粒度锁优化实践>,作者:pippo. 背景 HDFS依赖NameNode作为其元数据服务.NameNode将整个命名空 ...

  5. 使用MRS CDL实现实时数据同步的极致性能

    摘要:MRS CDL旨在实现最大的数据复制吞吐量和低复制延迟. 本文分享自华为云社区<使用MRS CDL实现实时数据同步的极致性能>,作者:大数据修行者 . MRS CDL提供从多个RDB ...

  6. Superior Scheduler:带你了解FusionInsight MRS的超级调度器

    摘要:Superior Scheduler是一个专门为Hadoop YARN分布式资源管理系统设计的调度引擎,是针对企业客户融合资源池,多租户的业务诉求而设计的高性能企业级调度器. 本文分享自华为云社 ...

  7. 华为云FusionInsight MRS:千余节点滚动升级业务无中断

    本文分享自华为云社区<华为云FusionInsight MRS如何实现千余节点滚动升级无业务中断升级>,原文作者:沙漏. 华为开发者大会2021(Cloud)大会期间,由华为技术专家天团打 ...

  8. 华为云FusionInsight MRS在金融行业存算分离的实践

    摘要:华为云FusionInsight MRS的大数据存算分离解决方案,实现资源价值最大化,存储与计算资源全面云化.灵活配置.弹性伸缩,降本增效. 在大数据.云计算.5G.AI等技术日新月异,数字经济 ...

  9. 华为云FusionInsight MRS:助力企业构建“一企一湖,一城一湖”

    摘要:华为云FusionInsight MRS新一代的数据湖,让大数据越用越快.越用越易.越用越稳.越用越省!让数据价值近在眼前! 10月30日,以"携手共赢·数创未来"为主题的第 ...

最新文章

  1. 实现自己的连接池(一)
  2. u-boot分析之makefile分析(二)
  3. android classloader的功能和工作模式,Android中ClassLoader和java中ClassLoader有什么关系和不同...
  4. java中用byte[]数组实现的队列和用Byte[]实现的队列实际占用空间对比
  5. 从头开始敲代码之《从BaseApplication/Activity开始(二)》
  6. 图论--LCA--Tarjan(离线)
  7. redis下载+php,php+redis实现消息队列
  8. 【EMV L2】数据元格式 对齐方式
  9. Markdown接口文档模板
  10. MATLAB图像分割系统设计
  11. 模拟按钮控件BN_CLICKED消息事件
  12. dipg(dip歌词翻译中文)
  13. ARM汇编语言编程-Keil环境搭建及STM32程序的编写
  14. echarts地图的常见用法:基本使用、区域颜色分级、水波动画、区域轮播、给地图添加背景图片和图标、3d地图
  15. SQL 错误 [40000] [42000]: Error while compiling statement: FAILED
  16. build tools
  17. 太棒了,Python和算法简直是绝配
  18. Mysql偶尔连接失败的问题
  19. win10 完全卸载 小黑笔记本
  20. 拍拍抢拍精灵 --腾讯拍拍秒杀器--截图

热门文章

  1. VSCode自定义代码片段9——JS中的面向对象编程
  2. 某位程序猿柬埔寨开发offer到手,薪资翻倍,去吗?网友:面向阎王编程...
  3. 真美 | 你破坏Java代码的样子,真美!
  4. window对象(一) 计时器 定位导航 url解析 浏览历史 对话框 消息推送
  5. php字符串中删除字符串函数,PHP 实现删除任意区间内字符串函数方法
  6. 树莓派3B+ Ubuntu mate16.04 开启热点
  7. 猎豹浏览器打飞机_墙内最好浏览器,微软带来完整版谷歌浏览器,扩展、同步无限制!...
  8. 分屏总屏计算机电缆,分屏加总屏电缆DJYVP计算机电缆14x2x0.75
  9. mysql实际应用在哪里_MySQL数据库的实际应用步骤
  10. neo4jcypher基本语句