我的目的是演示Spring Kafka如何为原始Kafka Producer和Consumer API提供一种易于使用且对具有Spring背景的人熟悉的抽象。

示例场景

示例场景是一个简单的场景,我有一个系统,该系统生成一条消息,另一个系统对其进行处理

使用Raw Kafka Producer / Consumer API的实施

首先,我使用原始的Kafka Producer和Consumer API来实现此方案。 如果您想看一下代码,可以在我的github仓库中找到它 。

制片人

以下设置了一个KafkaProducer实例,该实例用于向Kafka主题发送消息:

KafkaProducer<String, WorkUnit> producer = new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());

我使用了KafkaProducer构造函数的一种变体,该构造函数采用一个自定义的Serializer将域对象转换为json表示形式。

一旦有KafkaProducer实例可用,就可以将其用于向Kafka集群发送消息,这里我使用了同步版本的发送器,它等待响应返回。

ProducerRecord<String, WorkUnit> record = new ProducerRecord<>("workunits", workUnit.getId(), workUnit);RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();

消费者

在消费者方面,我们创建了一个KafkaConsumer,其中包含构造函数的一种变体,其中包含一个反序列化器 ,该解串器知道如何读取json消息并将其转换为域实例:

KafkaConsumer<String, WorkUnit> consumer
= new KafkaConsumer<>(props, stringKeyDeserializer()
, workUnitJsonValueDeserializer());

一旦KafkaConsumer实例可用,就可以建立一个监听器循环,以读取一批记录,对其进行处理,并等待更多记录通过:

consumer.subscribe("workunits);try {while (true) {ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100);for (ConsumerRecord<String, WorkUnit> record : records) {log.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}}
} finally {this.consumer.close();
}

使用Spring Kafka的实现

我在github repo中有使用Spring-kafka的实现。

制片人

Spring-Kafka提供了一个KafkaTemplate类,作为KafkaProducer上的包装器,用于将消息发送到Kafka主题:

@Bean
public ProducerFactory<String, WorkUnit> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer());
}@Bean
public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() {KafkaTemplate<String, WorkUnit> kafkaTemplate =  new KafkaTemplate<>(producerFactory());kafkaTemplate.setDefaultTopic("workunits");return kafkaTemplate;
}

需要注意的一件事是,尽管我之前实现了一个自定义的Serializer / Deserializer,以将域类型作为json发送,然后将其转换回去,但是Spring-Kafka开箱即用地为json提供了Seralizer / Deserializer。

并使用KafkaTemplate发送消息:

SendResult<String, WorkUnit> sendResult = workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();RecordMetadata recordMetadata = sendResult.getRecordMetadata();LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}",recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);

消费者

使用者部分使用侦听器模式实现,对于已为RabbitMQ / ActiveMQ实现侦听器的任何人,应该熟悉该模式。 首先是设置侦听器容器的配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConcurrency(1);factory.setConsumerFactory(consumerFactory());return factory;
}@Bean
public ConsumerFactory<String, WorkUnit> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
}

以及响应容器读取的消息的服务:

@Service
public class WorkUnitsConsumer {private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);@KafkaListener(topics = "workunits")public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}",topic, partition, offset, workUnit);}
}

这样就避免了像设置原始使用者一样设置侦听器循环的所有复杂性,并且很好地被侦听器容器隐藏了。

结论

我已经遍历了设置批处理大小,确认的变化以及不同的API签名的许多内部信息。 我的目的只是演示使用原始Kafka API的常见用例,并展示Spring-Kafka包装器如何简化它。

如果您有兴趣进一步探索, 可以在这里找到原始生产者消费者样本,在这里可以找到 Spring Kafka 。

翻译自: https://www.javacodegeeks.com/2016/11/spring-kafka-producerconsumer-sample.html

Spring Kafka生产者/消费者样本相关推荐

  1. Kafka生产者消费者模型

    一.Kafka回顾 1.AMQP协议 消息队列中消息交互规范,多数分布式消息中间件基于该协议进行消息传输 2.Broker 对于kafka,将生产者发送的消息,动态的添加到磁盘,一个Broker等同于 ...

  2. java kafka 集群消费_kafka集群搭建和使用Java写kafka生产者消费者

    转自:http://chengjianxiaoxue.iteye.com/blog/2190488 1 kafka集群搭建 1.zookeeper集群 搭建在110, 111,112 2.kafka使 ...

  3. java生产者消费者代码_Java实现Kafka生产者消费者代码实例

    Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者. 生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键 ...

  4. Kafka生产者与消费者详解

    什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...

  5. SpringBoot使用Kafka生产者、消费者

    文章目录 依赖 配置文件 生产者 消费者 依赖 <!--kafka--> <dependency><groupId>org.springframework.kafk ...

  6. kafka生产者和消费者端的数据不一致

    撸了今年阿里.头条和美团的面试,我有一个重要发现.......>>> kafka生产者生产30条数据,而消费者却不一定消费了30条数据,经过探索发现了main线程执行完成了而kafk ...

  7. kafka消费者如何读同一生产者消息_Kafka入门之生产者消费者

    一.Kafka安装与使用 ( kafka介绍     ) 1. 下载Kafka 2. 安装 Kafka是使用scala编写的运行与jvm虚拟机上的程序,虽然也可以在windows上使用,但是kafka ...

  8. java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

    本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...

  9. 深入分析Kafka生产者和消费者

    深入Kafka生产者和消费者 Kafka生产者 消息发送的流程 发送方式 发送并忘记 同步发送 异步发送 生产者属性配置 序列化器 分区器 自定义分区器 Kafka消费者 消费者属性配置 消费者基础概 ...

最新文章

  1. 不区分大小写比较Java_java-如何使字符串比较不区分大小写?
  2. void和void*详解 v2
  3. windbg 查看结构体_用WinDbg进行调试
  4. 经典C语言程序100例之十七
  5. Spring AOP功能和目标
  6. html按钮返回上一步操作,用js实现返回上一步操作
  7. 在小程序中使用腾讯视频插件播放教程视频
  8. java利用poi导出数据到excel
  9. C++与STL简单介绍( C/C++机试)
  10. excel表格分割线一分为二_高效秘技!用EXCEL制作导航页和日志表管理日常工作...
  11. easyexcel写动态表头(横向扩展)
  12. U-Net - Convolutional Networks for Biomedical Image Segmentation论文翻译——中英文对照
  13. 卸载 Notepad++ !事实已证明,它更牛逼……
  14. 计算单词的长度C++
  15. python:实现newton_forward_interpolation牛顿前插算法(附完整源码)
  16. Python实现对比两个Excel数据内容并标出不同
  17. iPhone iOS升级完美指南
  18. linux命令行的软件推荐
  19. java treeview使用详解_Javafx Treeview项目操作事件
  20. php验证码类(分享)

热门文章

  1. 设计模式之静态代理模式实战
  2. C++描述杭电OJ 2015.偶数求和 ||
  3. Sring类型数组赋值
  4. 联通 培训 c班还 20190814
  5. java速学_5分钟快速入门Java,不看真的可惜了
  6. 图像sobel梯度详细计算过程_数字图像处理(第十章)
  7. 转:常用的几种加密算法以及java实现
  8. Java 截取反斜杠--java使用split拆分特殊字符
  9. 设置 JDK环境变量(Windows)
  10. selenium 4_Selenium4 Alpha –期望什么?