根据配置文件一步步去查看:

spring-consumer.xml

DefaultKafkaConsumerFactory就是根据参数配置生产consumer,略过。

监听器类,实现了监听类一些列接口的某一个。我这里使用了手动提交的AcknowledgingMessageListener 接口,以此为例。

首先他又继承了一个接口

public interface AcknowledgingMessageListener extends GenericAcknowledgingMessageListener> {

}

继承的接口

public interface GenericAcknowledgingMessageListener extends KafkaDataListener {

void onMessage(T var1, Acknowledgment var2);

}

我的实现类,重写发放就好,略。。。继续下一步

public class KafkaConsumerListener implements AcknowledgingMessageListener {

@Override

public void onMessage(ConsumerRecord stringStringConsumerRecord, Acknowledgment acknowledgment) {

System.out.printf("offset= %d, key= %s, value= %s,topic= %s,partition= %s\n",

stringStringConsumerRecord.offset(),

stringStringConsumerRecord.key(),

stringStringConsumerRecord.value(),

stringStringConsumerRecord.topic(),

stringStringConsumerRecord.partition());

acknowledgment.acknowledge();

}

}

topic1

topic2

org.springframework.kafka.listener.config.ContainerProperties

关注的第一个构造方法,我可以传入多个topic,也就是说我可以通过传入一个list来同时订阅多个主题的消息

public ContainerProperties(String... topics) {

this.ackMode = AckMode.BATCH;

this.pollTimeout = 1000L;

this.shutdownTimeout = 10000L;

this.syncCommits = true;

this.ackOnError = true;

Assert.notEmpty(topics, "An array of topicPartitions must be provided");

this.topics = (String[])Arrays.asList(topics).toArray(new String[topics.length]);

this.topicPattern = null;

this.topicPartitions = null;

}

关注的第二个构造方法,我可以传入一个正则表达式对象来匹配主题

public ContainerProperties(Pattern topicPattern) {

this.ackMode = AckMode.BATCH;

this.pollTimeout = 1000L;

this.shutdownTimeout = 10000L;

this.syncCommits = true;

this.ackOnError = true;

this.topics = null;

this.topicPattern = topicPattern;

this.topicPartitions = null;

}

关注的第三个构造方法,我可以传入一个或多个set,对应的键值对为主题和分区,关注特定主题特定分区的消息

public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {

this.ackMode = AckMode.BATCH;

this.pollTimeout = 1000L;

this.shutdownTimeout = 10000L;

this.syncCommits = true;

this.ackOnError = true;

this.topics = null;

this.topicPattern = null;

Assert.notEmpty(topicPartitions, "An array of topicPartitions must be provided");

this.topicPartitions = (TopicPartitionInitialOffset[])(new LinkedHashSet(Arrays.asList(topicPartitions))).toArray(new TopicPartitionInitialOffset[topicPartitions.length]);

}

关注点私有属性

/**

* 确认模式(自动确认属性为false时使用)

*

*

1.RECORD逐条确认: 每条消息被发送给监听者后确认

*

2.BATCH批量确认: 当批量消息记录被消费者接收到并传送给监听器时确认

*

3.TIME超时确认:当超过设置的超时时间毫秒数时确认(should be greater than

* {@code #setPollTimeout(long) pollTimeout}.

*

4.COUNT计数确认: 当接收到指定数量之后确认

*

5.MANUAL手动确认:由监听器负责确认(AcknowledgingMessageListener)

*/

private AckMode ackMode;

//手动提交次数

private int ackCount;

//手动提交时间

private long ackTime;

//消息监听器,必须是 MessageListener或者AcknowledgingMessageListener两者中的一个

private Object messageListener;

//阻止消费者等待记录的最长时间。

private volatile long pollTimeout;

//线程执行器:轮询消费者

private AsyncListenableTaskExecutor consumerTaskExecutor;

//错误回调,当监听器抛出异常时

private GenericErrorHandler> errorHandler;

//停止容器超时时间

private long shutdownTimeout;

//用户定义的消费者再平衡监听器实现类

private ConsumerRebalanceListener consumerRebalanceListener;

//提交回调,默认记录日志。

private OffsetCommitCallback commitCallback;

private boolean syncCommits;

private boolean ackOnError;

private Long idleEventInterval;

private String groupId;

private PlatformTransactionManager transactionManager;

org.springframework.kafka.listener.KafkaMessageListenerContainer

关注点,构造函数

public KafkaMessageListenerContainer(ConsumerFactory consumerFactory, ContainerProperties containerProperties) {

this(consumerFactory, containerProperties, (TopicPartitionInitialOffset[])null);

}

略。。。。看不懂了。。

kafka c语言实现源码,Spring-Kafka源代码解析(消费者)相关推荐

  1. c 语言闹钟源码,小程序源代码:给自己设计一个番茄闹钟,文末有代码

    原标题:小程序源代码:给自己设计一个番茄闹钟,文末有代码 大家好,我是杨数Tos! 这篇文章主要分享:给自己设计一个非常简单的工作闹钟,提醒自己注意劳逸结合,除了工作也要定时休息. 整个闹钟项目的代码 ...

  2. linux sendto 源码,Linux内核源代码解析——用户发送数据包的起源之sendto

    Jack:我想知道用户如何把数据发送到内核空间的? 我:你觉得哪里比较难理解呢? Jack:一般程序员会在程序里通过socket变量获得一个文件描述符,然后通过write把定义好的字符串写入到该描述符 ...

  3. Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析

    文章目录 概述 Spring-kafka生产者源码流程 Spring-kafka消费者源码流程(`@EnableKafka`和`@KafkaListener` ) Flow KafkaListener ...

  4. 超大气APP下载页双语多国语言网站源码

    介绍: 超大气APP下载页双语多国语言网站源码,自带4国语言源码,界面高端大气适合做软件下载页面! 解压下载修改下载链接,上传根目录即可使用! 网盘下载地址: http://kekewl.cc/NaX ...

  5. 扫雷c语言完整源代码,C语言扫雷源码

    C语言扫雷源码 #include #include #include #include /*鼠标信息宏定义*/ #define WAITING 0xff00 #define LEFTPRESS 0xf ...

  6. C语言项目源码,C语言源码大全

    一.C语言源码大全 C语言是一门通用计算机编程语言,广泛应用于底层开发.C语言的设计目标是提供一种能以简易的方式编译.处理低级存储器.产生少量的机器码以及不需要任何运行环境支持便能运行的编程语言.尽管 ...

  7. 易语言mysql修改语句_易语言-MySQL-update源码例程

    易语言中MySQL的update可以用两种方式来对字段进行更新,下面就这两种方式贴出源码.涉及的数据库在文章易语言-MySQL-select源码例程..版本 2 .支持库 MySQL .程序集 窗口程 ...

  8. 【栖梧-源码-spring】@Bean从解析到注册到beanDefinitionMap

    [栖梧-源码-spring]@Bean从解析到注册到beanDefinitionMap 序幕 源码阅读技巧 本文说明 类 ConfigurationClassParser#doProcessConfi ...

  9. C语言MD5源码及实例

    C语言 MD5源码 md5c.h: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 /* POINTER defines ...

最新文章

  1. s-seq 生成序列化数字
  2. 手握173篇论文的学术新星被指造假!后续:博士论文被召回
  3. 【JS基础】Array数组的创建与操作方法
  4. HMM:Hidden Markov Model 代码讲解
  5. java之正则表达式
  6. 广度优先遍历_LeetCode | 广度优先遍历
  7. 【操作系统】分页内存管理
  8. (分治)7617:输出前k大的数
  9. 严加监管是否能解决超载问题
  10. 51单片机——红外解码
  11. 大学一年级计算机组成语结构试题,一年级下册期末复习习题
  12. TextBox显示提示信息
  13. 记2011 IOCCC Best one liner
  14. PCD与STL格式及其内涵
  15. 论文笔记-LIO-SAM: Tightly-coupled Lidar Inertial Odometry via Smoothing and Mapping
  16. 使用简单好用的WiFi摄像头,远程监控不再难
  17. 如何查看服务器登录日志文件,服务器登录日志查看
  18. 健康数据的获取 Iphone
  19. 快速排序随机选取主元的重要性
  20. 在腾讯实习一年,我学到了什么

热门文章

  1. 【Flink】Flink 1.11深度解析 【视频笔记】
  2. Java多线程学习十一:你知道哪几种锁?各有什么特点
  3. Window上修改了mysql的配置文件my.ini后重启服务报错:本地计算机上的MySQL服务启动后停止。某些服务在未由其他服务或程序使用时将自动停止
  4. 虚拟主机 webdav php,ubuntu 搭建 webdav 文件服务器 及客户端配置 详解
  5. java parseint(12.0),Java中parseInt()和valueOf(),toString()的区别
  6. JAVA继承类phone_【Java基础】类-----继承
  7. vue 组件 全局组件和局部组件component
  8. 2018-2019-2 20175230 实验三《Java面向对象程序设计》实验报告
  9. .Net Core项目 Encoding不全问题
  10. oracle之三手工不完全恢复