java操作kafka非常的简单,然后kafka也提供了很多缺省值,一般情况下我们不需要修改太多的参数就能使用。下面我贴出代码。

pom.xml

org.apache.kafka

kafka-clients

0.10.2.0

生产者:

packagecn.duanjt;importjava.util.Properties;importjava.util.Random;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.common.serialization.StringSerializer;public classProducer {public static String topic = "duanjt_test";//定义主题

public static void main(String[] args) throwsInterruptedException {

Properties p= newProperties();

p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.23.76:9092,192.168.23.77:9092");//kafka地址,多个地址用逗号分割

p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

KafkaProducer kafkaProducer = new KafkaProducer<>(p);try{while (true) {

String msg= "Hello," + new Random().nextInt(100);

ProducerRecord record = new ProducerRecord(topic, msg);

kafkaProducer.send(record);

System.out.println("消息发送成功:" +msg);

Thread.sleep(500);

}

}finally{

kafkaProducer.close();

}

}

}

注意:

1.kafka如果是集群,多个地址用逗号分割(,)

2.Properties的put方法,第一个参数可以是字符串,如:p.put("bootstrap.servers","192.168.23.76:9092")

3.kafkaProducer.send(record)可以通过返回的Future来判断是否已经发送到kafka,增强消息的可靠性。同时也可以使用send的第二个参数来回调,通过回调判断是否发送成功。

4.p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);设置序列化类,可以写类的全路径

消费者:

packagecn.duanjt;importjava.util.Collections;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.serialization.StringDeserializer;public classConsumer {public static voidmain(String[] args) {

Properties p= newProperties();

p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.23.76:9092");

p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

p.put(ConsumerConfig.GROUP_ID_CONFIG,"duanjt_test");

KafkaConsumer kafkaConsumer = new KafkaConsumer(p);

kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));//订阅消息

while (true) {

ConsumerRecords records = kafkaConsumer.poll(100);for (ConsumerRecordrecord : records) {

System.out.println(String.format("topic:%s,offset:%d,消息:%s", //record.topic(), record.offset(), record.value()));

}

}

}

}

注意:

1.订阅消息可以订阅多个主题

2.ConsumerConfig.GROUP_ID_CONFIG表示消费者的分组,kafka根据分组名称判断是不是同一组消费者,同一组消费者去消费一个主题的数据的时候,数据将在这一组消费者上面轮询。

3.主题涉及到分区的概念,同一组消费者的个数不能大于分区数。因为:一个分区只能被同一群组的一个消费者消费。出现分区小于消费者个数的时候,可以动态增加分区。

4.注意和生产者的对比,Properties中的key和value是反序列化,而生产者是序列化。

kafka java_Java操作Kafka相关推荐

  1. kafka学习--使用kafka conect操作kafka connector

    1. 配置kafka connectors kafka connectors配置是简单的键值映射.对于独立模式,这些在属性文件中定义,并传递到命令行上的kafka Connect进程.在分布式模式下, ...

  2. Java操作Kafka执行不成功

    使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢! 环境及依赖 <dependency><groupId>org ...

  3. Python 操作 Kafka --- kafka-python

    kafka-python:https://github.com/dpkp/kafka-python kafka-python 文档:https://kafka-python.readthedocs.i ...

  4. kafka实战教程(python操作kafka),kafka配置文件详解

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...

  5. c#连接kafka_c#操作kafka(上)搭建kafka环境

    小伙伴们大家好,今天没有概念,也没有理论,仅仅和大家一起快速的在centos上搭建一下kafka的测试环境,测试环境嘛,不涉及集群什么的,仅仅是单节点的kafka,日后可以在这个基础上,进行集群的相关 ...

  6. go 操作 kafka 实现发送和订阅

    kafka 消息队列 kafka架构 安装kafka kafak依赖zookeeper 需要先启动zk(集群) zookeeper 启动 单节点启动kafka kafka配置文件 config/ser ...

  7. ACL+SASL的认证配置后的Kafka命令操作(Windows版)

    ACL+SASL的认证配置后的Kafka命令操作 Windows环境 背景 版本 操作 配置文件准备 Zookeeper配置文件 Clients配置文件 Kafka Server配置文件 JAAS配置 ...

  8. Kafka原理+操作+实战

    Kafka原理+操作+实战 前面我和大家交流了kafka的部署安装.对于部署安装这都是小意思,不值得太多的提及.重点还是需要知道kafka原理.熟练掌握kafka命令以及灵活用于kafka场景. 干货 ...

  9. Doris系列之导入Kafka数据操作

    Doris系列 注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Sp ...

最新文章

  1. 【算法设计】虎溪校园导游系统
  2. 计算机实训课教案模板,CorelDRAW实训课教案(7周)
  3. 字符串系统函数strstr strrchr [5.3有版本不同]
  4. 【ArcGIS微课1000例】0009:ArcGIS影像拼接(镶嵌、镶嵌至新栅格)
  5. 插件一:JAVA微信砍价活动源码分享[商品帮砍到0元,免费领取奖品]
  6. 最优化设置mysql的max_connections
  7. 关于三个概念:ActiveX、OLE和COM
  8. JS -- http、https地址自动检测并添加为链接
  9. Centos-6.3-x86_64 minimal 迷你版安装笔记 - Java篇
  10. 编译ROS-Academy-for-Beginners
  11. R-FCN算法的Caffe实现
  12. Android SDK下载失败解决
  13. 【计算机网络微课堂】3.3 差错检测
  14. 关于对比学习在医学图像理解中两篇Paper的思考
  15. 2018年数学建模国赛B题 智能RGV的动态调度策略
  16. Visio 去掉页边距和空白页的方法
  17. 随机梯度下降算法 入门介绍(最通俗易懂)
  18. 预训练模型微调 | 一文带你了解Adapter Tuning
  19. 巴西龟饲养日志----野外捉鱼
  20. Java基本语法和规范

热门文章

  1. Java基础学习总结(176)——JDK 16 正式发布,一次性发布 17 个新特性
  2. Linux学习总结(19)——Linux中文本编辑器vim特殊使用方法
  3. 技术中台构建思路及进展_半年中台实践思考:落地中台,贵在其神,活用其形...
  4. Java中的String.hashCode()方法可能有问题?
  5. 简练软考知识点整理-互联网+
  6. iOS开发之UIApplication
  7. 《编程之美》1.9:高效率的安排见面会的一个解法
  8. 关于SOA您该知道却不愿知道的十件事
  9. luogu P1586 四方定理(背包)
  10. 多线程和socket练习