java kafka client_Kafka Java Client基本使用及整合SpringBoot
kafka-clients
添加依赖
org.apache.kafka
kafka-clients
2.5.0
消费者 Consumer
代码上总体可以分为三部分:消费者的配置消费者的配置在 org.apache.kafka.clients.consumer.ConsumerConfig 类中都有列举包括每个配置项的文档说明
创建消费者实例并订阅topic
消费消息
代码如下:// 1. 配置
Properties properties = new Properties();
//bootstrap.servers kafka集群地址 host1:port1,host2:port2 ....
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// key.deserializer 消息key序列化方式
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// value.deserializer 消息体序列化方式
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// group.id 消费组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");
// enable.auto.commit 设置自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// auto.offset.reset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2. 创建消费者实例并订阅topic
KafkaConsumer consumer = new KafkaConsumer<>(properties);
String[] topics = new String[]{"demo-topic"};
consumer.subscribe(Arrays.asList(topics));
// 3. 消费消息
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.println(record);
}
}
生产者 Producer
生产者这块的代码基本上和消费者的结构一样,不同的是,producer 的发消息的部分:生产者的配置在 org.apache.kafka.clients.producer.ProducerConfig 类中也都有列举
创建生产者实例
发送消息到 topic异步发送消息 producer.send(new ProducerRecord<>("demo-topic", data))
同步发送消息 ,使用 Future.get() 阻塞接收
异步发送消息,回调的方式
整体代码如下// 1. 配置
Properties properties = new Properties();
// bootstrap.servers kafka集群地址 host1:port1,host2:port2 ....
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// key.deserializer 消息key序列化方式
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value.deserializer 消息体序列化方式
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 2. 创建生产者实例
KafkaProducer producer = new KafkaProducer<>(properties);
// 3. 发送消息
// 0 异步发送消息
for (int i = 0; i < 10; i++) {
String data = "async :" + i;
// 发送消息
producer.send(new ProducerRecord<>("demo-topic", data));
}
// 1 同步发送消息 调用get()阻塞返回结果
for (int i = 0; i < 10; i++) {
String data = "sync : " + i;
try {
// 发送消息
Future send = producer.send(new ProducerRecord<>("demo-topic", data));
RecordMetadata recordMetadata = send.get();
System.out.println(recordMetadata);
} catch (Exception e) {
e.printStackTrace();
}
}
// 2 异步发送消息 回调callback()
for (int i = 0; i < 10; i++) {
String data = "callback : " + i;
// 发送消息
producer.send(new ProducerRecord<>("demo-topic", data), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
// 发送消息的回调
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println(metadata);
}
}
});
}
producer.close();
整合SpringBoot
添加依赖
org.springframework.boot
spring-boot-starter-parent
2.3.2.RELEASE
....
....
org.springframework.boot
spring-boot-starter
org.springframework.kafka
spring-kafka
org.springframework.boot
spring-boot-starter-test
test
org.springframework.kafka
spring-kafka-test
test
代码# application.yml
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: test-group// 启动类
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
// 消费者
@Component
public class Consumer {
@KafkaListener(topics = { "test-topic" })
public void receiveMessage(String message) {
System.out.println(message);
}
}
// 生产者
@Component
public class Producer {
@Resource
KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
// 测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {
@Autowired
private Producer producer;
@Test
public void send() {
producer.sendMessage("test-topic", "test-message");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
总结
整合SpringBoot之后的代码还是非常简洁的,不过还是要熟悉原生API,这样才能在实际项目中遇到问题时游刃有余。
java kafka client_Kafka Java Client基本使用及整合SpringBoot相关推荐
- java kafka client_Kafka Client API 基本使用
之前讲过了[Kafka基本概念及原理][1],这次我们来看看Kafka Client的API.要使用Kafka Client的API,首先需要先部署Kafka集群,部署过程请参见[官网][2].然后在 ...
- java kafka api_kafka java API的使用
Kafka包含四种核心的API: 1.Producer API支持应用将数据流发送到Kafka集群的主题 2.Consumer API支持应用从Kafka集群的主题中读取数据流 3.Streams A ...
- (转)Kafka 消费者 Java 实现
转自: Kafka 消费者 Java 实现 - 简书应用程序使用 KafkaConsumer向 Kafka 订阅 Topic 接收消息,首先理解 Kafka 中消费者(consumer)和消费者组(c ...
- Java Kafka 消费积压监控
Java Kafka 消费积压监控 后端代码: Monitor.java代码: package com.suncreate.kafkaConsumerMonitor.service;import co ...
- 2021年大数据Kafka(五):❤️Kafka的java API编写❤️
全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的java API编写 一.生产者代码 第一步: ...
- java kafka 集群消费_kafka集群搭建和使用Java写kafka生产者消费者
转自:http://chengjianxiaoxue.iteye.com/blog/2190488 1 kafka集群搭建 1.zookeeper集群 搭建在110, 111,112 2.kafka使 ...
- java kafka 设置分区_Java kafka如何实现自定义分区类和拦截器
Java kafka如何实现自定义分区类和拦截器 2.producer配置文件指定,具体的分区类 // 具体的分区类 props.put(ProducerConfig.PARTITIONER_CLAS ...
- kafka 出现Java heap space的解决方法
问题: 测试环境异常,经过查看server.log日志,发现:java.lang.OutOfMemoryError: Java heap space,具体如下: server.log: [2021-0 ...
- java kafka 集群消费_kafka集群简单生产者消费者实例
项目描述 本项目是个简单的kafka集群简单生产者和消费者实例,生产者能生产消息,消费者能消费消息,这里将消费的消息存入了mysql数据库,适合刚kafka刚入门的朋友借鉴使用,里面的zookeepe ...
最新文章
- Python判断两个文件夹中互相不同的文件有哪些、判断一个文件夹相对于另外一个文件夹缺少了哪些文件
- SAP CAR 的主要优势
- 图形处理相关资源(面部识别、姿态估计、变形、、、)
- 开发者进阶之路 |UIBPlayer (视频播放)demo分享
- 建立SQL Server警告和给操作员发送email通知
- [深度学习]自然语言处理 --- ELMo
- 【亲测有效】Centos安装完成docker后启动docker报错docker: unrecognized service的两种解决方案
- Apsara Clouder专项技能认证:实现调用API接口(阿里云疫情白给课程系列)
- 浪潮之巅-读书笔记一
- 手把手教你Windows操作系统添加Virtio驱动
- VS关闭vue语法检测
- RFID固定资产管理系统中的RFID标签的使用-新导智能
- 如何构建超现实元宇宙空间
- Android 仿淘宝详情视频图片混合轮播
- 配置服务器pytorch/TensorFlow环境+远程连接vscode
- Flink中的CEP(一)
- JavaScript中let的用法
- 【狗狗分类项目】(3)扩展数据集:斯坦福kaggle数据库
- archpr 压缩文件暴力破解
- 第八篇order订单专题(4)市价单、收盘价单、限价单、止损单