使用EMR-Kafka Connect进行数据迁移
1.背景
流式处理中经常会遇到Kafka与其他系统进行数据同步或者Kafka集群间数据迁移的情景。使用EMR Kafka Connect可以方便快速的实现数据同步或者数据迁移。
Kafka Connect是一种可扩展的、可靠的,用于在Kafka和其他系统之间快速地进行流式数据传输的工具。例如可以使用Kafka Connect获取数据库的binglog数据,将数据库的数据迁入Kafka集群,以同步数据库的数据,或者对接下游的流式处理系统。同时,Kafka Connect提供的REST API接口可以方便的进行Kafka Connect的创建和管理。
Kafka Connect分为standalone和distributed两种运行模式。standalone模式下,所有的worker都在一个进程中运行;相比之下,distributed模式更具扩展性和容错性,是最常用的方式,也是生产环境推荐使用的模式。
本文介绍使用EMR Kafka Connect的REST API接口在Kafka集群间进行数据迁移,使用distributed模式。
2.环境准备
创建两个EMR集群,集群类型为Kafka。EMR Kafka Connect安装在task节点上,进行数据迁移的目的Kafka集群需要创建task节点。集群创建好后,task节点上EMR Kafka Connect服务会默认启动,端口号为8083。
注意要保证两个集群的网路互通,详细的创建流程见创建集群。
3.数据迁移
3.1准备工作
EMR Kafka Connect的配置文件路径为/etc/ecm/kafka-conf/connect-distributed.properties。
在源Kafka集群创建需要同步的topic,例如
另外,Kafka Connect会将offsets, configs和任务状态保存在topic中,topic名对应配置文件中的offset.storage.topic、config.storage.topic 和status.storage.topic三个配置项。默认的,Kafka Connect会自动的使用默认的partition和replication factor创建这三个topic。
3.2创建Kafka Connect
在目的Kafka集群的task节点(例如emr-worker-3节点),使用curl命令通过json数据创建一个Kafka Connect。
json数据中,name字段代表创建的connect的名称,此处为connect-test;config字段需要根据实际情况进行配置,其中的变量说明如下表
3.3查看Kafka Connect
查看所有的Kafka Connect
查看创建的connect-test的状态
查看task的信息
3.4数据同步
在源Kafka集群创建需要同步的数据。
3.5查看同步结果
在目的Kafka集群消费同步的数据。
可以看到,在源Kafka集群发送的100000条数据已经迁移到了目的Kafka集群。
4.小结
本文介绍并演示了使用EMR kafka Connect在Kafka集群间进行数据迁移的方法,关于Kafka Connect更详细的使用请参考Kafka官网资料和REST API使用。
原文链接
本文为云栖社区原创内容,未经允许不得转载。
使用EMR-Kafka Connect进行数据迁移相关推荐
- SQL Server CDC配合Kafka Connect监听数据变化
写在前面 好久没更新Blog了,从CRUD Boy转型大数据开发,拉宽了不少的知识面,从今年年初开始筹备.组建.招兵买马,到现在稳定开搞中,期间踏过无数的火坑,也许除了这篇还很写上三四篇. 进入主题, ...
- HBase数据迁移到Kafka实战
1.概述 在实际的应用场景中,数据存储在HBase集群中,但是由于一些特殊的原因,需要将数据从HBase迁移到Kafka.正常情况下,一般都是源数据到Kafka,再有消费者处理数据,将数据写入HBas ...
- cdc工具 postgresql_SQLServer CDC数据通过Kafka connect实时同步至分析型数据库 AnalyticDB For PostgreSQL及OSS-阿里云开发者社区...
背景 SQLServer为实时更新数据同步提供了CDC机制,类似于Mysql的binlog,将数据更新操作维护到一张CDC表中. 开启cdc的源表在插入INSERT.更新UPDATE和删除DELETE ...
- Kafka的灵魂伴侣Logi-KafkaManger(4)之运维管控–集群运维(数据迁移和集群在线升级)
推荐一款非常好用的kafka管理平台,kafka的灵魂伴侣 滴滴开源Logi-KafkaManager 一站式Kafka监控与管控平台 技术交流 有想进滴滴LogI开源用户群的加我个人微信: jjdl ...
- kafka connect mysql_Kafka Connect如何实现同步RDS binlog数据?
本文介绍如何在E-MapReduce上使用Kafka Connect实现同步RDS binlog数据 1. 背景 在我们的业务开发中,往往会碰到下面这个场景: 业务更新数据写到数据库中 业务更新数据需 ...
- 【kafka运维】分区副本重分配、数据迁移、副本扩缩容 (附教学视频)
日常运维.问题排查=> 滴滴开源LogiKM一站式Kafka监控与管控平台 (后续的视频会在 公众号[首发].CSDN.B站等各平台同名号[石臻臻的杂货铺]上上传 ) 分区副本重分配+注意事项+ ...
- 使用Apache Hudi + Amazon S3 + Amazon EMR + AWS DMS构建数据湖
1. 引入 数据湖使组织能够在更短的时间内利用多个源的数据,而不同角色用户可以以不同的方式协作和分析数据,从而实现更好.更快的决策.Amazon Simple Storage Service(amaz ...
- 数据迁移测试_自动化数据迁移测试
数据迁移测试 Data migrations are notoriously difficult to test. They take a long time to run on large data ...
- Kafka Connect使用教程
1 kafka connect是什么 根据官方介绍,Kafka Connect是一种用于在Kafka和其他系统之间可扩展的.可靠的流式传输数据的工具.它使得能够快速定义将大量数据集合移入和移出Kafk ...
最新文章
- 让Python代码简洁的实用技巧!
- bzoj 1409 Password 矩阵快速幂+欧拉函数
- js 连接mysql_JS连接数据库
- 【Java】统计字符串中每个字符出现的次数
- selenium3 + python - expected_conditions判断元素
- UI设计干货素材|如何正确使用直观打折数字使画面更饱满更具促销感!
- #CSP 201403-1 相反数(100分)
- 关于信息安全工作方法论的一点猜想
- ​阿里云SAE助力百富旅行实现Serverless+微服务完美结合
- 【io】io等待为什么引发cpu过高?
- QQ、微博、陌陌:社交难逃社交命
- php session fixation,Session Fixation 原理与防御
- 硬件钱包 Ledger使用教程
- 【Java】解决 java:错误:编码GBK的不可映射字符
- 【大数据技术应用实战】【大数据与人工智能视角下数字孪生和元宇宙】二、新一轮大数据与人工智能变革
- 【嵌入式学习-STM32F103-EXTI外部中断】
- 表单设计中标签的布局方式有哪些
- 如何去掉抖音短视频水印----全网最好用的去抖音视频水印方法
- 【JavaSE】多态数组的使用
- 【混合编程jni 】第九篇之Jni总结
热门文章
- Java里的 for (;;) 与 while (true),哪个更快?
- c语言锁屏密码程序,求一个VB锁屏程序的源文件
- python 截取字符串6位_在Python中从字符串获取x个最低有效位
- 【LeetCode笔记】232. 用栈实现队列(Java、栈、队列)
- mysql技术内幕sampdb_MySQL技术内幕汇总
- 如何给mysql表添加百万条数据_给mysql一百万条数据的表添加索引
- java 并发测试main方法_Java并发测试
- java 1%10_Java获取随机数的3种方法
- 2020.2idea创建web_IntelliJ IDEA 2017.3 完整的配置Tomcat运行web项目教程(多图)
- 最后解密的两弹元勋,众帅之帅朱光亚。