Kafka在0.11版本中除了引入了Exactly Once语义,还引入了事务特性。Kafka事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中,或者说是一个原子操作,生产消息和提交偏移量同时成功或者失败。

1. Kafka事务的使用

Kafka中的事务特性主要用于以下两种场景:

  • 生产者发送多条消息可以封装在一个事务中,形成一个原子操作。多条消息要么都发送成功,要么都发送失败。
  • read-process-write模式:将消息消费和生产封装在一个事务中,形成一个原子操作。在一个流式处理的应用中,常常一个服务需要从上游接收消息,然后经过处理后送达到下游,这就对应着消息的消费和生成。

当事务中仅仅存在Consumer消费消息的操作时,它和Consumer手动提交Offset并没有区别。因此单纯的消费消息并不是Kafka引入事务机制的原因,单纯的消费消息也没有必要存在于一个事务中。

Kafka producer API提供了以下接口用于事务操作:

    /*** 初始化事务*/public void initTransactions();/*** 开启事务*/public void beginTransaction() throws ProducerFencedException ;/*** 在事务内提交已经消费的偏移量*/public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException ;/*** 提交事务*/public void commitTransaction() throws ProducerFencedException;/*** 丢弃事务*/public void abortTransaction() throws ProducerFencedException ;

下面是使用Kafka事务特性的例子,这段代码Producer开启了一个事务,然后在这个事务中发送了两条消息。这两条消息要么都发送成功,要么都失败。

KafkaProducer producer = createKafkaProducer("bootstrap.servers", "localhost:9092","transactional.id”, “my-transactional-id");producer.initTransactions();
producer.beginTransaction();
producer.send("outputTopic", "message1");
producer.send("outputTopic", "message2");
producer.commitTransaction();

下面这段代码即为read-process-write模式,在一个Kafka事务中,同时涉及到了生产消息和消费消息。

KafkaProducer producer = createKafkaProducer("bootstrap.servers", "localhost:9092","transactional.id", "my-transactional-id");KafkaConsumer consumer = createKafkaConsumer("bootstrap.servers", "localhost:9092","group.id", "my-group-id","isolation.level", "read_committed");consumer.subscribe(singleton("inputTopic"));producer.initTransactions();while (true) {ConsumerRecords records = consumer.poll(Long.MAX_VALUE);producer.beginTransaction();for (ConsumerRecord record : records)producer.send(producerRecord(“outputTopic”, record));producer.sendOffsetsToTransaction(currentOffsets(consumer), group);  producer.commitTransaction();
}

注意:在理解消息的事务时,一直处于一个错误理解是,把操作db的业务逻辑跟操作消息当成是一个事务,如下所示:

void  kakfa_in_tranction(){// 1.kafa的操作:读取消息或生产消息kafkaOperation();// 2.db操作dbOperation();
}

其实这个是有问题的。操作DB数据库的数据源是DB,消息数据源是kfaka,这是完全不同两个数据。一种数据源(如mysql,kafka)对应一个事务,所以它们是两个独立的事务。kafka事务指kafka一系列 生产、消费消息等操作组成一个原子操作,db事务是指操作数据库的一系列增删改操作组成一个原子操作。

2. Kafka事务配置

  • 对于Producer,需要设置transactional.id属性,这个属性的作用下文会提到。设置了transactional.id属性后,enable.idempotence属性会自动设置为true。
  • 对于Consumer,需要设置isolation.level = read_committed,这样Consumer只会读取已经提交了事务的消息。另外,需要设置enable.auto.commit = false来关闭自动提交Offset功能。

更多关于配置的信息请参考我的文章:Kafka消息送达语义详解

3. Kafka事务特性

Kafka的事务特性本质上代表了三个功能:原子写操作拒绝僵尸实例(Zombie fencing)和读事务消息

3.1 原子写

Kafka的事务特性本质上是支持了Kafka跨分区和Topic的原子写操作。在同一个事务中的消息要么同时写入成功,要么同时写入失败。我们知道,Kafka中的Offset信息存储在一个名为_consumed_offsets的Topic中,因此read-process-write模式,除了向目标Topic写入消息,还会向_consumed_offsets中写入已经消费的Offsets数据。因此read-process-write本质上就是跨分区和Topic的原子写操作。Kafka的事务特性就是要确保跨分区的多个写操作的原子性。

3.2 拒绝僵尸实例(Zombie fencing)

在分布式系统中,一个instance的宕机或失联,集群往往会自动启动一个新的实例来代替它的工作。此时若原实例恢复了,那么集群中就产生了两个具有相同职责的实例,此时前一个instance就被称为“僵尸实例(Zombie Instance)”。在Kafka中,两个相同的producer同时处理消息并生产出重复的消息(read-process-write模式),这样就严重违反了Exactly Once Processing的语义。这就是僵尸实例问题。

Kafka事务特性通过transaction-id属性来解决僵尸实例问题。所有具有相同transaction-id的Producer都会被分配相同的pid,同时每一个Producer还会被分配一个递增的epoch。Kafka收到事务提交请求时,如果检查当前事务提交者的epoch不是最新的,那么就会拒绝该Producer的请求。从而达成拒绝僵尸实例的目标。

3.3 读事务消息

为了保证事务特性,Consumer如果设置了isolation.level = read_committed,那么它只会读取已经提交了的消息。在Producer成功提交事务后,Kafka会将所有该事务中的消息的Transaction Markeruncommitted标记为committed状态,从而所有的Consumer都能够消费。

4. Kafka事务原理

Kafka为了支持事务特性,引入一个新的组件:Transaction Coordinator。主要负责分配pid,记录事务状态等操作。下面时Kafka开启一个事务到提交一个事务的流程图:

KafkaTransaction.png

主要分为以下步骤:

1. 查找Tranaction Corordinator

Producer向任意一个brokers发送 FindCoordinatorRequest请求来获取Transaction Coordinator的地址。

2. 初始化事务 initTransaction

Producer发送InitpidRequest给Transaction Coordinator,获取pid。Transaction Coordinator在Transaciton Log中记录这<TransactionId,pid>的映射关系。另外,它还会做两件事:

  • 恢复(Commit或Abort)之前的Producer未完成的事务
  • 对PID对应的epoch进行递增,这样可以保证同一个app的不同实例对应的PID是一样,而epoch是不同的。

只要开启了幂等特性即必须执行InitpidRequest,而无须考虑该Producer是否开启了事务特性。

3. 开始事务beginTransaction

执行Producer的beginTransacion(),它的作用是Producer在本地记录下这个transaction的状态为开始状态。这个操作并没有通知Transaction Coordinator,因为Transaction Coordinator只有在Producer发送第一条消息后才认为事务已经开启。

4. read-process-write流程

一旦Producer开始发送消息,Transaction Coordinator会将该<Transaction, Topic, Partition>存于Transaction Log内,并将其状态置为BEGIN。另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间)。

在注册<Transaction, Topic, Partition>到Transaction Log后,生产者发送数据,虽然没有还没有执行commit或者abort,但是此时消息已经保存到Broker上了。即使后面执行abort,消息也不会删除,只是更改状态字段标识消息为abort状态。

5. 事务提交或终结 commitTransaction/abortTransaction

在Producer执行commitTransaction/abortTransaction时,Transaction Coordinator会执行一个两阶段提交:

  • 第一阶段,将Transaction Log内的该事务状态设置为PREPARE_COMMITPREPARE_ABORT
  • 第二阶段,将Transaction Marker写入该事务涉及到的所有消息(即将消息标记为committedaborted)。这一步骤Transaction Coordinator会发送给当前事务涉及到的每个<Topic, Partition>的Leader,Broker收到该请求后,会将对应的Transaction Marker控制信息写入日志。

一旦Transaction Marker写入完成,Transaction Coordinator会将最终的COMPLETE_COMMITCOMPLETE_ABORT状态写入Transaction Log中以标明该事务结束。

5. 总结

  • Transaction Marker与PID提供了识别消息是否应该被读取的能力,从而实现了事务的隔离性。
  • Offset的更新标记了消息是否被读取,从而将对读操作的事务处理转换成了对写(Offset)操作的事务处理。
  • Kafka事务的本质是,将一组写操作(如果有)对应的消息与一组读操作(如果有)对应的Offset的更新进行同样的标记(Transaction Marker)来实现事务中涉及的所有读写操作同时对外可见或同时对外不可见。
  • Kafka只提供对Kafka本身的读写操作的事务性,不提供包含外部系统的事务性。

Kafka事务特性详解相关推荐

  1. java11 新特性 详解

    为什么80%的码农都做不了架构师?>>>    引言: 点击-->java10 新特性 详解 点击-->java9 新特性 详解 点击-->java8 新特性 详解 ...

  2. Flink常见的关键技术与特性详解

    转载:http://bigdata.51cto.com/art/201702/531036.htm Flink常见的关键技术与特性详解 Flink项目是大数据处理领域最近冉冉升起的一颗新星,其不同于其 ...

  3. 关于事务管理的理解和Spring事务管理详解

    转载于:http://www.mamicode.com/info-detail-1248286.html 1 初步理解 理解事务之前,先讲一个你日常生活中最常干的事:取钱. 比如你去ATM机取1000 ...

  4. Oracle 18c新特性详解-多租户专题

    Oracle 18c,传说中全球第一款自动驾驶数据库,正式到来.18c不仅仅是数据库,更是一种云服务,包括着Oracle数据库18c,Oracle云基础架构和Oracle云工具,机器学习,能够实现自治 ...

  5. 还在用JDK6的同学,来看看JDK13新特性详解吧

    点击上方"搜云库技术团队"关注,选择"设为星标" 回复"面试题"或"1024"获取 4T 学习资料 在 JDK 版本的世 ...

  6. 26.SpringBoot事务注解详解

    转自:https://www.cnblogs.com/kesimin/p/9546225.html @Transactional spring 事务注解 1.简单开启事务管理 @EnableTrans ...

  7. 4.6 W 字总结!Java 11—Java 17特性详解

    作者 | 民工哥技术之路 来源 | https://mp.weixin.qq.com/s/SVleHYFQeePNT7q67UoL4Q Java 11 特性详解 基于嵌套的访问控制 与 Java 语言 ...

  8. 【干货】PMcaff干货课程学习精彩分享:Apple Watch 技术特性详解

    昨天PMcaff给大家推荐了Apple Watch的发布会,好多小伙伴们在后台留言,所以PMcaff小咖今天给大家找来一篇干货,看完感觉还不错,在这里分享给大家. 作为苹果主推的智能穿戴产品,Appl ...

  9. 《Android群英传》读书笔记 (5) 第十一章 搭建云端服务器 + 第十二章 Android 5.X新特性详解 + 第十三章 Android实例提高...

    第十一章 搭建云端服务器 该章主要介绍了移动后端服务的概念以及Bmob的使用,比较简单,所以略过不总结. 第十三章 Android实例提高 该章主要介绍了拼图游戏和2048的小项目实例,主要是代码,所 ...

最新文章

  1. 三维等值面提取算法(Dual Contouring)
  2. 【BZOJ4259】残缺的字符串
  3. Exception: This is not supported, use MenuItemCompat.getActionProvider()的处理
  4. ResourceID(frameworks/base/libs/utils/README)
  5. 2018清华计算机类专业录取分数线,清华大学2018-2019年各省各专业录取分数线
  6. spring 4.0下集成webservice
  7. python编程语言-Python有望超越Java排第二?风变编程解析编程语言新趋势
  8. 智慧城市发展路径应分级分类
  9. PS 怎么去掉图片上的文字
  10. 中国历史上5个谣言,单是第1条就骗了不少人!
  11. react监听回车事件
  12. Android系统驱动介绍
  13. 2020年下半年会议时间表
  14. K8S多节点二进制部署
  15. Python卸载旧版本并安装新版本
  16. js 按拼音 首字母 排序 并分组
  17. 中国城市电话区号对照表中国移动短信中心号查询及命名规则
  18. 网络编程培训之一 编程实现IP/TCP/UDP报文
  19. 台式计算机m4350,比超极本便携 评联想M4350q小型台式机
  20. iPhone5 iOS6.1系统完美越狱教程

热门文章

  1. [How TO]-windows安装wget工具
  2. python的protected和private
  3. 给你汇报Struts2 S2-016漏洞修复的总结
  4. Git push error: Unable to unlink old (Permission denied)
  5. 51nod 1256 乘法逆元(扩展欧几里得)
  6. ACM入门之【分块】
  7. 129. 火车进栈【栈】
  8. 第五讲 树状数组与线段树 【未完结】
  9. 【PAT乙级】1053 住房空置率 (20 分)
  10. Volatile可见性