RabbitMq在我们日常开发中不可或缺,作为主流消息中间件,可以用于项目中的应用解耦、流量削峰、异步处理(非主流任务交由队列下发处理)等,本文着重介绍运用于项目中流量峰值时,依据服务器的消费能力进行削峰,最大限度保障服务器不宕机。


前期准备:安装rabbitMq、新建一个springboot项目

  略…

第一步:pom文件中导入amqp依赖

     <!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

第二步:yml中配置

此处生产者与消费者放到一个项目中,可以依据项目的需求调整生产者和消费者进行拆分。
除了通用配置外,还有两点说明:
  1.生产者消息确认配置项,确认消息发送到交换机和队列
  2.消费者配置手动确认配置项,默认消息是自动确认的,正常业务都需要手动确认(不手动确认,消息一直在)
生产者与消费者的详细完整配置如下:

spring:#配置rabbitMQrabbitmq:host: 192.168.144.133  #rabbitmq服务地址port: 5672username: adminpassword: 123456#消费者 手动确认配置项listener:type: simplesimple:acknowledge-mode: MANUAL #消息确认方式 MANUAL手动确认 NONE不确认 AUTO自动确认retry:enabled: true  #开启重试max-attempts: 3 #最大重试次数initial-interval: 5000ms #重试间隔时间#生产者 消息确认配置项#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true

第三步:生产者配置

  配置主题型交换机、队列(交换机是消息队列传输的载体),并且将队列和交换机绑定,并且设置绑定路由键

/*** 主题型交换机、队列配置*/
@Configuration
public class TopicRabbitConfig {@Beanpublic Queue geoQueue() {return new Queue("geo.inout.queue");}@Beanpublic TopicExchange geoExchange() {return new TopicExchange("geo-exchange");}/*** 将队列和交换机绑定,而且绑定的键值为geo.key.inout* 这样只要是消息携带的路由键是geo.key.inout,才会分发到该队列*/@BeanBinding bindingGeoExchange() {return BindingBuilder.bind(geoQueue()).to(geoExchange()).with("geo.key.inout");}
}

  消息确认回调函数配置(确认消息正常发送到RabbitMq上)

/*** 消息确认回调函数配置*/
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("ConfirmCallback:     "+"相关数据:"+correlationData);log.info("ConfirmCallback:     "+"确认情况:"+ack);log.info("ConfirmCallback:     "+"原因:"+cause);}});rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("ReturnCallback:     "+"消息:"+returnedMessage.getMessage());log.info("ReturnCallback:     "+"回应码:"+returnedMessage.getReplyCode());log.info("ReturnCallback:     "+"回应信息:"+returnedMessage.getReplyText());log.info("ReturnCallback:     "+"交换机:"+returnedMessage.getExchange());log.info("ReturnCallback:     "+"路由键:"+returnedMessage.getRoutingKey());}});return rabbitTemplate;}
}

  发送实体JSON序列化配置(防止消息乱码)

/*** 发送实体JSON序列化配置*/
@Configuration
public class RabbitProviderConfig implements InitializingBean {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void afterPropertiesSet() throws Exception {//使用JSON序列化rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());}
}

第四步:消费者配置

消费者消息接收监听类,说明:
  1.调用channel.basicAck()方法为消费端执行消息手动确认,即消息被消费
  2.因为RabbitMq要保持有序性,只有前面消费完了,后面才能消费,有可能出现消息消费慢的问题(接口反应处理慢)。这就需要在消费端开启多线程监听队列,
具体设置concurrency为开启多线程监听队列,concurrency = “5-8”:表示开启5个线程监听队列,最大为8个线程

/*** 主题型-消息接收监听类*/
@Slf4j
@Component
public class GeoMQReceiver {/*** 默认是单线程监听队列,消息消费会慢* 设置concurrency为开启多线程监听队列,concurrency = "5-8":表示开启5个线程监听队列,最大为8个线程*/@RabbitHandler@RabbitListener(queues = "geo.inout.queue", concurrency = "5-8")public void process(Map<String,Object> map, Message message, Channel channel) throws IOException {try {// TODO 你的业务处理// 手动确认消息// 第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 确认失败 将消息重新放回队列,让别人消费// 第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}
}

接收实体JSON序列化配置(防止消息乱码)

/*** 接收实体JSON序列化配置*/
@Configuration
public class RabbitMQConfig {@Beanpublic MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {return new Jackson2JsonMessageConverter(objectMapper);}
}

第五步:编写测试进行测试

  提供两个方法进行模拟,方法一为正常调用接口,方法二为通过消息中间件调用接口

@RestController
@RequestMapping("/geo")
public class GeoController {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 正常调用接口*/@PostMapping("/test1")public void test1(@RequestBody Map<String, Object> params){business(params);}/*** 通过消息中间件调用接口*/@PostMapping("/test2")public void test2(@RequestBody Map<String, Object> params){rabbitTemplate.convertAndSend("geo-exchange", "geo.key.inout", params);}/*** 业务处理方法*/public void business(Map<String, Object> params){//你的实际业务代码块try {String id = (String)params.get("id");log.info("获取到的参数:"+id);Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}
}

  消息接收监听类中处理业务方法

/*** 主题型-消息接收监听类*/
@Slf4j
@Component
public class GeoMQReceiver {@Autowiredprivate GeoController geoController;/*** 默认是单线程监听队列,消息消费会慢* 设置concurrency为开启多线程监听队列,concurrency = "5-8":表示开启5个线程监听队列,最大为8个线程*/@RabbitHandler@RabbitListener(queues = "geo.nb.inout.queue", concurrency = "5-8")public void process(Map<String,Object> map, Message message, Channel channel) throws IOException {try {geoController.business(map);// 手动确认消息// 第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 确认失败 将消息重新放回队列,让别人消费// 第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}
}

第六步:使用JMeter进行压力测试

  略…


  总结:当项目并发压力上来的时候,可以通过增加服务器性能、服务集群、数据缓存批量处理、数据库分库等多种方案进行提升服务并发性能,基于消息中间件削峰只是其中的一个技术方案,可以依据自己的项目需求选择合理的并发削峰方案。建议做成配置性选择走常规路由或消息中间件,后期只需要根据业务数据请求量在配置中心改配置就可以实现了,保障服务不会宕机。

基于RabbitMq的削峰实例相关推荐

  1. RabbitMQ流量削峰应用

    RabbitMQ流量削峰 配置 生产者 消费者 测试 配置 主要是配置队列长度 /** 削峰限流 */@Beanpublic Queue peakQueue() {Map<String, Obj ...

  2. kafka 削峰_Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么优点和缺点?

    面试题 为什么使用消息队列? 消息队列有什么优点和缺点? Kafka.ActiveMQ.RabbitMQ.RocketMQ 都有什么区别,以及适合哪些场景? 面试官心理分析 其实面试官主要是想看看: ...

  3. springboot集成rabbitmq商品秒杀业务实战(流量削峰)

    消息队列如何实现流量削峰? 要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送 ...

  4. RabbitMQ 限流 流量削峰

    以下是基于spring cloud 2.1.4+rabbitmq-server-3.8.22的实现代码 设置: spring:rabbitmq:port: 5672host: localhostuse ...

  5. SpringBoot+RabbitMQ削峰入门

    前言 当大量的客户访问请求打到后端,去访问数据库等,瞬间会爆炸的. 经过前端或者其他的方案进行限流外. 还是有大量的请求,这个时候需要削峰了. 简单的削峰例子 先设置小一点,然后循环往队列里面放消息, ...

  6. java 最少使用(lru)置换算法_一篇文章学会如何基于LRU-K算法设计本地缓存实现流量削峰...

    专注于Java领域优质技术号,欢迎关注 作者:一个Java菜鸟 1.背景介绍 1.1.现象 QPS突然增长2倍以上(45w~60w每分钟) 将产生下面一些问题: 1)响应接口响应时长增加了5倍(qps ...

  7. 基于LRU-K算法设计本地缓存实现流量削峰

    1.背景介绍 1.1.现象 QPS突然增长2倍以上(45w~60w每分钟) 将产生下面一些问题: 1)响应接口响应时长增加了5倍(qps增加了2倍): 2)机房局域网交换机带宽报警(1kM带宽使用了9 ...

  8. RabbitMQ快速入门及实例演示

    RabbitMQ 先做起来,再去想其他. 1.MQ 消息队列概念 MQ(message queue)消息队列,FIFO先入先出.对服务器的请求先加入到消息队列中,再由消息队列来进行请求的分发. 还是一 ...

  9. kafka 削峰_从面试角度一文学完 Kafka

    Kafka 是一个优秀的分布式消息中间件,许多系统中都会使用到 Kafka 来做消息通信.对分布式消息系统的了解和使用几乎成为一个后台开发人员必备的技能.今天就从常见的 Kafka 面试题入手,和大家 ...

最新文章

  1. CentOS配置网易163 yum源和EPEL yum源
  2. git 代码回退_「Vue 入门系列」第三期,适合新手入门的 Git 使用教程
  3. 基于SCVMM对虚拟化服务器与虚拟机管理权限分配用户角色
  4. 数据结构单链表SingleLinkedList,Java实现单链表增删改查
  5. 真格量化-主力跟买策略
  6. php 调用openoffice,PHP调用OpenOffice实现word转PDF的方法
  7. 机房设备服务器维护细则,机房安全管理方面的管理制度
  8. python文件生成电脑exe文件
  9. weblogic安装错误BEA-090870解决方案
  10. 巴士电台开放接口API
  11. 论文公式自动编号及引用(自动更新)
  12. php微信转发无法显示标题图片,完美解决:微信分享为什么不显示图片呢? - 老牛博客...
  13. win10自带微软拼音输入法卡死卡顿解决方法汇总
  14. win10安装Oracle官方精简版客户端
  15. 镶锆石、侧边指纹、双屏翻盖机,三星的这款2万块手机,只有土豪能懂
  16. 关于入门小白对java这种面向对象编程语言的封装、继承、多态的理解
  17. 【CUDA 基础】3.4 避免分支分化
  18. ❤echarts 南丁格尔玫瑰图的使用以及南丁格尔玫瑰图详细配置
  19. 神经网络学习4【误差传递与权重更新】
  20. python 1104: 求因子和(函数专题)

热门文章

  1. php错误1064,求助,phpmyadmin导入sql文件提示1064错误
  2. 梦幻成仙,诛灭外挂——《梦幻诛仙手游》的阻击外挂之旅
  3. Vue环境搭建及第一个hello world
  4. 鱼塘钓鱼 贪心算法
  5. 取消选中单选框radio的三种方式
  6. 【RSA-Tool 2 by tE】的使用
  7. 程序员就业和发展前景?
  8. ARM 立即数范围以及合法立即数
  9. 本科毕设论文如何写(1)-- 快速下手
  10. 最新华为鸿蒙系统升级名单,鸿蒙系统首批升级机型名单_华为鸿蒙系统升级机型名单时间表...