文章目录

  • 概述
  • 官方示例
  • Code (原生API)
  • Code (Spring Kafka)
    • POM依赖
    • 配置文件
    • 生产者
    • 注意事项
    • 消费者
    • 单元测试
    • 测试结果
  • 源码地址


概述

Kafka的事务不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败)

一般在kafka的流式计算场景用得多一点,比如,kafka需要对一个topic里的消息做不同的流式计算处理,处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如hbase,redis,es等),这种我们肯定希望系统发送到多个topic的数据保持事务一致性。

Kafka要实现类似Rocketmq的分布式事务需要额外开发功能。

官方文档: http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

这个功能比较鸡肋,大家看着用哈 ,它保证不了不同介质的数据一致性。


官方示例

From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer.

原生的API操作,请查看文档,这里我们来看下使用Spring kafka如何实现事务消息。


Code (原生API)

@Test public void testT(){ // 正常的 Properties props = new Properties();props.put("bootstrap.servers", "192.168.126.140:9092");props.put("transactional.id", "my-transactional-id");Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());producer.initTransactions();try {producer.beginTransaction();for (int i = 0; i < 100; i++)producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We can't recover from these exceptions, so our only option is to close the producer and exit.producer.close();} catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.producer.abortTransaction();}producer.close();}


    @Testpublic void testT2(){ // 测试异常情况  Properties props = new Properties();props.put("bootstrap.servers", "192.168.126.140:9092");props.put("transactional.id", "my-transactional-id");Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());producer.initTransactions();try {producer.beginTransaction();for (int i = 0; i < 100; i++){producer.send(new ProducerRecord<>("my-topic2", Integer.toString(i), Integer.toString(i)));if (i == 50) {throw new RuntimeException("MOCK Exception");}}producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We can't recover from these exceptions, so our only option is to close the producer and exit.producer.close();} catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.producer.abortTransaction();}producer.close();}

看看数据

可以看到 入了一部分,只是这里进入的数据也无法被消费。


Code (Spring Kafka)


POM依赖

 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 引入 Spring-Kafka 依赖 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies>

配置文件

 spring:# Kafka 配置项,对应 KafkaProperties 配置类kafka:bootstrap-servers: 192.168.126.140:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔# Kafka Producer 配置项producer:acks: all # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。retries: 3 # 发送失败时,重试发送的次数key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化transaction-id-prefix: artisan. # 事务编号前缀# Kafka Consumer 配置项consumer:auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerproperties:spring:json:trusted:packages: com.artisan.springkafka.domainisolation-level: read_committed # 读取已提交的消息# Kafka Consumer Listener 监听器配置listener:missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错logging:level:org:springframework:kafka: ERROR # spring-kafkaapache:kafka: ERROR # kafka

  • spring.kafka.producer.acks 配置为all,Kafka 的事务消息需要基于幂等性来实现,必须保证所有节点都写入成功,否则的话启动时会抛出Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence

  • 事务编号前缀属性设置 transaction-id-prefix, 需要保证相同应用配置相同,不同应用配置不同。 How to choose Kafka transaction id for several applications, hosted in Kubernetes?

  • spring.kafka.consumer.properties.isolation.level 设置为 read_committed ,Consumer 仅读取已提交的消息, 否则不生效


生产者

  package com.artisan.springkafka.producer;import com.artisan.springkafka.constants.TOPIC;
import com.artisan.springkafka.domain.MessageMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;import java.util.Random;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 22:25* @mark: show me the code , change the world*/@Component
public class ArtisanProducerMock {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate KafkaTemplate<Object,Object> kafkaTemplate ;public String testTransaction(Runnable runnable){return kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<Object, Object, String>() {@Overridepublic String doInOperations(KafkaOperations<Object, Object> operations)  throws  RuntimeException {for (int i = 1; i <= 10; i++) {// 用于测试  消息是否在同一个事务中if (i   == 7 ) {throw new RuntimeException("MOCK ERROR , TEST Tranasction");}// 模拟发送的消息Integer id = new Random().nextInt(100);MessageMock messageMock = new MessageMock(id,"messageSendByAsync-" + id);SendResult<Object, Object> sendResult = null;try {sendResult = operations.send(TOPIC.TOPIC, messageMock).get();}  catch ( Exception e) {logger.error("Error {}", e);}logger.info( i+ "-[doInOperations][发送数据:[{}] 发送结果:[{}]]", messageMock, sendResult);// 本地业务逻辑...runnable.run();}// 返回结果return "OJ8K";}});}}

我们模拟发送10条消息,第7条的时候抛出异常,观察消费者是否能消费前面已经发送的6条 ,如果能消费,那肯定不符合和预期。 因为Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败)。

调用 kafkaTemplate#executeInTransaction(OperationsCallback<K, V, T> callback) 模板方法,实现在 Kafka 事务中,执行自定义 KafkaOperations.OperationsCallback 操作。

  • executeInTransaction(...) 方法中,可以通过 KafkaOperations 来执行发送消息等 Kafka 相关的操作,当然了也可以执行自己的业务逻辑,比如 runnable参数,用于表示本地业务逻辑

  • executeInTransaction(...) 方法的开始,会自动动创建 Kafka 的事务,然后执行KafkaOperations 的逻辑。成功,则提交 Kafka 事务;失败,则回滚 Kafka 事务。


注意事项

如果 Kafka Producer 开启了事务的功能,则所有发送的消息,都必须处于 Kafka 事务之中,否则会抛出 No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

假设业务中,即存在需要事务的情况,也存在不需要事务的情况,那么则需要分别定义两个 KafkaTemplate(Kafka Producer)


消费者

 package com.artisan.springkafka.consumer;import com.artisan.springkafka.constants.TOPIC;
import com.artisan.springkafka.domain.MessageMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 22:33* @mark: show me the code , change the world*/@Component
public class ArtisanCosumerMock {private Logger logger = LoggerFactory.getLogger(getClass());private static final String CONSUMER_GROUP_PREFIX = "MANUAL_ACK_" ;@KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC)public void onMessage(MessageMock messageMock)  {logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock);}}

单元测试

  package com.artisan.springkafka.produceTest;import com.artisan.springkafka.SpringkafkaApplication;
import com.artisan.springkafka.producer.ArtisanProducerMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.support.SendResult;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** @author 小工匠*  * @version 1.0* @description: TODO* @date 2021/2/17 22:40* @mark: show me the code , change the world*/@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringkafkaApplication.class)
public class ProduceMockTest {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate ArtisanProducerMock artisanProducerMock;@AutowiredArtisanProducerMock producerMock;@Testpublic void testAsynSend() throws ExecutionException, InterruptedException {logger.info("开始发送");producerMock.testTransaction(() -> {logger.info(" mock doing bussiness ");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}logger.info("bussiness over  ");});// 阻塞等待,保证消费new CountDownLatch(1).await();}}

测试结果

....
....
.....   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v2.4.1)2021-02-20 01:35:44.452  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : Starting ProduceMockTest using Java 1.8.0_261 on LAPTOP-JF3RBRRJ with PID 12108 (started by artisan in D:\IdeaProjects\boot2\springkafkaTransaction)
2021-02-20 01:35:44.456  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : No active profile set, falling back to default profiles: default
2021-02-20 01:35:45.832  INFO 12108 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2021-02-20 01:35:46.811  INFO 12108 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2021-02-20 01:35:46.827  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : Started ProduceMockTest in 2.77 seconds (JVM running for 3.55)
2021-02-20 01:35:47.021  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : 开始发送
2021-02-20 01:35:47.298  INFO 12108 --- [           main] c.a.s.producer.ArtisanProducerMock       : 1-[doInOperations][发送数据:[MessageMock{id=2, name='messageSendByAsync-2'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=2, name='messageSendByAsync-2'}, timestamp=null), recordMetadata=AC-0@16]]]
2021-02-20 01:35:47.298  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 01:35:48.302  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 01:35:48.305  INFO 12108 --- [           main] c.a.s.producer.ArtisanProducerMock       : 2-[doInOperations][发送数据:[MessageMock{id=2, name='messageSendByAsync-2'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=2, name='messageSendByAsync-2'}, timestamp=null), recordMetadata=AC-0@17]]]
2021-02-20 01:35:48.305  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 01:35:49.308  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 01:35:49.308  INFO 12108 --- [           main] c.a.s.producer.ArtisanProducerMock       : 3-[doInOperations][发送数据:[MessageMock{id=36, name='messageSendByAsync-36'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=36, name='messageSendByAsync-36'}, timestamp=null), recordMetadata=AC-0@18]]]
2021-02-20 01:35:49.308  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 01:35:50.314  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 01:35:50.314  INFO 12108 --- [           main] c.a.s.producer.ArtisanProducerMock       : 4-[doInOperations][发送数据:[MessageMock{id=19, name='messageSendByAsync-19'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=19, name='messageSendByAsync-19'}, timestamp=null), recordMetadata=AC-0@19]]]
2021-02-20 01:35:50.318  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 01:35:51.321  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 01:35:51.321  INFO 12108 --- [           main] c.a.s.producer.ArtisanProducerMock       : 5-[doInOperations][发送数据:[MessageMock{id=29, name='messageSendByAsync-29'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=29, name='messageSendByAsync-29'}, timestamp=null), recordMetadata=AC-0@20]]]
2021-02-20 01:35:51.325  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 01:35:52.326  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 01:35:52.326  INFO 12108 --- [           main] c.a.s.producer.ArtisanProducerMock       : 6-[doInOperations][发送数据:[MessageMock{id=45, name='messageSendByAsync-45'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=45, name='messageSendByAsync-45'}, timestamp=null), recordMetadata=AC-0@21]]]
2021-02-20 01:35:52.326  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 01:35:53.326  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  java.lang.RuntimeException: MOCK ERROR , TEST Tranasctionat com.artisan.springkafka.producer.ArtisanProducerMock$1.doInOperations(ArtisanProducerMock.java:42)at com.artisan.springkafka.producer.ArtisanProducerMock$1.doInOperations(ArtisanProducerMock.java:34)at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:467)at com.artisan.springkafka.producer.ArtisanProducerMock.testTransaction(ArtisanProducerMock.java:34)at com.artisan.springkafka.produceTest.ProduceMockTest.testAsynSend(ProduceMockTest.java:45)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)at org.junit.runners.ParentRunner.run(ParentRunner.java:413)at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)at org.junit.runner.JUnitCore.run(JUnitCore.java:137)at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)2021-02-20 01:35:53.346  INFO 12108 --- [ntainer#0-0-C-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService
2021-02-20 01:35:53.357  INFO 12108 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'Process finished with exit code -1

可以看到,有异常了,消费者未的消费到消息。

那我们来个成功的看看嘛

2021-02-20 10:10:46.103  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : 开始发送
2021-02-20 10:10:46.429  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 1-[doInOperations][发送数据:[MessageMock{id=44, name='messageSendByAsync-44'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=44, name='messageSendByAsync-44'}, timestamp=null), recordMetadata=OOO_TOIPC-0@7]]]
2021-02-20 10:10:46.429  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 10:10:47.430  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 10:10:47.430  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 2-[doInOperations][发送数据:[MessageMock{id=76, name='messageSendByAsync-76'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=76, name='messageSendByAsync-76'}, timestamp=null), recordMetadata=OOO_TOIPC-0@8]]]
2021-02-20 10:10:47.430  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 10:10:48.430  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 10:10:48.431  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 3-[doInOperations][发送数据:[MessageMock{id=2, name='messageSendByAsync-2'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=2, name='messageSendByAsync-2'}, timestamp=null), recordMetadata=OOO_TOIPC-0@9]]]
2021-02-20 10:10:48.431  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 10:10:49.434  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 10:10:49.438  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 4-[doInOperations][发送数据:[MessageMock{id=34, name='messageSendByAsync-34'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=34, name='messageSendByAsync-34'}, timestamp=null), recordMetadata=OOO_TOIPC-0@10]]]
2021-02-20 10:10:49.438  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 10:10:50.440  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 10:10:50.440  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 5-[doInOperations][发送数据:[MessageMock{id=41, name='messageSendByAsync-41'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=41, name='messageSendByAsync-41'}, timestamp=null), recordMetadata=OOO_TOIPC-0@11]]]
2021-02-20 10:10:50.444  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 10:10:51.446  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 10:10:51.446  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 6-[doInOperations][发送数据:[MessageMock{id=29, name='messageSendByAsync-29'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=29, name='messageSendByAsync-29'}, timestamp=null), recordMetadata=OOO_TOIPC-0@12]]]
2021-02-20 10:10:51.446  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 10:10:52.447  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 10:10:52.447  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 7-[doInOperations][发送数据:[MessageMock{id=49, name='messageSendByAsync-49'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=49, name='messageSendByAsync-49'}, timestamp=null), recordMetadata=OOO_TOIPC-0@13]]]
2021-02-20 10:10:52.447  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 10:10:53.450  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 10:10:53.450  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 8-[doInOperations][发送数据:[MessageMock{id=12, name='messageSendByAsync-12'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=12, name='messageSendByAsync-12'}, timestamp=null), recordMetadata=OOO_TOIPC-0@14]]]
2021-02-20 10:10:53.450  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 10:10:54.450  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 10:10:54.450  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 9-[doInOperations][发送数据:[MessageMock{id=15, name='messageSendByAsync-15'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=15, name='messageSendByAsync-15'}, timestamp=null), recordMetadata=OOO_TOIPC-0@15]]]
2021-02-20 10:10:54.450  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 10:10:55.454  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 10:10:55.454  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 10-[doInOperations][发送数据:[MessageMock{id=25, name='messageSendByAsync-25'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=25, name='messageSendByAsync-25'}, timestamp=null), recordMetadata=OOO_TOIPC-0@16]]]
2021-02-20 10:10:55.458  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness
2021-02-20 10:10:56.460  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over
2021-02-20 10:10:56.625  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=44, name='messageSendByAsync-44'}]
2021-02-20 10:10:56.737  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=76, name='messageSendByAsync-76'}]
2021-02-20 10:10:56.846  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=2, name='messageSendByAsync-2'}]
2021-02-20 10:10:56.962  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=34, name='messageSendByAsync-34'}]
2021-02-20 10:10:56.970  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=41, name='messageSendByAsync-41'}]
2021-02-20 10:10:57.074  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=29, name='messageSendByAsync-29'}]
2021-02-20 10:10:57.082  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=49, name='messageSendByAsync-49'}]
2021-02-20 10:10:57.090  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=12, name='messageSendByAsync-12'}]
2021-02-20 10:10:57.094  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=15, name='messageSendByAsync-15'}]
2021-02-20 10:10:57.101  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=25, name='messageSendByAsync-25'}]

懂了么,老兄 ~

我们继续看下数据 (新跑的数据,和日志有出入)


源码地址

https://github.com/yangshangwei/boot2/tree/master/springkafkaTransaction

Apache Kafka-事务消息的支持与实现(本地事务)相关推荐

  1. 分布式事务seata只支持MySQL_阿里分布式事务框架Seata原理解析

    Seata框架是一个业务层的XA(两阶段提交)解决方案.在理解Seata分布式事务机制前,我们先回顾一下数据库层面的XA方案. 1. MySQL XA方案 MySQL从5.7开始加入了分布式事务的支持 ...

  2. kafka 发布-订阅模式_使用Apache Kafka作为消息系统的发布-订阅通信中的微服务,并通过集成测试进行了验证...

    kafka 发布-订阅模式 发布-订阅消息系统在任何企业体系结构中都起着重要作用,因为它可以实现可靠的集成而无需紧密耦合应用程序. 在解耦的系统之间共享数据的能力并不是一个容易解决的问题. 考虑一个企 ...

  3. 使用Apache Kafka作为消息系统的发布-订阅通信中的微服务,并通过集成测试进行了验证...

    发布-订阅消息系统在任何企业体系结构中都起着重要作用,因为它可以实现可靠的集成而无需紧密耦合应用程序. 在解耦的系统之间共享数据的能力并不是一个容易解决的问题. 考虑一个企业,其中具有使用不同语言和平 ...

  4. 第三集 Spring for Apache Kafka 接受消息

    我们可以接受消息通过配置一个MessageListenerContainer 和提供一个消息监听或者通过使用@KafkaListener 注解 3.1 Message Listeners 当我们使用一 ...

  5. 七种常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)

    分布式事务:在分布式系统中一次操作需要由多个服务协同完成,这种由不同的服务之间通过网络协同完成的事务称为分布式事务 一.2PC: 2PC,两阶段提交,将事务的提交过程分为资源准备和资源提交两个阶段,并 ...

  6. 如何通过事务消息保障抢购业务的分布式一致性?

    点击上方"朱小厮的博客",选择"设为星标" 后台回复"书",获取 后台回复"k8s",可领取k8s资料 作者 | 山猎 ...

  7. 还不知道事务消息吗?这篇文章带你全面扫盲

    目录 为什么需要事务消息? 事务消息 事务消息使用注意点 彩蛋 在分布式系统中,为了保证数据一致性是必须使用分布式事务.分布式事务实现方式就很多种,今天主要介绍一下使用 RocketMQ 事务消息,实 ...

  8. rocket mq 实现分布式事务消息 以及示例代码

    分布式事务的来龙去脉 业务场景:用户A转账100元给用户B,这个业务比较简单,具体的步骤: 1.用户A的账户先扣除100元 2.再把用户B的账户加100元 如果在同一个数据库中进行,事务可以保证这两步 ...

  9. 事务消息大揭秘!RocketMQ、Kafka、Pulsar全方位对比

    导语 | 事务是一个程序执行单元,里面的所有操作要么全部执行成功,要么全部执行失败.RocketMQ.Kafka和Pulsar都是当今业界应用十分广泛的开源消息队列(MQ)组件,笔者在工作中遇到关于M ...

最新文章

  1. 大学mysql教程_MYSQL教程:新手该看的MYSQL操作
  2. 2013-12-7 在超市给思杨买东西-思杨踢球
  3. 机器学习——梯度下降算法
  4. python所有变量更新_PYTHON:使用python变量更新MULTIPLE COLUMNS
  5. jaxb list集合对象_JAXB –表示空集合和空集合
  6. C语言指针学习(续)
  7. 《Android开发精要》读书笔记——Android应用模型
  8. 读书笔记-大型网站技术架构
  9. Git : SSH 协议服务器
  10. 多频电磁感应仪GEM-2介绍
  11. 激励函数-Activation Funciton
  12. Windows 10 安装 Maven
  13. PCD文件的rgb格式
  14. 挖掘数百万参与的IMVU用户
  15. 美团上线美团直播助手
  16. hc sr501 c语言程序,HC-SR501人体红外感应模块资料汇总(原理图、常见问题、程序等)...
  17. [Linux-网络性能测试] -- netperf测试
  18. Android调用系统自带的文件管理器进行文件选择
  19. 威漫哨兵机器人_漫威中实力最强的五大机器人,哨兵机器人能够团灭变种人!...
  20. php 访问 HTTP 网址

热门文章

  1. Java:高级之泛型概念引入,泛型可以设置多个类型参数,泛型继承和泛型接口实现,限制泛型可用类型,泛型通配的方式,泛型方法,泛型方法限制泛型可用类型
  2. android 之使用多线程中的AsyncTask实现下载网络图片资源
  3. C++标准模板库(STL)的概念
  4. python3是unicode还是utf-8_ASCII、Unicode、UTF-8以及Python3编码问题
  5. mysql 最长字符串_那些年的Mysql
  6. 机器学习笔记(时间序列):不同类型的图示
  7. tableau必知必会之用参数操作实现数据下钻
  8. 模式识别中Fisher分类器的Matlab实现及测试
  9. hadoop学习--基于Hive的Hadoop日志分析
  10. 初始化QChart极坐标图(含曲线、散点)