以下是基于spring cloud 2.1.4+rabbitmq-server-3.8.22的实现代码


设置:

spring:rabbitmq:port: 5672host: localhostusername: guestpassword: guestvirtual-host: /#开启confirms这个模式#springboot2.2.0.RELEASE支持这个#publisher-confirm-type: correlatedpublisher-confirms: true#开启return模式publisher-returns: truelistener:simple:#开启手动签收acknowledge-mode: manual#削峰#设置每一个消费端能够处理未确认消息的最大数,默认是250,一般用默认的就行了,这里是测试我们用1prefetch: 1

消费端

package com.example.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;@Component
@RabbitListener(queues = "queue_test6")
public class MyRabbitListener {/**** @param message 消息封装的对象,(包括了消息的序号,消息本身,消费者名称等)* @param channel  链接的通道* @param msg  消息本身*/@RabbitHandler //用于处理具体类型的消息,会自动把消息转换成对应的对象public void receiveMessage(Message message, Channel channel,String msg){//接收消息System.out.println(msg);MessageProperties messageProperties = message.getMessageProperties();try {//模拟业务-100System.out.println("消费后减100元");//模拟出问题//int i = 10/0;//我们测试流量削峰,所以上面出错我就注释了Thread.sleep(2000);//如果正常就签收消息//参数1,消息的序号//参数二,是否批量签收 true是批量签收channel.basicAck(messageProperties.getDeliveryTag(),true);} catch (Exception e) {e.printStackTrace();//不正常就拒收消息(丢弃了)try {//如果该消息重回过队列就不投递了,避免死循环if(messageProperties.getRedelivered()){System.out.println("已经重新投递过一次了");}else{//参数1 消息序号//参数2 是否批量拒绝消息//参数3 是否把消息重新回到队列中channel.basicNack(messageProperties.getDeliveryTag(),true,true);}//不能批量处理拒绝消息。第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝//channel.basicReject();} catch (IOException ioException) {ioException.printStackTrace();}}}//消费者监听队列/*@RabbitHandler //用于处理具体类型的消息,会自动把消息转换成对应的对象public void receiveMessage(String msg){//接收消息System.out.println(msg);//模拟业务-100}*///消费者监听队列@RabbitHandler //用于处理具体类型的消息,会自动把消息转换成对应的对象public void receiveMessage(Map msg){//接收消息System.out.println(msg);//模拟业务-100}
}

生产者

package com.example.controller;import com.example.confirm.MyConfirmCallBack;
import com.example.confirm.MyReturnCallBack;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/testsend")
public class TestSendController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowired//或者用这个//private MyConfirmCallBack myConfirmCallBack;private RabbitTemplate.ConfirmCallback confirmCallback;@Autowired//private MyReturnCallBack myReturnCallBack;//或者用这个private RabbitTemplate.ReturnCallback returnsCallback;@GetMapping ("/send1")public String send1(){//设置confirm回调函数rabbitTemplate.setConfirmCallback(confirmCallback);//设置return回调函数rabbitTemplate.setReturnCallback(returnsCallback);//发送消息//rabbitTemplate.convertAndSend("exchange_test6","test6.insert","消息本身");//测试一个错误,把交换机改正确,routingKey写错//rabbitTemplate.convertAndSend("exchange_test6","test6.insertxxx","消息本身");//测试削峰配置是否生效,把所有东西都改正确rabbitTemplate.convertAndSend("exchange_test6","test6.insert","消息本身");return "ok";}}

开启服务测试,发现是每隔两秒执行一个。

这个时候我们把配置改为最大处理消息数为3,你在测试发现还是一个一个消费。原因是这是单一线程的,我们需要在消费者注解加上concurrency属性并且设置3@RabbitListener(queues = "queue_test6",concurrency = "3")

再次测试你就会发现是3个3个的消费的了。concurrency单词的意思:并发性,在程序中相当于开了多个线程。

package com.example.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;@Component
@RabbitListener(queues = "queue_test6",concurrency = "3")
public class MyRabbitListener {/**** @param message 消息封装的对象,(包括了消息的序号,消息本身,消费者名称等)* @param channel  链接的通道* @param msg  消息本身*/@RabbitHandler //用于处理具体类型的消息,会自动把消息转换成对应的对象public void receiveMessage(Message message, Channel channel,String msg){//接收消息System.out.println(msg);MessageProperties messageProperties = message.getMessageProperties();try {//模拟业务-100System.out.println("消费后减100元");//模拟出问题//int i = 10/0;//我们测试流量削峰,所以上面出错我就注释了Thread.sleep(2000);//如果正常就签收消息//参数1,消息的序号//参数二,是否批量签收 true是批量签收channel.basicAck(messageProperties.getDeliveryTag(),true);} catch (Exception e) {e.printStackTrace();//不正常就拒收消息(丢弃了)try {//如果该消息重回过队列就不投递了,避免死循环if(messageProperties.getRedelivered()){System.out.println("已经重新投递过一次了");}else{//参数1 消息序号//参数2 是否批量拒绝消息//参数3 是否把消息重新回到队列中channel.basicNack(messageProperties.getDeliveryTag(),true,true);}//不能批量处理拒绝消息。第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝//channel.basicReject();} catch (IOException ioException) {ioException.printStackTrace();}}}//消费者监听队列/*@RabbitHandler //用于处理具体类型的消息,会自动把消息转换成对应的对象public void receiveMessage(String msg){//接收消息System.out.println(msg);//模拟业务-100}*///消费者监听队列@RabbitHandler //用于处理具体类型的消息,会自动把消息转换成对应的对象public void receiveMessage(Map msg){//接收消息System.out.println(msg);//模拟业务-100}
}

RabbitMQ 限流 流量削峰相关推荐

  1. 如何降低QPS(错峰、限流、削峰)

    如何降低QPS(错峰.限流.削峰) 目录 一.错峰策略 二.限流策略 三.削峰策略 服务端每秒流量处理能力是通过QPS衡量的,最大QPS(峰值)就是对服务端抗压能力的衡量指标,如果来自客户端的流量超过 ...

  2. 服务降级,限流,削峰

    服务降级 举例:淘宝的搜索功能,平时的时候 搜全站.高峰期的时候,限制只能搜店内或者一些品类不给搜. 削峰 把瞬时流量拉长.强行把并行变成串行 举例:12306的春运票,一段时间一段时间的放,一个地方 ...

  3. 【服务治理】服务熔断、服务降级、服务限流、流量削峰、错峰

    文章目录 服务熔断.服务降级和服务限流 服务熔断 服务降级 服务限流 实现 步骤三.Polly在.NetCore项目中封装 流量错峰.流量限流.流量削峰 流量错峰 服务端错峰策略 客户端错峰策略 流量 ...

  4. springboot集成rabbitmq商品秒杀业务实战(流量削峰)

    消息队列如何实现流量削峰? 要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送 ...

  5. RabbitMQ流量削峰应用

    RabbitMQ流量削峰 配置 生产者 消费者 测试 配置 主要是配置队列长度 /** 削峰限流 */@Beanpublic Queue peakQueue() {Map<String, Obj ...

  6. 消息中间件系列(六):什么是流量削峰?如何解决秒杀业务的削峰场景

    流量削峰的由来 主要是还是来自于互联网的业务场景,例如,马上即将开始的春节火车票抢购,大量的用户需要同一时间去抢购:以及大家熟知的阿里双11秒杀, 短时间上亿的用户涌入,瞬间流量巨大(高并发),比如: ...

  7. 高并发架构系列:什么是流量削峰?如何解决秒杀业务的削峰场景

    流量削峰的由来 主要是还是来自于互联网的业务场景,例如,马上即将开始的春节火车票抢购,大量的用户需要同一时间去抢购:以及大家熟知的阿里双11秒杀, 短时间上亿的用户涌入,瞬间流量巨大(高并发),比如: ...

  8. 怎样来实现流量削峰方案

    流量削峰的由来 主要是还是来自于互联网的业务场景,例如,马上即将开始的春节火车票抢购,大量的用户需要同一时间去抢购:以及大家熟知的阿里双11秒杀, 短时间上亿的用户涌入,瞬间流量巨大(高并发),比如: ...

  9. 秒杀系统流量削峰,这事应该怎么做?

    如果你看过秒杀系统的流量监控图的话,你会发现它是一条直线,就在秒杀开始那一秒是一条很 直很直的线,这是因为秒杀请求在时间上高度集中于某一特定的时间点.这样一来,就会导致一 个特别高的流量峰值,它对资源 ...

最新文章

  1. 利用python实现IP扫描
  2. swift_031(Swift 中的枚举/原始值(RawValues))
  3. [剑指offer]面试题21:包含min函数的栈
  4. 【每日一题】7月15日题目精讲—生日快乐
  5. 从理念到实践跳跃式演进! 云的原生“免疫系统”如何有机作战?
  6. vba 指定列后插入列_在不同的列左侧插入指定数量的空白列
  7. Together(AtCoder-3524)
  8. W3C发布DRM作为推荐方案
  9. linux运维视频教程
  10. ebay 后台HTML有尺寸宽度要求吗,eBay牛仔裤成为尺码问题重灾区,卖家上架需注意这几个方面...
  11. mmo游戏开发应在profile下运行,才能保证正式运行不卡
  12. 2022年4月树莓派系统初始用户名密码
  13. 影响力最大化——CELF算法的简介与python实现
  14. 计算机程序员的英文简历,电脑程序员个人英文简历范文
  15. 小米手机4获取ROOT权限的步骤
  16. FFmpeg+SDL纯语音播放器
  17. 热点讨论:IT人,40岁以后能干什么?
  18. android 控件发光_Android自定义控件打造闪闪发光字体
  19. 空气温湿度光照度传感器
  20. 《阿里云服务器教程2》:如何远程连接linux系统阿里云服务器ECS

热门文章

  1. 【SSM框架】【怠惰致错】Invalid bound statement (not found)
  2. a标签与button按钮
  3. mysql经典46_50个经典SQL语句
  4. HDU - 最大报销额(01背包|贪心)
  5. java国内外详情研究动态,国内外研究现状分析及文献综述.doc
  6. LOJ6001 - 「网络流 24 题」太空飞行计划
  7. 16.Hamilton(哈密顿)回路问题
  8. 药品过5关价翻12倍 批发商抢走药品一半利润
  9. 计算机c盘program,Program Files是什么文件夹?Win7 C盘Program Files可以删除吗?
  10. 力扣——883. 三维形体投影面积(Java、JavaScript、C实现)