kafka-topics.sh的使用方式

  • 一、kafka的基本操作
    • 1.1、创建topic
    • 1.2、查看topic
    • 1.3、查看topic属性
    • 1.4、发送消息
    • 1.5、消费消息
  • 二、kafka-topics.sh 使用方式
    • 2.1、查看帮助
    • 2.2、副本数量规则
    • 2.3、创建主题
    • 2.4、查看broker上所有的主题
    • 2.5、查看指定主题 topic 的详细信息
    • 2.6、修改主题信息之增加主题分区数量
    • 2.7、删除主题
  • 后言

一、kafka的基本操作

1.1、创建topic

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

参数说明:

  1. –create 是创建主题的的动作指令。
  2. –zookeeper 指定kafka所连接的zookeeper服务地址。
  3. –replicator-factor 指定了副本因子(即副本数量); 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份Partitions分区数。
  4. –partitions 指定分区个数;多通道,类似车道。
  5. –topic 指定所要创建主题的名称,比如test。

成功则显示:

Created topic "test".

1.2、查看topic

sh kafka-topics.sh --list --zookeeper localhost:2181

显示:

test

1.3、查看topic属性

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

显示:

Topic:test   PartitionCount:1    ReplicationFactor:1 Configs:Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

1.4、发送消息

sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

发送端输入:

>hello
>where are you
>let’s go

1.5、消费消息

sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
--from-beginning

消费端显示:

hello
where are you
let’s go
^CProcessed a total of 3 messages

二、kafka-topics.sh 使用方式

创建、修改、删除以及查看等功能。

2.1、查看帮助

/bin目录下的每一个脚本工具,都有着众多的参数选项,不可能所有命令都记得住,这些脚本都可以使用 --help 参数来打印列出其所需的参数信息。

$ sh kafka-topics.sh --help
Command must include exactly one action: --list, --describe, --create, --alter or --delete
Option                                   Description
------                                   -----------
--alter                                  Alter the number of partitions,        replica assignment, and/or           configuration for the topic.
--config <String: name=value>            A topic configuration override for the topic being created or altered.The   following is a list of valid         configurations:                      cleanup.policy                        compression.type                      delete.retention.ms                   file.delete.delay.ms                  flush.messages                        flush.ms                              follower.replication.throttled.       replicas                             index.interval.bytes                  leader.replication.throttled.replicas max.message.bytes                     message.downconversion.enable         message.format.version                message.timestamp.difference.max.ms   message.timestamp.type                min.cleanable.dirty.ratio             min.compaction.lag.ms                 min.insync.replicas                   preallocate                           retention.bytes                       retention.ms                          segment.bytes                         segment.index.bytes                   segment.jitter.ms                     segment.ms                            unclean.leader.election.enable        See the Kafka documentation for full   details on the topic configs.
--create                                 Create a new topic.
--delete                                 Delete a topic
--delete-config <String: name>           A topic configuration override to be   removed for an existing topic (see   the list of configurations under the --config option).
--describe                               List details for the given topics.
--disable-rack-aware                     Disable rack aware replica assignment
--force                                  Suppress console prompts
--help                                   Print usage information.
--if-exists                              if set when altering or deleting       topics, the action will only execute if the topic exists
--if-not-exists                          if set when creating topics, the       action will only execute if the      topic does not already exist
--list                                   List all available topics.
--partitions <Integer: # of partitions>  The number of partitions for the topic being created or altered (WARNING:   If partitions are increased for a    topic that has a key, the partition  logic or ordering of the messages    will be affected
--replica-assignment <String:            A list of manual partition-to-broker   broker_id_for_part1_replica1 :           assignments for the topic being      broker_id_for_part1_replica2 ,           created or altered.                  broker_id_for_part2_replica1 :                                                broker_id_for_part2_replica2 , ...>
--replication-factor <Integer:           The replication factor for each        replication factor>                      partition in the topic being created.
--topic <String: topic>                  The topic to be create, alter or       describe. Can also accept a regular  expression except for --create option
--topics-with-overrides                  if set when describing topics, only    show topics that have overridden     configs
--unavailable-partitions                 if set when describing topics, only    show partitions whose leader is not  available
--under-replicated-partitions            if set when describing topics, only    show under replicated partitions
--zookeeper <String: hosts>              REQUIRED: The connection string for    the zookeeper connection in the form host:port. Multiple hosts can be     given to allow fail-over.            

2.2、副本数量规则

副本数量不能大于broker的数量。
kafka 创建主题的时候其副本数量不能大于broker的数量,否则创建主题 topic 失败。

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test1

报错:

Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2022-11-24 14:08:18,745] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.(kafka.admin.TopicCommand$)

注意:副本数量和分区数量的区别。

2.3、创建主题

创建主题时候,有3个参数是必填的:

  1. –partitions(分区数量)、
  2. –topic(主题名) 、
  3. –replication-factor(复制系数),

同时还需使用 --create 参数表明本次操作是想要创建一个主题操作。

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

返回显示:

Created topic "test1".

另外在创建主题的时候,还可以附加以下两个选项:–if-not-exists 和 --if-exists . 第一个参数表明仅当该主题不存在时候,创建; 第二个参数表明当修改或删除这个主题时候,仅在该主题存在的时候去执行操作。

2.4、查看broker上所有的主题

–list。

 sh kafka-topics.sh --list --zookeeper localhost:2181

结果显示:

__consumer_offsets
test
test1

2.5、查看指定主题 topic 的详细信息

–describe。

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

结果显示:

Topic:test1  PartitionCount:1    ReplicationFactor:1 Configs:Topic: test1    Partition: 0    Leader: 0   Replicas: 0 Isr: 0

2.6、修改主题信息之增加主题分区数量

–alter。

sh kafka-topics.sh --zookeeper localhost:2181 --topic test1 --alter --partitions 2

结果显示:

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

查看主题信息:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

可以看到已经成功的将主题的分区数量从1修改为了2。

Topic:test1  PartitionCount:2    ReplicationFactor:1 Configs:Topic: test1    Partition: 0    Leader: 0   Replicas: 0 Isr: 0Topic: test1  Partition: 1    Leader: 0   Replicas: 0 Isr: 0

当去修改一个不存在的topic信息时(比如修改主题 test2,当前这主题是不存在的)。

sh kafka-topics.sh --zookeeper localhost:2181 --topic test2 --alter --partitions 2

会报错:

Error while executing topic command : Topic test2 does not exist on ZK path localhost:2181
[2022-11-24 14:21:33,564] ERROR java.lang.IllegalArgumentException: Topic test2 does not exist on ZK path localhost:2181at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:123)at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)at kafka.admin.TopicCommand.main(TopicCommand.scala)(kafka.admin.TopicCommand$)

注意:不要使用 --alter 去尝试减少分区的数量,如果非要减少分区的数量,只能删除整个主题 topic, 然后重新创建。

2.7、删除主题

–delete。

sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1

日志信息提示,主题 test1已经被标记删除状态,但是若delete.topic.enable 没有设置为 true , 则将不会有任何作用。

Topic test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

可以测试一些:

# 一个终端启动生产者:
sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1# 另一个终端启动消费者:
sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test1--from-beginning

发现此时还是可以发送消息和接收消息。如果要支持能够删除主题的操作,则需要在 /bin 的同级目录 /config目录下的文件server.properties中,修改配置delete.topic.enable=true(如果置为false,则kafka broker 是不允许删除主题的)。

然后就重启kafka:

# 停止:
sh kafka-server-stop.sh -daemon ../config/server.properties
# 启动:
sh kafka-server-start.sh -daemon ../config/server.properties

再次删除就可以了。

sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1

后言

本专栏知识点是通过<零声教育>的系统学习,进行梳理总结写下文章,对c/c++linux系统提升感兴趣的读者,可以点击链接,详细查看详细的服务:C/C++服务器课程

Kafka之kafka-topics.sh的使用方式相关推荐

  1. flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic

    flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic 一.flume基于kafka.topics匹配Kafka Topic ...

  2. 【kafka】kafka 脚本 kafka-run-class.sh 使用介绍 jmx监控 查看jmx信息

    文章目录 1.概述 2.Consumer Offset Checker 3.Dump Log Segment 4.导出Zookeeper中Group相关的偏移量 5.通过JMX获取metrics信息 ...

  3. discard connection丢失数据_python kafka 生产者发送数据的三种方式

    python kafka 生产者发送数据的三种方式 发送方式 同步发送 发送数据耗时最长 有发送数据的状态,不会丢失数据,数据可靠性高 以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断 ...

  4. 【kafka】kafka kerberos KeeperErrorCode = InvalidACL for /config/topics

    文章目录 1.概述 1.概述 环境搭建参考:[kafka]kafka 消费 带有 kerberos认证的服务器 在这个kerberos认证的环境下,我想测试一下,构建zk的客户端是否需要认证,然后代码 ...

  5. kafka实战教程(python操作kafka),kafka配置文件详解

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...

  6. 【檀越剑指大厂—kafka】kafka高阶篇

    一.认识 kafka 1.kafka 的定义? Kafka 传统定义:Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域.发布/订阅:消 ...

  7. Kafka:Kafka核心概念

    1 消息系统简介 1.1 为什么要用消息系统 ? 解耦 各位系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在: 冗余 部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险: 灵活性 ...

  8. SpringBoot整合kafka之kafka分区实战

    本文来说下SpringBoot整合kafka之kafka分区实战 文章目录 准备工作 程序代码 程序测试 本文小结 准备工作 当然我们也可以不手动创建topic,在执行代码kafkaTemplate. ...

  9. Kafka系列 —— Kafka监控

    Kafka系列文章: Kafka系列 -- 入门及应用场景 & 部署 & 简单测试 Kafka系列 -- Kafka核心概念 Kafka系列 -- Kafka常用命令 常见Kafka监 ...

最新文章

  1. 看完就忘!看完就忘!我感觉我要废了
  2. 2019.03.28 bzoj3594: [Scoi2014]方伯伯的玉米田(二维bit优化dp)
  3. A Scala Tutorial for Java Programmers
  4. 操作系统(一)操作系统的概念、功能
  5. linux路由表命令
  6. 一文带你了解华为云DevCloud为何能全面领跑中国DevOps云服务市场
  7. python 中缩进—— tab 还是空格是不一样的,一般不能混用,除非设置Tab自动替换成空格
  8. IPv4和IPv6比特转发率和包转发率的关系
  9. 跟华为悦盒V9U机顶盒拼了
  10. 国内外最顶级的十大敏捷项目管理软件【2022】
  11. 什么是实例?什么是引用?
  12. [轉貼]奋斗5年从月薪3500到700万!
  13. 带你玩转Jetson Xavier NX系列教程 | Xavier NX刷机教程
  14. 白杨SEO:流量红利消失,现在都在各渠道做推广,我们还有必要做官方网站吗?怎么做呢?
  15. Kraljic采购定位模型
  16. Gson:GitHub 标星 18K 的 JSON 解析器,Google 出品的 Java JSON 解析器,强烈推荐!
  17. Electron桌面应用打包流程详情
  18. RISC-V IDE MRS使用笔记(七) :常用开发技巧汇总
  19. 抖音私信规则分析丨抖音企业号私信规则解读
  20. OpenGL总结9-万向锁

热门文章

  1. windows内核开发学习笔记十八:IRP 处理的标准模式
  2. 《期权、期货及其他衍生产品》读书笔记(第九章:价值调节量)
  3. 对话Nodebrick创始人:区块链游戏需要更多玩家,韩国开发者偏保守
  4. Python爬虫入门一(爬虫基础)
  5. SAP权限管理的基本概念
  6. jlink修复固件教程
  7. 这就是裸金属服务器?
  8. SDL游戏开发之四-卡马克卷轴
  9. 真肝,整理了一周的Spring面试大全【含答案】,吊打Java面试官
  10. 技能提升--1枚程序员的普通话练习