使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...
原文:http://blog.csdn.net/changong28/article/details/39325079
使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题将使用默认值,先改变需要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000显示的修改,我们也希望将此过程在Producer调用之前通过API的方式进行设定,无需在之前或之后使用脚本进行操作,所以才了这篇文章。查看源码发现,其实内部所有的实现都是通过TopicCommand的main方法,在此记录两种方式:
1、创建主题(Topic)
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
【JAVA API方式】:
- String[] options = new String[]{
- "--create",
- "--zookeeper",
- "zk_host:port/chroot",
- "--partitions",
- "20",
- "--topic",
- "my_topic_name",
- "--replication-factor",
- "3",
- "--config",
- "x=y"
- };
- TopicCommand.main(options);
2、查看所有主题
【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181
【JAVA API方式】:
- String[] options = new String[]{
- "--list",
- "--zookeeper",
- "localhost:2181"
- };
- TopicCommand.main(options);
3、查看指定主题:
【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
【JAVA API方式】:
- String[] options = new String[]{
- "--describe",
- "--zookeeper",
- "localhost:2181",
- "--topic",
- "my-replicated-topic",
- };
- TopicCommand.main(options);
4、修改主题:
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
【JAVA API方式】:
- String[] options = new String[]{
- "--alter",
- "--zookeeper",
- "zk_host:port/chroot",
- "--topic",
- "my_topic_name",
- "--deleteConfig",
- "x"
- };
- TopicCommand.main(options);
5、删除出题:
【命令方式】:无
【JAVA API方式】:
- String[] options = new String[]{
- "--zookeeper",
- "zk_host:port/chroot",
- "--topic",
- "my_topic_name"
- };
- DeleteTopicCommand.main(options);
另:下文kafka删除topic的方法(出自 “菜光光的博客” 博客,出处http://caiguangguang.blog.51cto.com/1652935/1548069)
0.8的官方文档提供了一个删除topic的命令:
kafka-topics.sh --delete 但是在运行时会报错找不到这个方法。
kafka-topics.sh最终是运行了kafka.admin.TopicCommand这个类,在0.8的源码中这个类中没有找到有delete topic相关的代码。
在kafka的admin包下,提供了一个DeleteTopicCommand的类,可以实现删除topic的功能。
kafka.admin.DeleteTopicCommand
其中删除topic的具体实现代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
.......
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(zkConnect, 30000 , 30000 , ZKStringSerializer)
zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) //其实最终还是通过删除zk里面对应的路径来实现删除topic的功能
println( "deletion succeeded!" )
}
catch {
case e: Throwable =>
println( "delection failed because of " + e.getMessage)
println(Utils.stackTrace(e))
}
finally {
if (zkClient != null )
zkClient.close()
}
|
因为这个命令只会删除zk里面的信息,真实的数据还是没有删除,所以需要登录各个broker,把对应的topic的分区数据目录删除,也可能正因为这一点,delete命令才没有集成到kafka.admin.TopicCommand这个类。
转载于:https://www.cnblogs.com/davidwang456/p/4313784.html
使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...相关推荐
- kafka java api 删除_使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)...
使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题 ...
- java邮件中添加excel_使用java api 创建excel内容并发送邮件
https://github.com/auguszero/javaToolRepsitory 利用java api 创建excel 内容并发送邮件 主要实现功能: 1.通过配置文件设置发送邮件发送方, ...
- Mysql基础知识:创建、查看、修改和删除表
Mysql 创建.查看.修改和删除表 1. 创建表 创建表的语法形式: CREATE TABLE 表名 ( 属性名 数据类型 约束条件,属性名 数据类型 约束条件,...) ENGINE=存储引擎名 ...
- java hdfs创建文件_使用HDFS java api 创建文件出错。
//创建文件核心代码 public static void createNewHDFSFile(String toCreateFilePath, String content) throws IOEx ...
- HBase Java API 创建表时一直卡住
场景 HBase在CentOS上分布集群安装: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119511593 在上面搭建起来H ...
- Topic 相关操作(创建,查看,修改,删除)
创建 topic [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --zookeeper localhost:2181 --create ...
- 6、存储函数详解,创建,查看,修改,删除
存储函数和存储过程一样,都是在数据库中定义一些 SQL 语句的集合.存储函数可以通过 return 语句返回函数值,主要用于计算并返回一个值.而存储过程没有直接返回值,主要用于执行操作. 在 MySQ ...
- SQL 基础(一)创建、查看、修改、删除数据库
SQL(Structured Query Language),结构化查询语言 基础理论 T-SQL 和 SQL 的区别: T-SQL 是 SQL 语言的一种版本,且只能在 SQL SERVER 上使用 ...
- MySQL数据库之DDL语言:库和表的创建CREATE、修改ALTER、删除DROP
文章目录 @[TOC] MySQL数据库之DDL语言:库和表的创建.修改.删除 1.DDL 数据定义语言 2.库的管理 2.1 库的创建 2.2 库的修改(能修改的不多,如修改字符集) 2.3 库的删 ...
最新文章
- idea中使用docker插件部署项目
- Day 20: 斯坦福CoreNLP —— 用Java给Twitter进行情感分析
- Netty 框架学习(二):Netty粘包和拆包
- 相册权限_手机相册太乱?1分钟教你快速管理自己的照片,非常好用!
- 1. 根据输出的数据,对各个阶维度的反推+2.tf中生成根据指定的shape,tensor的各个阶的维度判断
- 力扣914.卡牌分组
- 撤消git pull,如何将repos带到旧状态
- db2 license过期
- SQL Server 2016 bak文件还原
- OpenGL超级宝典(第7版)之VS2019使用sb7框架搭建自己的程序
- VMware ESXI7.0的安装与配置(全过程超详细含中英文对照,附应知必会的理论基础和常见故障解决方案)
- Python collections.Counter()用法
- IntelliJ idea2017 安装破解
- 计蒜客 蒜头君的训练室
- 春夜宴诸从弟桃李园序 李白
- HTTP协议状态及报文组成 - 一文通读
- python中excel处理及生成图表
- 【ChatGPT初体验与Android的集成使用】
- Springboot项目打包发布找不到项目中使用的文件
- Android x86 手动安装houdini
热门文章
- git stage 暂存_什么是Git?下载和安装Git
- 表格某一列不固定其余全固定_如何利用Python一键拆分表格并进行邮件发送~
- elasticjob2.x 获取上次执行时间_深圳会务公司-会议活动策划与执行注意事项
- 阿里云物模型层初始化代码实现
- 变参模板、完美转发和emplace
- 软件详细设计说明书_校导周绪龙|软件测试第五篇——软件测试的底层思维
- html视频资源加载出错处理,如何处理前端异常
- 服务器给站点读写权限,IIS7目录权限设置的问题详解Windows服务器操作系统 -电脑资料...
- 温铁军、林毅夫、陈平,从学术、现实等多方面来分析,谁的价值高?
- C++函数模板的重载