为了保证消息不丢失,从生产者两种模式(确认模式和返回模式)到消费者手动确认的一个大整合

生产者

1.maven依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --><!-- 用于rabbitmq发送json消息 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.7.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>

2.配置文件yml

server:port: 8001spring:rabbitmq:host: 192.168.56.101port: 5672virtual-host: /emsusername: adminpassword: admin# 开启确认模式publisher-confirms: true# 开启回退模式publisher-returns: true

3.编写rabbitmq配置类RabbitmqConfig,包含交换机、队列、绑定队列与交换机、设置确认模式和返回模式以及消息转换器

/*** @author xiaozikang* @date 2020/10/10 11:02* @Email:xiaozikangwy@163.com*/@Configuration
public class RabbitmqConfig {public static final String CONFIRM_EXCAHNGE = "confirmExchange";public static final String CONFIRM_QUEUE = "confirmQueue";// 交换机@Bean("confirmExchange")public Exchange confirmExchange(){return ExchangeBuilder.directExchange(CONFIRM_EXCAHNGE).durable(true).build();}// 队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE).build();}// 绑定队列与交换机@Beanpublic Binding binding(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") Exchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("confirm").noargs();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 定义消息转换器Jackson2JsonMessageConverter,发送json数据rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());// 设置开启Mandatory,如果消息没路由到队列,则返回给消息发送方(如果不开启,默认丢弃消息)rabbitTemplate.setMandatory(true);// 定义确认模式rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {/***  correlationData 相关配置信息*  ack exchange交换机是否成功收到了消息,true成功*  cause 失败原因*/if (ack){System.out.println("exchange成功收到消息");}else {System.out.print("未收到消息:" + cause);// 可以做些处理,再次发送}}});// 定义返回模式rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {/***  message 消息对象*  replyCode 错误码*  replyText 错误信息*  exchange 交换机*  routingKey 路由键*/System.out.println("回调函数执行了");}});return rabbitTemplate;}@Beanpublic Jackson2JsonMessageConverter producerJackson2MessageConverter(){return new Jackson2JsonMessageConverter();}}

4.实体类(主要用于测试发送json消息)

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {private String name;private Integer age;
}

5.发送消息

@Testpublic void testf01(){User user = new User("小明",10);for (int i = 0; i < 5; i++) {rabbitTemplate.convertAndSend(RabbitmqConfig.CONFIRM_EXCAHNGE,"confirm",user);}}

消费者

1.maven依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.7.5</version></dependency>

2.配置文件yml

server:port: 9020spring:rabbitmq:host: 192.168.56.101port: 5672virtual-host: /emsusername: adminpassword: adminlistener:simple:# 消费手动确认acknowledge-mode: manual# 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发prefetch: 2# 指定最小的消费者数量# concurrency: 1# 指定最大的消费者数量# max-concurrency: 2

3.编写配置类(这边主要用于接受json类型消息)

/*** @author xiaozikang* @date 2020/10/12 17:50* @Email:xiaozikangwy@163.com*/
@Configuration
public class RabbitConfig implements RabbitListenerConfigurer {@Overridepublic void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());}@BeanMessageHandlerMethodFactory messageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());return messageHandlerMethodFactory;}@Beanpublic MappingJackson2MessageConverter consumerJackson2MessageConverter() {return new MappingJackson2MessageConverter();}
}

4.消费消息

@Component
public class Consumer {@RabbitListener(queues = "confirmQueue")public void process(User user, Channel channel, Message message) throws IOException {// 1.接受转换消息long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 2.处理业务逻辑System.out.println("处理业务逻辑。。。。。。");// 3.手动签收// channel.basicAck(deliveryTag,true);} catch (Exception e) {// e.printStackTrace();// 4.出现异常,拒绝签收/*** requeue 重回队列* multiple false只确认当前consumer一个消息收到,true确认所有consumer获得的消息*/channel.basicNack(deliveryTag,true,true);}}}

rabbitmq高级特性(消息手动确认)相关推荐

  1. RabbitMQ 高级特性(吐血猝死整理篇)

    文章目录 RabbitMQ 高级特性 消息可靠性投递(可靠性发送) 事务机制 代码实现 发送方确认机制 为什么比事务性能好 示例代码 测试一下QPS 持久化存储 TTL 队列 死信队列(DLX) 延迟 ...

  2. RabbitMQ(二):RabbitMQ高级特性

    RabbitMQ(二):RabbitMQ高级特性 RabbitMQ是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用.作为一名合格的开发者,有必要了解一下相关知识,RabbitM ...

  3. RabbitMQ高级特性

    文章目录 1. 简述 2. 特性示例: 2.1 消息可靠性投递 2.2 Consumer Ack 2.3 消费端限流 2.4 TTL 2.5 死信队列 2.6 延迟队列 1. 简述 在rabbitMQ ...

  4. 3 RabbitMQ高级特性 3

    主要为大家讲解RabbitMQ的高级特性和实际场景应用, 包括消息如何保障 100% 的投递成功 ? 幂等性概念详解,在海量订单产生的业务高峰期,如何避免消息的重复消费问题? Confirm确认消息. ...

  5. 【消息中间件】RabbitMQ 高级特性与应用问题

    消息的可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 r ...

  6. 1-8 (4). RabbitMQ高级特性-消费端ACK

    Consumer ACK 指Acknowledge,确认 有三种方式: (1)自动确认:acknowledge="none"(默认) (2)手动确认:acknowledge=&qu ...

  7. RabbitMQ高级特性——死信队列DLX以及代码测试

    大伙可以到我的RabbitMQ专栏获取更多信息 demo示例这里拿 概述 死信队列,缩写DLX(dead letter exchange 死信交换机),当消息称为dead message之后,会被重新 ...

  8. RabbitMQ高级特性(五):RabbitMQ之死信队列DLX

    一.死信队列简介 (1)死信队列 死信队列,英文缩写:DLX .Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就 ...

  9. RabbitMQ(13)RabbitMQ高级特性:TTL

    TTL全称Time To Live (存活时间/过期时间). 当消息到达存活时间后,还没有被消费,会被自动清除.. RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间. ...

最新文章

  1. .net System.Web.Caching.Cache缓存类使用详解(转载)
  2. php成绩管理前段模板,php学生成绩管理系统(模板).doc
  3. android Button背景高度被拉伸问题--解决方案
  4. yfan.qiu linux硬链接与软链接
  5. Nginx+PHP实时生成不同尺寸图片
  6. Map集合使用get方法返回null抛出空指针异常问题
  7. 如何自学并且系统学习计算机网络?(知乎问答)
  8. ORACLE清空数据库中所有表中的数据
  9. 安装ubuntu教程
  10. 如何区分杠精和批判性思维
  11. 大数据与机器学习算法相关的电子书分享
  12. 设计模式之禅【适配器模式】
  13. 谁说程序员年龄大了,就没出路了?
  14. 3G上网:按时长计费是运营商的“最佳选择”
  15. 微机原理与接口技术的基础知识
  16. Linux ALSA 之六:ALSA ASoc 架构
  17. opensips3.0之新工具opensips-cli
  18. python量化选股策略_牛刀小试-小市值选股策略
  19. BP神经网络隐藏层单元数的选择--(1)
  20. JSON文件读写操作详解

热门文章

  1. 凹凸手游服务器维护中是什么意思,凹凸世界手游测试炸服补偿公告介绍 开服进不去有什么补偿[多图]...
  2. 空气颗粒度PM2.5的检测设计与实现
  3. echarts设置坐标轴标题的样式
  4. swagger在VS2019 .net core2.2中的使用,及报错undefined /swagger/v1/swagger.json的解决办法
  5. 在Mac系统环境下如何制作win 10 启动盘
  6. 判断当前时间是否在股票开盘时间,不考虑周六周日和节假日
  7. CometOJ国庆欢乐赛 C两排房子 二分 D1 二分贪心 E贪心特判
  8. 兽耳怎么画?怎样才能画好兽耳?
  9. 1147 简单评委打分
  10. Todesk远程连接时一直显示密码错误