kafka java_Java操作Kafka
java操作kafka非常的简单,然后kafka也提供了很多缺省值,一般情况下我们不需要修改太多的参数就能使用。下面我贴出代码。
pom.xml
org.apache.kafka
kafka-clients
0.10.2.0
生产者:
packagecn.duanjt;importjava.util.Properties;importjava.util.Random;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.common.serialization.StringSerializer;public classProducer {public static String topic = "duanjt_test";//定义主题
public static void main(String[] args) throwsInterruptedException {
Properties p= newProperties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.23.76:9092,192.168.23.77:9092");//kafka地址,多个地址用逗号分割
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer kafkaProducer = new KafkaProducer<>(p);try{while (true) {
String msg= "Hello," + new Random().nextInt(100);
ProducerRecord record = new ProducerRecord(topic, msg);
kafkaProducer.send(record);
System.out.println("消息发送成功:" +msg);
Thread.sleep(500);
}
}finally{
kafkaProducer.close();
}
}
}
注意:
1.kafka如果是集群,多个地址用逗号分割(,)
2.Properties的put方法,第一个参数可以是字符串,如:p.put("bootstrap.servers","192.168.23.76:9092")
3.kafkaProducer.send(record)可以通过返回的Future来判断是否已经发送到kafka,增强消息的可靠性。同时也可以使用send的第二个参数来回调,通过回调判断是否发送成功。
4.p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);设置序列化类,可以写类的全路径
消费者:
packagecn.duanjt;importjava.util.Collections;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.serialization.StringDeserializer;public classConsumer {public static voidmain(String[] args) {
Properties p= newProperties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.23.76:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG,"duanjt_test");
KafkaConsumer kafkaConsumer = new KafkaConsumer(p);
kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));//订阅消息
while (true) {
ConsumerRecords records = kafkaConsumer.poll(100);for (ConsumerRecordrecord : records) {
System.out.println(String.format("topic:%s,offset:%d,消息:%s", //record.topic(), record.offset(), record.value()));
}
}
}
}
注意:
1.订阅消息可以订阅多个主题
2.ConsumerConfig.GROUP_ID_CONFIG表示消费者的分组,kafka根据分组名称判断是不是同一组消费者,同一组消费者去消费一个主题的数据的时候,数据将在这一组消费者上面轮询。
3.主题涉及到分区的概念,同一组消费者的个数不能大于分区数。因为:一个分区只能被同一群组的一个消费者消费。出现分区小于消费者个数的时候,可以动态增加分区。
4.注意和生产者的对比,Properties中的key和value是反序列化,而生产者是序列化。
kafka java_Java操作Kafka相关推荐
- kafka学习--使用kafka conect操作kafka connector
1. 配置kafka connectors kafka connectors配置是简单的键值映射.对于独立模式,这些在属性文件中定义,并传递到命令行上的kafka Connect进程.在分布式模式下, ...
- Java操作Kafka执行不成功
使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢! 环境及依赖 <dependency><groupId>org ...
- Python 操作 Kafka --- kafka-python
kafka-python:https://github.com/dpkp/kafka-python kafka-python 文档:https://kafka-python.readthedocs.i ...
- kafka实战教程(python操作kafka),kafka配置文件详解
全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...
- c#连接kafka_c#操作kafka(上)搭建kafka环境
小伙伴们大家好,今天没有概念,也没有理论,仅仅和大家一起快速的在centos上搭建一下kafka的测试环境,测试环境嘛,不涉及集群什么的,仅仅是单节点的kafka,日后可以在这个基础上,进行集群的相关 ...
- go 操作 kafka 实现发送和订阅
kafka 消息队列 kafka架构 安装kafka kafak依赖zookeeper 需要先启动zk(集群) zookeeper 启动 单节点启动kafka kafka配置文件 config/ser ...
- ACL+SASL的认证配置后的Kafka命令操作(Windows版)
ACL+SASL的认证配置后的Kafka命令操作 Windows环境 背景 版本 操作 配置文件准备 Zookeeper配置文件 Clients配置文件 Kafka Server配置文件 JAAS配置 ...
- Kafka原理+操作+实战
Kafka原理+操作+实战 前面我和大家交流了kafka的部署安装.对于部署安装这都是小意思,不值得太多的提及.重点还是需要知道kafka原理.熟练掌握kafka命令以及灵活用于kafka场景. 干货 ...
- Doris系列之导入Kafka数据操作
Doris系列 注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Sp ...
最新文章
- 【算法设计】虎溪校园导游系统
- 计算机实训课教案模板,CorelDRAW实训课教案(7周)
- 字符串系统函数strstr strrchr [5.3有版本不同]
- 【ArcGIS微课1000例】0009:ArcGIS影像拼接(镶嵌、镶嵌至新栅格)
- 插件一:JAVA微信砍价活动源码分享[商品帮砍到0元,免费领取奖品]
- 最优化设置mysql的max_connections
- 关于三个概念:ActiveX、OLE和COM
- JS -- http、https地址自动检测并添加为链接
- Centos-6.3-x86_64 minimal 迷你版安装笔记 - Java篇
- 编译ROS-Academy-for-Beginners
- R-FCN算法的Caffe实现
- Android SDK下载失败解决
- 【计算机网络微课堂】3.3 差错检测
- 关于对比学习在医学图像理解中两篇Paper的思考
- 2018年数学建模国赛B题 智能RGV的动态调度策略
- Visio 去掉页边距和空白页的方法
- 随机梯度下降算法 入门介绍(最通俗易懂)
- 预训练模型微调 | 一文带你了解Adapter Tuning
- 巴西龟饲养日志----野外捉鱼
- Java基本语法和规范
热门文章
- Java基础学习总结(176)——JDK 16 正式发布,一次性发布 17 个新特性
- Linux学习总结(19)——Linux中文本编辑器vim特殊使用方法
- 技术中台构建思路及进展_半年中台实践思考:落地中台,贵在其神,活用其形...
- Java中的String.hashCode()方法可能有问题?
- 简练软考知识点整理-互联网+
- iOS开发之UIApplication
- 《编程之美》1.9:高效率的安排见面会的一个解法
- 关于SOA您该知道却不愿知道的十件事
- luogu P1586 四方定理(背包)
- 多线程和socket练习