1.背景

流式处理中经常会遇到Kafka与其他系统进行数据同步或者Kafka集群间数据迁移的情景。使用EMR Kafka Connect可以方便快速的实现数据同步或者数据迁移。

Kafka Connect是一种可扩展的、可靠的,用于在Kafka和其他系统之间快速地进行流式数据传输的工具。例如可以使用Kafka Connect获取数据库的binglog数据,将数据库的数据迁入Kafka集群,以同步数据库的数据,或者对接下游的流式处理系统。同时,Kafka Connect提供的REST API接口可以方便的进行Kafka Connect的创建和管理。
Kafka Connect分为standalone和distributed两种运行模式。standalone模式下,所有的worker都在一个进程中运行;相比之下,distributed模式更具扩展性和容错性,是最常用的方式,也是生产环境推荐使用的模式。

本文介绍使用EMR Kafka Connect的REST API接口在Kafka集群间进行数据迁移,使用distributed模式。

2.环境准备

创建两个EMR集群,集群类型为Kafka。EMR Kafka Connect安装在task节点上,进行数据迁移的目的Kafka集群需要创建task节点。集群创建好后,task节点上EMR Kafka Connect服务会默认启动,端口号为8083。

注意要保证两个集群的网路互通,详细的创建流程见创建集群。

3.数据迁移


3.1准备工作


EMR Kafka Connect的配置文件路径为/etc/ecm/kafka-conf/connect-distributed.properties。

在源Kafka集群创建需要同步的topic,例如

另外,Kafka Connect会将offsets, configs和任务状态保存在topic中,topic名对应配置文件中的offset.storage.topic、config.storage.topic 和status.storage.topic三个配置项。默认的,Kafka Connect会自动的使用默认的partition和replication factor创建这三个topic。

3.2创建Kafka Connect


在目的Kafka集群的task节点(例如emr-worker-3节点),使用curl命令通过json数据创建一个Kafka Connect。

json数据中,name字段代表创建的connect的名称,此处为connect-test;config字段需要根据实际情况进行配置,其中的变量说明如下表

3.3查看Kafka Connect


查看所有的Kafka Connect

查看创建的connect-test的状态

查看task的信息

3.4数据同步


在源Kafka集群创建需要同步的数据。

3.5查看同步结果


在目的Kafka集群消费同步的数据。

可以看到,在源Kafka集群发送的100000条数据已经迁移到了目的Kafka集群。

4.小结

本文介绍并演示了使用EMR kafka Connect在Kafka集群间进行数据迁移的方法,关于Kafka Connect更详细的使用请参考Kafka官网资料和REST API使用。

原文链接
本文为云栖社区原创内容,未经允许不得转载。

使用EMR-Kafka Connect进行数据迁移相关推荐

  1. SQL Server CDC配合Kafka Connect监听数据变化

    写在前面 好久没更新Blog了,从CRUD Boy转型大数据开发,拉宽了不少的知识面,从今年年初开始筹备.组建.招兵买马,到现在稳定开搞中,期间踏过无数的火坑,也许除了这篇还很写上三四篇. 进入主题, ...

  2. HBase数据迁移到Kafka实战

    1.概述 在实际的应用场景中,数据存储在HBase集群中,但是由于一些特殊的原因,需要将数据从HBase迁移到Kafka.正常情况下,一般都是源数据到Kafka,再有消费者处理数据,将数据写入HBas ...

  3. cdc工具 postgresql_SQLServer CDC数据通过Kafka connect实时同步至分析型数据库 AnalyticDB For PostgreSQL及OSS-阿里云开发者社区...

    背景 SQLServer为实时更新数据同步提供了CDC机制,类似于Mysql的binlog,将数据更新操作维护到一张CDC表中. 开启cdc的源表在插入INSERT.更新UPDATE和删除DELETE ...

  4. Kafka的灵魂伴侣Logi-KafkaManger(4)之运维管控–集群运维(数据迁移和集群在线升级)

    推荐一款非常好用的kafka管理平台,kafka的灵魂伴侣 滴滴开源Logi-KafkaManager 一站式Kafka监控与管控平台 技术交流 有想进滴滴LogI开源用户群的加我个人微信: jjdl ...

  5. kafka connect mysql_Kafka Connect如何实现同步RDS binlog数据?

    本文介绍如何在E-MapReduce上使用Kafka Connect实现同步RDS binlog数据 1. 背景 在我们的业务开发中,往往会碰到下面这个场景: 业务更新数据写到数据库中 业务更新数据需 ...

  6. 【kafka运维】分区副本重分配、数据迁移、副本扩缩容 (附教学视频)

    日常运维.问题排查=> 滴滴开源LogiKM一站式Kafka监控与管控平台 (后续的视频会在 公众号[首发].CSDN.B站等各平台同名号[石臻臻的杂货铺]上上传 ) 分区副本重分配+注意事项+ ...

  7. 使用Apache Hudi + Amazon S3 + Amazon EMR + AWS DMS构建数据湖

    1. 引入 数据湖使组织能够在更短的时间内利用多个源的数据,而不同角色用户可以以不同的方式协作和分析数据,从而实现更好.更快的决策.Amazon Simple Storage Service(amaz ...

  8. 数据迁移测试_自动化数据迁移测试

    数据迁移测试 Data migrations are notoriously difficult to test. They take a long time to run on large data ...

  9. Kafka Connect使用教程

    1 kafka connect是什么 根据官方介绍,Kafka Connect是一种用于在Kafka和其他系统之间可扩展的.可靠的流式传输数据的工具.它使得能够快速定义将大量数据集合移入和移出Kafk ...

最新文章

  1. 让Python代码简洁的实用技巧!
  2. bzoj 1409 Password 矩阵快速幂+欧拉函数
  3. js 连接mysql_JS连接数据库
  4. 【Java】统计字符串中每个字符出现的次数
  5. selenium3 + python - expected_conditions判断元素
  6. UI设计干货素材|如何正确使用直观打折数字使画面更饱满更具促销感!
  7. #CSP 201403-1 相反数(100分)
  8. 关于信息安全工作方法论的一点猜想
  9. ​阿里云SAE助力百富旅行实现Serverless+微服务完美结合
  10. 【io】io等待为什么引发cpu过高?
  11. QQ、微博、陌陌:社交难逃社交命
  12. php session fixation,Session Fixation 原理与防御
  13. 硬件钱包 Ledger使用教程
  14. 【Java】解决 java:错误:编码GBK的不可映射字符
  15. 【大数据技术应用实战】【大数据与人工智能视角下数字孪生和元宇宙】二、新一轮大数据与人工智能变革
  16. 【嵌入式学习-STM32F103-EXTI外部中断】
  17. 表单设计中标签的布局方式有哪些
  18. 如何去掉抖音短视频水印----全网最好用的去抖音视频水印方法
  19. 【JavaSE】多态数组的使用
  20. 【混合编程jni 】第九篇之Jni总结

热门文章

  1. Java里的 for (;;) 与 while (true),哪个更快?
  2. c语言锁屏密码程序,求一个VB锁屏程序的源文件
  3. python 截取字符串6位_在Python中从字符串获取x个最低有效位
  4. 【LeetCode笔记】232. 用栈实现队列(Java、栈、队列)
  5. mysql技术内幕sampdb_MySQL技术内幕汇总
  6. 如何给mysql表添加百万条数据_给mysql一百万条数据的表添加索引
  7. java 并发测试main方法_Java并发测试
  8. java 1%10_Java获取随机数的3种方法
  9. 2020.2idea创建web_IntelliJ IDEA 2017.3 完整的配置Tomcat运行web项目教程(多图)
  10. 最后解密的两弹元勋,众帅之帅朱光亚。