文章目录

  • Kafka API # Consumer ACK
  • Spring Kafka # Consumer ACK
  • Spring Kafka Transaction
  • KafkaMessageListenerContainer和ListenerConsumer,KafkaListenerContainerFactory
    • 给KafkaMessageListenerContainer配置一个KafkaAwareTransactionManger的实现
    • 给KafkaMessageListenerContainer配置一个非KafkaAwareTransactionManger实现;
    • 给KafkaMessageListenerContainer配置一个ChainedKafkaTransactionManager
  • 单纯的Kafka Producer 事务
    • 给KafkaTemplate配置一个KafkaAwareTransactionManager
    • 给KafkaTemplate配置一个非KafkaAwareTransactionManager
      • Kafka Template 和 其他事务管理器同步
        • Kafka Producer开启事务的时间点
        • Kafka Producer提交事务的时间点
    • 不管给KafkaTemplate配置的啥TransactionManager
  • 参考
  1. plain kafka client api transaction个三种情况(1. comsumer ack ,2 comsumer + producer commit,3 comsumer +producer@send 一块commit)
    comsumer ack模式适用于poll and simple processor
  2. KafkaTransactionManager详解,模板化了上述的哪些操作
  3. KafkaMessageListenerContainer和ListenerConsumer,KafkaListenerContainerFactory
  4. @KafkaListener注解的处理过程
  5. @Transactional + producer详解 Spring-Kafka(五)—— 使用Kafka事务的两种方式
  6. KafkaTemplate + 事务详解,KafkaTemplate只是用来发送消息的
  7. KafkaTransactionManager 和 DataSourceTransactionmanager协同

Kafka API # Consumer ACK

Spring Kafka # Consumer ACK

@KafkaListener(id = "cat", topics = "myTopic",containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {...ack.acknowledge();
}

Spring Kafka Transaction

kafka的事务牵扯到kafka的读写消息;
spring kafka 使用KafkaMessageListenerContainer封装读kafka,使用KafkaTemplate封装写kafka;
事务相关的实现也是围绕着KafkaMessageListenerContainer和KafkaTemplate展开的;

KafkaMessageListenerContainer和ListenerConsumer,KafkaListenerContainerFactory

给KafkaMessageListenerContainer配置一个KafkaAwareTransactionManger的实现

如果给KafkaMessageListenerContainer配置了一个KafkaAwareTransactionManger的实现,KafkaMessageListenerContainer就会初始化一个TransactionTemplate对象,通过将KafkaMessageListenerContainer持有的MessageListener或者BatchMessageListener的实现的调用方法TransactionTemplate#execute方法中,实现在listener调用前后执行事务的开启,提交和回滚操作;

如果我们在listener中使用KafkaTemplate#send方法发送数据,则这些调用会自动加入到调用listener之前开启的事务中;

listner中的操作结束执行完成之后,,在调用KafkaAwareTransactionManger的commit提交事务之前,KafkaMessageListenerContainer会调用Producer#sendOffsetsToTransaction()方法,将offset的提交也加入到这个事务中;

private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList,@SuppressWarnings(RAW_TYPES) Producer producer) throws InterruptedException {if (this.wantsFullRecords) {this.batchListener.onMessage(records,this.isAnyManualAck? new ConsumerBatchAcknowledgment(records): null, this.consumer);}else {doInvokeBatchOnMessage(records, recordList);}if (!this.isAnyManualAck && !this.autoCommit) {for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {this.acks.put(record);}if (producer != null) {sendOffsetsToTransaction(producer);}}
}private void invokeOnMessage(final ConsumerRecord<K, V> record,@SuppressWarnings(RAWTYPES) @Nullable Producer producer) {if (record.value() instanceof DeserializationException) {throw (DeserializationException) record.value();}if (record.key() instanceof DeserializationException) {throw (DeserializationException) record.key();}if (record.value() == null && this.checkNullValueForExceptions) {checkDeser(record, ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER);}if (record.key() == null && this.checkNullKeyForExceptions) {checkDeser(record, ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER);}doInvokeOnMessage(record);ackCurrent(record, producer);
}public void ackCurrent(final ConsumerRecord<K, V> record, @SuppressWarnings(RAW_TYPES)@Nullable Producer producer) {if (this.isRecordAck) {Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));if (producer == null) {this.commitLogger.log(() -> "Committing: " + offsetsToCommit);if (this.containerProperties.isSyncCommits()) {this.consumer.commitSync(offsetsToCommit);}else {this.consumer.commitAsync(offsetsToCommit, this.commitCallback);}}else {this.acks.add(record);}}else if (!this.isAnyManualAck && !this.autoCommit) {this.acks.add(record);}if (producer != null) {try {sendOffsetsToTransaction(producer);}catch (Exception e) {this.logger.error("Send offsets to transaction failed", e);}}
}@SuppressWarnings({ "unchecked", RAW_TYPES })
private void sendOffsetsToTransaction(Producer producer) {handleAcks();Map<TopicPartition, OffsetAndMetadata> commits = buildCommits();this.commitLogger.log(() -> "Sending offsets to transaction: " + commits);producer.sendOffsetsToTransaction(commits, this.consumerGroupId);
}

给KafkaMessageListenerContainer配置一个非KafkaAwareTransactionManger实现;

这种配置方式提供了一个Kafka Transaction与其他的事务同步的能力;

这里以DataSourceTransactionManager为例,如果给KafkaMessageListenerContainer配置了一个DataSourceTransactionManager,那么KafkaMessageListenerContainer在调用其持有的listener之前,就会在TransactionTempalte的能力下通过DataSourceTransactionmanager开启一个JDBC事务。listener调用完成之后,就会根据情况调用DataSourceTransactionmanager的提交或者回滚;
那么poll的ack如何返回给kafka server呢;

@Bean
KafkaMessageListenerContainer container(ConsumerFactory<String, String> cf,final KafkaTemplate template) {ContainerProperties props = new ContainerProperties("foo");props.setGroupId("group");props.setTransactionManager(new SomeOtherTransactionManager());...props.setMessageListener((MessageListener<String, String>) m -> {template.send("foo", "bar");template.send("baz", "qux");template.sendOffsetsToTransaction(//这里是重点,自己发送Offsets给kafka serverCollections.singletonMap(new TopicPartition(m.topic(), m.partition()),new OffsetAndMetadata(m.offset() + 1)));});return new KafkaMessageListenerContainer<>(cf, props);
}

这样配置一个持有DataSourceTransactionmanager对象的KafkaMessageListenerContainer<K, V>,就可以实现如下的模式。从数据的读取,然后操作数据库,数据库处理成功后,offset才会提交给kafka server;如果数据库操作失败,则offset不回提交给kafka server,在超过指定次数之前,还能再次拉取到此次的数据;

@KafkaListener(id = "cat", topics = "myTopic",containerFactory = "kafkaWithDataSourceTransactionmanagerListenerContainerFactory")
public void listen(String data) {// insert data into database
}

给KafkaMessageListenerContainer配置一个ChainedKafkaTransactionManager

上一种方式只能让kafka的事务与另外的一个事务进行同步,ChainedKafkaTransactionManager提供了一个kafka事务与任意多个其他非Kafka事务进行同步的能力;

如果既想控制Kafka的事务,又想于其他事务管理器同步的话,使用ChainedKafkaTransactionManager是个更时髦的选择;

ChainedKafkaTransactionManager实现了KafkaAwareTransactionManager接口,并继承自ChainedTransactionManager;

实现了KafkaAwareTransactionManager接口,意味着ChainedKafkaTransactionManager可以对外提供ProducerFactory的引用;

也就是说将ChainedKafkaTransactionManager配置给KafkaMessageListenerContainer,KafkaMessageListenerContainer就拥有了第一种模式所描述的那样,container来发送offsets,而不用开发者在listener的实现里发送offsets了;

将ChainedKafkaTransactionManager配置到ContainerProperties来构建一个KafkaMessageListenerContainer就能实现这样的能力;

单纯的Kafka Producer 事务

给KafkaTemplate配置一个KafkaAwareTransactionManager

给KafkaTemplate配置一个非KafkaAwareTransactionManager

Kafka Template 和 其他事务管理器同步

这里以DataSourceTransactionmanager为例;
TransactionSynchronization

Kafka API加入事务的概念是,获取之前操作使用的已经调用了initTransaction和beginTransaction方法的Producer对象,直接调用对应的操作,就叫将当前操作加入到这个Producer所在的事务中;

如果KafkaTemplate持有的ProducerFactory<K, V>接口的实现支持事务,即ProducerFactory#transactionCapable返回true;

KafkaTemplate是用来发送数据的模板,Kafka的发送数据相关的api支持事务操作;

只要构建KafkaTemplate时传入的producerFactory.transactionCapable()是支持事务的,那么KafkaTemplate就是支持事务的;KafkaTemplate在使用producerFactory获取一个Producer时,就会同时beginTranaction();

Kafka Producer开启事务的时间点

KafkaTemplate持有一个producers字段,其定义如下:

private final ThreadLocal<Producer<K, V>> producers = new ThreadLocal<>();

使用KafkaTemplate的一系列send方法发送数据时(详细信息请查看KafkaTemplate#doSend方法),首先通过KafkaTemplate#getTheProducer方法获取一个Producer接口的实现,在getTheProducer方法中,首先尝试从producers获取当前线程的已经创建好的Producer,如果没有获取到,则会创建一个,并放到KafkaTemplate#producers中,重点就是这个创建的逻辑:

 public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(final ProducerFactory<K, V> producerFactory) {Assert.notNull(producerFactory, "ProducerFactory must not be null");@SuppressWarnings("unchecked")KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager.getResource(producerFactory);if (resourceHolder == null) {Producer<K, V> producer = producerFactory.createProducer();try {producer.beginTransaction();}catch (RuntimeException e) {producer.close();throw e;}resourceHolder = new KafkaResourceHolder<K, V>(producer);bindResourceToTransaction(resourceHolder, producerFactory);}return resourceHolder;}private static <K, V> void bindResourceToTransaction(KafkaResourceHolder<K, V> resourceHolder,ProducerFactory<K, V> producerFactory) {TransactionSynchronizationManager.bindResource(producerFactory, resourceHolder);resourceHolder.setSynchronizedWithTransaction(true);if (TransactionSynchronizationManager.isSynchronizationActive()) {TransactionSynchronizationManager.registerSynchronization(new KafkaResourceSynchronization<K, V>(resourceHolder, producerFactory));}}

可以看到getTransactionalResourceHolder方法中,创建了一个Producer并且调用了producer.beginTransaction()开启事务,也就是说如果我们有如下的代码,KafkaTemplate配置支持事务:

kafkaTemplate.sendDefault("foo")
kafkaTemplate.sendDefault("bar")

发送foo字符串时,KafkaTemplate首先获取一个Producer对象,并开启事务,发送bar字符串时,KafkaTemplate通过ProducerFactoryUtils#getTransactionalResourceHolder方法,调用TransactionSynchronizationManager获取绑定到当前事务的Producer对象,直接返回;也就是说,当发送bar字符串的时候,就自动加入到了发送foo字符串时开启的事务中;

综上所述,Kafka Producer开启事务的时间点为当前事务所在线程的Producer第一次被创建的时候。

Kafka Producer提交事务的时间点

如果给KafkaTemplate配置了其他支持同步的事务管理器,比如DataSourceTransactionManager,而不是KafkaAwareTransactionManager的实现,KafkaTemplate#getTheProducer方法里的逻辑通过间接调用ProducerFactoryUtils#bindResourceToTransaction方法,会创建一个KafkaResourceSynchronization<K, V>对象,绑定到Producer当前所操作的事务;

KafkaResourceSynchronization是TransactionSynchronization接口的实现,相关内容请查看以往博客;

private static final class KafkaResourceSynchronization<K, V> extendsResourceHolderSynchronization<KafkaResourceHolder<K, V>, Object> {private final KafkaResourceHolder<K, V> resourceHolder;KafkaResourceSynchronization(KafkaResourceHolder<K, V> resourceHolder, Object resourceKey) {super(resourceHolder, resourceKey);this.resourceHolder = resourceHolder;}@Overrideprotected boolean shouldReleaseBeforeCompletion() {return false;}@Overridepublic void afterCompletion(int status) {try {if (status == TransactionSynchronization.STATUS_COMMITTED) {this.resourceHolder.commit();}else {this.resourceHolder.rollback();}}finally {super.afterCompletion(status);}}@Overrideprotected void releaseResource(KafkaResourceHolder<K, V> resourceHolder, Object resourceKey) {ProducerFactoryUtils.releaseResources(resourceHolder);}}

可以看到其在afterCompletion回调方法中会判断绑定给KafkaTemplate的非KafkaAwareTransactionManager的事务管理器,比如DataSourceTransactionManager,在commit之后的状态,是成功提交了,TransactionSynchronization.STATUS_COMMITTED,还是TransactionSynchronization.STATUS_ROLLED_BACK了,还是未知状态TransactionSynchronization.STATUS_UNKNOWN;这里以DataSourceTransactionManager为例,如果DataSourceTransactionManager提交了事务,并且成功了,KafkaResourceSynchronization里的逻辑就会间接调用Kafka Producer#commitTransaction(),否则就会间接调用Kafka Producer.abortTransaction()中断事务,最后调用super.afterCompletion(status),释放当前线程中之前的逻辑绑定的各个资源;

不管给KafkaTemplate配置的啥TransactionManager

KafkaTemplate.executeInTransaction()不使用事务管理器,而是直接调用了Kafka Producer的事务API;

@Override
public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {Assert.notNull(callback, "'callback' cannot be null");Assert.state(this.transactional, "Producer factory does not support transactions");Producer<K, V> producer = this.producers.get();// 在此可以看到,executeInTransaction只能单独调用,不能参与的其他的事务中,但是其他的事务操作可以参与到executeInTransaction已经开启的事务中,比如KafkaTempalte.send系列方法Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed");String transactionIdSuffix;if (this.producerFactory.isProducerPerConsumerPartition()) {transactionIdSuffix = TransactionSupport.getTransactionIdSuffix();TransactionSupport.clearTransactionIdSuffix();}else {transactionIdSuffix = null;}producer = this.producerFactory.createProducer();try {producer.beginTransaction();}catch (Exception e) {closeProducer(producer, false);throw e;}this.producers.set(producer);try {T result = callback.doInOperations(this);try {producer.commitTransaction();}catch (Exception e) {throw new SkipAbortException(e);}return result;}catch (SkipAbortException e) { // NOSONAR - exception flow controlthrow ((RuntimeException) e.getCause()); // NOSONAR - lost stack trace}catch (Exception e) {producer.abortTransaction();throw e;}finally {if (transactionIdSuffix != null) {TransactionSupport.setTransactionIdSuffix(transactionIdSuffix);}this.producers.remove();closeProducer(producer, false);}
}
boolean result = template.executeInTransaction(t -> {t.sendDefault("thing1", "thing2");t.sendDefault("cat", "hat");return true;
});

sendDefault有能力开启事务,但是也可以参与到executeInTransaction开启的事务中;

t.sendDefault("thing1", "thing2");
boolean result = template.executeInTransaction(t -> {t.sendDefault("cat", "hat");return true;
});

如果是上面这种形式,那就不行了, t.sendDefault(“thing1”, “thing2”);已经在当前线程中创建了Producer,就调用beginTransaction 开启了事务,再执行executeInTransaction就要报错了;

参考

Kafka client 消息接收的三种模式

Spring Kafka Transaction相关推荐

  1. 自定义spring kafka consumer 线程池

    序 本文讲述一下如何自定义spring kafka的consumer线程池 KafkaMessageListenerContainer spring-kafka-1.2.3.RELEASE-sourc ...

  2. Spring Kafka生产者/消费者样本

    我的目的是演示Spring Kafka如何为原始Kafka Producer和Consumer API提供一种易于使用且对具有Spring背景的人熟悉的抽象. 示例场景 示例场景是一个简单的场景,我有 ...

  3. Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例

    目录 1. 单记录消费listener.type=single 1.1 单记录消费 - 自动确认 1.2 单记录消费 - 手动确认 2. 批量消费listener.type=batch 2.1 批量消 ...

  4. spring事务(Transaction)的七种事务传播行为及五种隔离级别

    1. 首先,说说什么事务(Transaction) 事务,就是一组操作数据库的动作集合.事务是现代数据库理论中的核心概念之一. 如果一组处理步骤或者全部发生或者一步也不执行,我们称该组处理步骤为一个事 ...

  5. Spring事务 Transaction rolled back because it has been marked as rollback-only

    前言 使用Spring事务时,出现异常:org.springframework.transaction.UnexpectedRollbackException: Transaction rolled ...

  6. Spring Boot Transaction 源码解析(二)

    目录 DataSourceTransactionManager doGetTransaction isExistingTransaction doBegin doCommit doSuspend do ...

  7. Spring Boot Transaction 源码解析(一)

    目录 PlatformTransactionManager TransactionStatus DefaultTransactionStatus AbstractPlatformTransaction ...

  8. Spring : Spring kafka 入门Demo

    1.美图 2.概述 kafka直通车请看我的kafka专栏 项目结构 配置类 package com.spring.boot.kafka.demo.config;import org

  9. kafka Transaction coordinator

    本文转发自技术世界,原文链接 Kafka设计解析(八)- Exactly Once语义与事务机制原理 本文所有Kafka原理性的描述除特殊说明外均基于Kafka 1.0.0版本. 为什么要提供事务机制 ...

  10. Spring Kafka的Offset提交时机

    1.前提 项目中使用了spring-kafka1.3版本,也用了2.5版本.但是对于offset的提交时机是模糊的,这次通过源码分析和资料进一步明确. 2.认识KafkaConsumer的偏移量 Ka ...

最新文章

  1. (C语言)一种简易记法:生成[a,b]范围内的随机整数
  2. CVPR2021 | 基于transformer的视频实例分割网络VisTR
  3. NBT:扩增子测序革命—用16S及18S rRNA全长进行微生物多样性研究
  4. WINCE下实现USB转RS232
  5. AT3957-[AGC023F]01 on Tree【贪心,堆】
  6. 车联网领域,传统TSP企业做错了什么 ?
  7. [区块链] 拜占庭将军问题 [BFT]
  8. 两个有序数组求中位数log(m+n)复杂度
  9. 二维数组的空间复杂度_剑指 offer 面试题精选图解 04 . 二维数组中的查找
  10. ARCHICAD 25 for Mac(cad绘图软件)
  11. 一款好看的pycharm主题Atom One Dark
  12. 股票历史数据-股票前复权数据下载
  13. Saving Tang Monk II(bfs+优先队列)
  14. 计算机应用基础在线题库,计算机应用基础练习试题库完整.doc
  15. 如何免费创建三级域名?
  16. i春秋——“百度杯”CTF比赛 十月场——Vld(Vulcan Logic Dumper 、php opcode、sql 报错注入)...
  17. 设计模式六大原则——单一职责原则(SRP)
  18. 《软技能-代码之外的生存能力》第四篇——生产力
  19. 清除 柯美367打印机 转印辊组件、碳粉过滤器和臭氧过滤器报警
  20. jdk7和8的一些新特性介绍

热门文章

  1. 网易云课堂-缓存介绍
  2. 什么叫工作波长,截止波长和波导波长
  3. (实测)天猫商城抢购茅台脚本
  4. 网传固态硬盘因为TRIM指令一经删除无法恢复是以讹传讹
  5. Skype 登陆地址或登陆凭据有问题的解决方法
  6. 2020,网络安全领域有什么新趋势
  7. SVG排版教程 | SVG排版入门基础知识汇总
  8. LaTeX排版学习资源汇总
  9. mfc9340扫描到文件服务器,兄弟MFC7340打印机怎么扫描文件?
  10. 浏览器渲染原理及web前端分析