kafka java api 删除_Kafka入门系列—6. Kafka 常用命令及Java API使用
常用命令
启动Zookeeper
./zkServer.sh start-foreground
可选参数:
./zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}
启动ZooInspector,可以查看注册到Zookeeper的Kafka broker和topic情况:
java -jar zookeeper-dev-ZooInspector.jar
屏幕快照 2018-12-06 上午10.56.40.png
启动Kafka
./kafka-server-start.sh ../config/server.properties
创建topic,指定3个分区,每个分区1个副本
./kafka-topics.sh --create -topic testtopic -partitions 3 -replication-factor 1 -zookeeper localhost:2181
列出所有topic
./kafka-topics.sh --list -zookeeper localhost:2181
删除topic
./kafka-topics.sh --delete -topic testtopic -zookeeper localhost:2181
使用producer命令行工具
./kafka-console-producer.sh -topic testtopic --broker-list localhost:9092
使用consumer命令行工具
注意:--from-beginning会从初始offset位置开始接收消息;不加该参数从当前offset位置开始。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic testtopic --from-beginning
Java API使用
Producer API
public class SampleProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducer.class);
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("client.id", "DemoProducer");
//序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(properties);
ProducerRecord record = new ProducerRecord<>("testtopic", "hello world");
Future future = producer.send(record);
RecordMetadata recordMetadata = null;
try {
recordMetadata = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(recordMetadata);
}
}
Consumer API
public class SampleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//指定消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");
//关闭自动位移提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
//反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer(props);
//订阅topic
consumer.subscribe(Arrays.asList("testtopic"));
ConsumerRecords records = consumer.poll(Duration.ofSeconds(2));
records.forEach((ConsumerRecord record) -> {
System.out.println(record.value());
});
//手动提交位移
consumer.commitAsync();
}
}
kafka java api 删除_Kafka入门系列—6. Kafka 常用命令及Java API使用相关推荐
- Docker 从入门到实践系列三 - Docker 常用命令,java高级开发工程师面试问题
容器命令 镜像运行起来后,称为容器. 下面以 Tomcat 为例 搜索镜像 docker search tomcat 下载镜像 docker pull tomcat 运行容器 运行tomcat doc ...
- Git使用 从入门到入土 收藏吃灰系列(三) Git常用命令
文章目录 一.前言 二.常用的命令 一.前言 参考安装Git 详细安装教程 参考视频B站 Git最新教程通俗易懂,这个有点长,感觉讲的精华不多 参考视频『Git』知道这些就够了_哔哩哔哩_bilibi ...
- SAP PM 入门系列6 - PM常用表
SAP PM 入门系列6 - PM常用表 Equipment设备相关的数据表 EQUI: Equipment master data EQKT: Multilingual equipment shor ...
- Java IO的原理 入门理解,input和output和java程序的关系
Java IO的原理 入门理解,input和output和java程序的关系 1.Java IO的原理 2.input和output的理解 3.IO流的分类 4.IO流体系(蓝色为重点.常用) 5.. ...
- Java视频教程,java全套教程从入门到精通:13天学会Java编程!
java,编程语言排行榜榜首,常年来霸占排行榜,在加上工资高,导致了学习java的人群越来越多,但是好多人又找不到学习java好的教程,不知道该如何入门?今天分享一套java入门教程,让你轻松入门ja ...
- HDFS文件系统(含hdfs常用命令,java对hdfs API的简单操作)
HDFS文件系统 HDFS概念 什么是HDFS HDFS的组成 HDFS文件块大小 HFDS命令行操作 基本语法 常用命令 Java操作hdfs 配置编译环境 获取文件系统 上传文件 文件下载 目录创 ...
- Kafka实践指南:快速掌握部署使用与常用命令
Kafka部署使用 Kafka部署使用 Kafka定义和特性 Kafka架构和组成部分 Kafka工作原理和消息传递过程 Kafka安装与配置 安装Kafka 配置Kafka集群 Kafka的主题和分 ...
- Linux学习:入门,概述,常用命令,环境安装
服务器购买配置 参考:狂神说Linux 基于CentOS7 Linux一切皆文件:读,写 一.入门概述 二.Linux命令 1. 开机登录 可以使用 man [命令]来查看各个命令的使用文档,如 :m ...
- HDFS常用命令和客户端API操作
注意 1)记得快照,快照,快照一下 ECS服务器怎么快照? ECS服务器搭建hadoop伪分布式_CBeann的博客-CSDN博客_宝塔部署hadoop 2)参考资料(视频)的问题 上面网盘的视频, ...
最新文章
- PAT甲级题目翻译+答案 AcWing(模拟)
- 【RK PX30】 瑞芯微四核64位工业级芯片PX30 | 安卓核心板
- Ubuntu16.04安装jdk8
- SAP Spartacus Tag Management System TMS 介绍
- Android—Broadcast原理
- 让电流检测更精确的AMR技术
- 网络游戏中网络模块浅析
- _Linux 系统挂载数据盘
- 马斯克:我是Rust粉丝,但我选择C
- Eclipse 在开发中使用到的快捷键很实用 .
- Python 之 游戏飞机大战项目实现
- android 页面icon拉伸_页面转场: 忽略它可能会使你犯错
- 《计算机网络基础》考试大纲
- xamarin.forms 绑定页面里指定元素的某个属性值
- 解决eclispe SVN 创建资源库报错,无法验证:SVN…… 504 Connection to server timed out
- 如何更改springboot的tomcat版本
- 矿物质电缆的优点与应用场所
- 怎样开通gmail邮箱smtp服务
- 【5G/4G】AT Command 详细教程(基于3GPP Spec 27.007)
- mybatis 3.x 缓存Cache的使用
热门文章
- 利用Socketserver实现简单的文件上传
- cs-Filters
- 关于extern C
- 学习开发webpart
- windows安装Linux卡logo,Dell xps 15 windows ubuntu16.04 UEFI 双系统安装 卡在logo界面 卡***问题解决...
- CSDN上传资源时一直提示 请填写资源tag
- 赛可达实验室发布2015测评认证标准
- 《C程序猿:从校园到职场》出版预告(4):从“散兵游勇”到“正规部队”
- [李景山php]每天TP5-20161225|thinkphp5-Console.php-2
- cookie封装做免输入登录