rabbitmq

(1)生产者弄丢了数据

生产者将数据发送到rabbitmq的时候,可能数据就在半路给搞丢了,因为网络啥的问题,都有可能。

此时可以选择用rabbitmq提供的事务功能,就是生产者发送数据之前开启rabbitmq事务(channel.txSelect),然后发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;如果收到了消息,那么可以提交事务(channel.txCommit)。但是问题是,rabbitmq事务机制一搞,基本上吞吐量会下来,因为太耗性能。

所以一般来说,如果你要确保说写rabbitmq的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq中,rabbitmq会给你回传一个ack消息,告诉你说这个消息ok了。如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息rabbitmq接收了之后会异步回调你一个接口通知你这个消息接收到了。

所以一般在生产者这块避免数据丢失,都是用confirm机制的。

(2)rabbitmq弄丢了数据

就是rabbitmq自己弄丢了数据,这个你必须开启rabbitmq的持久化,就是消息写入之后会持久化到磁盘,哪怕是rabbitmq自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。

设置持久化有两个步骤,第一个是创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里的数据;第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时rabbitmq就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。

而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,rabbitmq挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。

哪怕是你给rabbitmq开启了持久化机制,也有一种可能,就是这个消息写到了rabbitmq中,但是还没来得及持久化到磁盘上,结果不巧,此时rabbitmq挂了,就会导致内存里的一点点数据会丢失。

(3)消费端弄丢了数据

rabbitmq如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,rabbitmq认为你都消费了,这数据就丢了。

这个时候得用rabbitmq提供的ack机制,简单来说,就是你关闭rabbitmq自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那rabbitmq就认为你还没处理完,这个时候rabbitmq会把这个消费分配给别的consumer去处理,消息是不会丢的。

kafka

(1)消费端弄丢了数据

唯一可能导致消费者弄丢数据的情况,就是说,你那个消费到了这个消息,然后消费者那边自动提交了offset,让kafka以为你已经消费好了这个消息,其实你刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。

这不是一样么,大家都知道kafka会自动提交offset,那么只要关闭自动提交offset,在处理完之后自己手动提交offset,就可以保证数据不会丢。但是此时确实还是会重复消费,比如你刚处理完,还没提交offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

生产环境碰到的一个问题,就是说我们的kafka消费者消费到了数据之后是写到一个内存的queue里先缓冲一下,结果有的时候,你刚把消息写入内存queue,然后消费者会自动提交offset。

然后此时我们重启了系统,就会导致内存queue里还没来得及处理的数据就丢失了

(2)kafka弄丢了数据

这块比较常见的一个场景,就是kafka某个broker宕机,然后重新选举partiton的leader时。大家想想,要是此时其他的follower刚好还有些数据没有同步,结果此时leader挂了,然后选举某个follower成leader之后,他不就少了一些数据?这就丢了一些数据啊。

生产环境也遇到过,我们也是,之前kafka的leader机器宕机了,将follower切换为leader之后,就会发现说这个数据就丢了

所以此时一般是要求起码设置如下4个参数:

给这个topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少2个副本

在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower吧

在producer端设置acks=all:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了

在producer端设置retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了

我们生产环境就是按照上述要求配置的,这样配置之后,至少在kafka broker端就可以保证在leader所在broker发生故障,进行leader切换时,数据不会丢失

(3)生产者会不会弄丢数据

如果按照上述的思路设置了ack=all,一定不会丢,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

rabbitmq接收不到消息_分布式消息队列:如何保证消息的可靠性传输相关推荐

  1. rocketmq支持最大消息_分布式消息引擎Apache RocketMQ最佳实践

    1 Producer一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置 只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤 ...

  2. 【重难点】【RabbitMQ 02】如何避免消息重复投递和消息重复消费、如何防止消息丢失、如何保证消息的顺序性、如何保证消息队列的可用性

    [重难点][RabbitMQ 02]如何避免消息重复投递和消息重复消费.如何防止消息丢失.如何保证消息的顺序性.如何保证消息队列的可用性 文章目录 [重难点][RabbitMQ 02]如何避免消息重复 ...

  3. 消息队列如何保证消息的幂等性

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到网站. 文章目录 什么是幂等性 什么是消息的幂等性 为什么会出现消息幂等性问题 该如何解决消息幂等性问题 总 ...

  4. 你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你

    我们将消息队列这个组件加入到了我们的商城系统里,并且通过秒杀这个实际的案例进行了实际演练,知道了它对高并发写流量做削峰填谷,对非关键业务逻辑做异步处理,对不同的业务系统做解耦合. 场景: 现在我们的电 ...

  5. 公众号 接收规则 消息_微信公众平台 发送模板消息(Java接口开发)

    前言:最近一直再弄微信扫码推送图文消息和模板消息发送,感觉学习到了不少东西.今天先总结一下微信公众平台模板消息的发送.因为这个自己弄了很久,开始很多地方不明白,所以今天好好总结一下. 微信公众平台技术 ...

  6. 延时消息_手把手实现一条延时消息

    前言 近期在维护公司的调度平台,其中有个关键功能那就是定时任务:定时任务大家平时肯定接触的不少,比如 JDK 中的 Timer.ScheduledExecutorService.调度框架 Quartz ...

  7. rabbitmq如何保证消息不被重复消费_RabbitMQ保证消息可靠投递与消费的正确使用姿势...

    前言 MQ 是什么?MQ 我们可以理解为消息队列. 队列是什么?队列我们可以理解为管道. 即以管道的方式做消息传递. 场景展示: 1.我们在双11的凌晨大量秒杀和抢购商品,然后去结算的时候,发现界面会 ...

  8. 分布式消息队列如何保证消息有且仅被消费一次?

    自己总结,仅供参考 一.保证消息生产成功.未丢失 这个要从生产者与MQ的角度去保证 1.生产者 生产者投递消息后,等待mq的消息接收成功ack(同步或者异步形式),成功则代表生产成功,失败则重试: 另 ...

  9. 消息队列怎么保证消息有没有重复消费(幂等性)?

    普通业务控制幂等性 1.mysql唯一索引 2.token机制(请求前生成一个token,请求时携带这个token,如果这个token在redis中没有则继续,有则 有请求进行中) 3.mysql悲观 ...

最新文章

  1. 【BZOJ4259】残缺的字符串
  2. linux 子系统 巡检,Linux 系统巡检
  3. python Day6 面向对象学习
  4. 前端学习资料(书籍和视频)
  5. 获取列表中包含的元素数 在C#中
  6. Docker-容器数据卷
  7. php时间转两位数年份,PHP常用时间函数资料整理
  8. 各种水龙头拆卸图解_[各种水龙头拆卸图解]水龙头漏水怎么办
  9. C#基础7:类的定义
  10. Linux操作系统中awk语言常见用法
  11. XILINX FPGA数字信号处理——4、CORDIC算法原理及实现
  12. xcode 可以打开xmind_思维导图,原来Xmind这么强大
  13. 简要罗列通过Allegro绘制的PCB封装步骤
  14. Uubuntu20.04配置openpose
  15. 《又到毕业季》MATLAB GUI 鼠标键盘交互
  16. 西瓜书-机器学习复习<HENU>
  17. vscode免密登录需要更改authorized_keys的权限
  18. 如何在NVivo中使用编码条探索编码?
  19. Docker的安装及加速器配置
  20. 好用的Chrome插件大全网站:插件小屋

热门文章

  1. 通过掌握谷歌成为更好的程序员
  2. 并注册烧写钩子 获取启动介质类型_鸿蒙OS开源代码精要解读之—— 系统服务框架子系统(服务启动)...
  3. android自定义键盘小数点键,android实现 自定义键盘 keydemo
  4. 力扣(LeetCode)46
  5. Selenium学习(11) 网页截图
  6. kill 的常用信号
  7. 英特尔与Verizon合力推动5G技术 新网络传输革命即将来临
  8. oracle导入导出表
  9. 路由器访问控制列表基础知识
  10. OpenCV-绘制标记符cv::drawMarker