精确一次处理语义(exactly onece semantic–EOS),Kafka的EOS主要体现在3个方面:

1)幂等producer 保证单个分区的只会发送一次,不会出现重复消息

2)事务(transation):保证原子性的写入多个分区,即写入到多个分区的消息要么全部成功,要么全部回滚

3)流式EOS:流处理本质上可看成是“读取-处理-写入管道”。整个过程的操作是原子性。

幂等producer只能保证单分区上无重复消息;事务可以保证多分区写入消息的完整性;而流处理EOS保证的是端到端(E2E)消息处理的EOS。用户在使用过程中需要根据自己的需求选择不同的EOS。

以下是启用方法:

1)启用幂等producer:在producer程序中设置属性enabled.idempotence=true,但不要设置transational_id.注意是不要设置,而不是设置为空字符串。

2)启用事务支持:在producer程序中设置属性transcational.id为一个指定字符串(你可以认为这是你的额事务名称,故最好七个有意义的名字),同时设置enable.idempotence=true

3)启用流处理EOS:在Kafka Streams程序中设置processing.guarantee=exactly_once

关于幂等producer的一些讨论

所谓幂等producer指producer.send的逻辑是幂等的,即发送相同的Kafka消息,broker端不会重复写入消息。同一条消息Kafka保证底层日志中只会持久化一次,既不会丢失也不会重复。幂等性可以极大地减轻下游consumer系统实现消息去重的工作负担,因此是非常实用的功能。值得注意的是,幂等producer提供的语义保证是有条件的:

单分区幂等性:幂等producer无法实现多分区上的幂等性。如前所述,若要实现多分区上的原子性,需要引入事务。

单会话幂等性:幂等producer无法跨会话实现幂等性。即使同一个producer宕机并重启也无法保证消息的EOS语义

虽然有上面两个限制,幂等producer依然是一个非常实用的新功能。下面我们来讨论下它的设计原理。如果要实现幂等性, 通常都需要花费额外的空间来保存状态以执行消息去重。Kafka的幂等producer整体上也是这样的思想。

首先,producer对象引入了一个新的字段:Producer ID(下称PID),它唯一标识一个producer,当producer启动时Kafka会为每个producer分配一个PID(64位整数),因此PID的生成和分配对用户来说是完全透明的,用户无需考虑PID的事情,甚至都感受不到PID的存在。Kafka重构了消息格式(有兴趣的参见Kafka 0.11消息设计),引入了序列号字段(sequence number,下称seq number)来标识某个PID producer发送的消息。

和consumer端的offset类似,seq number从0开始计数并严格单调增加。同时在broker端会为每个PID(即每个producer)保存该producer发送过来的消息batch的某些元信息,比如PID信息、消息batch的起始seq number及结束seq number等。这样每当该PID发送新的消息batch时,Kafka broker就会对比这些信息,如果发生冲突(比如起始seq number和结束seq number与当前缓存的相同),那么broker就会拒绝这次写入请求。倘若没有冲突,那么broker端就会更新这部分缓存然后再开始写入消息。

这就是Kafka实现幂等producer的设计思路:

  1. 为每个producer设置唯一的PID;

  2. 引入seq number以及broker端seq number缓存更新机制来去重。

kafka怎样保证消息仅被消费一次?

在使用kafka时,大多数场景对于数据少量的不一致(重复或者丢失)并不关注,比如日志,因为不会影响最终的使用或者分析,但是在某些应用场景(比如业务数据),需要对任何一条消息都要做到精确一次的消费,才能保证系统的正确性,kafka并不提供准确一致的消费API,需要我们在实际使用时借用外部的一些手段来保证消费的精确性,下面我们介绍如何实现。

名词说明:

offset:如图所示

重复消费(最少一次消费语义实现):消费数据处理业务“完成后”,再提交offset。因为在提交offset的过程中,可能出现提交失败的情况,已经消费了数据,但是offset没提交。从而导致数据重复消费。

解决办法:

至少成功发送一次+去重操作(幂等性)

a,如何保证至少成功发送一次?

保证不丢失消息:

生产者(ack=all 代表至少成功发送一次)

消费者 (offset手动提交,业务逻辑成功处理后,提交offset)去重问题:消息可以使用唯一id标识

b,保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据)。

业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)。

丢失数据(最多一次消费语义实现):在消费数据业务“处理前”进行offset提交。因为在后续数据业务处理过程中,如果出现故障,没有消费到消息,那么将导致数据丢失。为了避免数据丢失,可以设置:

enable.auto.commit=false  关闭自动提交位移,这样,在消息被完整处理之后再手动提交位移。

丢包:指发送方发送的数据未到达接收方。常见的丢包可能发生在发送端,网络,接收端。

解决方案:

对kafka进行限速,平滑流量

启用重试机制,重试间隔时间设置长一些。

Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。

说明:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。

解决办法就是,设置 acks = all。

acks 是 Kafka 生产者(Producer) 很重要的一个参数。

acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置 acks = all 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。

从kafka的消费机制,我们可以得到是否能够精确的消费关键在消费进度信息的准确性,如果能够保证消费进度的准确性,也就保证了消费数据的准确性。

数据有状态:可以根据数据信息进行确认数据是否重复消费,这时候可以使用手动提交的最少一次消费语义实现,即使消费的数据有重复,可以通过状态进行数据去重,以达到幂等的效果。

存储数据容器具备幂等性:在数据存入的容器具备天然的幂等(比如ElasticSearch的put操作具备幂等性,相同的数据多次执行Put操作和一次执行Put操作的结果是一致的),这样的场景也可以使用手动提交的最少一次消费语义实现,由存储数据端来进行数据去重。

数据无状态,并且存储容器不具备幂等:这种场景需要自行控制offset的准确性,这里数据不具备状态,存储使用关系型数据库,比如MySQL。通过自己管理offset的方式,来确保数据和offset信息是同时变化,通过数据库事务的特性来保证一致性和原子性。

Kafka 的 ISR 机制是什么?

现在我们来看一个 Kafka 的核心机制,就是 ISR 机制。

这个机制简单来说,就是会自动给每个 Partition 维护一个 ISR 列表,这个列表里一定会有 Leader,然后还会包含跟 Leader 保持同步的 Follower。

也就是说,只要 Leader 的某个 Follower 一直跟他保持数据同步,那么就会存在于 ISR 列表里。

但是如果 Follower 因为自身发生一些问题,导致不能及时的从 Leader 同步数据过去,那么这个 Follower 就会被认为是“out-of-sync”,被从 ISR 列表里踢出去。

所以大家先得明白这个 ISR 是什么,说白了,就是 Kafka 自动维护和监控哪些 Follower 及时的跟上了 Leader 的数据同步。

Kafka 写入的数据如何保证不丢失?

所以如果要让写入 Kafka 的数据不丢失,你需要保证如下几点:

* 每个 Partition 都至少得有 1 个 Follower 在 ISR 列表里,跟上了 Leader 的数据同步。

* 每次写入数据的时候,都要求至少写入 Partition Leader 成功,同时还有至少一个 ISR 里的 Follower 也写入成功,才算这个写入是成功了。

* 如果不满足上述两个条件,那就一直写入失败,让生产系统不停的尝试重试,直到满足上述两个条件,然后才能认为写入成功。

* 按照上述思路去配置相应的参数,才能保证写入 Kafka 的数据不会丢失。

好!现在咱们来分析一下上面几点要求。

第一条,必须要求至少一个 Follower 在 ISR 列表里。

那必须的啊,要是 Leader 没有 Follower 了,或者是 Follower 都没法及时同步 Leader 数据,那么这个事儿肯定就没法弄下去了。

第二条,每次写入数据的时候,要求 Leader 写入成功以外,至少一个 ISR 里的 Follower 也写成功。

这个要求就是保证说,每次写数据,必须是 Leader 和 Follower 都写成功了,才能算是写成功,保证一条数据必须有两个以上的副本。

这个时候万一 Leader 宕机,就可以切换到那个 Follower 上去,那么 Follower 上是有刚写入的数据的,此时数据就不会丢失了。

简单总结一下:

消费端重复消费:很容易解决,建立去重表。

消费端丢失数据:也容易解决,关闭自动提交offset,处理完之后受到移位。

生产端重复发送:这个不重要,消费端消费之前从去重表中判重就可以。

生产端丢失数据:这个是最麻烦的情况。

解决策略:

1.异步方式缓冲区满了,就阻塞在那,等着缓冲区可用,不能清空缓冲区。

2.发送消息之后回调函数,发送成功就发送下一条,发送失败就记在日志中,等着定时脚本来扫描。(发送失败可能并不真的发送失败,只是没收到反馈,定时脚本可能会重发)。

如何保证有序:

如果有一个发送失败了,后面的就不能继续发了,不然重发的那个肯定乱序了。

生产者在收到发送成功的反馈之前,不能发下一条数据,但我感觉生产者是一个流,阻塞生产者感觉业务上不可行,怎么会因为一条消息发出去没收到反馈,就阻塞生产者。

同步发送模式:发出消息后,必须阻塞等待收到通知后,才发送下一条消息。

异步发送模式:一直往缓冲区写,然后一把写到队列中去。

两种都是各有利弊:

同步发送模式虽然吞吐量小,但是发一条收到确认后再发下一条,既能保证不丢失消息,又能保证顺序。

设置 acks = all。

设置 replication.factor >= 3

为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。

设置 min.insync.replicas > 1

一般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。

但是,为了保证整个 Kafka 服务的高可用性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想一下加入两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 replication.factor = min.insync.replicas + 1。

如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

举个例子吧。假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

所以第二个问题来了,怎么保证消息队列消费的幂等性?

其实还是得结合业务来思考,我这里给几个思路:

比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。

比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。

比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

当然,如何保证 MQ 的消费是幂等性的,需要结合具体的业务来看。

参考链接:

【kafka怎么保证数据消费一次且仅消费一次?_大数据-CSDN博客_kafka怎么保证消息被消费一次】https://blog.csdn.net/qq_35078688/article/details/86082858

突发宕机,Kafka写入的数据如何保证不丢失?

https://mp.weixin.qq.com/s/UzRcHdN7PBoxiEBZOUadGA

《Kafka如何保证消息不丢失不重复》, 一起来围观吧 https://blog.csdn.net/matrix_google/article/details/79888144?utm_source=app&app_version=4.5.3

Kafka 官方文档: https://kafka.apache.org/documentation/


http://www.taodudu.cc/news/show-3848212.html

相关文章:

  • 3、深潜KafkaProducer —— 核心架构
  • Xcode12.5 新特性介绍
  • Kafka Producer 参数设置详解
  • 18. SAP ABAP OData 服务嵌套创建功能的实现步骤(Create Deep)
  • idea创建类报错:Unable to parse template Class Error message: This template did not produc
  • Linux/Centos 安装oracle报错“调用makefile '/oracle/produc
  • The Apache Tomcat Native library which allows optimal performance in produc
  • 亚洲与大洋洲经济自由度指数(1995-2021年)
  • 赴港IPO,小鹏汽车急需再输血
  • Digix:密码学资产中的黄金标准
  • Note3 港版稳定ROM 刷机
  • 地方门户湘都信息港 网站源码(discuz)
  • 别了,港澳通行证!
  • 澳洲自由行玩法一:悉尼情人港
  • 通证避风港提案 2.0 析要与评论
  • 东南亚港美股交易系统开发
  • 港大CS笔试面试分享
  • 香港自由行之:个人如何赴港旅游
  • “自由数字港”海南自贸区(港)设立区块链试验区
  • 海南自由贸易港高层次人才分类标准 (2020):互联网领域标准
  • 阿里与宁夏打造国际自由数据港
  • 天津港吞吐量情况预测
  • 大数据模型研究报告pdf_大数据模型与决策课程案例分析报告
  • atlas服务器位置,Atlas资源分布图 各区域资源植物及动物分布一览
  • ”走向跨链自由港“,一文读懂EOS的王牌侧链BOSCore
  • 寺田仓库与基地位于新加坡的LE FREEPORT公司签订使用自由港服务相关业务合作合同
  • 《区块链财富指北》序 | 扬帆博士(BOScore)自由港,乘风破浪区块链大航海时代!
  • 数字签名的步骤和原理
  • ECC数字签名过程
  • 数字签名的原理

kafka怎么保证数据消费一次且仅消费一次?使用消息队列如何保证幂等性?相关推荐

  1. 消息队列如何保证消息的幂等性

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到网站. 文章目录 什么是幂等性 什么是消息的幂等性 为什么会出现消息幂等性问题 该如何解决消息幂等性问题 总 ...

  2. 你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你

    我们将消息队列这个组件加入到了我们的商城系统里,并且通过秒杀这个实际的案例进行了实际演练,知道了它对高并发写流量做削峰填谷,对非关键业务逻辑做异步处理,对不同的业务系统做解耦合. 场景: 现在我们的电 ...

  3. Kafka 消息队列如何保证顺序性?

    主要思路:相同key值的消息写入同一个partition(partition内的消息是有序的),一个partition的消息只会被一个消费者消费. 如果一个消费者是多个线程消费,则需要把pull来的消 ...

  4. 分布式消息队列如何保证消息有且仅被消费一次?

    自己总结,仅供参考 一.保证消息生产成功.未丢失 这个要从生产者与MQ的角度去保证 1.生产者 生产者投递消息后,等待mq的消息接收成功ack(同步或者异步形式),成功则代表生产成功,失败则重试: 另 ...

  5. 消息队列如何保证顺序性?

    主要思路有两种:1.单线程消费来保证消息的顺序性:2.对消息进行编号,消费者处理时根据编号判断顺序. 1.rabbitMq 问题分析: 如图,data1 和 data2 是有顺序的,必须 data1 ...

  6. 如何保证mq的有序性_消息队列如何保证顺序性?

    主要思路有两种:1.单线程消费来保证消息的顺序性:2.对消息进行编号,消费者处理时根据编号判断顺序. 1.rabbitMq 问题分析: 如图,data1 和 data2 是有顺序的,必须 data1 ...

  7. 消息队列怎么保证消息有没有重复消费(幂等性)?

    普通业务控制幂等性 1.mysql唯一索引 2.token机制(请求前生成一个token,请求时携带这个token,如果这个token在redis中没有则继续,有则 有请求进行中) 3.mysql悲观 ...

  8. rocketmq怎么保证数据不会重复_阿里架构师亲授:Kafka和RocketMQ的消息复制实现的差异点在哪?...

    众所周知,消息队列在收发两端,主要是依靠业务代码,配合请求确认的机制,来保证消息不会丢失的.而在服务端,一般采用持久化和复制的方式来保证不丢消息. 把消息复制到多个节点上,不仅可以解决丢消息的问题,还 ...

  9. 怎么保证读取最新数据_Kafka怎么保证数据不丢失?

    Kafka怎么保证数据不丢失? 这个问题要从3个方面来保证数据不丢失:生产者.服务端.消费者. 01 producer 生产端是如何保证数据不丢失的 1.ack的配置策略 acks = all (或- ...

最新文章

  1. Android运行报错avd,Android Studio出错:无法在模拟器中启动AVD
  2. 大数据WEB阶段(九)Myeclipse中配置Tomcat并发布项目
  3. dos安装深度linux,U盘用grub4dos引导Deepin v20 Beta iso安装的方法
  4. 初始化_Linux的内存初始化
  5. linux下设置程序后台运行,linux中如何让进程在后台运行
  6. XML与HTML的区别
  7. USACO1.1.1 - PROB Your Ride Is Here
  8. java核心教程_核心Java教程
  9. open*** 跨平台部署
  10. python逢7跳过_python学习笔记(七)break 和continue
  11. ArrayList错误:java.util.ConcurrentModificationException:null
  12. 各位大佬,Spark的重点难点系列暂时更新完毕
  13. 结构体定义LNode,*LinkList和typedef struct
  14. VMware虚拟机安装Win10
  15. Nginx--流量限制(最有用的功能之一)
  16. Gravity:环形二维码扫描识别传感器详细介绍和工作原理
  17. 建服务器数据中心,如何构建一个服务器数据中心
  18. C#读取RFID卡号源码
  19. Android各版本的版本号、版本名、API及发布时间
  20. 已有一个排好序的数组,由键盘输入一个数,要求按原来的排序规律将其插入到数组中.

热门文章

  1. Qt 数据可视化之3D图形
  2. MSP430单片机的端口介绍
  3. 502 VS 504
  4. 【论文阅读】聚集多个启发式信号作为监督用于无监督作文自动评分
  5. 五大“创新”崛起软件城
  6. SharePoint - 如何设置Survey权限控制?
  7. 【读书笔记】关于中医学的一些思考
  8. 运算放大器的功耗计算
  9. 更喜欢卡萨布兰卡花,(怪不得我是单身狗)
  10. 使用PACKET_MMAP + PF_PACKET实现ZERO COPY抓包和发包