kafka-clients

添加依赖

org.apache.kafka

kafka-clients

2.5.0

消费者 Consumer

代码上总体可以分为三部分:消费者的配置消费者的配置在 org.apache.kafka.clients.consumer.ConsumerConfig 类中都有列举包括每个配置项的文档说明

创建消费者实例并订阅topic

消费消息

代码如下:// 1. 配置

Properties properties = new Properties();

//bootstrap.servers kafka集群地址 host1:port1,host2:port2 ....

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

// key.deserializer 消息key序列化方式

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// value.deserializer 消息体序列化方式

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// group.id 消费组id

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");

// enable.auto.commit 设置自动提交offset

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

// auto.offset.reset

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// 2. 创建消费者实例并订阅topic

KafkaConsumer consumer = new KafkaConsumer<>(properties);

String[] topics = new String[]{"demo-topic"};

consumer.subscribe(Arrays.asList(topics));

// 3. 消费消息

while (true) {

ConsumerRecords records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord record : records) {

System.out.println(record);

}

}

生产者 Producer

生产者这块的代码基本上和消费者的结构一样,不同的是,producer 的发消息的部分:生产者的配置在 org.apache.kafka.clients.producer.ProducerConfig 类中也都有列举

创建生产者实例

发送消息到 topic异步发送消息 producer.send(new ProducerRecord<>("demo-topic", data))

同步发送消息 ,使用 Future.get() 阻塞接收

异步发送消息,回调的方式

整体代码如下// 1. 配置

Properties properties = new Properties();

// bootstrap.servers kafka集群地址 host1:port1,host2:port2 ....

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

// key.deserializer 消息key序列化方式

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// value.deserializer 消息体序列化方式

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 2. 创建生产者实例

KafkaProducer producer = new KafkaProducer<>(properties);

// 3. 发送消息

// 0 异步发送消息

for (int i = 0; i < 10; i++) {

String data = "async :" + i;

// 发送消息

producer.send(new ProducerRecord<>("demo-topic", data));

}

// 1 同步发送消息 调用get()阻塞返回结果

for (int i = 0; i < 10; i++) {

String data = "sync : " + i;

try {

// 发送消息

Future send = producer.send(new ProducerRecord<>("demo-topic", data));

RecordMetadata recordMetadata = send.get();

System.out.println(recordMetadata);

} catch (Exception e) {

e.printStackTrace();

}

}

// 2 异步发送消息 回调callback()

for (int i = 0; i < 10; i++) {

String data = "callback : " + i;

// 发送消息

producer.send(new ProducerRecord<>("demo-topic", data), new Callback() {

@Override

public void onCompletion(RecordMetadata metadata, Exception exception) {

// 发送消息的回调

if (exception != null) {

exception.printStackTrace();

} else {

System.out.println(metadata);

}

}

});

}

producer.close();

整合SpringBoot

添加依赖

org.springframework.boot

spring-boot-starter-parent

2.3.2.RELEASE

....

....

org.springframework.boot

spring-boot-starter

org.springframework.kafka

spring-kafka

org.springframework.boot

spring-boot-starter-test

test

org.springframework.kafka

spring-kafka-test

test

代码# application.yml

spring:

kafka:

bootstrap-servers: 127.0.0.1:9092

producer:

key-serializer: org.apache.kafka.common.serialization.StringSerializer

value-serializer: org.apache.kafka.common.serialization.StringSerializer

consumer:

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

group-id: test-group// 启动类

@SpringBootApplication

public class DemoApplication {

public static void main(String[] args) {

SpringApplication.run(DemoApplication.class, args);

}

}

// 消费者

@Component

public class Consumer {

@KafkaListener(topics = { "test-topic" })

public void receiveMessage(String message) {

System.out.println(message);

}

}

// 生产者

@Component

public class Producer {

@Resource

KafkaTemplate kafkaTemplate;

public void sendMessage(String topic, String message) {

kafkaTemplate.send(topic, message);

}

}

// 测试

@RunWith(SpringRunner.class)

@SpringBootTest

public class DemoApplicationTests {

@Autowired

private Producer producer;

@Test

public void send() {

producer.sendMessage("test-topic", "test-message");

try {

Thread.sleep(10000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

总结

整合SpringBoot之后的代码还是非常简洁的,不过还是要熟悉原生API,这样才能在实际项目中遇到问题时游刃有余。

java kafka client_Kafka Java Client基本使用及整合SpringBoot相关推荐

  1. java kafka client_Kafka Client API 基本使用

    之前讲过了[Kafka基本概念及原理][1],这次我们来看看Kafka Client的API.要使用Kafka Client的API,首先需要先部署Kafka集群,部署过程请参见[官网][2].然后在 ...

  2. java kafka api_kafka java API的使用

    Kafka包含四种核心的API: 1.Producer API支持应用将数据流发送到Kafka集群的主题 2.Consumer API支持应用从Kafka集群的主题中读取数据流 3.Streams A ...

  3. (转)Kafka 消费者 Java 实现

    转自: Kafka 消费者 Java 实现 - 简书应用程序使用 KafkaConsumer向 Kafka 订阅 Topic 接收消息,首先理解 Kafka 中消费者(consumer)和消费者组(c ...

  4. Java Kafka 消费积压监控

    Java Kafka 消费积压监控 后端代码: Monitor.java代码: package com.suncreate.kafkaConsumerMonitor.service;import co ...

  5. 2021年大数据Kafka(五):❤️Kafka的java API编写❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的java API编写 一.生产者代码 第一步: ...

  6. java kafka 集群消费_kafka集群搭建和使用Java写kafka生产者消费者

    转自:http://chengjianxiaoxue.iteye.com/blog/2190488 1 kafka集群搭建 1.zookeeper集群 搭建在110, 111,112 2.kafka使 ...

  7. java kafka 设置分区_Java kafka如何实现自定义分区类和拦截器

    Java kafka如何实现自定义分区类和拦截器 2.producer配置文件指定,具体的分区类 // 具体的分区类 props.put(ProducerConfig.PARTITIONER_CLAS ...

  8. kafka 出现Java heap space的解决方法

    问题: 测试环境异常,经过查看server.log日志,发现:java.lang.OutOfMemoryError: Java heap space,具体如下: server.log: [2021-0 ...

  9. java kafka 集群消费_kafka集群简单生产者消费者实例

    项目描述 本项目是个简单的kafka集群简单生产者和消费者实例,生产者能生产消息,消费者能消费消息,这里将消费的消息存入了mysql数据库,适合刚kafka刚入门的朋友借鉴使用,里面的zookeepe ...

最新文章

  1. Python判断两个文件夹中互相不同的文件有哪些、判断一个文件夹相对于另外一个文件夹缺少了哪些文件
  2. SAP CAR 的主要优势
  3. 图形处理相关资源(面部识别、姿态估计、变形、、、)
  4. 开发者进阶之路 |UIBPlayer (视频播放)demo分享
  5. 建立SQL Server警告和给操作员发送email通知
  6. [深度学习]自然语言处理 --- ELMo
  7. 【亲测有效】Centos安装完成docker后启动docker报错docker: unrecognized service的两种解决方案
  8. Apsara Clouder专项技能认证:实现调用API接口(阿里云疫情白给课程系列)
  9. 浪潮之巅-读书笔记一
  10. 手把手教你Windows操作系统添加Virtio驱动
  11. VS关闭vue语法检测
  12. RFID固定资产管理系统中的RFID标签的使用-新导智能
  13. 如何构建超现实元宇宙空间
  14. Android 仿淘宝详情视频图片混合轮播
  15. 配置服务器pytorch/TensorFlow环境+远程连接vscode
  16. Flink中的CEP(一)
  17. JavaScript中let的用法
  18. 【狗狗分类项目】(3)扩展数据集:斯坦福kaggle数据库
  19. archpr 压缩文件暴力破解
  20. 第八篇order订单专题(4)市价单、收盘价单、限价单、止损单

热门文章

  1. modelsim(1):经常使用的测试设计的结构
  2. Form表单标签的Enctype属性的编码格类型
  3. 内置类型存储空间(32位机参考)
  4. 关于linux内核模块的装载过程
  5. java 中Lock的使用
  6. 10 过滤器和监听器
  7. 【面经】蚂蚁金服一二三面的面经总结(内推实习方面)
  8. chrome 看每行代码的运行时间
  9. CC++刚開始学习的人编程教程(9) Windows8.1安装VS2013并捆绑QT与编程助手
  10. 树-当前结点与列表页不符的处理