Spring Kafka Transaction
文章目录
- Kafka API # Consumer ACK
- Spring Kafka # Consumer ACK
- Spring Kafka Transaction
- KafkaMessageListenerContainer和ListenerConsumer,KafkaListenerContainerFactory
- 给KafkaMessageListenerContainer配置一个KafkaAwareTransactionManger的实现
- 给KafkaMessageListenerContainer配置一个非KafkaAwareTransactionManger实现;
- 给KafkaMessageListenerContainer配置一个ChainedKafkaTransactionManager
- 单纯的Kafka Producer 事务
- 给KafkaTemplate配置一个KafkaAwareTransactionManager
- 给KafkaTemplate配置一个非KafkaAwareTransactionManager
- Kafka Template 和 其他事务管理器同步
- Kafka Producer开启事务的时间点
- Kafka Producer提交事务的时间点
- 不管给KafkaTemplate配置的啥TransactionManager
- 参考
- plain kafka client api transaction个三种情况(1. comsumer ack ,2 comsumer + producer commit,3 comsumer +producer@send 一块commit)
comsumer ack模式适用于poll and simple processor - KafkaTransactionManager详解,模板化了上述的哪些操作
- KafkaMessageListenerContainer和ListenerConsumer,KafkaListenerContainerFactory
- @KafkaListener注解的处理过程
- @Transactional + producer详解 Spring-Kafka(五)—— 使用Kafka事务的两种方式
- KafkaTemplate + 事务详解,KafkaTemplate只是用来发送消息的
- KafkaTransactionManager 和 DataSourceTransactionmanager协同
Kafka API # Consumer ACK
Spring Kafka # Consumer ACK
@KafkaListener(id = "cat", topics = "myTopic",containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {...ack.acknowledge();
}
Spring Kafka Transaction
kafka的事务牵扯到kafka的读写消息;
spring kafka 使用KafkaMessageListenerContainer封装读kafka,使用KafkaTemplate封装写kafka;
事务相关的实现也是围绕着KafkaMessageListenerContainer和KafkaTemplate展开的;
KafkaMessageListenerContainer和ListenerConsumer,KafkaListenerContainerFactory
给KafkaMessageListenerContainer配置一个KafkaAwareTransactionManger的实现
如果给KafkaMessageListenerContainer配置了一个KafkaAwareTransactionManger的实现,KafkaMessageListenerContainer就会初始化一个TransactionTemplate对象,通过将KafkaMessageListenerContainer持有的MessageListener或者BatchMessageListener的实现的调用方法TransactionTemplate#execute方法中,实现在listener调用前后执行事务的开启,提交和回滚操作;
如果我们在listener中使用KafkaTemplate#send方法发送数据,则这些调用会自动加入到调用listener之前开启的事务中;
listner中的操作结束执行完成之后,,在调用KafkaAwareTransactionManger的commit提交事务之前,KafkaMessageListenerContainer会调用Producer#sendOffsetsToTransaction()方法,将offset的提交也加入到这个事务中;
private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList,@SuppressWarnings(RAW_TYPES) Producer producer) throws InterruptedException {if (this.wantsFullRecords) {this.batchListener.onMessage(records,this.isAnyManualAck? new ConsumerBatchAcknowledgment(records): null, this.consumer);}else {doInvokeBatchOnMessage(records, recordList);}if (!this.isAnyManualAck && !this.autoCommit) {for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {this.acks.put(record);}if (producer != null) {sendOffsetsToTransaction(producer);}}
}private void invokeOnMessage(final ConsumerRecord<K, V> record,@SuppressWarnings(RAWTYPES) @Nullable Producer producer) {if (record.value() instanceof DeserializationException) {throw (DeserializationException) record.value();}if (record.key() instanceof DeserializationException) {throw (DeserializationException) record.key();}if (record.value() == null && this.checkNullValueForExceptions) {checkDeser(record, ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER);}if (record.key() == null && this.checkNullKeyForExceptions) {checkDeser(record, ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER);}doInvokeOnMessage(record);ackCurrent(record, producer);
}public void ackCurrent(final ConsumerRecord<K, V> record, @SuppressWarnings(RAW_TYPES)@Nullable Producer producer) {if (this.isRecordAck) {Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));if (producer == null) {this.commitLogger.log(() -> "Committing: " + offsetsToCommit);if (this.containerProperties.isSyncCommits()) {this.consumer.commitSync(offsetsToCommit);}else {this.consumer.commitAsync(offsetsToCommit, this.commitCallback);}}else {this.acks.add(record);}}else if (!this.isAnyManualAck && !this.autoCommit) {this.acks.add(record);}if (producer != null) {try {sendOffsetsToTransaction(producer);}catch (Exception e) {this.logger.error("Send offsets to transaction failed", e);}}
}@SuppressWarnings({ "unchecked", RAW_TYPES })
private void sendOffsetsToTransaction(Producer producer) {handleAcks();Map<TopicPartition, OffsetAndMetadata> commits = buildCommits();this.commitLogger.log(() -> "Sending offsets to transaction: " + commits);producer.sendOffsetsToTransaction(commits, this.consumerGroupId);
}
给KafkaMessageListenerContainer配置一个非KafkaAwareTransactionManger实现;
这种配置方式提供了一个Kafka Transaction与其他的事务同步的能力;
这里以DataSourceTransactionManager为例,如果给KafkaMessageListenerContainer配置了一个DataSourceTransactionManager,那么KafkaMessageListenerContainer在调用其持有的listener之前,就会在TransactionTempalte的能力下通过DataSourceTransactionmanager开启一个JDBC事务。listener调用完成之后,就会根据情况调用DataSourceTransactionmanager的提交或者回滚;
那么poll的ack如何返回给kafka server呢;
@Bean
KafkaMessageListenerContainer container(ConsumerFactory<String, String> cf,final KafkaTemplate template) {ContainerProperties props = new ContainerProperties("foo");props.setGroupId("group");props.setTransactionManager(new SomeOtherTransactionManager());...props.setMessageListener((MessageListener<String, String>) m -> {template.send("foo", "bar");template.send("baz", "qux");template.sendOffsetsToTransaction(//这里是重点,自己发送Offsets给kafka serverCollections.singletonMap(new TopicPartition(m.topic(), m.partition()),new OffsetAndMetadata(m.offset() + 1)));});return new KafkaMessageListenerContainer<>(cf, props);
}
这样配置一个持有DataSourceTransactionmanager对象的KafkaMessageListenerContainer<K, V>,就可以实现如下的模式。从数据的读取,然后操作数据库,数据库处理成功后,offset才会提交给kafka server;如果数据库操作失败,则offset不回提交给kafka server,在超过指定次数之前,还能再次拉取到此次的数据;
@KafkaListener(id = "cat", topics = "myTopic",containerFactory = "kafkaWithDataSourceTransactionmanagerListenerContainerFactory")
public void listen(String data) {// insert data into database
}
给KafkaMessageListenerContainer配置一个ChainedKafkaTransactionManager
上一种方式只能让kafka的事务与另外的一个事务进行同步,ChainedKafkaTransactionManager提供了一个kafka事务与任意多个其他非Kafka事务进行同步的能力;
如果既想控制Kafka的事务,又想于其他事务管理器同步的话,使用ChainedKafkaTransactionManager是个更时髦的选择;
ChainedKafkaTransactionManager实现了KafkaAwareTransactionManager接口,并继承自ChainedTransactionManager;
实现了KafkaAwareTransactionManager接口,意味着ChainedKafkaTransactionManager可以对外提供ProducerFactory的引用;
也就是说将ChainedKafkaTransactionManager配置给KafkaMessageListenerContainer,KafkaMessageListenerContainer就拥有了第一种模式所描述的那样,container来发送offsets,而不用开发者在listener的实现里发送offsets了;
将ChainedKafkaTransactionManager配置到ContainerProperties来构建一个KafkaMessageListenerContainer就能实现这样的能力;
单纯的Kafka Producer 事务
给KafkaTemplate配置一个KafkaAwareTransactionManager
给KafkaTemplate配置一个非KafkaAwareTransactionManager
Kafka Template 和 其他事务管理器同步
这里以DataSourceTransactionmanager为例;
TransactionSynchronization
Kafka API加入事务的概念是,获取之前操作使用的已经调用了initTransaction和beginTransaction方法的Producer对象,直接调用对应的操作,就叫将当前操作加入到这个Producer所在的事务中;
如果KafkaTemplate持有的ProducerFactory<K, V>接口的实现支持事务,即ProducerFactory#transactionCapable返回true;
KafkaTemplate是用来发送数据的模板,Kafka的发送数据相关的api支持事务操作;
只要构建KafkaTemplate时传入的producerFactory.transactionCapable()是支持事务的,那么KafkaTemplate就是支持事务的;KafkaTemplate在使用producerFactory获取一个Producer时,就会同时beginTranaction();
Kafka Producer开启事务的时间点
KafkaTemplate持有一个producers字段,其定义如下:
private final ThreadLocal<Producer<K, V>> producers = new ThreadLocal<>();
使用KafkaTemplate的一系列send方法发送数据时(详细信息请查看KafkaTemplate#doSend方法),首先通过KafkaTemplate#getTheProducer方法获取一个Producer接口的实现,在getTheProducer方法中,首先尝试从producers获取当前线程的已经创建好的Producer,如果没有获取到,则会创建一个,并放到KafkaTemplate#producers中,重点就是这个创建的逻辑:
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(final ProducerFactory<K, V> producerFactory) {Assert.notNull(producerFactory, "ProducerFactory must not be null");@SuppressWarnings("unchecked")KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager.getResource(producerFactory);if (resourceHolder == null) {Producer<K, V> producer = producerFactory.createProducer();try {producer.beginTransaction();}catch (RuntimeException e) {producer.close();throw e;}resourceHolder = new KafkaResourceHolder<K, V>(producer);bindResourceToTransaction(resourceHolder, producerFactory);}return resourceHolder;}private static <K, V> void bindResourceToTransaction(KafkaResourceHolder<K, V> resourceHolder,ProducerFactory<K, V> producerFactory) {TransactionSynchronizationManager.bindResource(producerFactory, resourceHolder);resourceHolder.setSynchronizedWithTransaction(true);if (TransactionSynchronizationManager.isSynchronizationActive()) {TransactionSynchronizationManager.registerSynchronization(new KafkaResourceSynchronization<K, V>(resourceHolder, producerFactory));}}
可以看到getTransactionalResourceHolder方法中,创建了一个Producer并且调用了producer.beginTransaction()开启事务,也就是说如果我们有如下的代码,KafkaTemplate配置支持事务:
kafkaTemplate.sendDefault("foo")
kafkaTemplate.sendDefault("bar")
发送foo字符串时,KafkaTemplate首先获取一个Producer对象,并开启事务,发送bar字符串时,KafkaTemplate通过ProducerFactoryUtils#getTransactionalResourceHolder方法,调用TransactionSynchronizationManager获取绑定到当前事务的Producer对象,直接返回;也就是说,当发送bar字符串的时候,就自动加入到了发送foo字符串时开启的事务中;
综上所述,Kafka Producer开启事务的时间点为当前事务所在线程的Producer第一次被创建的时候。
Kafka Producer提交事务的时间点
如果给KafkaTemplate配置了其他支持同步的事务管理器,比如DataSourceTransactionManager,而不是KafkaAwareTransactionManager的实现,KafkaTemplate#getTheProducer方法里的逻辑通过间接调用ProducerFactoryUtils#bindResourceToTransaction方法,会创建一个KafkaResourceSynchronization<K, V>对象,绑定到Producer当前所操作的事务;
KafkaResourceSynchronization是TransactionSynchronization接口的实现,相关内容请查看以往博客;
private static final class KafkaResourceSynchronization<K, V> extendsResourceHolderSynchronization<KafkaResourceHolder<K, V>, Object> {private final KafkaResourceHolder<K, V> resourceHolder;KafkaResourceSynchronization(KafkaResourceHolder<K, V> resourceHolder, Object resourceKey) {super(resourceHolder, resourceKey);this.resourceHolder = resourceHolder;}@Overrideprotected boolean shouldReleaseBeforeCompletion() {return false;}@Overridepublic void afterCompletion(int status) {try {if (status == TransactionSynchronization.STATUS_COMMITTED) {this.resourceHolder.commit();}else {this.resourceHolder.rollback();}}finally {super.afterCompletion(status);}}@Overrideprotected void releaseResource(KafkaResourceHolder<K, V> resourceHolder, Object resourceKey) {ProducerFactoryUtils.releaseResources(resourceHolder);}}
可以看到其在afterCompletion回调方法中会判断绑定给KafkaTemplate的非KafkaAwareTransactionManager的事务管理器,比如DataSourceTransactionManager,在commit之后的状态,是成功提交了,TransactionSynchronization.STATUS_COMMITTED,还是TransactionSynchronization.STATUS_ROLLED_BACK了,还是未知状态TransactionSynchronization.STATUS_UNKNOWN;这里以DataSourceTransactionManager为例,如果DataSourceTransactionManager提交了事务,并且成功了,KafkaResourceSynchronization里的逻辑就会间接调用Kafka Producer#commitTransaction(),否则就会间接调用Kafka Producer.abortTransaction()中断事务,最后调用super.afterCompletion(status),释放当前线程中之前的逻辑绑定的各个资源;
不管给KafkaTemplate配置的啥TransactionManager
KafkaTemplate.executeInTransaction()不使用事务管理器,而是直接调用了Kafka Producer的事务API;
@Override
public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {Assert.notNull(callback, "'callback' cannot be null");Assert.state(this.transactional, "Producer factory does not support transactions");Producer<K, V> producer = this.producers.get();// 在此可以看到,executeInTransaction只能单独调用,不能参与的其他的事务中,但是其他的事务操作可以参与到executeInTransaction已经开启的事务中,比如KafkaTempalte.send系列方法Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed");String transactionIdSuffix;if (this.producerFactory.isProducerPerConsumerPartition()) {transactionIdSuffix = TransactionSupport.getTransactionIdSuffix();TransactionSupport.clearTransactionIdSuffix();}else {transactionIdSuffix = null;}producer = this.producerFactory.createProducer();try {producer.beginTransaction();}catch (Exception e) {closeProducer(producer, false);throw e;}this.producers.set(producer);try {T result = callback.doInOperations(this);try {producer.commitTransaction();}catch (Exception e) {throw new SkipAbortException(e);}return result;}catch (SkipAbortException e) { // NOSONAR - exception flow controlthrow ((RuntimeException) e.getCause()); // NOSONAR - lost stack trace}catch (Exception e) {producer.abortTransaction();throw e;}finally {if (transactionIdSuffix != null) {TransactionSupport.setTransactionIdSuffix(transactionIdSuffix);}this.producers.remove();closeProducer(producer, false);}
}
boolean result = template.executeInTransaction(t -> {t.sendDefault("thing1", "thing2");t.sendDefault("cat", "hat");return true;
});
sendDefault有能力开启事务,但是也可以参与到executeInTransaction开启的事务中;
t.sendDefault("thing1", "thing2");
boolean result = template.executeInTransaction(t -> {t.sendDefault("cat", "hat");return true;
});
如果是上面这种形式,那就不行了, t.sendDefault(“thing1”, “thing2”);已经在当前线程中创建了Producer,就调用beginTransaction 开启了事务,再执行executeInTransaction就要报错了;
参考
Kafka client 消息接收的三种模式
Spring Kafka Transaction相关推荐
- 自定义spring kafka consumer 线程池
序 本文讲述一下如何自定义spring kafka的consumer线程池 KafkaMessageListenerContainer spring-kafka-1.2.3.RELEASE-sourc ...
- Spring Kafka生产者/消费者样本
我的目的是演示Spring Kafka如何为原始Kafka Producer和Consumer API提供一种易于使用且对具有Spring背景的人熟悉的抽象. 示例场景 示例场景是一个简单的场景,我有 ...
- Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例
目录 1. 单记录消费listener.type=single 1.1 单记录消费 - 自动确认 1.2 单记录消费 - 手动确认 2. 批量消费listener.type=batch 2.1 批量消 ...
- spring事务(Transaction)的七种事务传播行为及五种隔离级别
1. 首先,说说什么事务(Transaction) 事务,就是一组操作数据库的动作集合.事务是现代数据库理论中的核心概念之一. 如果一组处理步骤或者全部发生或者一步也不执行,我们称该组处理步骤为一个事 ...
- Spring事务 Transaction rolled back because it has been marked as rollback-only
前言 使用Spring事务时,出现异常:org.springframework.transaction.UnexpectedRollbackException: Transaction rolled ...
- Spring Boot Transaction 源码解析(二)
目录 DataSourceTransactionManager doGetTransaction isExistingTransaction doBegin doCommit doSuspend do ...
- Spring Boot Transaction 源码解析(一)
目录 PlatformTransactionManager TransactionStatus DefaultTransactionStatus AbstractPlatformTransaction ...
- Spring : Spring kafka 入门Demo
1.美图 2.概述 kafka直通车请看我的kafka专栏 项目结构 配置类 package com.spring.boot.kafka.demo.config;import org
- kafka Transaction coordinator
本文转发自技术世界,原文链接 Kafka设计解析(八)- Exactly Once语义与事务机制原理 本文所有Kafka原理性的描述除特殊说明外均基于Kafka 1.0.0版本. 为什么要提供事务机制 ...
- Spring Kafka的Offset提交时机
1.前提 项目中使用了spring-kafka1.3版本,也用了2.5版本.但是对于offset的提交时机是模糊的,这次通过源码分析和资料进一步明确. 2.认识KafkaConsumer的偏移量 Ka ...
最新文章
- (C语言)一种简易记法:生成[a,b]范围内的随机整数
- CVPR2021 | 基于transformer的视频实例分割网络VisTR
- NBT:扩增子测序革命—用16S及18S rRNA全长进行微生物多样性研究
- WINCE下实现USB转RS232
- AT3957-[AGC023F]01 on Tree【贪心,堆】
- 车联网领域,传统TSP企业做错了什么 ?
- [区块链] 拜占庭将军问题 [BFT]
- 两个有序数组求中位数log(m+n)复杂度
- 二维数组的空间复杂度_剑指 offer 面试题精选图解 04 . 二维数组中的查找
- ARCHICAD 25 for Mac(cad绘图软件)
- 一款好看的pycharm主题Atom One Dark
- 股票历史数据-股票前复权数据下载
- Saving Tang Monk II(bfs+优先队列)
- 计算机应用基础在线题库,计算机应用基础练习试题库完整.doc
- 如何免费创建三级域名?
- i春秋——“百度杯”CTF比赛 十月场——Vld(Vulcan Logic Dumper 、php opcode、sql 报错注入)...
- 设计模式六大原则——单一职责原则(SRP)
- 《软技能-代码之外的生存能力》第四篇——生产力
- 清除 柯美367打印机 转印辊组件、碳粉过滤器和臭氧过滤器报警
- jdk7和8的一些新特性介绍