该方案解决Kafka跨集群同步、创建Kafka集群镜像等相关问题,主要使用Kafka内置的MirrorMaker工具实现。

Kafka镜像即已有Kafka集群的副本。下图展示如何使用MirrorMaker工具创建从源Kafka集群(source cluster)到目标Kafka集群(target cluster)的镜像。该工具通过Kafka consumer从源Kafka集群消费数据,然后通过一个内置的Kafka producer将数据重新推送到目标Kafka集群。

一、如何创建镜像

使用MirrorMaker创建镜像是比较简单的,搭建好目标Kafka集群后,只需要启动mirror-maker程序即可。其中,一个或多个consumer配置文件、一个producer配置文件是必须的,whitelist、blacklist是可选的。在consumer的配置中指定源Kafka集群的Zookeeper,在producer的配置中指定目标集群的Zookeeper(或者broker.list)。

kafka-run-class.sh kafka.tools.MirrorMaker –consumer.config sourceCluster1Consumer.config –consumer.config sourceCluster2Consumer.config –num.streams 2 –producer.config targetClusterProducer.config –whitelist=“.*”

例如,你需要创建S集群的镜像,目标集群T已经搭建好,简单的做法如下:

1. 创建consumer配置文件:sourceClusterConsumer.config

zk.connect=szk0:2181,szk1:2181,szk2:2181

groupid=test-mirror-consumer-group

2. 创建producer配置文件:targetClusterProducer.config

zk.connect=tzk0:2181,tzk1:2181

3. 创建启动脚本:start.sh

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.MirrorMaker –consumer.config sourceClusterConsumer.config –num.streams 2 –producer.config targetClusterProducer.config –whitelist=“.*”

4. 执行脚本

执行start.sh通过日志信息查看运行状况,到目标Kafka集群的log.dir中即可看到同步过来的数据。

二、MirrorMaker的参数说明

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.MirrorMaker –help

执行上面的命令就可以看到各个参数的说明:

1. 白名单(whitelist) 黑名单(blacklist)

mirror-maker接受精确指定同步topic的白名单和黑名单。使用java标准的正则表达式,为了方便,逗号(‘,’)被编译为java正则中的(‘|’)。

2. Producer timeout

为了支持高吞吐量,你最好使用异步的内置producer,并将内置producer设置为阻塞模式(queue.enqueueTimeout.ms=-1)。这样可以保证数据(messages)不会丢失。否则,异步producer默认的 enqueueTimeout是0,如果producer内部的队列满了,数据(messages)会被丢弃,并抛出QueueFullExceptions异常。而对于阻塞模式的producer,如果内部队列满了就会一直等待,从而有效的节制内置consumer的消费速度。你可以打开producer的的trace logging,随时查看内部队列剩余的量。如果producer的内部队列长时间处于满的状态,这说明对于mirror-maker来说,将消息重新推到目标Kafka集群或者将消息写入磁盘是瓶颈。

对于kafka的producer同步异步的详细配置请参考$KAFKA_HOME/config/producer.properties文件。关注其中的producer.type和queue.enqueueTimeout.ms这两个字段。

3. Producer 重试次数(retries)

如果你在producer的配置中使用broker.list,你可以设置当发布数据失败时候的重试次数。retry参数只在使用broker.list的时候使用,因为在重试的时候会重新选择broker。

4. Producer 数量

通过设置—num.producers参数,可以使用一个producer池来提高mirror maker的吞吐量。在接受数据(messages)的broker上的producer是只使用单个线程来处理的。就算你有多个消费流,吞吐量也会在producer处理请求的时候被限制。

5. 消费流(consumption streams)数量

使用—num.streams可以指定consumer的线程数。请注意,如果你启动多个mirror maker进程,你可能需要看看其在源Kafka集群partitions的分布情况。如果在每个mirror maker进程上的消费流(consumption streams)数量太多,某些消费进程如果不拥有任何分区的消费权限会被置于空闲状态,主要原因在于consumer的负载均衡算法。

6. 浅迭代(Shallow iteration)与producer压缩

我们建议在mirror maker的consumer中开启浅迭代(shallow iteration)。意思就是mirror maker的consumer不对已经压缩的消息集(message-sets)进行解压,只是直接将获取到的消息集数据同步到producer中。

如果你开启浅迭代(shallow iteration),那么你必须关闭mirror maker中producer的压缩功能,否则消息集(message-sets)会被重复压缩。

7. Consumer 和 源Kafka集群(source cluster)的 socket buffer sizes

镜像经常用在跨集群场景中,你可能希望通过一些配置选项来优化内部集群的通信延迟和特定硬件性能瓶颈。一般来说,你应该对mirror-maker中consumer的socket.buffersize 和源集群broker的socket.send.buffer设定一个高的值。此外,mirror-maker中消费者(consumer)的fetch.size应该设定比socket.buffersize更高的值。注意,套接字缓冲区大小(socket buffer size)是操作系统网络层的参数。如果你启用trace级别的日志,你可以检查实际接收的缓冲区大小(buffer size),以确定是否调整操作系统的网络层。

三、如何检验MirrorMaker运行状况

Consumer offset checker工具可以用来检查镜像对源集群的消费进度。例如:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –group KafkaMirror –zkconnect localhost:2181 –topic test-topic

KafkaMirror,topic1,0-0 (Group,Topic,BrokerId-PartitionId)

Owner = KafkaMirror_jkoshy-ld-1320972386342-beb4bfc9-0

Consumer offset = 561154288

= 561,154,288 (0.52G)

Log size = 2231392259

= 2,231,392,259 (2.08G)

Consumer lag = 1670237971

= 1,670,237,971 (1.56G)

BROKER INFO

0 -> 127.0.0.1:9092

注意,–zkconnect参数需要指定到源集群的Zookeeper。另外,如果指定topic没有指定,则打印当前消费者group下所有topic的信息。

欢迎工作一到五年的Java工程师朋友们加入Java架构开发:855835163

本群提供免费的学习指导 架构资料 以及免费的解答

不懂得问题都可以在本群提出来 之后还会有职业生涯规划以及面试指导

同时大家可以多多关注一下小编 大家一起学习进步

java集群如何同步_Kafka 跨集群同步方案相关推荐

  1. Elastic:集群相关知识点总结(一)数据流 Data Stream、索引生命周期 ILM、可搜索快照 searchable snapshots、跨集群搜索 CCS、跨集群复制 CCR

    0.引言 集群管理是ES的核心重点,因此相关的知识点至关重要,本期主要针对数据流.索引生命周期.可搜索快照.跨集群搜索.跨集群复制进行讲解 1.数据流 Data Stream 官方文档:Data st ...

  2. mysql集群不同步_mysql数据库集群出现1236错误导致主库与从库无法同步的

    mysql数据库集群出现1236错误导致主库与从库无法同步的 发布时间:2020-02-28 02:50:14 来源:51CTO 阅读:153 作者:o凤舞九天o /etc/my.cnf中sync_b ...

  3. Elasticsearch(CCR)主从同步之跨集群复制

    注: 部分概念介绍来源于网络 1.跨集群复制Cross-cluster replication (CCR) 跨集群复制最早发布版本Elasticsearch 6.7 版本 跨集群复制 (CCR) 功能 ...

  4. RabbitMQ镜像集群与ShovelFederation跨集群数据同步

    部署环境与版本 安装包下载地址: Erlang语言依赖下载链接:http://erlang.org/download/otp_src_21.1.tar.gz RabbitMQ 3.7.8版本下载链接: ...

  5. es集群搭建_滴滴Elasticsearch 集群跨版本升级与平台重构之路

    前不久,滴滴ES团队将维护的30多个ES集群,3500多个ES节点,8PB的数据,从2.3.3跨大版本无缝升级到6.6.1.在对用户查询写入基本零影响和改动的前提下,解决了ES跨大版本协议不兼容.文件 ...

  6. Kafka集群间同步数据方案-Flume

    Apache Flume 是一个分布式.高可靠.高可用的用来收集.聚合.转移不同来源的大量日志数据到中央数据仓库的工具. 系统要求 Java运行环境 - Java 1.8或更高版本 体系结构 Even ...

  7. java 集群 文件共享_使用Artifactory集群作为文件共享中心

    一.背景和痛点 大企业内部,跨团队,跨地域,导致文件共享困难 如果不使用Artifactory,如何实现跨数据中心的文件共享呢? 挂载NFS文件系统,开通跨数据中心的rsync/sftp协议 自研解决 ...

  8. RabbitMQ 镜像集群之同步策略_专栏讲解

    文章目录 一. 基础知识汲取 1.1. 镜像集群简述 1.2. 策略参数说明 1.3. 策略案例 二.HA mode 同步方式 2.1. 参数说明 2.2. 案例 2.3. 命令终端形式 2.4. M ...

  9. mongodb集群数据同步及故障演练

    在上一篇我们简单搭建了mongodb的副本集模式的集群,这一篇我们来模拟一下数如何在集群实现同步的,并简单演示一下集群故障及自动恢复的场景, 1.启动集群,三个节点做同样的操作 2.启动成功后,进入某 ...

  10. java集群技术_什么是集群?集群?java集群技术面试的一些知识准备

    你是否正在寻找关于集群技术的内容?让我把最完整的东西奉献给你: java集群技术面试的一些知识准备 一个集群系统是一群松散结合的服务器组,形成一个虚拟的服务器,为客户端用户提供统一的服务.对于这个客户 ...

最新文章

  1. 冷热分离和直接使用大数据库_中台有“数”:大数据技术为苏宁818保驾护航
  2. .net 怎么在控制器action中返回一个试图_一个view事件分发,面试官6连问直击灵魂,我被虐的体无完肤...
  3. 一场关于动态化开发实践的技术探讨
  4. java double 的精度_Java Double的精度问题
  5. js 金额转为大写
  6. Oracle实战笔记(第二天)
  7. 马斯克2021五大预测:重返月球并比赛遥控汽车,全面实现自动驾驶,你pick哪一个?...
  8. Java中hashCode和equals方法的正确使用
  9. shell脚本备份MySQL
  10. mysql like in 组合_mysql like in 组合 黄小柔junior分手原因
  11. 【AD封装】 Type C 封装库 6Pin 24Pin分享下载(带3D视图)
  12. 计算机在机械工程中的应用英语作文,机械英语论文范文
  13. Java开发基础(四)——dbutils的使用
  14. 基于spring cloud + nacos + gateway + ssm+的学生管理系统
  15. 科学管理之父——泰勒的故事
  16. 《逆向工程核心原理》相关说明
  17. 安科瑞无线测温装置ARTM的功能特点有哪些
  18. C#环境下GDAL / OGR环境配置与入门
  19. 魔兽争霸在win7下的分辨率解决方法
  20. **关于交流电机、直流电机、永磁同步电机、步进电机的分类与控制**

热门文章

  1. 学习OO,实现的小跟堆代码
  2. 内核中断,异常,抢占总结篇
  3. 深入解读Linux内存管理系列(5)——lowmem和highmem
  4. 卡特兰数(JAVA大数)Buy the Ticket
  5. 面向对象19:内部类
  6. Cannot load module file xxx.iml Intellij
  7. 【动态规划】数位DP入门题:不要62
  8. 线段树详解 一(单点更新 区间查询)
  9. php startup memcache,centos php 安装memcache模块
  10. 提交响应后无法调用sendredirect_DDD 指导应用垂直拆分后事务问题