常用命令

启动Zookeeper

./zkServer.sh start-foreground

可选参数:

./zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}

启动ZooInspector,可以查看注册到Zookeeper的Kafka broker和topic情况:

java -jar zookeeper-dev-ZooInspector.jar

屏幕快照 2018-12-06 上午10.56.40.png

启动Kafka

./kafka-server-start.sh ../config/server.properties

创建topic,指定3个分区,每个分区1个副本

./kafka-topics.sh --create -topic testtopic -partitions 3 -replication-factor 1 -zookeeper localhost:2181

列出所有topic

./kafka-topics.sh --list -zookeeper localhost:2181

删除topic

./kafka-topics.sh --delete -topic testtopic -zookeeper localhost:2181

使用producer命令行工具

./kafka-console-producer.sh -topic testtopic --broker-list localhost:9092

使用consumer命令行工具

注意:--from-beginning会从初始offset位置开始接收消息;不加该参数从当前offset位置开始。

./kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic testtopic --from-beginning

Java API使用

Producer API

public class SampleProducer {

private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducer.class);

public static void main(String[] args) {

Properties properties = new Properties();

properties.put("bootstrap.servers", "localhost:9092");

properties.put("client.id", "DemoProducer");

//序列化器

properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer producer = new KafkaProducer(properties);

ProducerRecord record = new ProducerRecord<>("testtopic", "hello world");

Future future = producer.send(record);

RecordMetadata recordMetadata = null;

try {

recordMetadata = future.get();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

System.out.println(recordMetadata);

}

}

Consumer API

public class SampleConsumer {

public static void main(String[] args) {

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//指定消费者组

props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");

//关闭自动位移提交

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");

//反序列化器

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer consumer = new KafkaConsumer(props);

//订阅topic

consumer.subscribe(Arrays.asList("testtopic"));

ConsumerRecords records = consumer.poll(Duration.ofSeconds(2));

records.forEach((ConsumerRecord record) -> {

System.out.println(record.value());

});

//手动提交位移

consumer.commitAsync();

}

}

kafka java api 删除_Kafka入门系列—6. Kafka 常用命令及Java API使用相关推荐

  1. Docker 从入门到实践系列三 - Docker 常用命令,java高级开发工程师面试问题

    容器命令 镜像运行起来后,称为容器. 下面以 Tomcat 为例 搜索镜像 docker search tomcat 下载镜像 docker pull tomcat 运行容器 运行tomcat doc ...

  2. Git使用 从入门到入土 收藏吃灰系列(三) Git常用命令

    文章目录 一.前言 二.常用的命令 一.前言 参考安装Git 详细安装教程 参考视频B站 Git最新教程通俗易懂,这个有点长,感觉讲的精华不多 参考视频『Git』知道这些就够了_哔哩哔哩_bilibi ...

  3. SAP PM 入门系列6 - PM常用表

    SAP PM 入门系列6 - PM常用表 Equipment设备相关的数据表 EQUI: Equipment master data EQKT: Multilingual equipment shor ...

  4. Java IO的原理 入门理解,input和output和java程序的关系

    Java IO的原理 入门理解,input和output和java程序的关系 1.Java IO的原理 2.input和output的理解 3.IO流的分类 4.IO流体系(蓝色为重点.常用) 5.. ...

  5. Java视频教程,java全套教程从入门到精通:13天学会Java编程!

    java,编程语言排行榜榜首,常年来霸占排行榜,在加上工资高,导致了学习java的人群越来越多,但是好多人又找不到学习java好的教程,不知道该如何入门?今天分享一套java入门教程,让你轻松入门ja ...

  6. HDFS文件系统(含hdfs常用命令,java对hdfs API的简单操作)

    HDFS文件系统 HDFS概念 什么是HDFS HDFS的组成 HDFS文件块大小 HFDS命令行操作 基本语法 常用命令 Java操作hdfs 配置编译环境 获取文件系统 上传文件 文件下载 目录创 ...

  7. Kafka实践指南:快速掌握部署使用与常用命令

    Kafka部署使用 Kafka部署使用 Kafka定义和特性 Kafka架构和组成部分 Kafka工作原理和消息传递过程 Kafka安装与配置 安装Kafka 配置Kafka集群 Kafka的主题和分 ...

  8. Linux学习:入门,概述,常用命令,环境安装

    服务器购买配置 参考:狂神说Linux 基于CentOS7 Linux一切皆文件:读,写 一.入门概述 二.Linux命令 1. 开机登录 可以使用 man [命令]来查看各个命令的使用文档,如 :m ...

  9. HDFS常用命令和客户端API操作

    注意 1)记得快照,快照,快照一下 ECS服务器怎么快照?  ECS服务器搭建hadoop伪分布式_CBeann的博客-CSDN博客_宝塔部署hadoop 2)参考资料(视频)的问题 上面网盘的视频, ...

最新文章

  1. PAT甲级题目翻译+答案 AcWing(模拟)
  2. 【RK PX30】 瑞芯微四核64位工业级芯片PX30 | 安卓核心板
  3. Ubuntu16.04安装jdk8
  4. SAP Spartacus Tag Management System TMS 介绍
  5. Android—Broadcast原理
  6. 让电流检测更精确的AMR技术
  7. 网络游戏中网络模块浅析
  8. _Linux 系统挂载数据盘
  9. 马斯克:我是Rust粉丝,但我选择C
  10. Eclipse 在开发中使用到的快捷键很实用 .
  11. Python 之 游戏飞机大战项目实现
  12. android 页面icon拉伸_页面转场: 忽略它可能会使你犯错
  13. 《计算机网络基础》考试大纲
  14. xamarin.forms 绑定页面里指定元素的某个属性值
  15. 解决eclispe SVN 创建资源库报错,无法验证:SVN…… 504 Connection to server timed out
  16. 如何更改springboot的tomcat版本
  17. 矿物质电缆的优点与应用场所
  18. 怎样开通gmail邮箱smtp服务
  19. 【5G/4G】AT Command 详细教程(基于3GPP Spec 27.007)
  20. mybatis 3.x 缓存Cache的使用

热门文章

  1. 利用Socketserver实现简单的文件上传
  2. cs-Filters
  3. 关于extern C
  4. 学习开发webpart
  5. windows安装Linux卡logo,Dell xps 15 windows ubuntu16.04 UEFI 双系统安装 卡在logo界面 卡***问题解决...
  6. CSDN上传资源时一直提示 请填写资源tag
  7. 赛可达实验室发布2015测评认证标准
  8. 《C程序猿:从校园到职场》出版预告(4):从“散兵游勇”到“正规部队”
  9. [李景山php]每天TP5-20161225|thinkphp5-Console.php-2
  10. cookie封装做免输入登录