一、springboot整合rabbitmq

小说网 m.198200.com

  1. 我们需要新建两个工程,一个作为生产者,另一个作为消费者。在pom.xml中添加amqp依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 在application.yml文件中添加rabbitmq的相关信息:
spring:rabbitmq:# 连接地址host: 127.0.0.1# 端口port: 5672# 登录账号username: guest# 登录密码password: guest# 虚拟主机virtual-host: /
  1. 在生产者工程中新建配置项rabbitmqConfig.java,申明名称为”byte-zb“直连交换机和队列,使用”byte-zb“的routing-key将队列和交换机绑定,代码如下:
@Configuration
public class RabbitConfig {public static final String QUEUE_NAME = "byte-zb";public static final String EXCHANGE_NAME = "byte-zb";public static final String ROUTING_KEY = "byte-zb";// 队列申明@Beanpublic Queue queue(){return new Queue(QUEUE_NAME);}// 申明交换机@Beanpublic DirectExchange directExchange(){return new DirectExchange(EXCHANGE_NAME);}// 数据绑定申明@Beanpublic Binding directBinding(){return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY);}
}
  1. 创建生产者发送一条消息,代码如下:
@RestController
public class Producer {public static final String QUEUE_NAME = "byte-zb";public static final String EXCHANGE_NAME = "byte-zb";@Autowiredprivate AmqpTemplate amqpTemplate;@RequestMapping("/send")public void sendMessage(){JSONObject jsonObject = new JSONObject();jsonObject.put("email","11111111111");jsonObject.put("timestamp",System.currentTimeMillis());String json = jsonObject.toJSONString();System.out.println(json);amqpTemplate.convertAndSend(EXCHANGE_NAME,QUEUE_NAME,json);}}
  1. 在消费者工程里创建消费者消费消息,代码如下:
@Component
public class Consumer throws Exception{public static final String QUEUE_NAME = "byte-zb";@RabbitListener(queues = QUEUE_NAME)public void receiveMessage(String message){System.out.println("接收到的消息为"+message);}
}

我们启动生产者,然后请求send接口,然后打开rabbitmq控制台发现多了一个名为”byte-zb“的交换机和队列,并且队列中出现了一个未消费的消息,然后启动消费者,我们会在控制台上发现打印了一条消息,同时rabbitmq控制台中”byte-zb“的队列中消息没有了。

二、自动补偿机制

如果消费者消息消费不成功的话,会出现什么情况呢?我们修改一下消费者代码,然后看看。

@Component
public class Consumer {public static final String QUEUE_NAME = "byte-zb";@RabbitListener(queues = QUEUE_NAME)public void receiveMessage(String message) throws Exception {System.out.println("接收到的消息为"+message);int i = 1 / 0;}
}

我们会看到消费者工程控制台一直在刷新报错,当消费者配出异常,也就是说当消息消费不成功的话,该消息会存放在rabbitmq的服务端,一直进行重试,直到不抛出异常为止。

如果一直抛异常,我们的服务很容易挂掉,那有没有办法控制重试几次不成功就不再重试了呢?答案是有的。我们在消费者application.yml中增加一段配置。

spring:rabbitmq:# 连接地址host: 127.0.0.1# 端口port: 5672# 登录账号username: guest# 登录密码password: guest# 虚拟主机virtual-host: /listener:simple:retry:enabled: true # 开启消费者进行重试max-attempts: 5 # 最大重试次数initial-interval: 3000 # 重试时间间隔

上面配置的意思是消费异常后,重试五次,每次隔3s。继续启动消费者看看效果,我们发现重试五次以后,就不再重试了。

三、结合实际案例来使用消息补偿机制

像上面那种情况出现的异常其实不管怎么重试都不会成功,实际上用到消息补偿的就是调用第三方接口的这种。

案例:生者往队列中扔一条消息,包含邮箱和发送内容。消费者拿到消息后将调用邮件接口发送邮件。有时候可能邮件接口由于网络等原因不通,这时候就需要去重试了。

在调用接口的工具类中,如果出现异常我们直接返回null,工具类具体代码就不贴了,如果返回null之后怎么处理呢?我们只需要抛出异常,rabbitListener捕获到异常后就会自动重试。

我们改造一下消费者代码:

@Component
public class Consumer {public static final String QUEUE_NAME = "byte-zb";@RabbitListener(queues = QUEUE_NAME)public void receiveMessage(String message) throws Exception {System.out.println("接收到的消息为"+message);JSONObject jsonObject = JSONObject.parseObject(message);String email = jsonObject.getString("email");String content = jsonObject.getString("timestamp");String httpUrl = "http://127.0.0.1:8080/email?email"+email+"&content="+content;// 如果发生异常则返回nullString body = HttpUtils.httpGet(httpUrl, "utf-8");//if(body == null){throw new Exception();}}
}

当然我们可以自定义异常抛出。具体怎么试验呢,第一步启动生产者和消费者,这时候我们发现消费者在重试,第二步我们启动邮件服务,这时候我们会发现邮件发送成功了,消费者不再重试了

四、解决消息幂等性问题

一些刚接触java的同学可能对幂等性不太清楚。幂等性就是重复消费造成结果不一致。为了保证幂等性,因此消费者消费消息只能消费一次消息。我么可以是用全局的消息id来控制幂等性。当消息被消费了之后我们可以选择缓存保存这个消息id,然后当再次消费的时候,我们可以查询缓存,如果存在这个消息id,我们就不错处理直接return即可。先改造生产者代码,在消息中添加消息id:

@RequestMapping("/send")public void sendMessage(){JSONObject jsonObject = new JSONObject();jsonObject.put("email","11111111111");jsonObject.put("timestamp",System.currentTimeMillis());String json = jsonObject.toJSONString();System.out.println(json);Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();amqpTemplate.convertAndSend(EXCHANGE_NAME,QUEUE_NAME,message);}

消费者代码改造:

@Component
public class Consumer {public static final String QUEUE_NAME = "byte-zb";@RabbitListener(queues = QUEUE_NAME)public void receiveMessage(Message message) throws Exception {Jedis jedis = new Jedis("localhost", 6379);String messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"UTF-8");System.out.println("接收导的消息为:"+msg+"==消息id为:"+messageId);String messageIdRedis = jedis.get("messageId");if(messageId == messageIdRedis){return;}JSONObject jsonObject = JSONObject.parseObject(msg);String email = jsonObject.getString("email");String content = jsonObject.getString("timestamp");String httpUrl = "http://127.0.0.1:8080/email?email"+email+"&content="+content;// 如果发生异常则返回nullString body = HttpUtils.httpGet(httpUrl, "utf-8");//if(body == null){throw new Exception();}jedis.set("messageId",messageId);}
}

我们在消费者端使用redis存储消息id,只做演示,具体项目请根据实际情况选择相应的工具进行存储。

如果文章对您有帮助,请记得点赞关注哟~
欢迎大家关注我的公众号:字节传说,每日推送技术文章供大家学习参考。

rabbitmq系列(三)消息幂等性处理相关推荐

  1. RabbitMQ入门学习系列(三).消息发送接收

    快速阅读 用Rabitmq的队列管理,以及如何保证消息在队列中不丢失.通过ack的消息确认和持久化进行操作.以及Rabbit中如何用Web面板进行管理队列.消费者如何处理耗时的任务 生产者代码创建链接 ...

  2. RabbitMq系列(九):主题交换Topic Exchange

    系列文章 RabbitMq系列(一):服务器搭建 RabbitMq系列(二):最简单的例子 RabbitMq系列(三):工作队列 RabbitMq系列(四):消息确认和持久性 RabbitMq系列(五 ...

  3. MQ消息的自动应答和手动应答| RabbitMQ系列(三)

    相关文章 RabbitMQ系列汇总:RabbitMQ系列 前言 开始消息应答之前先思考几个问题 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会 ...

  4. RabbitMQ系列(三)RabbitMQ交换器Exchange介绍与实践

    RabbitMQ交换器Exchange介绍与实践 RabbitMQ系列文章 RabbitMQ在Ubuntu上的环境搭建 深入了解RabbitMQ工作原理及简单使用 RabbitMQ交换器Exchang ...

  5. SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压

    1.消息可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 ...

  6. RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)

    说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...

  7. RabbitMQ (三)消息重试

    1 RabbitMQ自带的重试机制 1 示例代码 rabbitMQ为自带了消息重试机制:当消费者消费消息失败时,可以选择将消息重新"推送"给消费者,直至消息消费成功为止. 开启自带 ...

  8. RabbitMQ消息幂等性问题

    文章目录 1. 什么是幂等性? 1.1 消息队列的幂等性 1.2 模拟重试机制 1.2.1 生产者代码 1.2.2 消费者代码 1.2.3 消费者 application.yml 配置 2. 如何保证 ...

  9. RabbitMQ(mq) 如何处理高并发、负载均衡、消息幂等性、丢失、消息顺序错乱问题?

    目录 介绍: 1.连接器(connection): 2.信道.通道(channel): 3.交换机(exchange): 4.队列(queue): 以下通过两个例子,让我们先来对rabbitmq 有个 ...

最新文章

  1. CodeChef--EQUAKE
  2. 解决ssm项目表单数据提交到数据库乱码问题
  3. 软考高项之范围管理-攻坚记忆
  4. JZOJ 3808. 【NOIP2014模拟8.25】道路值守
  5. python多态_python 多态
  6. 编写一个判断素数的函数,在主函数输入一个整数时,输出是否素数的信息。...
  7. java版spring cloud+spring boot+redis多租户社交电子商务平台(三)SpringBoot用JdbcTemplates访问Mysql...
  8. 【渝粤教育】广东开放大学 网络市场与预测 形成性考核 (23)
  9. 第 132 章 Example
  10. [cocos2d]格式化获取当前layer的控件名
  11. Articles for objccn.io. objc.io的完整、准确、优雅的中文翻译版本 http://objccn.io/
  12. es客户端工具_超越 Cookie:当今的客户端数据存储
  13. 【转】解密微软的架构师之路
  14. Visual Studio 2022配置GAMP出现 LNK2019无法解析外部符号_imp_timeGetTime@0
  15. 公司老总直接面试 我该如何准备
  16. html网页字体颜色代码大全
  17. 短视频合集怎么做,教你快速合并的技巧
  18. Java 程序员,年薪 40W 需要什么水平?
  19. python 导入的nan怎么解决_如何在Python中使用Lmfit解决NaN值错误
  20. 随机变量-离散-连续-假设检验方法

热门文章

  1. 跟偶一起做:击退眼睛疲劳的五大运动
  2. H5学习笔记(九)高度塌陷问题
  3. 【Linux】Protected multilib versions XXX错误
  4. 一千零一夜的观后感(一)
  5. C++设计模式——观察者模式(高屋建瓴)
  6. duck typing java_Duck typing
  7. JPA二:FindBy和JPQL
  8. crm订单管理系统有哪些?
  9. Python入门基础篇 No.8 —— 时间的表示_unix时间点_毫秒_time模块
  10. Win10电脑桌面壁纸自动变成黑色无法更换怎么解决