常用命令

Kafka内部提供了许多管理脚本,这些脚本都放在$KAFKA_HOME/bin目录下,而这些类的实现都是放在源码的kafka/core/src/main/scala/kafka/tools/路径下。

topic相关

kafka-topics.sh

kafka-topics.sh用于维护topic。包括create, delete, describe, change

#创建topic
kafka-topics.sh --create --bootstrap-server kafka1:9092 --replication-factor 2 --partitions 6 --topic topic.real.mes.huoju
kafka-topics.sh  --list --bootstrap-server kafka1:9092
#删除topic。有更多其他配合,不一定删除得了。
kafka-topics.sh  --delete --bootstrap-server kafka1:9092  --topic topic.real.mes

options:

Option Description
–create 创建topic.
–describe 获取topic信息.
–list List all available topics.
–alter 修改topic:Alter the number of partitions,replica assignment, and/or configuration for the topic.
–bootstrap-server 必需。Kafka server列表
–command-config 配置文件
–config <String: name=value> 配置信息。
–delete-config <String: name> 不支持 --bootstrap-server option.
–disable-rack-aware Disable rack aware replica assignment
–exclude-internal 排除内部topic
–force Suppress console prompts
–if-exists 仅topic存在才执行,不支持 --bootstrap-server.
–if-not-exists 仅topic不存在才执行,不支持 --bootstrap-server.
–partitions <Integer: # of partitions> 创建或修改的partition 列表
–replica-assignment 副本与broker id赋值。
–replication-factor <Integer:replication factor> 未设置,使用cluster默认值.
–topic <String: topic> topic名称. 除了–create,其他都支持正则表达式,用双引号包括。\用于转义
–topics-with-overrides describe topic时,仅显示have overridden configs
–unavailable-partitions describe topic时,仅显示leader is not available
–under-min-isr-partitions describe topic时,仅显示比指定数字小的isr。不支持 --zookeeper
–at-min-isr-partitions describe topic时,仅显示等于指定数字的isr
–under-replicated-partitions describe topic时,仅显示指定分区
–version Display Kafka version.
–zookeeper <String: hosts> zookeeper列表。(废弃).

生产者相关

kafka-console-producer.sh

发送数据到topic

kafka-console-producer.sh --broker-list kafka1:9092 --topic topic.real.mes

options:

Option Description
–batch-size <Integer: size> 单个批处理中发送的消息数(default: 200)
–bootstrap-server
–broker-list <String: broker-list> 废弃。
–compression-codec [String:compression-codec] 压缩编解码器: either ‘none’, ‘gzip’, ‘snappy’, ‘lz4’, or ‘zstd’. 默认值:gzip
–line-reader <String: reader_class> 从标准输入读取信息的class类名称。 (default: kafka.tools.ConsoleProducer$LineMessageReader)
–max-block-ms <Long: ms> 在发送请求期间,生产者将阻止的最长时间。(default: 60000)
–max-memory-bytes <Long: bytes> 内存(default: 33554432,23M)
–max-partition-memory-bytes <Long: bytes> 为分区分配的缓冲区大小(default: 16384,16K)
–message-send-max-retries 最大的重试发送次数(default: 3)
–metadata-expiry-ms <Long: > 强制更新元数据的时间阈值(ms)(default: 300000)
–producer-property <String:producer_prop> 将自定义属性传递给生成器的机制。形如:key=value
–producer.config <String: config file> 生产者配置属性文件。[–producer-property]优先于此配置
–property <String: prop> 自定义消息读取器。
–request-required-acks String: 生产者请求的确认方式。0、1(默认值)、all
–request-timeout-ms <Integer: ms> 生产者请求的确认超时时间 默认值:1500
–retry-backoff-ms 生产者重试前,刷新元数据的等待时间阈值(default: 100)
–socket-buffer-size <Integer: size> TCP接收缓冲大小. (default: 102400)
–sync 同步发送消息
–timeout <Integer: timeout_ms> 异步发送模式,超时时间。默认值:1000
–topic
–version

消费者相关

kafka-console-consumer.sh

kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic topic.real.mes

options:

参数 值类型 说明 有效值
–topic string 被消费的topic
–whitelist string 正则表达式,指定要包含以供使用的主题的白名单
–partition integer 指定分区 除非指定’–offset’,否则从分区结束(latest)开始消费
–offset string 执行消费的起始offset位置 默认值:latest latest earliest
–consumer-property string 将用户定义的属性以key=value的形式传递给使用者
–consumer.config string 消费者配置属性文件 请注意,[consumer-property]优先于此配置
–formatter string 用于格式化kafka消息以供显示的类的名称 默认值:kafka.tools.DefaultMessageFormatter kafka.tools.DefaultMessageFormatter kafka.tools.LoggingMessageFormatter kafka.tools.NoOpMessageFormatter kafka.tools.ChecksumMessageFormatter
–property string 初始化消息格式化程序的属性 print.timestamp=true|false print.key=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer>
–from-beginning 从存在的最早消息开始,而不是从最新消息开始
–max-messages integer 消费的最大数据量,若不指定,则持续消费下去
–timeout-ms integer 在指定时间间隔内没有消息可用时退出
–skip-message-on-error 如果处理消息时出错,请跳过它而不是暂停
–bootstrap-server string 必需(除非使用旧版本的消费者),要连接的服务器
–key-deserializer string
–value-deserializer string
–enable-systest-events 除记录消费的消息外,还记录消费者的生命周期 (用于系统测试)
–isolation-level string 设置为read_committed以过滤掉未提交的事务性消息 设置为read_uncommitted以读取所有消息 默认值:read_uncommitted
–group string 指定消费者所属组的ID
–blacklist string 要从消费中排除的主题黑名单
–csv-reporter-enabled 如果设置,将启用csv metrics报告器
–delete-consumer-offsets 如果指定,则启动时删除zookeeper中的消费者信息
–metrics-dir string 输出csv度量值 需与[csv-reporter-enable]配合使用
–zookeeper string 必需(仅当使用旧的使用者时)连接zookeeper的字符串。 可以给出多个URL以允许故障转移

kafka-consumer-groups.sh

用于查询,维护消费组。

 #显示消费情况kafka-consumer-groups.sh --group consumer.group.realme123 --describe --bootstrap-server kafka1:9092
#设置到最晚offset
kafka-consumer-groups.sh --group consumer.group.realme123  --bootstrap-server kafka1:9092   --topic topic.real.mes --reset-offsets  --to-latest   --execute
#设置到指定offset
kafka-consumer-groups.sh --group consumer.group.realme123  --bootstrap-server kafka1:9092   --topic topic.real.mes --reset-offsets  --to-offset 80000000   --execute

options:

Option Description
–all-groups 应用所有消费组.
–all-topics 一个组消费的所有topic。用于reset-offsets.
–bootstrap-server
–by-duration <String: duration> 设置offset(离当前时间duration的位置). Format: ‘PnDTnHnMnS’
–command-config <String: config property file>
–delete 删除指定群组中的topic partition offsets and ownership。
–delete-offsets 删除offsets ,一次1个group,多个topic。
–describe
–from-file <String: path to CSV file> Reset offsets to values defined in CSV file.
–group <String: consumer group> The consumer group we wish to act on.
–help Print usage information.
–list List all consumer groups.
–members Describe members of the group. 仅支持 ‘–describe’、 ‘–bootstrap-server’ options
–offsets 描述group和topic。 仅支持 ‘–describe’、 ‘–bootstrap-server’ options
–state 描述状态。仅支持 ‘–describe’、 ‘–bootstrap-server’ options
–reset-offsets 重置offset。offset支持: --to-datetime,–by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current. 操作支持:–dry-run(默认), --execute,-- export
–dry-run 仅显示结果,不真正生效。
–execute 修改生效。
–export 导出操作 to a CSV file. Supported operations: reset-offsets.
–shift-by <Long: number-of-offsets> 指定离当前的偏移量 ‘n’,可以是正负值。
–to-current 当前offset.
–to-datetime <String: datetime> 指定时间。Format: ‘YYYY-MM-DDTHH:mm:SS.sss’
–to-earliest 最早.
–to-latest 最晚.
–to-offset <Long: offset> 指定offset.
–topic <String: topic> 指定topic。reset-offsets可以指定partition格式:topic1:0,1,2
–verbose 提供辅助信息。
–timeout <Long: timeout (ms)> (default: 5000)
–version

有些操作不能在topic被消费时执行,不然会提示:

​ Assignments can only be reset if the group ’ is inactive, but the current state is Stable

通用命令

kafka-run-class.sh

运行一个class,调用kafka的tools的部分功能。

kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts]

GetOffsetShell

kafka-run-class.sh kafka.tools.GetOffsetShell /?
#获取offset
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --topic topic.real.mes --time -1

options:

Option Description
–broker-list
–max-wait-ms <Integer: ms> 废弃。(default: 1000)
–offsets <Integer: count> 废弃。(default: 1)
–partitions <String: partition ids> partition id列表
–time <Long: > 时间戳。返回指定时间戳之前的offset。 timestamp/-1(latest,默认值)/-2(earliest)。如果时间戳大于当前时刻,无offset返回。
–topic <String: topic>

ConsumerOffsetChecker

主要是运行kafka.tools.ConsumerOffsetChecker类,对应的脚本是kafka-consumer-offset-checker.sh,会显示出Consumer的Group、Topic、分区ID、分区对应已经消费的Offset、logSize大小,Lag以及Owner等信息。

DumpLogSegments

验证日志索引是否正确,或者从log文件中直接打印消息。

ExportZkOffsets

导出Zookeeper中Group相关的偏移量。

JmxTool

打印出Kafka相关的metrics信息

KafkaMigrationTool

将Kafka 0.7上面的数据迁移到Kafka 0.8

MirrorMaker

同步两个Kafka集群的数据

服务管理

#启动kafka服务
kafka-server-start.sh
#停止kafka服务
kafka-server-stop.sh

问题

1、kafka 异常 WARN Error while fetching metadata with correlation id xxx

原因:从zookeeper获取到的kafka的信息,需要有外部监听,注意配置kafka的listeners。

修改config下的 server.properties 文件
将 listeners=PLAINTEXT://:9092
修改成listeners=PLAINTEXT://ip:9092

不同kafka 镜像的配置可能不相同:

      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181ALLOW_PLAINTEXT_LISTENER: 'yes'KAFKA_INTER_BROKER_LISTENER_NAME: INTERNALKAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXTKAFKA_CFG_LISTENERS: INTERNAL://:9092,EXTERNAL://:${MS_KAFKA_EXT_PORT}KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://${MS_KAFKA_EXT_HOST}:${MS_KAFKA_EXT_PORT}KAFKA_CFG_LOG_RETENTION_HOURS: 64
#或者KAFKA_ADVERTISED_HOST_NAME: 192.168.1.63KAFKA_ADVERTISED_PORT: 9192KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2181,zk3:2181DELETE_TOPIC_ENBLE: "true"KAFKA_BROKER_ID: 1

2、彻底删除topic

彻底删除Kafka中的topic

1)、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录

2)、Kafka 删除topic的命令是:

 ./bin/kafka-topics  --delete --zookeeper 【zookeeper server】  --topic 【topic name】#如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion#查看所有topic:./bin/kafka-topics --zookeeper 【zookeeper server】 --list
 此时你若想真正删除它,可以如下操作:(1)登录zookeeper客户端:命令:./bin/zookeeper-client(2)找到topic所在的目录:ls /brokers/topics(3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,
如果你删除了此处的topic,那么marked for deletion 标记消失
zookeeper 的config中也有有关topic的信息: ls /config/topics/【topic name】暂时不知道有什么用

总结:

彻底删除topic:

1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录

2、如果配置了delete.topic.enable=true直接通过命令删除,如果命令删除不掉,直接通过zookeeper-client 删除掉broker下的topic即可。

3 、查看某个group消费topic的offset,并重置。

使用 kafka-consumer-groups.sh 命令,见前面。

4、查看kafka版本

没有对应命令,进入kafka安装目录libs下。查看kafka_* 开头的jar包,

-rw-r--r-- 1 kafka kafka     821 Feb 14  2017 kafka_2.11-0.10.2.0-test-sources.jar.asc
-rw-r--r-- 1 kafka kafka 3452117 Feb 14  2017 kafka_2.11-0.10.2.0-test.jar
-rw-r--r-- 1 kafka kafka     821 Feb 14  2017 kafka_2.11-0.10.2.0-test.jar.asc
-rw-r--r-- 1 kafka kafka 5641281 Feb 14  2017 kafka_2.11-0.10.2.0.jar
-rw-r--r-- 1 kafka kafka     821 Feb 14  2017 kafka_2.11-0.10.2.0.jar.asc

2.11是 scala版本,0.10.2.0是 kafka版本。

参考

kafka的bin目录下的其他工具

connect-distributed.sh
connect-mirror-maker.sh
connect-standalone.sh
kafka-acls.sh
kafka-broker-api-versions.sh
kafka-configs.sh
kafka-consumer-perf-test.sh
kafka-delegation-tokens.sh
kafka-delete-records.sh
kafka-dump-log.sh
kafka-leader-election.sh
kafka-log-dirs.sh
kafka-mirror-maker.sh
kafka-preferred-replica-election.sh
kafka-producer-perf-test.sh
kafka-reassign-partitions.sh
kafka-replica-verification.sh
kafka-streams-application-reset.sh
kafka-verifiable-consumer.sh
kafka-verifiable-producer.sh
trogdor.sh
zookeeper-security-migration.sh
zookeeper-server-start.sh
zookeeper-server-stop.sh
zookeeper-shell.sh

kafka.tools下有什么类

git:https://github.com/apache/kafka/tree/2.5

路径:core/src/main/scala/kafka/tools。

ConsoleConsumer
ConsoleProducer
ConsumerPerformance
DumpLogSegments
EndToEndLatency
GetOffsetShell
JmxTool
MirrorMaker
PerfConfig
ReplicaVerificationTool
StateChangeLogMerger
StreamsResetter

文档

官网:http://kafka.apache.org/documentation/#gettingStarted

中文文档:https://kafka.apachecn.org/

kafka常用命令及问题解决相关推荐

  1. Kafka学习之四 Kafka常用命令

    2019独角兽企业重金招聘Python工程师标准>>> Kafka学习之四 Kafka常用命令 Kafka常用命令 以下是kafka常用命令行总结: 1.查看topic的详细信息 . ...

  2. Kafka常用命令(1):kafka-topics

    Kafka常用命令之:kafka-topics 概述 1. 创建Topic: --create 2. 查看Topic详细信息: --describe 3. 查看Topic列表: --list 4.修改 ...

  3. Kafka常用命令行命令

    文章目录 Kafka常用命令 kafka的基本操作(命令行操作) 1.启动集群: 2.查看当前服务器中的所有topic(在kafka目录下) 3.创建主题topic(在kafka目录下) 4.删除to ...

  4. kafka创建topic_一网打尽Kafka常用命令、脚本及配置,宜收藏!

    前言 通过前面 7 篇文章的介绍,小伙伴们应该对 Kafka 运行工作原理有一个相对比较清晰的认识了. Kafka是什么?一起来看看吧! Kafka 安装及简单命令使用 Kafka中消息如何被存储到B ...

  5. kafka java api 删除_Kafka入门系列—6. Kafka 常用命令及Java API使用

    常用命令 启动Zookeeper ./zkServer.sh start-foreground 可选参数: ./zkServer.sh {start|start-foreground|stop|res ...

  6. Gluster FS 部署复制卷与常用命令 常见问题解决

    在 liunx 下部署Gluster FS分布式文件系统,以及Gluster FS遇到的一些问题解决,常用命令 概述 Gluster FS 是一个开源分布式文件系统,具有强大的横向扩展能力,可支持数P ...

  7. git常用命令和问题解决

    常用命令: 克隆代码: git clone [url] 显示当前的git配置: git config --list 添加指定文件到暂存区: git add xxx 添加当前目录的所有文件到暂存区: g ...

  8. Kafka常用命令收录

    目录 目录 1 1. 前言 2 2. Broker默认端口号 2 3. 安装Kafka 2 4. 启动Kafka 2 5. 创建Topic 2 6. 列出所有Topic 3 7. 删除Topic 3 ...

  9. 一网打尽Kafka常用命令、脚本及配置,宜收藏!

    前言 通过前面 7 篇文章的介绍,小伙伴们应该对 Kafka 运行工作原理有一个相对比较清晰的认识了. Kafka是什么?一起来看看吧! Kafka 安装及简单命令使用 Kafka中消息如何被存储到B ...

最新文章

  1. 如何提高企业竞争力,科学的组织架构是第一王牌
  2. 智能车竞赛云端比赛第三天:一场在家具建材广场中的智能车比赛
  3. PyTorch 实现经典模型7:YOLO (v1, v2, v3, v4)
  4. 不同程序用不同网络_微信小程序直播登场,与平台直播有何不同?
  5. 闭包,sync使用细节
  6. android 重新启动应用程序,通过单击应用程序图标打开Android应用程序时重新启动...
  7. WebAPI基本封装
  8. 部署SpringBoot到阿里云
  9. 11.29--mappedBy
  10. 计算机电脑怎么开热点,电脑怎么设置wifi热点共享
  11. 空气质量等级c语言编程,华中科技大学C语言课设空气质量检测信息管理系统技术分析.docx...
  12. 神经网络加速器的兴起
  13. 邮箱接爱服务器端口填什么,你了解多少邮件端口及常用邮箱服务器?
  14. 业务数据分析最佳案例!旅游业数据分析!⛵
  15. centos添加桌面快捷方式
  16. Springboot 阿里云OSS修改下载文件名称
  17. java 判断手机访问_下面java代码判断是手机访问还是PC访问什么地方出错了,手机跳转不到制定页面,等待解答...
  18. 帕克西AR一键换发型,让你秒变潮人!
  19. RACI 责任分配矩阵
  20. 终于找到了一款好用的录屏软件了

热门文章

  1. 0301 - 一个比价的小项目
  2. Flink从入门到放弃之源码解析系列-第1章 Flink组件和逻辑计划
  3. MicroPython开发板:TPYBoard v102 播放音乐实例
  4. AsyncTask的理解
  5. mysql数据库导出模型到powerdesigner,PDM图形窗口中显示数据列的中文注释
  6. DNS子域授权及view(三)
  7. MYSQL的索引类型:PRIMARY, INDEX,UNIQUE,FULLTEXT,SPAIAL 有什么区别?各适用于什么场合?...
  8. 设计模式中类之间的关系
  9. 团队冲刺the second day
  10. PKU A Simple Problem with Integers 3468