添加 amqp 依赖

配置文件 application.properties

生产者

消费者

可以直接使用@RabbitListener注解,声明Queue和Exchange以及Binding关系。消费端接收的消息是Message对象,结构为:

我们可以直接通过注解@Payload 获取我们传输的数据,通过注解@Headers 获取消息请求头。这里我们增加了消息限流的功能,防止生产过多,导致消费者消费吃力的情况:channel.basicQos(0, 1, false):0表示对消息的大小无限制,1表示每次只允许消费一条,false表示该限制不作用于channel。同时,我们这里采用手工ACK的方式,因为我们配置文件配置了spring.rabbitmq.listener.simple.acknowledge-mode=manual:

channel.basicAck(deliveryTag, false):deliveryTag表示处理的消息条数(一般为1),从heaers中取,false表示不批量ack。

DLX(死信队列)DLX定义DLX为Dead Letter Exchange,死信队列。当一个消息在一个队列中变成死信(dead message)之后,它能重新publish到另一个Exchange,那么这个让消息变为死信的Exchange就是DLX(死信队列)。

消息变成死信的几种情况1、消息被拒绝,ack为false,并且 requeue=false;2、消息TTL(Time To Live)过期,指消息达到了过期时间;3、队列达到最大长度。

死信队列代码演示:1、声明一个死信Exchange、Queue以及Binding

2、声明一个转发队列,来接收死信消息

3、对发送的消息设置TTL,模拟DLX场景

这里设置了10s的过期时间,消息一开始是在DL_QUEUE队列上,如果10s之内还没被消费,则会进入FORWARD_QUEUE(转发)队列。以上的代码示例,通过DLX实现了延迟队列的功能,即将消息发送至DL_QUEUE队列上,10s之后才会被FORWARD_QUEUE消费(消费者监听该队列即可),从而起到了延迟消费的功能。

可靠性投递解决方案

可靠性投递,即保证消息的100%被消费。目前,互联网大厂主要的解决方案,有两种:1、消息落库,对消息状态进行打标2、消息的延迟投递,做二次确认,回调检查消息落库

延迟投递

延迟投递方案相比消息落库方案,优势是在于把msg db剥离了核心业务,在大业务量的场景中,会减少核心业务的数据库压力(少了一次msg db的数据插入)。

消息幂等性解决方案

幂等性指的是,使用相同参数对同一资源重复调用某个接口的结果与调用一次的结果相同。可能导致消息出现非幂等的原因:

消息的幂等性

主要的原因是第三个,当消费端抛出异常,并且requeue=true(默认为true),消息会一直重新入队进行消费,这样就导致了重复消费。

幂等性解决方案

首先,给每条消息生成一个全局唯一的ID标识(messageId),然后在到达消费端时,先将消息(messageId作为主键)插入MSG DB数据库,然后再执行业务。这样如果消费端中途抛出了异常,重新入队消费时,由于还是同一个消息,则meesageId还是不变的,当插入MSG DB数据库时,会因为主键冲突而抛异常,此时我们捕捉该异常,并且拒绝该消息(channel.basicReject)即可。

Spring Cloud中RabbitMQ配置属性表属性名说明默认值spring.rabbitmq.address客户端连接的地址,有多个的时候使用逗号分隔,该地址可以是IP与Port的结合

spring.rabbitmq.cache.channel.checkout-timeout当缓存已满时,获取Channel的等待时间,单位为毫秒

spring.rabbitmq.cache.channel.size缓存中保持的Channel数量

spring.rabbitmq.cache.connection.mode连接缓存的模式CHANEL

spring.rabbitmq.cache.connection.size缓存的连接数

spring.rabbitmq.connnection-timeout连接超时参数单位为毫秒:设置为“0”代表无穷大

spring.rabbitmq.dynamic默认创建一个AmqpAdmin的Beantrue

spring.rabbitmq.hostRabbitMQ的主机地址localhost

spring.rabbitmq.listener.acknowledge-mode容器的acknowledge模式

spring.rabbitmq.listener.auto-startup启动时自动启动容器true

spring.rabbitmq.listener.concurrency消费者的最小数量

spring.rabbitmq.listener.default-requeue-rejected投递失败时是否重新排队true

spring.rabbitmq.listener.max-concurrency消费者的最大数量

spring.rabbitmq.listener.prefetch在单个请求中处理的消息个数,他应该大于等于事务数量

spring.rabbitmq.listener.retry.enabled不论是不是重试的发布false

spring.rabbitmq.listener.retry.initial-interval第一次与第二次投递尝试的时间间隔1000

spring.rabbitmq.listener.retry.max-attempts尝试投递消息的最大数量3

spring.rabbitmq.listener.retry.max-interval两次尝试的最大时间间隔10000

spring.rabbitmq.listener.retry.multiplier上一次尝试时间间隔的乘数1.0

spring.rabbitmq.listener.retry.stateless不论重试是有状态的还是无状态的true

spring.rabbitmq.listener.transaction-size在一个事务中处理的消息数量。为了获得最佳效果,该值应设置为小于等于每个请求中处理的消息个数,即spring.rabbitmq.listener.prefetch的值

spring.rabbitmq.password登录到RabbitMQ的密码

spring.rabbitmq.portRabbitMQ的端口号5672

spring.rabbitmq.publisher-confirms开启Publisher Confirm机制false

spring.rabbitmq.publisher-returns开启publisher Return机制false

spring.rabbitmq.requested-heartbeat请求心跳超时时间,单位为秒

spring.rabbitmq.ssl.enabled启用SSL支持false

spring.rabbitmq.ssl.key-store保存SSL证书的地址

spring.rabbitmq.ssl.key-store-password访问SSL证书的地址使用的密码

spring.rabbitmq.ssl.trust-storeSSL的可信地址

spring.rabbitmq.ssl.trust-store-password访问SSL的可信地址的密码

spring.rabbitmq.ssl.algorithmSSL算法,默认使用Rabbit的客户端算法库

spring.rabbitmq.template.mandatory启用强制信息false

spring.rabbitmq.template.receive-timeoutreceive()方法的超时时间0

spring.rabbitmq.template.reply-timeoutsendAndReceive()方法的超时时间5000

spring.rabbitmq.template.retry.enabled设置为true的时候RabbitTemplate能够实现重试false

spring.rabbitmq.template.retry.initial-interval第一次与第二次发布消息的时间间隔1000

spring.rabbitmq.template.retry.max-attempts尝试发布消息的最大数量3

spring.rabbitmq.template.retry.max-interval尝试发布消息的最大时间间隔10000

spring.rabbitmq.template.retry.multiplier上一次尝试时间间隔的乘数1.0

spring.rabbitmq.username登录到RabbitMQ的用户名

spring.rabbitmq.virtual-host连接到RabbitMQ的虚拟主机

rabbitmq可靠性投递_RabbitMQ可靠性相关推荐

  1. rabbitmq可靠性投递_RabbitMQ 可靠投递

    背景 - confirmCallback 确认模式 - returnCallback 未投递到 queue 退回模式 - shovel-plugin 跨机房可靠投递 背景 在使用 RabbitMQ 的 ...

  2. RabbitMQ消息可靠性投递及分布式事务最终一致性实现

    RabbitMQ消息可靠性投递就是保证消息生产者能够将消息百分百投递到RabbitMQ服务器,并在传递过程中不丢失.然而在生产环境中由于网络中断.网络不稳定等原因导致消息在投递过程中丢失,这或许会造成 ...

  3. RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

    搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...

  4. rabbitmq怎样确认是否已经消费了消息_阿里Java研发二面:了解RabbitMQ?说说RabbitMQ可靠性投递...

    上期写到高并发下RabbitMq消息中间件你应该介么玩今天给小伙伴说说!有自己看法的也可以在评论区留言探讨,也可以转发关注下我以后会长期分享! 目录: 确保消息发送到RabbitMQ服务器 确保消息被 ...

  5. rabbitmq可靠性投递_阿里Java研发二面:了解RabbitMQ?说说RabbitMQ可靠性投递

    上期写到高并发下RabbitMq消息中间件你应该介么玩今天给小伙伴说说!有自己看法的也可以在评论区留言探讨,也可以转发关注下我以后会长期分享! 目录:确保消息发送到RabbitMQ服务器 确保消息被正 ...

  6. Rabbitmq专题:rabbitMQ如何保证消息的可靠性投递?如何防止消息丢失

    文章目录 1. 消息可能出现丢失的情况 2. 生产者如何保证消息的可靠性投递 2.1 消息落库打标 + confirm机制 2.2 消息幂等性如何保证? 2.3 延时消息确认 3. rabbitMQ服 ...

  7. 【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战

    目录 一.绪论 二.生产者 2.1事务机制 2.2confirm模式 串行模式 批量模式 异步模式 三.消费者 3.1手动ACK 一.绪论 上篇文章介绍了rabbitmq的基本知识.交换机类型实战&l ...

  8. RabbitMQ可靠性投递与高可用架构

    一,可靠性投递 使用 RabbitMQ 实现异步通信的时候,消息丢了怎么办,消息重复消费怎么办? 在 RabbitMQ 里面提供了很多保证消息可靠投递的机制,这个也是 RabbitMQ 的一个特性. ...

  9. RabbitMQ消息中间件(二) RabbitMQ如何保证消息的可靠性投递

    RabbitMQ如何保证消息投递的准确性? 生产端的可靠性投递: 1.保证消息成功发送 2.保证MQ节点成功接收 3.发送端收到MQ节点(Broker)确认应答 4.完善的消息补偿机制 BAT等大厂解 ...

最新文章

  1. win 复制linux文件命令行,windows与Linux间远程拷贝文件(pscp命令)
  2. C++程序的内存布局
  3. 手机版网页开发_华为低调发布鸿蒙OS手机开发者Beta版,明年覆盖1亿台设备
  4. Chrome和Firefox浏览器长截图
  5. 使用HttpUnit进行集成测试
  6. 库克跟乔布斯差几代iPhone? 解读iPhone十年变与不变
  7. Hangfire在ASP.NET CORE中的简单实现方法
  8. 虚拟机安装win10专业版
  9. 龙果支付 mysql_龙果学院 基于电商业务的企业级大中台从设计到实现(第一阶段) 百度云 百度网盘...
  10. 扒一扒那些叫欧拉的定理们(二)——简单多面体欧拉定理的证明
  11. mysql临时表插入数据
  12. css文字闪光特效,利用js css3实现文字闪光滑过动画特效
  13. python---之scipy.ndimage.measurements.label
  14. 搭配Online:腾讯吃鸡手游《PUBG Mobile》及《和平精英》(前《刺激战场》)全球收入超15亿美元!
  15. Python免杀脚本生成.exe(过火绒过联想没过360)
  16. 在Ubuntu 16.04中安装Google拼音
  17. 创建索引-资源正忙的解决方案及原理
  18. ROS入门七 机器人建模——URDF
  19. 文思海辉应届生java面试_2019文思海辉面试经验(java程序员,项目经理助理等)
  20. HCNP学习笔记之HCNP学习的几种境界

热门文章

  1. java代码建立删除vpn_Java类集综合练习——信息管理(增、删、改、查)
  2. 抽屉效果_越来越多人家装了餐边柜,为什么不多加一排抽屉?很多家庭没想到...
  3. 修改mongodb最大查询数_mongodb数据库如何查询某个字段的最大值?
  4. 过分的谜题 模拟+思维题
  5. 【TensorFlow-windows】(四) CNN(卷积神经网络)进行手写数字识别(mnist)
  6. 机器学习入门笔记(一):模型性能评价与选择
  7. Tomcat启动乱码及IDEA中tomcat信息乱码解决方法
  8. govendor用法
  9. stm32的串口DMA空闲中断接收不等长数据,stm32F4的usart2-DMA-IDLE收发
  10. LVS小型系统架构搭建笔记