在流式数据处理过程中,E-MapReduce经常需要在Kafka与其他系统间进行数据同步或者在Kafka集群间进行数据迁移。本节向您介绍如何在E-MapReduce上通过Kafka Connect快速的实现Kafka集群间的数据同步或者数据迁移。

前提条件

  • 已注册云账号,详情请参见注册云账号。
  • 已开通E-MapReduce服务。
  • 已完成云账号的授权,详情请参见角色授权。

背景信息

Kafka Connect是一种可扩展的、可靠的,用于在Kafka和其他系统之间快速的进行流式数据传输的工具。例如,Kafka Connect可以获取数据库的binlog数据,将数据库数据同步至Kafka集群,从而达到迁移数据库数据的目的。由于Kafka集群可对接流式处理系统,所以还可以间接实现数据库对接下游流式处理系统的目的。同时,Kafka Connect还提供了REST API接口,方便您创建和管理Kafka Connect。

kafka Connect分为standalone和distributed两种运行模式。在standalone模式下,所有的worker都在一个进程中运行。相比于standalone模式,distributed模式更具扩展性和容错性,是最常用的方式,也是生产环境推荐使用的模式。

本文介绍如何在E-MapReduce上使用Kafka Connect的REST API接口在Kafka集群间进行数据迁移,kafka Connect使用distributed模式。

步骤一 创建Kafka集群

在EMR上创建源Kafka集群和目的Kafka集群。Kafka Connect安装在Task节点上,所以目的Kafka集群必须创建Task节点。集群创建好后,Task节点上Kafka Connect服务会默认启动,端口号为8083。

推荐您将源Kafka集群和目的kafka集群创建在同一个安全组下。如果源Kafka集群和目的kafka集群不在同一个安全组下,则两者的网络默认是不互通的,您需要对两者的安全组分别进行相关配置,以使两者的网络互通。

  1. 登录阿里云 E-MapReduce 控制台。
  2. 创建源Kafka集群和目的Kafka集群,详情请参见创建集群。

说明 创建目的Kafka集群时,必须开启Task实例,即创建Task节点。

步骤二 准备待迁移数据Topic

在源Kafka集群上创建一个名称为connect的Topic。

  1. 以SSH方式登录到源Kafka集群的header节点(本例为emr-header-1)。
  2. 以root用户运行如下命令创建一个名称为connect的Topic。
kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 2 --partitions 10 --topic connect

说明 完成上述操作后,请保留该登录窗口,后续仍将使用。

步骤三 创建Kafka Connect的connector

在目的Kafka集群的Task节点上,使用curl命令通过JSON数据创建一个Kafka Connect的connector。

  1. 以SSH方式登录到目的Kafka集群的Task节点(本节为emr-worker-3)。
  2. 可选: 自定义Kafka Connect配置。
    进入目的Kafka集群Kafka服务的配置页面,在connect-distributed.properties中自定义offset.storage.topic、config.storage.topic和status.storage.topic三个配置项,详情请参见组件参数配置。

Kafka Connect会将offsets、configs和任务状态保存在Topic中,Topic名对应offset.storage.topic、config.storage.topic和status.storage.topic三个配置项。Kafka Connect会自动使用默认的partition和replication factor创建这三个Topic,其中partition和repication factor配置项保存在/etc/ecm/kafka-conf/connect-distributed.properties文件中。

  1. 以root用户运行如下命令创建一个Kafka Connect。
curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-test", "config": { "connector.class": "EMRReplicatorSourceConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "src.kafka.bootstrap.servers": "${src-kafka-ip}:9092", "src.zookeeper.connect": "${src-kafka-curator-ip}:2181", "dest.zookeeper.connect": "${dest-kafka-curator-ip}:2181", "topic.whitelist": "${source-topic}", "topic.rename.format": "${dest-topic}", "src.kafka.max.poll.records": "300" } }' http://emr-worker-3:8083/connectors

在JSON数据中,name字段代表创建的Kafka Connect的名称,本例为connect-test;config字段需要根据实际情况进行配置,关键变量的说明如下:

说明 完成上述操作后,请保留该登录窗口,后续仍将使用。

步骤四 查看Kafka Connect和Task节点状态

查看Kafka Connect和Task节点信息,确保两者的状态正常。

  1. 返回到目的Kafka集群的Task节点(本节为emr-worker-3)的登录窗口。
  2. 以root用户运行如下命令查看所有的Kafka Connect。
curl emr-worker-3:8083/connectors
  1. 以root用户运行如下命令查看本例创建的Kafka Connect(本例为connect-test)的状态。
curl emr-worker-3:8083/connectors/connect-test/status

确保Kafka Connect(本例为connect-test)的状态为RUNNING。

  1. 以root用户运行如下命令查看Task节点信息。
curl emr-worker-3:8083/connectors/connect-test/tasks

确保Task节点的返回信息中无错误信息。

步骤五 生成待迁移数据

通过命令向源集群中的connect Topic发送待迁移的数据。

  1. 返回到源Kafka集群的header节点(本例为emr-header-1)的登录窗口。
  2. 以root用户运行如下命令向connect Topic发送数据。
kafka-producer-perf-test.sh --topic connect --num-records 100000 --throughput 5000 --record-size 1000 --producer-props bootstrap.servers=emr-header-1:9092

步骤六 查看数据迁移结果

生成待迁移数据后,Kafka Connect会自动将这些数据迁移到目的集群的相应文件(本例为connect.replica)中。

  1. 返回到目的Kafka集群的Task节点(本节为emr-worker-3)的登录窗口。
  2. 以root用户运行如下命令查看数据迁移是否成功。
kafka-consumer-perf-test.sh --topic connect.replica --broker-list emr-header-1:9092 --messages 100000

从上述返回结果可以看出,在源Kafka集群发送的100000条数据已经迁移到了目的kafka集群。

小结

本文介绍并演示了使用Kafka Connect在Kafka集群间进行数据迁移的方法。如果需要了解Kafka Connect更详细的使用方法,请参见Kafka官网资料和REST API。

作者:开源大数据EMR

kafka 查看待消费数据_通过Kafka Connect进行数据迁移相关推荐

  1. kafka 查看待消费数据_kafka查看消费数据

    一.如何查看 在老版本中,使用kafka-run-class.sh 脚本进行查看.但是对于最新版本,kafka-run-class.sh 已经不能使用,必须使用另外一个脚本才行,它就是kafka-co ...

  2. kafka异步发送数据_在Kafka上异步发送数据

    kafka异步发送数据 对于一个项目,我试图记录用户的基本交易,例如添加和删除一个项目以及多种类型的项目,并为每笔交易向kafka发送一条消息. 日志机制的准确性不是至关重要的,在kafka服务器停机 ...

  3. kafka控制台模拟消费_Flink初试——对接Kafka

    本篇文章我们用 Flink Kafka Connector对接Kafka,实现一个简单的报警业务.我们暂时不去谈论理论,先上手实现这个简单的需求. flink-connector-kafka是 fli ...

  4. 分布式从mysql查数据_技术分享 | 从库数据的查找和参数 slave_rows_search_algorithms...

    作者:高鹏 文章末尾有他著作的<深入理解MySQL主从原理 32讲>,深入透彻理解MySQL主从,GTID相关技术知识. 本文节选自<深入理解MySQL主从原理>第24节 注意 ...

  5. 数据错误循环冗余检查是什么意思_为什么数据库会丢失数据?今天我就来跟你掰扯掰扯

    这份分布式一致性算法文档,足够你解决分布式系统 80% 核心问题​zhuanlan.zhihu.com 从远程办公到简历被拒,再到斩获阿里offer,这份PDF功不可没​zhuanlan.zhihu. ...

  6. 数据错误循环冗余检查是什么意思_数据库为什么会丢失数据

    数据库管理系统在今天已经是软件的重要组成部分,开源的 MySQL.PostgreSQL 以及商业化的 Oracle 等数据库已经随处可见,几乎所有的服务都需要依赖数据库管理系统存储数据. 图 1 - ...

  7. 自拍会不会被大数据_不会搭建大数据平台,我被老板优化了...

    [51CTO.com原创稿件]随着业务的飞速发展,信息化作为业务的支撑,各个企业都建立了自己的信息化系统. 图片来自 Pexels 在业务增涨过程中,每个企业不知不觉积累积累了一些数据.无论数据是多是 ...

  8. 爬虫goodreads数据_使用Python从Goodreads数据中预测好书

    爬虫goodreads数据 Photo of old books by Ed Robertson on Unsplash 埃德·罗伯森 ( Ed Robertson)的旧书照片,内容为Unsplash ...

  9. 科学价值 社交关系 大数据_服务的价值:数据科学和用户体验研究美好生活

    科学价值 社交关系 大数据 A crucial part of building a product is understanding exactly how it provides your cus ...

  10. 微软大数据_我对Microsoft的数据科学采访

    微软大数据 Microsoft was one of the software companies that come to hire interns at my university for 202 ...

最新文章

  1. 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 )
  2. mysql 评论回复表设计_【数据库】评论回复表设计
  3. SQL server触发器中 update insert delete 分别给写个例子被。
  4. 选择题_一级造价师选择题的分值是多少
  5. 数字图像处理,图像锐化算法的C++实现
  6. 影响宝宝脾胃健康的3个“真凶”,难怪孩子脾胃总是调不好!
  7. CSS实现3D菜单效果【每日一题】
  8. JS,统计图表大全--十一、甘特图
  9. PCB设计笔记-AD(一)-如何从立创EDA中将元器件导入AD中
  10. 海德汉 LSV2 协议采集 2
  11. 解决方案:ppt打不开,显示发现文件中的内容有问题。可尝试修复此演示文稿
  12. ajax将监听器值赋值servlet,Java面试问题
  13. 企业经营发展战略的选择
  14. 苦练SOC“基本功”启明星辰九年磨一剑
  15. 浏览器Goole Chrome调试工具
  16. 百度之星 2015资格赛 列变位法解密【字符串】
  17. 都是学 AI,为什么别人薪资比你高?
  18. 李想创办的车和家发布智能电动车品牌“理想智造”
  19. 5、玩转树莓派音频——打造便携式合成器
  20. 人机同行:明略数据产品理念之二 | 简单好用

热门文章

  1. java 队列总结queue v3 svv.docxjava 队列总结queue v3 svv.docx atitit. java queue 队列体系总结o7t 1. 队列概念 1 1.1. 队列
  2. Atitit 计算机系统结构 计算机系统结构 Cpu 存储 cache 指令系统 目录 Line 56: 第2章指令系统设计 指令格式 寻址方式 1 Line 64: 第3章CPU及其实现
  3. Atitit 工具选型的因素与方法 attilax总结
  4. Atitit.各种 数据类型 ( 树形结构,表形数据 ) 的结构与存储数据库 attilax 总结
  5. paip.python错误解决21
  6. paip.提升效率----几款任务栏软件vc59
  7. 当前不会命中断点 还没有为该文档加载任何符号
  8. 学习笔记 | 读完《公开募集证券投资基金销售机构监督管理办法》的几点感受
  9. 查理·芒格的合伙人李录:价值投资在中国到底适不适用?
  10. 【优化算法】混沌博弈优化算法(CGO)【含Matlab源码 1803期】