原文:http://blog.csdn.net/changong28/article/details/39325079

使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题将使用默认值,先改变需要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000显示的修改,我们也希望将此过程在Producer调用之前通过API的方式进行设定,无需在之前或之后使用脚本进行操作,所以才了这篇文章。查看源码发现,其实内部所有的实现都是通过TopicCommand的main方法,在此记录两种方式:

1、创建主题(Topic)

【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y

【JAVA API方式】:

  1. String[] options = new String[]{
  2. "--create",
  3. "--zookeeper",
  4. "zk_host:port/chroot",
  5. "--partitions",
  6. "20",
  7. "--topic",
  8. "my_topic_name",
  9. "--replication-factor",
  10. "3",
  11. "--config",
  12. "x=y"
  13. };
  14. TopicCommand.main(options);

2、查看所有主题

【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181

【JAVA API方式】:

  1. String[] options = new String[]{
  2. "--list",
  3. "--zookeeper",
  4. "localhost:2181"
  5. };
  6. TopicCommand.main(options);

3、查看指定主题:

【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

【JAVA API方式】:

  1. String[] options = new String[]{
  2. "--describe",
  3. "--zookeeper",
  4. "localhost:2181",
  5. "--topic",
  6. "my-replicated-topic",
  7. };
  8. TopicCommand.main(options);

4、修改主题:

【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
【JAVA API方式】:

  1. String[] options = new String[]{
  2. "--alter",
  3. "--zookeeper",
  4. "zk_host:port/chroot",
  5. "--topic",
  6. "my_topic_name",
  7. "--deleteConfig",
  8. "x"
  9. };
  10. TopicCommand.main(options);

5、删除出题:

【命令方式】:无

【JAVA API方式】:

  1. String[] options = new String[]{
  2. "--zookeeper",
  3. "zk_host:port/chroot",
  4. "--topic",
  5. "my_topic_name"
  6. };
  7. DeleteTopicCommand.main(options);

另:下文kafka删除topic的方法(出自 “菜光光的博客” 博客,出处http://caiguangguang.blog.51cto.com/1652935/1548069)

0.8的官方文档提供了一个删除topic的命令:

kafka-topics.sh --delete 但是在运行时会报错找不到这个方法。

kafka-topics.sh最终是运行了kafka.admin.TopicCommand这个类,在0.8的源码中这个类中没有找到有delete topic相关的代码。

在kafka的admin包下,提供了一个DeleteTopicCommand的类,可以实现删除topic的功能。

kafka.admin.DeleteTopicCommand

其中删除topic的具体实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
.......
    val topic = options.valueOf(topicOpt)
    val zkConnect = options.valueOf(zkConnectOpt)
    var zkClient: ZkClient = null
    try {
      zkClient = new ZkClient(zkConnect, 3000030000, ZKStringSerializer)
      zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))  //其实最终还是通过删除zk里面对应的路径来实现删除topic的功能
      println("deletion succeeded!")
    }
    catch {
      case e: Throwable =>
        println("delection failed because of " + e.getMessage)
        println(Utils.stackTrace(e))
    }
    finally {
      if (zkClient != null)
        zkClient.close()
    }

因为这个命令只会删除zk里面的信息,真实的数据还是没有删除,所以需要登录各个broker,把对应的topic的分区数据目录删除,也可能正因为这一点,delete命令才没有集成到kafka.admin.TopicCommand这个类。

转载于:https://www.cnblogs.com/davidwang456/p/4313784.html

使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...相关推荐

  1. kafka java api 删除_使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)...

    使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题 ...

  2. java邮件中添加excel_使用java api 创建excel内容并发送邮件

    https://github.com/auguszero/javaToolRepsitory 利用java api 创建excel 内容并发送邮件 主要实现功能: 1.通过配置文件设置发送邮件发送方, ...

  3. Mysql基础知识:创建、查看、修改和删除表

    Mysql 创建.查看.修改和删除表 1. 创建表 创建表的语法形式: CREATE TABLE 表名 ( 属性名 数据类型 约束条件,属性名 数据类型 约束条件,...) ENGINE=存储引擎名 ...

  4. java hdfs创建文件_使用HDFS java api 创建文件出错。

    //创建文件核心代码 public static void createNewHDFSFile(String toCreateFilePath, String content) throws IOEx ...

  5. HBase Java API 创建表时一直卡住

    场景 HBase在CentOS上分布集群安装: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119511593 在上面搭建起来H ...

  6. Topic 相关操作(创建,查看,修改,删除)

    创建 topic [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --zookeeper localhost:2181 --create ...

  7. 6、存储函数详解,创建,查看,修改,删除

    存储函数和存储过程一样,都是在数据库中定义一些 SQL 语句的集合.存储函数可以通过 return 语句返回函数值,主要用于计算并返回一个值.而存储过程没有直接返回值,主要用于执行操作. 在 MySQ ...

  8. SQL 基础(一)创建、查看、修改、删除数据库

    SQL(Structured Query Language),结构化查询语言 基础理论 T-SQL 和 SQL 的区别: T-SQL 是 SQL 语言的一种版本,且只能在 SQL SERVER 上使用 ...

  9. MySQL数据库之DDL语言:库和表的创建CREATE、修改ALTER、删除DROP

    文章目录 @[TOC] MySQL数据库之DDL语言:库和表的创建.修改.删除 1.DDL 数据定义语言 2.库的管理 2.1 库的创建 2.2 库的修改(能修改的不多,如修改字符集) 2.3 库的删 ...

最新文章

  1. idea中使用docker插件部署项目
  2. Day 20: 斯坦福CoreNLP —— 用Java给Twitter进行情感分析
  3. Netty 框架学习(二):Netty粘包和拆包
  4. 相册权限_手机相册太乱?1分钟教你快速管理自己的照片,非常好用!
  5. 1. 根据输出的数据,对各个阶维度的反推+2.tf中生成根据指定的shape,tensor的各个阶的维度判断
  6. 力扣914.卡牌分组
  7. 撤消git pull,如何将repos带到旧状态
  8. db2 license过期
  9. SQL Server 2016 bak文件还原
  10. OpenGL超级宝典(第7版)之VS2019使用sb7框架搭建自己的程序
  11. VMware ESXI7.0的安装与配置(全过程超详细含中英文对照,附应知必会的理论基础和常见故障解决方案)
  12. Python collections.Counter()用法
  13. IntelliJ idea2017 安装破解
  14. 计蒜客 蒜头君的训练室
  15. 春夜宴诸从弟桃李园序 李白
  16. HTTP协议状态及报文组成 - 一文通读
  17. python中excel处理及生成图表
  18. 【ChatGPT初体验与Android的集成使用】
  19. Springboot项目打包发布找不到项目中使用的文件
  20. Android x86 手动安装houdini

热门文章

  1. git stage 暂存_什么是Git?下载和安装Git
  2. 表格某一列不固定其余全固定_如何利用Python一键拆分表格并进行邮件发送~
  3. elasticjob2.x 获取上次执行时间_深圳会务公司-会议活动策划与执行注意事项
  4. 阿里云物模型层初始化代码实现
  5. 变参模板、完美转发和emplace
  6. 软件详细设计说明书_校导周绪龙|软件测试第五篇——软件测试的底层思维
  7. html视频资源加载出错处理,如何处理前端异常
  8. 服务器给站点读写权限,IIS7目录权限设置的问题详解Windows服务器操作系统 -电脑资料...
  9. 温铁军、林毅夫、陈平,从学术、现实等多方面来分析,谁的价值高?
  10. C++函数模板的重载