转载:https://3gods.com/bigdata/Kafka-Message-Delivery-Semantics.html

介绍

kafka支持3种消息投递语义:

  • At most once——最多一次,消息可能会丢失,但不会重复
  • At least once——最少一次,消息不会丢失,可能会重复
  • Exactly once——只且一次,消息不丢失不重复,只且消费一次。

但是整体的消息投递语义需要Producer端和Consumer端两者来保证。

Producer 消息生产者端

一个场景例子:
当producer向broker发送一条消息,这时网络出错了,producer无法得知broker是否接受到了这条消息。
网络出错可能是发生在消息传递的过程中,也可能发生在broker已经接受到了消息,并返回ack给producer的过程中。

这时,producer只能进行重发,消息可能会重复,但是保证了at least once。

0.11.0的版本通过给每个producer一个唯一ID,并且在每条消息中生成一个sequence num,
这样就能对消息去重,达到producer端的exactly once。

这里还涉及到producer端的acks设置和broker端的副本数量,以及min.insync.replicas的设置。
比如producer端的acks设置如下:
acks=0 //消息发了就发了,不等任何响应就认为消息发送成功
acks=1 //leader分片写消息成功就返回响应给producer
acks=all(-1) //当acks=all, min.insync.replicas=2,就要求INSRNC列表中必须要有2个副本都写成功,才返回响应给producer,
如果INSRNC中已同步副本数量不足2,就会报异常,如果没有2个副本写成功,也会报异常,消息就会认为没有写成功。

Broker 消息接收端

上文说过acks=1,表示当leader分片副本写消息成功就返回响应给producer,此时认为消息发送成功。
如果leader写成功单马上挂了,还没有将这个写成功的消息同步给其他的分片副本,那么这个分片此时的ISR列表为空,
如果unclean.leader.election.enable=true,就会发生log truncation(日志截取),同样会发生消息丢失。
如果unclean.leader.election.enable=false,那么这个分片上的服务就不可用了,producer向这个分片发消息就会抛异常。

所以我们设置min.insync.replicas=2,unclean.leader.election.enable=false,producer端的acks=all,这样发送成功的消息就绝不会丢失。

Consumer 消息消费者端

所有分片的副本都有自己的log文件(保存消息)和相同的offset值。当consumer没挂的时候,offset直接保存在内存中,
如果挂了,就会发生负载均衡,需要consumer group中另外的consumer来接管并继续消费。

consumer消费消息的方式有以下2种;

  1. consumer读取消息,保存offset,然后处理消息。
    现在假设一个场景:保存offset成功,但是消息处理失败,consumer又挂了,这时来接管的consumer
    就只能从上次保存的offset继续消费,这种情况下就有可能丢消息,但是保证了at most once语义。

  2. consumer读取消息,处理消息,处理成功,保存offset。
    如果消息处理成功,但是在保存offset时,consumer挂了,这时来接管的consumer也只能
    从上一次保存的offset开始消费,这时消息就会被重复消费,也就是保证了at least once语义。

以上这些机制的保证都不是直接一个配置可以解决的,而是你的consumer代码来完成的,只是一个处理顺序先后问题。
第一种对应的代码:

List<String> messages = consumer.poll();
consumer.commitOffset();
processMsg(messages);

第二种对应的代码:

List<String> messages = consumer.poll();
processMsg(messages);
consumer.commitOffset();

Exactly Once实现原理

下面详细说说exactly once的实现原理。

Producer端的消息幂等性保证

每个Producer在初始化的时候都会被分配一个唯一的PID,
Producer向指定的Topic的特定Partition发送的消息都携带一个sequence number(简称seqNum),从零开始的单调递增的。

Broker会将Topic-Partition对应的seqNum在内存中维护,每次接受到Producer的消息都会进行校验;
只有seqNum比上次提交的seqNum刚好大一,才被认为是合法的。比它大的,说明消息有丢失;比它小的,说明消息重复发送了。

以上说的这个只是针对单个Producer在一个session内的情况,假设Producer挂了,又重新启动一个Producer被而且分配了另外一个PID,
这样就不能达到防重的目的了,所以kafka又引进了Transactional Guarantees(事务性保证)。

Transactional Guarantees 事务性保证

kafka的事务性保证说的是:同时向多个TopicPartitions发送消息,要么都成功,要么都失败。

为什么搞这么个东西出来?我想了下有可能是这种例子:
用户定了一张机票,付款成功之后,订单的状态改了,飞机座位也被占了,这样相当于是
2条消息,那么保证这个事务性就是:向订单状态的Topic和飞机座位的Topic分别发送一条消息,
这样就需要kafka的这种事务性保证。

这种功能可以使得consumer offset的提交(也是向broker产生消息)和producer的发送消息绑定在一起。
用户需要提供一个唯一的全局性TransactionalId,这样就能将PID和TransactionalId映射起来,就能解决
producer挂掉后跨session的问题,应该是将之前PID的TransactionalId赋值给新的producer。

Consumer端

以上的事务性保证只是针对的producer端,对consumer端无法保证,有以下原因:

  1. 压实类型的topics,有些事务消息可能被新版本的producer重写
  2. 事务可能跨坐2个log segments,这时旧的segments可能被删除,就会丢消息
  3. 消费者可能寻址到事务中任意一点,也会丢失一些初始化的消息
  4. 消费者可能不会同时从所有的参与事务的TopicPartitions分片中消费消息

如果是消费kafka中的topic,并且将结果写回到kafka中另外的topic,
可以将消息处理后结果的保存和offset的保存绑定为一个事务,这时就能保证
消息的处理和offset的提交要么都成功,要么都失败。

如果是将处理消息后的结果保存到外部系统,这时就要用到两阶段提交(tow-phase commit),
但是这样做很麻烦,较好的方式是offset自己管理,将它和消息的结果保存到同一个地方,整体上进行绑定,
可以参考Kafka Connect中HDFS的例子。

Kafka消息投递语义-消息不丢失,不重复,不丢不重相关推荐

  1. kafka 安装使用 /springboot整合kafka /消息投递机制以及存储策略 /副本处理机制

    一.背景 1.基本信息 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流 ...

  2. 详解分布式系统与消息投递

    作者:Draveness https://draveness.me/message-delivery 消息是一个非常有趣的概念,它是由来源发出一个离散的通信单元,被发送给一个或者一群接受者,无论是单体 ...

  3. 分布式系统与消息投递

    戳蓝字"CSDN云计算"关注我们哦! 原文:https://draveness.me/message-delivery 消息是一个非常有趣的概念,它是由来源发出一个离散的通信单元, ...

  4. php定时发送生日模块消息_RabbitMQ之消息的可靠性投递

    生产端的可靠性投递: 1.保障消息成功发送出去 2.保障mq节点成功接收消息 3.消息发送端需要收到mq服务的确认应答 4.完善的消息补偿机制(百分百成功成功,需要该步骤) 消息落库方案 订单服务调用 ...

  5. Rabbitmq消息中心_消息中心总体方案

    消息中心方案 一.消息中心简介 为了将各个应用系统之间进行业务解耦,对业务的透明化处理及技术架构的统一管理,方便对各应用的整体把控,保证系统的稳定性,也方便各应用的消息中间件的快速搭建,因此搭建消息中 ...

  6. kafka中生产者是如何把消息投递到哪个分区的?消费者又是怎么选择分区的?...

    作者 | 废物大师兄 来源 | https://www.cnblogs.com/cjsblog/p/9664536.html 1. 前言 我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名 ...

  7. kafka数据不丢失不重复_如何配置 KAFKA 使其消息不会丢失

    不可靠的KAFKA 这里的不可靠是指代KAFKA其设计之初就为高性能而设计,其是允许消息丢失的,但经过多个版本的升级之后,通过KAFKA的相关配置,我们可以将其作为可靠的队列(不丢消息的队列). 在本 ...

  8. 【重难点】【RabbitMQ 02】如何避免消息重复投递和消息重复消费、如何防止消息丢失、如何保证消息的顺序性、如何保证消息队列的可用性

    [重难点][RabbitMQ 02]如何避免消息重复投递和消息重复消费.如何防止消息丢失.如何保证消息的顺序性.如何保证消息队列的可用性 文章目录 [重难点][RabbitMQ 02]如何避免消息重复 ...

  9. kafka的消息丢失和重复消费解决办法

    1.消息发送 Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置.Kafka通过配置request.required.ac ...

最新文章

  1. 单链表-插入一个元素为x的节点后,使链表仍然有序
  2. 复制数据表的两种情况。
  3. python链表的创建_《大话数据结构》配套源码:链表(Python版)
  4. 网络安全人才的发展情况是怎么样的呢?快上车,带你了解
  5. linux下的二进制文件的编辑和查看 -
  6. c获取文件的名字和运行到程序的第几行功能
  7. Arrays类中的binarysearch
  8. 天才编程女孩敲开支付宝大门!马云:有梦想的孩子了不起!
  9. c fun函数求n个整数的平均值_c语言题目(求阶乘)
  10. JAVA网络编程个人笔记 第五章 URL和URI
  11. 动态规则最佳入门(转)
  12. 框架学习(1)——service层,dao层和service实现类进行数据库操作
  13. 为什么iPhone 12 中国版不支持5G毫米波?
  14. Linux中断管理 (3)workqueue工作队列
  15. 闵帆老师《论文写作》心得体会
  16. 研究数字基带信号功率谱的意义
  17. SAP PP 笔记(一) 概述
  18. STM32驱动VL6180X测距
  19. Hibernate入门5持久化对象关系和批量处理技术
  20. 创建简单demo通用步骤

热门文章

  1. java 把URL中的中文转换成utf-8编码
  2. linux 安装 memcached
  3. [转]application.properties详解 --springBoot配置文件
  4. Maven的个性化定制
  5. Android开发(五)——计时器
  6. Oracle 查看 对象 持有锁的情况
  7. 使用MONGODB 集群的OPLOG 日志进行数据恢复
  8. android打包apk时混淆遇到的问题
  9. android volatile的使用
  10. Android 置Activity全屏和无标题