文章目录

  • CHAPTER 9 Administering Kafka 管理kafka
    • Topic Operations 主题操作
      • Creating a New Topic 创建新的topic
        • Specifying Topic Configurations 指定Topic的配置
      • Adding Partitions 添加分区
      • 删除topic
      • Listing All Topics in a Cluster 列出集群所有的topic
      • Describing Topic Details 描述topic的细节
    • Consumer Groups 消费者组
      • 列出组详情
      • Delete Group 删除消费者组
      • Offset Management offset管理
        • Export Offsets 导出offsets
        • Import Offsets 导入offsets
    • Dynamic Configuration Changes 配置动态修改
      • Overriding Topic Configuration Defaults 覆盖topic配置的默认值
      • Overriding Client Configuration Defaults 重写客户端的缺省配置
      • Describing Configuration Overrides 配置覆盖说明
      • Removing Configuration Overrides 删除覆盖的配置
    • Partition Management 分区管理
      • Preferred Replica Election
      • Changing a Partition’s Replicas 更改分区的副本数
      • Changing Replication Factor 改变副本因子
      • Dumping Log Segments
      • Replica Verification 副本验证
    • Consuming and Producing 生产和消费
      • Console Consumer 控制台消费者
        • Message formatter options消息格式化器选项
        • Consuming the offsets topics 指定offset消费topic
      • Console Producer 控制台生产者
        • Line-Reader Options 行读取选项
    • Client ACLs 客户端acls
    • Unsafe Operations 不安全操作
      • Moving the Cluster Controller 移动集群控制器
      • Killing a Partition Move 终止分区移动
      • Removing Topics to Be Deleted 删除topic
      • Deleting Topics Manually 手动删除
    • Summary 总结

CHAPTER 9 Administering Kafka 管理kafka

Kafka提供了几个命令行接口实用程序,他们对于kafka集群的配置管理非常有用。这些工具是通过java来实现的,并提供了一组脚本来调用这些类。这些工具提供了基本的功能,但是对于更复杂的操作,你可能会发现他们还是有些力不从心。本章将描述做为Apache Kafka开源项目的一部分的工具。在Apache桑可以找到关于社区中开发的高级工具的更多信息。详见kakfa官网。

  • 授权管理操作:
    虽然Apache Kafka实现了身份验证和授权来控制Topic的操作,但是大多数集群操作还不支持。这意味着这些命令行工具可以在不需要任何身份验证的情况下使用。这将允许在不进行安全检查或者审计的情况下执行诸如topic更改之类的操作。该功能正在开发中,应该很快就会添加。

Topic Operations 主题操作

kafka-topics.sh工具提供了快速创建、修改、删除和列出集群中的topic信息。配置管理已经移到kafka-configs.sh中。使用这些命令行工具,需要使用–zookeeper参数,如:zoo1.example.com:2181/kafka-cluster。

  • 版本检查Check the Version 许多命令行工具直接将元数据存储在zookeeper中,而不是broker。因此,务必确保你的工具版本与集群broker的版本匹配。最安全的办法是使用kafka broker服务端程序自带的工具。

Creating a New Topic 创建新的topic

在集群中创建topic的时候需要三个参数。这三个参数是必须的。尽管一些参数在broker上有默认值。

  • Topic Name : 你需要创建Topic的名称
  • Replication Factor:副本因子,在集群中维护topic的每个分区的副本数。
  • Partitions: 分区数,topic由这些分区组成。
Specifying Topic Configurations 指定Topic的配置

还可以在创建的时候显示设置topic的副本,或者设置配置参数对topic的配置进行覆盖。这些操作不在此讨论。配置覆盖可以在本章后面找到,他们可以提供给kafka-topics.sh 通过 --config 命令行参数使用。分区的配置也将在后续内容中介绍。

Topic的名称可以包含字母、数字、字符及下划线、破折号和点号。

  • topic的命名:允许但是不建议以两个下划线开头的topic名称。这种形式的topic呗视为集群内部的topic。如消费者组offset存储的topic是__consumer_offsets。也不建议在单个集群中同时使用句号和下划线,因为在内部统计topic的时候,句号被改为了下划线,如:topic.1在统计中将变成topic_1。

kafka-topics.sh 示例如下:

kafka-topics.sh --zookeeper <zookeeper connect> --create --topic <string>
--replication-factor <integer> --partitions <integer>

该命令将导致集群创建具有指定名称和分区数量的topic。对于每个分区,集群将选择适当的副本数。这意味着,如果集群设置为支持机架的副本分配,那么每个分区的副本将位于单独的机架中,如果不需要机架支持,那么可以通过–disable-rack-aware 关闭。
如下创建名为 my-topic的topic,包含8个分区,每个分区有2个副本:

# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --create
--topic my-topic --replication-factor 2 --partitions 8
Created topic "my-topic".
#
  • 跳过存在错误:当用上述脚本进行自动化操作的时候,你可以使用–if-not-exists参数,如果Topic已经存在,则不会返回一个错误。

Adding Partitions 添加分区

有时需要增加topic的分区数量,分区是在集群中扩展和复制TOPIC的方式。增加分区技术的最常见的原因是为了进一步扩展topic,或者降低单个分区的吞吐量。如果消费者需要扩展在单个组中运行更多的副本,则Topic也可以增加,因为分区只能由组中的单个成员使用。

  • 调整topic的key 在消费者角度来看,使用key来控制的topic很难添加分区。这是因为key到分区的映射将随着分区数量的改变而改变。由于这个原因,建议在创建topic的时候设置一次包含key控制消息的topic的分区数量,并避免调整topic的大小。
  • 跳过不存在topic的错误。虽然为–alter命令提供了一个–if-exists参数,但是不建议使用它。如果正在修改的topic不存在,使用此参数将导致命令不返回错误。这可能会掩盖本应该创建topic的topic不存在问题。
    例如,将my-topic的分区增加到16个:
# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --topic my-topic --partitions 16
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
#
  • 减少分区的数量。不可能减少topic分区的数量,不支持此操作的原因是,从topic中删除的分区将导致该topic的部分数据也被删除。从消费者的角度来看,这是不一致的。此外,尝试将数据重新分发到剩余的分区也会很困难。并导致无序的消息。如果需要减少分区的数量,则需要删除topic并重新创建它。

删除topic

即使没有消息的topic也会使用集群的资源,包括磁盘空间,打开的文件句柄和内存。如果topic不再需要,可以删除它并释放这些资源。为了执行此操作,集群中的broker必须配置了delete.topic.enable选项为true。如果这个选项设置为false,则这个操作将被忽略。

  • 删除数据之前需要注意:删除一个topic也会删除它的全部消息。这是不可逆的操作,所以一定要小心执行。

例如,删除my-topic的topic:

# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--delete --topic my-topic
Topic my-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set
to true.
#

Listing All Topics in a Cluster 列出集群所有的topic

topic工具可以列出集群所有的topic,列表格式为每行一个topic,没有特定的顺序。
样例如下:

# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--list
my-topic - marked for deletion
other-topic
#

Describing Topic Details 描述topic的细节

还可以获得关于集群中的一个或者多个topic的详细信息,输出包括分区计数,topic配置覆盖以及每个分区及其副本分配的清单。通过向命令行提供一个topic参数,可以将此限制为单个topic主题。
样例如下:

# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe
Topic:other-topic PartitionCount:8 ReplicationFactor:2 Configs:
Topic:other-topic Partition: 0 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 1 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 2 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 3 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 4 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 5 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 6 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 7 ... Replicas: 0,1 Isr: 0,1
#

describe命令还有几个用于过滤输出的选项。这有助于诊断集群问题。对于其中每一个,不要指定–topic参数。因为目的是查找集群中匹配条件的所有topic和分区。因此这些选项不适用于list命令。
为了找到所有具有覆盖的topic,请使用 --topics-withoverrides 参数,这将只描述配置与缺省值不同的topic。这将只描述配置与集群缺省的不同的topic。
有两个过滤器用于查找所有问题的分区,–underreplicated-partitions 将显示一个或者多个副本与leader不同步的所有分区。–unavailable-partitions显示没有leader的所有分区。这是一种更严重的情况,意味着该分区目前处于脱机状态,不能用于客户端生产或者使用。
样例如下:

# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe --under-replicated-partitions
Topic: other-topic Partition: 2 Leader: 0 Replicas: 1,0
Isr: 0
Topic: other-topic Partition: 4 Leader: 0 Replicas: 1,0
Isr: 0
#

Consumer Groups 消费者组

kafka中的消费者组分为两个不同的地方维护,对于较老的消费者,信息存储在zookeeper中。而对于版本比较新的消费者,信息存储在kafka中的特定topic中。kafka-consumer-groups.sh可以列出这两种类型的消费者组,它还可以用于删除消费者组的offset。但仅仅用在旧的消费者组下运行的组中,在zookeeper中维护的消费者组。在与较老的消费者组一起工作时,你将访问–zookeeper参数指定的kafka集群。对于新版本的消费者组,你需要通过–bootstrap-server指定kafka broker的IP和端口。

列出组详情

旧的消费者详情:

# kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --list
console-consumer-79697
myconsumer
#

新的消费者:

# kafka-consumer-groups.sh --new-consumer --bootstrap-server
kafka1.example.com:9092/kafka-cluster --list
kafka-python-test
my-new-consumer
#

如果要查看一个消费者组的情况,你可以用–describe替换–list之后加上–group参数。这将列出当前指定的消费者组正在使用的topic以及每个topic的offset。
如下对testgroup进行查看:

# kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe --group testgroup
GROUP TOPIC PARTITION
CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
myconsumer my-topic 0
1688 1688 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 1
1418 1418 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 2
1314 1315 1
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 3
2012 2012 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 4
1089 1089 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 5
1429 1432 3
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 6
1634 1634 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 7
2261 2261 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
#

输出字段的解释如下表:

字段 描述
GROUP 消费者组的名称
TOPIC 被消费的topic的名称
PARTITION 被消费的分区ID
CURRENT-OFFSET 消费者组为这个topic分区提交的最后一个offset,这是消费者在分区中的位置。
LOG-END-OFFSET Topic的当前高水位线的offset,这是生产者提交到消费者集群被确认的最后一条消息的offset
LAG 此Topic分区的消费者当前的offset和broker中水位线的差异
OWNER 当前使用此topic的分区的消费者的组成员,这是消费者组成员提供的任意ID,不一定包括消费者的主机名

Delete Group 删除消费者组

只支持对旧的消费者客户端删除消费者组。这将从zookeeper中删除整个组。包括该组正在使用的所有topic的所有被存储的offset。应该关闭组中的消费者,如果没有首先关闭所有消费者,则可能带来消费者的未定义行为,因为组的zookeeper元数据将在消费者使用的时候被删除。
删除示例如下:

# kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --delete --group testgroup
Deleted all consumer group information for group testgroup in
zookeeper.
#

还可以使用相同的命令删除正在使用的当个topic的offset,而不是删除整个组。同样,江一在执行操作之前停止使用消费者组。或配置为不适用要删除的topic。
从名为testgroup的消费者组中删除my-topic的offset:

# kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --delete --group testgroup
--topic my-topic
Deleted consumer group information for group testgroup topic
my-topic in zookeeper.
#

Offset Management offset管理

除了使用旧的消费者客户端显示和删除消费者组的offset之外,还可以检索offset并以批处理的方式存储新的offset。这对于在出现需要重新读取消息的问题时为使用重置offset非常有用。或者对于在消费者有问题之后的消息推进offset(入果存在消费者无法处理的格式化错误的消息)。

  • 管理offset的commit 对于将offset提交给kafka的消费者客户端,目前还米有可用的工具来管理offset,此功能仅对向zookeeper提交offset的用户可用。以管理承诺的组的offset,你必须使用客户端中可用的api来提交组的offset。
Export Offsets 导出offsets

没有命名脚本来导出offset,但是我们可以使用kafka-run-class.sh 在适当的时候通过其底层的java类来执行该工具。导出offset并将其生成一个文件。该文件以导入工具可以读取的以定义的格式包含的组的每个topic的分区及offset。创建的文件每行有一个topic分区,格式如下:

/consumers/GROUPNAME/offsets/topic/TOPICNAME/PARTITIONID-0:OFFSET.

将名为testgroup的消费者组的offset导出到名为offsets的文件中:

# kafka-run-class.sh kafka.tools.ExportZkOffsets
--zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup
--output-file offsets
# cat offsets
/consumers/testgroup/offsets/my-topic/0:8905
/consumers/testgroup/offsets/my-topic/1:8915
/consumers/testgroup/offsets/my-topic/2:9845
/consumers/testgroup/offsets/my-topic/3:8072
/consumers/testgroup/offsets/my-topic/4:8008
/consumers/testgroup/offsets/my-topic/5:8319
/consumers/testgroup/offsets/my-topic/6:8102
/consumers/testgroup/offsets/my-topic/7:12739
#
Import Offsets 导入offsets

导入offset的工具与导出相反,它获取通过导出上一节中的offset生成的文件,并使用该文件设置消费者组的当前offset。一种常见的做法时导出消费者组的当前offset,对文件进行复制,并编辑该副本。以offset替换为所需要的值。注意,对于import命令,没有使用–group选项。这是因为消费者组名称要嵌入到要导入的文件中。

  • 注意,首先要关闭消费者。在执行此步骤之前,必须停止消费者组中的所有消费者。如果在消费者组处于活动状态时写入新的offset,则不会读取这些offset。消费者会将这些写入的offset覆盖。
    从一个名为offsets的文件中导入名为testgroup的消费者组的offset。
# kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect
zoo1.example.com:2181/kafka-cluster --input-file offsets
#

Dynamic Configuration Changes 配置动态修改

在集群运行的过程中,可以对topic和客户端的配置进行覆盖。kafka的开发者打算在未来添加更多的动态配置,这就是为什么这些更改被放在一个单独的命令行工具kafka-config.sh中。这运行你为特定的topic和客户端id设置配置。一旦设置好,这些配置对于集群就是永远生效的。他们存储在zookeeper中,并在启动的死后由每个broker读取。在工具和文档中,像这样为每个topic或者客户端动态配置被称为重写。
与前面的工具一样,需要使用–zookeeper参数为集群提供zookeeper的连接字符串。在下面的示例中,假设zookeeper的连接字符串为zoo1.example.com:2181/kafka-cluster。

Overriding Topic Configuration Defaults 覆盖topic配置的默认值

有许多应用于topic的配置,可以针对单个topic更改这些配置。以适应当个集群中的不同的用例。大多数配置都是在broker配置中指定的缺省的值。除非设置了覆盖,否则将使用缺省值。
更改topic的样例如下:

kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name <topic name>
--add-config <key>=<value>[,<key>=<value>...]

有效配置见下表:

key 说明
cleanup.policy 如果设置为compact,则topic中 的消息将被丢弃,仅保留具有给定key的最新消息(日志压缩)。
compression.type broker将消息写入磁盘时使用的压缩类型,可以用gzip、snappy和lz4.
delete.retention.ms 删除墓碑,将为这个topic保留多长时间。仅仅对日志压缩的topic有效。
file.delete.delay.ms 从磁盘中删除此topic的日志端和索引之前需要等待的多长时间
flush.messages 在强制将此topic的消息刷到磁盘之前接收的消息数
flush.ms 在强制将此topic的消息刷到磁盘之前需要的时间,单位是ms
index.interval.bytes 日志段索引中的条目之间可以产生多少字节的消息
max.message.bytes 此topic中当个消息的大小
message.format.version broker将消息写入磁盘时使用的消息格式版本,必须是一个有效的版本号,如0.10.0
message.timestamp.difference.max.ms 接收消息时,消息时间戳和broker时间戳之间允许的最大差异。只有当message.timestamp.type 是CreateTime时生效
message.timestamp.type 将消息写入磁盘时使用的时间戳,当前值为createTime用于客户端指定的时间戳。LogAppendtime用于broker将消息写入分区的时间。
min.cleanable.dirty.ratio 日志压缩器尝试为这个topic压缩分区的频率,表示未压缩的日志段数与日志段总数的比率,仅对日志压缩topic有效
min.insync.replicas topic的一个分区必须同步的最小副本才能被认为是可用的
preallocate 如果设置为true,则应该在滚动新段的时候预先分配此topic的日志段
retention.bytes 为topic保留的消息量的总字节数
retention.ms topic中消息保留的最长时间
segment.bytes 写入分区中的单个日志段的消息大小
segment.index.bytes 按日志段索引的最大大小,以字节为单位
segment.jitter.ms 随机化添加到段的最大毫秒数,之后将生成新的段
segment.ms 旋转每个分区的日志段的频率,以毫秒为单位
unclean.leader.election.enable 如果设置为false,则不洁的领导人选举将不被允许

如将my-topic的topic的用户留存时间设置为1小时,3600万ms:

# kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name my-topic --add-config
retention.ms=3600000
Updated config for topic: "my-topic".
#

Overriding Client Configuration Defaults 重写客户端的缺省配置

对于kafka客户端唯一可以配置的是生产者和消费者的配额,他们都是一个字节/秒的速率。允许具有指定客户端ID的所有客户端在每个broker的基础上生成或者使用。这意味着,如果集群中有5个broker,并且为一个客户端指定10M/s的生产者配额,那么该客户端将被允许在broker上同时生产10MB/s的总量为50MB/s。

  • 客户端ID与消费者组 客户端ID不一定与消费者组的名称相同,消费者可以设置他们自己的客户端ID,而且你可能有许多位于不同组的消费者,他们指定的相同的客户端ID,最佳的方法是将每个消费者组的客户端ID设置为标识该组的唯一值/这运行单个消费者组共享配额,并且更容易的在日志中确定哪个组负责请求。

更改客户端配置的格式如下:

kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type clients --entity-name <client ID>
--add-config <key>=<value>[,<key>=<value>...]

客户端支持的配置参数如下表:

key 说明
producer_bytes_rate 允许单个客户端ID在一秒内生成给单个broker的消息量。以字节为单位
consumer_bytes_rate 允许单个消费者ID在一秒内单个broker中消费的消息量,以字节为单位

Describing Configuration Overrides 配置覆盖说明

可以使用命令行工具列出所有的配置,这将允许你检查topic的特定配置,与其他工具类似,通过–describe命令即可:

# kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe --entity-type topics --entity-name my-topic
Configs for topics:my-topic are
retention.ms=3600000,segment.ms=3600000
#
  • topic 覆盖: 配置describe 值显示覆盖,它不包括集群默认的配置。目前,无论是通过zookeeper还是kafka存储的新旧版本,都无法动态地发现broker本身的配置,这意味着,当使用此工具在自动会发现topic或者客户端的设置时,该工具必须具有集群默认的配置的独立知识。

Removing Configuration Overrides 删除覆盖的配置

可以完全删除动态配置,这将导致集群恢复到默认值,要删除配置覆盖,情使用alter命令和delete-config命令。
如下是一个重写retention.ms :

# kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name my-topic
--delete-config retention.ms
Updated config for topic: "my-topic".
#

Partition Management 分区管理

kafka工具包含两个用于分区管理的脚本,一个允许副本选举,另外一个用于为broker分配分区的低级实用程序。这些工具一起可以帮助在kafka broker集群中实现消息流量的适当平衡。

Preferred Replica Election

如第六章所述,为了可靠性,分区可以有多个副本,但是,这些副本中只有一个可以做为分区的leader,并且所有生成和消费操作都发生在这个broker上,kafka internals将其定位副本列表中的第一个同步副本,但是当broerk停止并重写启动的时候,他不会自动恢复任何分区的领导权。

  • Automatic Leader Rebalancing 自动reblance
    一个用于自动leader重平衡的broker配置,不建议用于生产环节。自动reblance会对性能造成重大影响,对于较大的集群,他会导致客户端流量长时间暂停。
    让broker恢复领导抵为的一种方式是触发一种首选的副本选举机制。这告诉集群控制器为分区选择理想的leader,该操作通常不会产生影响,因为客户可以自动跟踪leader的变更,这可以使kafka-preferred-replicaelection.sh手动进行。
    为集群中所有的topic启动一个首选副本选择:
# kafka-preferred-replica-election.sh --zookeeper
zoo1.example.com:2181/kafka-cluster
Successfully started preferred replica election for partitions
Set([my-topic,5], [my-topic,0], [my-topic,7], [my-topic,4],
[my-topic,6], [my-topic,2], [my-topic,3], [my-topic,1])
#

对于有大量分区的集群,可能无法运行单一首选副本选择,请求必须写入集群元数据中的zookeeper的znode,如果请求大于znode的大小默认为1MB,则请求将失败。在本例中,需要创建一个包含j’son对象的文件,该对象列出要选择的分区,并将请求分解为多个步骤,json的格式为:

{
"partitions": [
{
"partition": 1,
"topic": "foo"
},
{
"partition": 2,
"topic": "foobar"
}
]
}

例如,在一个名为partitions.json的文件中指定分区列表,开始一个首选副本选择:

# kafka-preferred-replica-election.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --path-to-json-file
partitions.json
Successfully started preferred replica election for partitions
Set([my-topic,1], [my-topic,2], [my-topic,3])
#

Changing a Partition’s Replicas 更改分区的副本数

有时,可能需要更改分区的副本配置,需要有这样的例子:

  • 如果主题的分区在集群中不平衡,导致broker上的负载不均匀。
  • 如果broker脱机,且分区复制不足
  • 如果添加了一个新的borker,并且需要接收集群的负载的共享

kafka-reassign-partitions.sh能够用来执行这个操作。它必须包含如下两个步骤:第一步使用broker列表和topic列表来生成一组移动。第二本执行生成的移动。还有一个可选的第三步,它使用生成的列表来验证分区重写分配的进度或完成。
要生成一组分区移动,必须创建一个包含列出topic的JSON对象的文件。JSON对象的格式如下:

{
"topics": [
{
"topic": "foo"
},
{
"topic": "foo1"
}
],
"version": 1
}

例如,生成一组分区移动以移动文件中列出的topic.json发送给id为0和1的broker:

# kafka-reassign-partitions.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --generate
--topics-to-move-json-file topics.json --broker-list 0,1
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]},
{"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"mytopic","
partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"repli
cas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"mytopic","
partition":6,"replicas":[1,0]},{"topic":"my-topic","partition":3,"replicas":[0,1]},{"topic":"my-topic","partition":15,"replicas":[0,1]},
{"topic":"my-topic","partition":0,"replicas":[1,0]},{"topic":"mytopic","
partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"repli
cas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"mytopic","
partition":2,"replicas":[1,0]},{"topic":"my-topic","partition":
13,"replicas":[0,1]},{"topic":"my-topic","partition":14,"replicas":[1,0]},
{"topic":"my-topic","partition":9,"replicas":[0,1]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]},
{"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"mytopic","
partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"repli
cas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"mytopic","
partition":6,"replicas":[1,0]},{"topic":"my-topic","partition":
15,"replicas":[0,1]},{"topic":"my-topic","partition":0,"replicas":[1,0]},
{"topic":"my-topic","partition":3,"replicas":[0,1]},{"topic":"mytopic","
partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"repli
cas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"mytopic","
partition":13,"replicas":[0,1]},{"topic":"my-topic","partition":
2,"replicas":[1,0]},{"topic":"my-topic","partition":14,"replicas":[1,0]},
{"topic":"my-topic","partition":9,"replicas":[0,1]}]}
#

broker列出在命令行工具中以都好分隔的broker ID列表的形式提供在标准的输出中。该工具将输出两个JSON对象。描述topic当前分区分配和建议的分区分配。JSON格式为:

{"partitions": [{"topic": "mytopic",
"partition": 0, "replicas": [1,2] }], "version":_1_}.

可以保持的第一个JSON对象,以备重写分配时需要恢复时使用。第二个JSON对象,应该保持到一个新的文件中,然后,这个文件被提供给kafka-reassign-partitions.sh工具在第二步使用。
如;从reassign.json执行分区建议分配:

# kafka-reassign-partitions.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --execute
--reassignment-json-file reassign.json
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]},
{"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"mytopic","
partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"repli
cas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"mytopic","
partition":6,"replicas":[1,0]},{"topic":"my-topic","partition":
3,"replicas":[0,1]},{"topic":"my-topic","partition":15,"replicas":[0,1]},
{"topic":"my-topic","partition":0,"replicas":[1,0]},{"topic":"mytopic","
partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"repli
cas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"mytopic","
partition":2,"replicas":[1,0]},{"topic":"my-topic","partition":13,"replicas":[0,1]},{"topic":"my-topic","partition":14,"replicas":[1,0]},
{"topic":"my-topic","partition":9,"replicas":[0,1]}]}
Save this to use as the --reassignment-json-file option during
rollback
Successfully started reassignment of partitions {"version":1,"partitions":
[{"topic":"my-topic","partition":5,"replicas":[0,1]},{"topic":"mytopic","
partition":0,"replicas":[1,0]},{"topic":"my-topic","partition":7,"repli
cas":[0,1]},{"topic":"my-topic","partition":13,"replicas":[0,1]},{"topic":"mytopic","
partition":4,"replicas":[1,0]},{"topic":"my-topic","partition":
12,"replicas":[1,0]},{"topic":"my-topic","partition":6,"replicas":[1,0]},
{"topic":"my-topic","partition":11,"replicas":[0,1]},{"topic":"mytopic","
partition":10,"replicas":[1,0]},{"topic":"my-topic","partition":9,"repli
cas":[0,1]},{"topic":"my-topic","partition":2,"replicas":[1,0]},{"topic":"mytopic","
partition":14,"replicas":[1,0]},{"topic":"my-topic","partition":
3,"replicas":[0,1]},{"topic":"my-topic","partition":1,"replicas":[0,1]},
{"topic":"my-topic","partition":15,"replicas":[0,1]},{"topic":"mytopic","
partition":8,"replicas":[1,0]}]}
#

这将启动将指定的分区副本重写分配到新的broker,集群控制器通过将新的副本添加到每个分区副本列表,增加副本因子,来执行此操作。然后,新的副本将从当前leader复制每个分区的所有现有消息。根据磁盘上分区的大小,在通过网络将数据复制到新的副本时,这可能会花费大量的时间。复制完成之后,控制器将从复制列表中删除旧的副本,将复制因子减少到原始的大小。

  • 改善在重新分配副本的时候的网络利用率:当从当个broker删除许多分区的时候,比如从集群中删除该broker,最佳的实践时在启动重写分配之前关闭并重写启动broker。这把特定的broker上分区的领导权转移到集群中其他broker上,这可以显著提高重写分配的时候的性能,并减少对集群的影响,因为复制流量将分配到许多的broker。
    当重写分配运行时,再它完成后,kafka-reassignpartitions.sh可以用来验证重写分配的状态。这将显示哪些重写分配正在进行中,什么重写分配已经完成,如果出现错误,什么重写分配已经失败。为此,你必须拥有在此执行步骤中使用的带有JSON对象的文件。
    例如,从reassign.json校验正在运行的重写分配:
# kafka-reassign-partitions.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --verify
--reassignment-json-file reassign.json
Status of partition reassignment:
Reassignment of partition [my-topic,5] completed successfully
Reassignment of partition [my-topic,0] completed successfully
Reassignment of partition [my-topic,7] completed successfully
Reassignment of partition [my-topic,13] completed successfully
Reassignment of partition [my-topic,4] completed successfully
Reassignment of partition [my-topic,12] completed successfully
Reassignment of partition [my-topic,6] completed successfully
Reassignment of partition [my-topic,11] completed successfully
Reassignment of partition [my-topic,10] completed successfully
Reassignment of partition [my-topic,9] completed successfully
Reassignment of partition [my-topic,2] completed successfully
Reassignment of partition [my-topic,14] completed successfully
Reassignment of partition [my-topic,3] completed successfully
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [my-topic,15] completed successfully
Reassignment of partition [my-topic,8] completed successfully
#
  • Batching Reassignments批次分配
    分区重新分配对集群的性能有很大的影响,因为他们会导致内存页缓存的一致性发生变化。并使用网络和磁盘IO,将重新分配分解为许多小步骤时一个保持这种最小化的好主意。

Changing Replication Factor 改变副本因子

分区重新分配工具中有一个未在文档中说明的特性,它允许你增加或者简述分区的副本因子。在使用错误的副本因子创建分区的情况下,这可能是必须的,假如在创建topic的时候没有足够的broker可用。这可以通过创建一个json对象来完成,该json对象的格式在分区重新分配的执行步骤中使用,该步骤条件或者删除副本以正确设置副本因子。集群将完成重新分配,并将复制因子保持在新的大小。
例如,考试一个名为my-topic的tipic的当前分配,他有一个分区,副本因子为1:

{
"partitions": [
{
"topic": "my-topic",
"partition": 0,
"replicas": [
1
]
}
],
"version": 1
}

在重新分配分区的执行步骤中提供以下json对象将导致副本因子增加到2:

{
"partitions": [
{
"partition": 0,
"replicas": [
1,
2
],
"topic": "my-topic"
}
],
"version": 1
}

类似的,通过提供具有更小副本列表的json对象,可以减少分区的副本呢因子。

Dumping Log Segments

如果你必须寻找消息的特定内容,可能是因为你的topic中出现了消费者无法处理的毒丸消息,那么你有一个辅助工具可以为分区解码日志段,这将允许你查看单独的消息,而不需要使用和解码他们。该工具以逗号分隔的日志段文件列表做为参数,并可以打印消息摘要信息或者详细的消息数据。
例如,解吗名为00000000000052368601.log的日志段文件:

# kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000052368601.log
Dumping 00000000000052368601.log
Starting offset: 52368601
offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431
offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true
payloadsize: 932 magic: 0 compresscodec: GZIPCompressionCodec crc:
2444301359
...

解码名为00000000000052368601.log的日志段文件:

# kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000052368601.log --print-data-log
offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321 payload: test message 1
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641 payload: test message 2
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431 payload: test message 3
offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true
payloadsize: 932 magic: 0 compresscodec: GZIPCompressionCodec crc:
2444301359 payload: test message 4
...

还可以使用此工具验证日志段附带的索引文件,索引用于查找日志段中的消息,如果消息被破坏,将导致错误的使用。只要broker在buclean状态启动,就会执行验证,但也可以手动执行。有两种检查索引的选项,这取决于你想要进行多少检查,选项–index-sanity-check将检查索引是否处于可用状态。–verify-index-only将检查索引中是否存在不匹配,而不会打印出所有的索引项。
如:验证00000000000052368601.log是否损坏:

# kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000052368601.index,00000000000052368601.log
--index-sanity-check
Dumping 00000000000052368601.index
00000000000052368601.index passed sanity check.
Dumping 00000000000052368601.log
Starting offset: 52368601
offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431
...

Replica Verification 副本验证

分区复制的工作原理类似于普通的kafka客户端,follower的broker在最老的offset开始复制,并定期检查磁盘当前的offset。当复制停止并重新启动时,它从最后要给检查点获取数据,以前的复制的日志段可以从broker中删除,在这种情况下,follower 不会填补空白。
为了验证topic分区的副本在集群中是否相同,可以使用kafka-replica-verification.sh进行验证,次攻击从给懂的topic分区集的所有副本中获取消息。并检查所有副本上是否存在所有的消息,必须为该工具提供一个正则表达式。以匹配希望验证的topic,如果没有提供,则验证所有的topic。还必须提供要连接的broker的显式列表。

  • Caution: Cluster Impact Ahead 集群碰撞
    副本验证工具对集群的影响类似于重新分配分区,因为它必须从旧的offset读取所有的消息,以验证副本。此外,它并行的读取一个分区的所有副本,因此应该谨慎使用。
    例如,验证broker1和broker2上以my-开头的topic副本:
# kafka-replica-verification.sh --broker-list
kafka1.example.com:9092,kafka2.example.com:9092 --topic-white-list 'my-.*'
2016-11-23 18:42:08,838: verification process is started.
2016-11-23 18:42:38,789: max lag is 0 for partition [my-topic,7]
at offset 53827844 among 10 partitions
2016-11-23 18:43:08,790: max lag is 0 for partition [my-topic,7]
at offset 53827878 among 10 partitions

Consuming and Producing 生产和消费

在使用ApacheKafka的时候,你经常会发现需要手动使用消费或者生产一些示例消息,以验证应用程序的运行情况。提供了两个实用程序。kafka-console-consumer.sh 和 kafka-console-producer.sh。这是围绕java客户端端的包装器,允许你与kafka的topic交互,而无须编写整个应用程序。

  • Piping Output to Another Application 将输出管道传输到另外一个程序
    虽然可以编写围绕控制台消费者或者生产者的应用程序,例如,使用消息并将其传输到另外一个应用程序进行处理,但是这种类型的应用程序相当脆弱,应该避免这么做。很难以不丢消息的方式与控制台的使用者进行交互。同样,控制台的生成器也不允许使用所有的特性,正确的发送字节也需要技巧,最好是直接使用java客户端库,或者直接使用kafka协议的其他语言的第三方客户端库。

Console Consumer 控制台消费者

kafka-console-consumer.sh 提供了一种使用来自kafka集群中的一个或者多个topic消息的方法,消息以标准输出的方式打印,然后分隔。默认情况下,它不使用格式输出消息中的原始字节,以下各段描述了所需的选项。

  • Checking Tool Versions 检查工具版本
    使用与kafka集群相同的版本的消费者非常重要,较老的控制台用户可能会通过不正确的方式与zookeeper交互而损坏集群。

第一个选项是指定是否使用新的消费者,并让配置指向kafka集群本身,在使用较老的消费者的时候,唯一要的参数是–zookeeper选项。后面是集群的连接字符串。从上面的例子来看,这可能是–zookeeper zoo1.example.com:2181/kafka-cluster。 如果使用新的消费者,旧必须指定–new-consumer的标识和–broker-list选项。后面是逗号分隔的broker列表。如:–broker-list
kafka1.example.com:9092,kafka2.example.com:9092。
接下来必须指定要使用的topic,为此提供了三种选择。–topic, --whitelist, 和 --blacklist。只能使用其中一种。–topic选项指定要使用的单个topic。–whitelist与–blacklist每个选项后面都跟一个正则表达式,记住要对正则表达式转义
这样它就不会被shell命令修改。白名单将使用与正则表达式匹配的所有topic,而黑名单将使用除正则表达式匹配的topic之外的所有topic。
样例如下:

# kafka-console-consumer.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --topic my-topic
sample message 1
sample message 2
^CProcessed a total of 2 messages
#

除了基本的命令行选项之外,还恶意将任何普通的用户配置选项传递给控制台用户,这可以通过两种方式完成,具体取决于你需要传递的选项的数量以及你喜欢的方式,第一个是通过指定提供消费者的配置文件,–consumer.config CONFIGFILE需要包含配置文件的完整路径。另外一种方法是使用表单的一个或者多个参数在命令行上指定选项,消费者属性KEY=VALUE,其中key是配置选项名。VALUE是设置它的值,者对于消费者选项如设置消费者组ID非常有用。

  • Confusing Command-Line Options 命令行参数混淆
    控制台的生产者和消费者都有一个–property选项。不要将其与–consumer-property和–producer-property混淆。–property仅仅用于将配置传递给消息格式化程序,而不是客户端本身。

控制台消费者还有一些其他的参数,你应该知道:

  • –formatter CLASSNAME 指定用于解码消息的消息格式化程序类。默认设置为:kafka.tools.DefaultFormatter.
  • –from-beginning 使用从旧的offset中读取topic特定消息。否则,消费者从最近开始读取。
  • –max-messages NUM
    消费者在退出之前消费最多的num个消息。
  • –partition NUM
    只使用ID NUM标识分区。
Message formatter options消息格式化器选项

除了默认的格式化器外,还有三种供选择:

  • kafka.tools.LoggingMessageFormatter 使用日志记录器输出消息,而不是标准的输出,消息在信息级别上打印,包括时间戳,key和value
  • kafka.tools.ChecksumMessageFormatter
    仅仅打印校验值
  • kafka.tools.NoOpMessageFormatter
    消耗但是不输出消息。
    kafka.tools.DefaultMessageFormatter 也能通过–property传递选项:
  • print.timestamp 设置为true则显示每个消息的时间戳。
  • print.key 设置为true,除显示value之外还显示key。
  • key.separator 指定打印消息key和value之间的分隔符。
  • line.separator 指定消息之间的分隔符。
  • key.deserializer 提供一个类名,打印前对key进行反序列化。
  • value.deserializer 提供一个类名,用于打印前反序列化消息value。
    反序列化器必须实现org.apache.kafka.common.serialization.Deserializer接口。控制台消费者将对他们调用tostring方法以获得要显示的输出。通常,你可以将这些反序列化器实现为java类,通过在执行kafka_console_consumer.sh之前设置的classpath环境变量,将其插入到控制台消费者的类路径中。
Consuming the offsets topics 指定offset消费topic

再某些时候,查看集群的消费者组提交了哪些offset是非常关键的。你可能想知道某个特定的组是否正在提交offset,或者offset提交的频率是多少。这可以通过使用控制台消费者对__consumer_offsets这个特殊的内部topic进行消费来实现。为了解码这个topic总的消息,必须使用格式化程序kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter。
样例如下:

# kafka-console-consumer.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --topic __consumer_offsets
--formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessage
Formatter' --max-messages 1
[my-group-name,my-topic,0]::[OffsetMetadata[481690879,NO_METADATA]
,CommitTime 1479708539051,ExpirationTime 1480313339051]
Processed a total of 1 messages
#

Console Producer 控制台生产者

与控制台消费者程序类似,kakfa-console-producer.sh工具可以用于将消息写入该集群的kafka的topic中,默认情况下,每行读取一条消息,用tab分隔key和value。如果没有tab,则key为null。

  • Changing Line-Reading Behavior 你可以提供自己的类来按行读取,以便进行自定义操作。你创建的类必须继承kafka.common.MessageReader并将负责创建ProducerRecord.属性再命令行上指定类,行阅读器选项,并确保包含的类的JAR再类的路径中。

控制台生产者中要求必须提供两个参数,–broker-list 用于指定一个或者多个broker。通过hostname:port格式,进行提供。–topic 指定你需要写入的topic。当你写入完成之后,发送一个文件结束字符EOF来关闭客户端。
样例如下,给my-topic写入信息:

# kafka-console-producer.sh --broker-list
kafka1.example.com:9092,kafka2.example.com:9092 --topic my-topic
sample message 1
sample message 2
^D
#

与控制台生产者一样,你也可以将任何普通的生产者配置选项传递给控制台生产者。这可以通过两种方式来完成,具体取决于你需要传递选项的参数以及你喜欢的方式。第一种方法是通过–producer.config CONFIGFILE指定生产者配置文件。其中CONFIGFILE是包含配置选项文件的完整路径。另外一种方法是再命令行上用表单的一个或者多个参数进行指定。–producer-property KEY=VALUE。其中KEY是配置选项名,VALUE是要设置的值。这对于消费者批处理配置等非常有用。如linger.ms 和 batch.size。
控制台生产者提供了很大命令行参数来调整其行为,一些更有用的选项是:

  • –key-serializer CLASSNAME 指定消息key序列化的编码类,默认为kafka.serializer.DefaultEncoder。

  • –value-serializer CLASSNAME 指定消息value序列化编码类,默认为kafka.serializer.DefaultEncoder。

  • –compression-codec STRING
    指定再生成消息时使用的压缩类型,这可以是一个gzip,snappy或者lz4。默认是gzip。

  • –sync
    同步生成消息,再发送下一条消息之前等待每个消息的ack。

  • Creating a Custom Serializer 创建消息的自定义序列化器,必须继承kafka.serializer.Encoder 这可以额从标准输入获取字符串,并将他们转换为适合的topic。如Avro和Protobuf。

Line-Reader Options 行读取选项

kafka.tools.LineMessageReader 负责读取标准输入和创建生产者记录,也有几个有用的选项,可以通过–property传递到控制台生产者:

  • ignore.error 设置为false在解析时抛出异常,key设置为真,且不存在key分隔符,默认值为true。
  • parse.key
    设置为false总是将key设置为空,默认值为true。
    -key.separator
    指定读取时在消息key和消息value之间使用分隔字符,默认为tab。
    在生成消息的时候,LineMessageReader 将在key.separator的第一个实例上进行分隔输入,如果之后没有字符,消息的value将为空,如果行上没有key分隔符,或者parse.key为false的时候,key为空。

Client ACLs 客户端acls

命令行工具kafka-acls.sh提供了kafka客户端的访问控制与交互,Apache kafka网站提供了关于acl的安全性和其他的文档。

Unsafe Operations 不安全操作

有戏管理任务在技术上是可行的,但是除非在最极端的情况下,否则不应该尝试。通常是在诊断问题并没有其他选择的时候,或者是发现了需要临时解决的特定错误。这些任务通常没有文档记录,不受支持。并且会给应用程序带来一定的风险。
这里记录了其中一些较为常见的任务,以便在紧急情况下可以选择恢复。不建议在正常的集群中使用他们,应该在执行之前仔细考虑。

  • Danger: Here Be Dragons 本节的操作涉及直接使用存储在zookeeper中的元的集群数据,这可能是一个非常危险的操作,所以你必须非常消息,不要直接修改zookeeper中的信息,除非有其他的说明。

Moving the Cluster Controller 移动集群控制器

每个kafka集群都有一个控制器,它是一个在broker中运行的thread。控制器负责监督集群的操作,有时候需要强制将控制器移动到另外一个broker。一个这样的例子是当控制器遇到异常或者其他问题,使其无法运行而无法正常工作的时候,这些情况下移动控制器的风险并不高,但是这不是一项正常的任务,不应该定期执行。当前做为控制器的broker使用名为/controller的集群路径的顶层的zookeeper的节点注册。手动删除这个zookeeper节点将导致当前控制器退出,集群将选择一个新的控制器。

Killing a Partition Move 终止分区移动

分区重新分配的正常操作流程为:

  • 1.请求重新分配(创建zookeeper节点)。

  • 2.集群控制器向添加的新的broker添加分区。

  • 3.新的broker开始复制每个分区,知道它同步。

  • 4.集群控制器从分区复制列表中删除旧的broker。
    因为所有的重新分配都是在请求时并行重新启动,所有通常没有理由尝试取消正在进行的重新分配。一个例外是当broker在重新分配过程中失败而不能立即重新启动的时候。这将导致无法完成重新分配,从而排除启动任何额外的重新分配,例如从失败的broker中删除分区并将他们分配给其他的broker,在这种情况下,可能会使集群忘记所有的重新分配。
    要删除正在进行的分区重新分配:

  • 1.从zookeeper节点上kakfa集群的路径中删除/admin/reassign_partitions。

  • 2.强制控制器移动。请参阅前文移动集群控制器的详细过程。

  • Checking Replication Factors 检查副本因子
    在删除正在进行的分区移动的时候,任何尚未完成的分区都不会执行从复制列表中删除旧的broker的步骤。这意味着某些分区的复制因子可能大于预期。broker不允许对具有不一致副本因子的分区,(例如增加分区)的topi进行一些管理操作。建议检查仍在进行中的分区。并在重新分配的另外一个分区时确保它的复制因子是正确的。

Removing Topics to Be Deleted 删除topic

当使用命令行工具删除topic的时候,zookeeper节点请求创建删除操作。在正常情况下,集群会立即执行此操作,但是,命令行工具无法知道集群中是否启用了topic删除操作,因此,无论如何,它都会请求删除topic,如果禁用了删除,则会导致意外的结果,可能集群会将删除操作挂起,以避免这种情况。
通过在zookeeper上的/admin/delete_topic节点创建一个子节点来删除topic。该节点以topic命名。删除这些zookeeper节点,将删除挂起的请求。

Deleting Topics Manually 手动删除

如果你运行的集群禁用了删除topic。或者你发现自己需要删除正常的操作流程之外的一些topic,那么可以从集群中手动删除他们。但是这需要完全关闭集群中的所有borker,并且在集群中的任何broker都在运行时不能这样做。

  • Shut Down Brokers First 关闭首选broker
    当集群在线的时候,在zookeeper中修改集群的元数据是一项非常危险的操作,会使集群处于不稳定状态,当集群在线的时候,不要试图删除或者修改zookeeper的topic的元数据。

从集群中删除topic的过程:

  • 1.关闭集群中的所有broker。
  • 2.删除zookeeper中的/brokers/topics/TOPICNAME节点,注意,删除此节点之前必须首先删除子节点。
  • 3.从每个broker的日志牡蛎中删除分区目录。他们将被命名为TOPICNAME-NUM.其NUM是分区ID。
  • 4.重启所有的broker。

Summary 总结

运行kafka集群是一项艰巨的任务,需要进行大量的配置和维护工作,以保持系统在最高性能下能运行。在本章中,我们讨论了许多日常的任务,比如管理topic和经常需要处理的客户端配置。我们还介绍了调试问题所需要的一些更深的任务,比如检查日志段。最后,我们介绍了一些虽然不安全或者例行的操作,但是可以用于摆脱棘手的情况,总之,这些工具将帮助你管理kafka集群。
当然,如果没有适当的监控,管理集群是不可能的,第十章将讨论监控集群和集群的运行情况的操作方法,这样你可以七二班kafka工作良好,我们还将提供监控客户端的最佳办法,报告生产者和消费者。

09 Confluent_Kafka权威指南 第九章:管理kafka集群相关推荐

  1. 08 Confluent_Kafka权威指南 第八章:跨集群数据镜像

    文章目录 CHAPTER 8 Cross-Cluster Data Mirror 跨集群数据镜像 Use Cases of Cross-Cluster Mirroring 跨集群镜像用例 Multic ...

  2. 02 Confluent_Kafka权威指南 第二章:安装kafka

    文章目录 CHAPTER 2 Installing Kafka kafka的安装配置 First Things First Choosing an Operating System Installin ...

  3. 融云发送自定义消息_数据源管理 | Kafka集群环境搭建,消息存储机制详解

    一.Kafka集群环境 1.环境版本 版本:kafka2.11,zookeeper3.4 注意:这里zookeeper3.4也是基于集群模式部署. 2.解压重命名 tar -zxvf kafka_2. ...

  4. 数据源管理 | Kafka集群环境搭建,消息存储机制详解

    本文源码:GitHub·点这里 || GitEE·点这里 一.Kafka集群环境 1.环境版本 版本:kafka2.11,zookeeper3.4 注意:这里zookeeper3.4也是基于集群模式部 ...

  5. 第10章-管理Hadoop集群-hadoop 安全模式相关知识点

    为什么80%的码农都做不了架构师?>>>    1.namenode启动时,namenode节点都做了哪些动作?fsimage和edits有什么变化? 2.namenode什么时候开 ...

  6. Kafka Without ZooKeeper ---- 不使用zookeeper的kafka集群

    不使用zookeeper的kafka集群 前言 ZooKeeper的缺点 Kakfa Without ZooKeeper简介 Kakfa Without ZooKeeper的优势 总结 参考链接 前言 ...

  7. 10 Kafka集群与运维

    Kafka集群与运维 10.1 集群应用场景 10.1.1 消息传递 Kafka可以很好地替代传统邮件代理.消息代理的使用有多种原因(将处理与数据生产者分离,缓冲未处理的消息等).与大多数邮件系统相比 ...

  8. 4.2.5 Kafka集群与运维(集群的搭建、监控工具 Kafka Eagle)

    Kafka集群与运维 文章目录 Kafka集群与运维 1.集群的搭建 1.1 搭建zookeeper集群 1.1.1 上传JDK到linux,安装并配置JDK 1.1.2. Linux 安装Zooke ...

  9. 4.2.9 Kafka集群与运维, 应用场景, 集群搭建, 集群监控JMX(度量指标, JConsole, 编程获取, Kafka Eagle)

    目录 3.1 集群应用场景 1 消息传递 2 网站活动路由 3 监控指标 4 日志汇总 5 流处理 6 活动采集 7 提交日志 总结 3.2 集群搭建 3.2.1 Zookeeper集群搭建 3.2. ...

最新文章

  1. 前端抱怨 API 响应慢,怎么办?
  2. 阿里,腾讯,拼多多面试必挂:面对千万级、亿级流量怎么处理?
  3. 搭建免费ftp服务,视频演示
  4. JavaScript Bitwise NOT Operator
  5. excel如何快速实现数据区域的框选
  6. 0.IDA-基本的反汇编算法
  7. 在未启动程序情况 点击视图设计器 弹出未将对象引用窗体的解决方案
  8. shell语法 06-Linux文本处理-grep
  9. 数据:以太坊2.0存款合约新增9.4万ETH
  10. 无人驾驶飞机来了!空难后波音的电动飞机你敢乘吗?
  11. kernel编译速度提高
  12. 单片机蜂鸣器的控制程序与驱动电路图
  13. 黎曼猜想能用计算机算吗,关于黎曼猜想的计算机验证
  14. 智能生活管家项目之一-系统简介
  15. excel缩字间距_如何取消字体间距 excel字体间距紧缩
  16. vue中a标签的href属性的写法
  17. 关于input:-webkit-autofill样式问题
  18. 安装VMware的VM Tools
  19. 如何评价「仙剑奇侠传六」使用Unity 3D引擎?
  20. vs 没法f12_键盘快捷键 - F12不再适用于Visual Studio

热门文章

  1. java array缓存_有java数组
  2. 日语学习 第4篇 部屋(へや)に机(つくえ)と椅子(いす)があります
  3. 在HTML中如何加入一个PDF文件,怎么给pdf文件插入页面?
  4. 修正波逆变器的设计要点
  5. 微信小程序里使用weui的正确打开方式
  6. 量子位智库报告:三分钟看懂ChatGPT | 附下载
  7. 十八种让你有用的饲料配方
  8. 人体神经构成结构示意图,神经元结构示意图简易
  9. java iw_java知识回顾 - osc_iwr5mti2的个人空间 - OSCHINA - 中文开源技术交流社区
  10. 公网对讲机与传统对讲机的发展与融合分析