rabbitmq使用

  • docker 安装 rabbitmq
  • docker 安装 rabbitmq 延时消息插件
  • RabbitMQ 延时队列的实现
    • 什么是延时队列
    • 延时队列使用场景
    • RabbitMQ 中的TTL
    • 结合死信队列实现延时消息的处理
    • 利用插件实现延时消息的处理
  • RabbitMQ 如何进行消息可靠投递
    • 生产者没有成功把消息发送到MQ
    • MQ接收到消息之后丢失了消息
      • 让消息可靠投递到队列
      • 备份交换机

docker 安装 rabbitmq

下载镜像,带有management页面的

docker pull rabbitmq:3.8.2-management

启动(亲测可行)

root@deployment:/# docker run -d --hostname rabbitmq:3.8.3-management --name rabbitmq -p 15672:15672 rabbitmq:3.8.3-management

或者

docker run -d --name rabbitmq-3.8.3-management -p 5672:5672 -p 15672:15672 -v /opt/rabbitmq/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin 479479d8e188[镜像Id]
#说明:
-d 后台运行容器;
–name 指定容器名;
-p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
-v 映射目录或文件;
–hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
-e 指定环境变量;
RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
RABBITMQ_DEFAULT_USER:默认的用户名;
RABBITMQ_DEFAULT_PASS:默认用户名的密码

docker 安装 rabbitmq 延时消息插件

首先去 github 上把插件下载下来

然后把插件上传到 linux ,看接下来步骤:

docker ps 看看rabbitmq 是否启动

如果启动了则docker exec -it rabbitmq-3.8.3-management /bin/bash 进入安装目录

root@deployment:~# docker exec -it rabbitmq-3.8.3-management /bin/bash
# 可以看到有 plugins 目录
root@myRabbit:/# ls
bin  boot  dev  etc  home  lib  lib64  media  mnt  opt  plugins  proc  root  run  sbin  srv  sys  tmp  usr  var

可以看到有 plugins 目录,接下来 ctrl+d 退出,把插件拷贝到 rabbitmq 安装目录的plugins 目录下

root@deployment:~# docker cp /usr/local/tmp/rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq-3.8.3-management:/plugins

为了保险你可以进入 plugins 目录看是否拷贝成功

root@deployment:~# docker exec -it rabbitmq-3.8.3-management /bin/bash
root@myRabbit:/# ls
bin  boot  dev  etc  home  lib  lib64  media  mnt  opt  plugins  proc  root  run  sbin  srv  sys  tmp  usr  var
root@myRabbit:/# cd plugins
# 有在呢
root@myRabbit:/plugins# ls -l|grep delay
-rw-r--r-- 1 root     root       43377 Mar 27 08:07 rabbitmq_delayed_message_exchange-3.8.0.ez

接下来,启用插件,并重启 rabbitmq

root@myRabbit:/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@myRabbit:
rabbitmq_delayed_message_exchange
The following plugins have been configured:rabbitmq_delayed_message_exchangerabbitmq_managementrabbitmq_management_agentrabbitmq_web_dispatch
Applying plugin configuration to rabbit@myRabbit...
The following plugins have been enabled:rabbitmq_delayed_message_exchangestarted 1 plugins.
root@deployment:~# docker restart rabbitmq-3.8.3-management

最后你可以打开 rabbitmq管理页面,在Type里面看是否出现了x-delayed-message选项,验证是否安装成功

RabbitMQ 延时队列的实现

什么是延时队列

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。

其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延时队列使用场景

那么什么时候需要用延时队列呢?考虑一下以下场景:

  1. 订单在十分钟之内未支付则自动取消。

  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

  3. 账单在一周内未支付,则自动结算。

  4. 用户注册成功后,如果三天内没有登陆则进行短信提醒。

  5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。

  6. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;发生账单生成事件,检查账单支付状态,然后自动结算未支付的账单;

看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。更重要的一点是,不!优!雅!

没错,作为一名有追求的程序员,始终应该追求更优雅的架构和更优雅的代码风格,写代码要像写诗一样优美。

这时候,延时队列就可以闪亮登场了,以上场景,正是延时队列的用武之地。

RabbitMQ 中的TTL

TTL是什么呢?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”(至于什么是死信,请翻看上一篇)。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。

那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

这样所有被投递到该队列的消息都最多不会存活超过6s。
另一种方式便是针对每条消息设置TTL,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

这样这条消息的过期时间也被设置成了6s。

但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

结合死信队列实现延时消息的处理


来看看代码实现,先声明交换机、队列以及他们的绑定关系:

@Configuration
public class RabbitMQConfig {//交换机public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";//abc三个队列public static final String DELAY_QUEUE_A = "delay.queue.demo.business.queuea";public static final String DELAY_QUEUE_B = "delay.queue.demo.business.queueb";public static final String DELAY_QUEUE_C = "delay.queue.demo.business.queuec";//abc三个队列的routingkeypublic static final String DELAY_QUEUE_A_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey";public static final String DELAY_QUEUE_B_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey";public static final String DELAY_QUEUE_C_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey";//死信队列交换机public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";//声明abc三个死信队列public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queuea";public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queueb";public static final String DEAD_LETTER_QUEUE_C_NAME = "delay.queue.demo.deadletter.queuec";//死信队列的routingkeypublic static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey";public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey";public static final String DEAD_LETTER_QUEUE_C_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey";// 声明延时队列和死信队列的 Exchange@Bean("delayExchange")public DirectExchange delayExchange(){return new DirectExchange(DELAY_EXCHANGE_NAME);}@Bean("deadLetterExchange")public DirectExchange deadLetterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE);}// 声明延时队列A 延时10s// 并绑定到对应的死信交换机@Bean("delayQueueA")public Queue delayQueueA(){Map<String, Object> args = new HashMap<>(3);// x-dead-letter-exchange    这里声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// x-dead-letter-routing-key  这里声明当前队列的死信路由keyargs.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_ROUTING_KEY);// x-message-ttl  声明队列的TTLargs.put("x-message-ttl", 6000);return QueueBuilder.durable(DELAY_QUEUE_A).withArguments(args).build();}// 声明延时队列B 延时 60s// 并绑定到对应的死信交换机@Bean("delayQueueB")public Queue delayQueueB(){Map<String, Object> args = new HashMap<>(3);// x-dead-letter-exchange    这里声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// x-dead-letter-routing-key  这里声明当前队列的死信路由keyargs.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_ROUTING_KEY);// x-message-ttl  声明队列的TTLargs.put("x-message-ttl", 60000);return QueueBuilder.durable(DELAY_QUEUE_B).withArguments(args).build();}// 声明延时队列C 不设置TTL// 并绑定到对应的死信交换机@Bean("delayQueueC")public Queue delayQueueC(){Map<String, Object> args = new HashMap<>(3);// x-dead-letter-exchange    这里声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// x-dead-letter-routing-key  这里声明当前队列的死信路由keyargs.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_C_ROUTING_KEY);return QueueBuilder.durable(DELAY_QUEUE_C).withArguments(args).build();}// 声明死信队列A 用于接收延时10s处理的消息@Bean("deadLetterQueueA")public Queue deadLetterQueueA(){return new Queue(DEAD_LETTER_QUEUE_A_NAME);}// 声明死信队列B 用于接收延时60s处理的消息@Bean("deadLetterQueueB")public Queue deadLetterQueueB(){return new Queue(DEAD_LETTER_QUEUE_B_NAME);}// 声明死信队列C 用于接收延时任意时长处理的消息@Bean("deadLetterQueueC")public Queue deadLetterQueueC(){return new Queue(DEAD_LETTER_QUEUE_C_NAME);}// 声明延时队列A绑定关系@Beanpublic Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,@Qualifier("delayExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_A_ROUTING_KEY);}// 声明延时队列B绑定关系@Beanpublic Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,@Qualifier("delayExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_B_ROUTING_KEY);}// 声明延时队列C绑定关系@Beanpublic Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,@Qualifier("delayExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_C_ROUTING_KEY);}// 声明死信队列A绑定关系@Beanpublic Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,@Qualifier("deadLetterExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);}// 声明死信队列B绑定关系@Beanpublic Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,@Qualifier("deadLetterExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);}// 声明死信队列C绑定关系@Beanpublic Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue,@Qualifier("deadLetterExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_C_ROUTING_KEY);}
}

定义一个消息发送者

@Component
public class DelayMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String msg, DelayTypeEnum type){switch (type){case DELAY_10s:rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_A_ROUTING_KEY, msg);break;case DELAY_60s:rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_B_ROUTING_KEY, msg);break;}}public void sendMsg(String msg, Integer delayTime) {rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_C_ROUTING_KEY, msg, a ->{a.getMessageProperties().setExpiration(String.valueOf(delayTime));return a;});}
}

定义一个消息监听,进行消费:

@Slf4j
@Component
public class DeadLetterQueueConsumer {@RabbitListener(queues = DEAD_LETTER_QUEUE_A_NAME)public void receiveA(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = DEAD_LETTER_QUEUE_B_NAME)public void receiveB(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = DEAD_LETTER_QUEUE_C_NAME)public void receiveC(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("当前时间:{},死信队列C收到消息:{}", new Date().toString(), msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

再来一个controller,模拟请求:

@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {@Resourceprivate DelayMessageSender sender;/*** http://localhost:8080/rabbitmq/sendmsg?msg=HelloWorld&delayType=2* 第一条消息在6s后变成了死信消息,然后被消费者消费掉,* 第二条消息在30s之后变成了死信消息,然后被消费掉,这样,一个还算ok的延时队列就打造完成了。** 问题来了,假如我要它60s之后也变成死信消息,按照这个逻辑,岂不是又要增加一个队列?* 如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,* 这里只有6s和60s两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,* 如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求??*/@RequestMapping("sendmsg")public void sendMsg(String msg, Integer delayType){log.info("当前时间:{},收到请求,msg:{},delayType:{}", new Date(), msg, delayType);sender.sendMsg(msg, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnumByValue(delayType)));}/*** 基于上面的问题,我们进行优化*http://localhost:8080/rabbitmq/delayMsg?msg=操蛋&delayTime=10000 单位ms* delayTime 这里可以随意更改,用的都是同一个队列和key,** 看起来似乎没什么问题,但不要高兴的太早,在最开始的时候,就介绍过,* 如果使用在消息属性上设置TTL的方式,消息可能并不会按时死亡,* 因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,* 索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。* 就像下面这样:20秒的消息没有得到优先执行* 当前时间:Fri Mar 27 17:17:15 CST 2020,收到请求,msg:60秒的消息,delayTime:60000* 当前时间:Fri Mar 27 17:17:26 CST 2020,收到请求,msg:20秒的消息,delayTime:20000* 当前时间:Fri Mar 27 17:18:15 CST 2020,死信队列C收到消息:60秒的消息* 当前时间:Fri Mar 27 17:18:15 CST 2020,死信队列C收到消息:20秒的消息** 在设置的TTL时间及时死亡,却无法及时得到消费,就无法设计成一个通用的延时队列。** 那如何解决这个问题呢?不要慌,安装一个插件*/@RequestMapping("delayMsg")public void delayMsg(String msg, Integer delayTime) {log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), msg, delayTime);sender.sendMsg(msg, delayTime);}
}

DelayTypeEnum 枚举类

public enum DelayTypeEnum {DELAY_10s(1, "延时10s"),DELAY_60s(2, "延时30s");private Integer code;private String desc;DelayTypeEnum(Integer code, String desc) {this.code = code;this.desc = desc;}public Integer getCode() {return code;}public String getDesc() {return desc;}public static DelayTypeEnum getDelayTypeEnumByValue(Integer code){for (DelayTypeEnum value : DelayTypeEnum.values()) {if (value.getCode().equals(code)){return value;}}return null;}
}

application.yml 配置

spring:rabbitmq:host: 192.168.239.132password: adminusername: adminlistener:type: simplesimple:default-requeue-rejected: falseacknowledge-mode: manual

利用插件实现延时消息的处理

队列和交换机的声明绑定

@Configuration
public class RabbitMQConfig {//下面是用 rabbitmq 插件做的一个延时消息队列,放在一起///public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue";public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange";public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey";@Beanpublic Queue immediateQueue() {return new Queue(DELAYED_QUEUE_NAME);}/*** CustomExchange*/@Beanpublic CustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,@Qualifier("customExchange") CustomExchange customExchange) {return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

controller 模拟请求

@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {@Resourceprivate DelayMessageSender sender;/*** 这个就是使用插件的方式设计延时队列,可以看到,第二个消息被先消费掉了,符合预期*当前时间:Fri Mar 27 17:17:15 CST 2020,收到请求,msg:60秒的消息,delayTime:60000* 当前时间:Fri Mar 27 17:17:26 CST 2020,收到请求,msg:20秒的消息,delayTime:20000* 当前时间:Fri Mar 27 17:18:15 CST 2020,死信队列C收到消息:20秒的消息* 当前时间:Fri Mar 27 17:18:15 CST 2020,死信队列C收到消息:60秒的消息*/@RequestMapping("delayMsg2")public void delayMsg2(String msg, Integer delayTime) {log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), msg, delayTime);sender.sendDelayMsg(msg, delayTime);}
}

消息发送者

@Component
public class DelayMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDelayMsg(String msg, Integer delayTime) {rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{a.getMessageProperties().setDelay(delayTime);return a;});}
}

消息消费

@Slf4j
@Component
public class DeadLetterQueueConsumer {@RabbitListener(queues = DELAYED_QUEUE_NAME)public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("当前时间:{},延时队列收到消息:{}", new Date().toString(), msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

pom 文件和启动类

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.example.springboot</groupId><artifactId>spring-boot-samples</artifactId><version>0.0.1-SNAPSHOT</version></parent><groupId>com.example.rabbitmq</groupId><artifactId>springboot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><properties><skipTests>true</skipTests><java.version>1.8</java.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><mainClass>com.example.rabbitmq.RabbitmqApplication</mainClass></configuration></plugin></plugins></build>
</project>
@SpringBootApplication
public class RabbitmqApplication {public static void main(String[] args) {SpringApplication.run(RabbitmqApplication.class, args);}
}

RabbitMQ 如何进行消息可靠投递

生产者没有成功把消息发送到MQ

  • 事务机制

    • yml配置,不变
    • 队列和交换机的声明绑定
    • 消息发送
    • 消息消费
    • controller 模拟请求
  • confirm机制

    • yml配置

      spring:rabbitmq:host: 192.168.239.132password: adminusername: adminpublisher-confirm-type: correlatedlistener:type: simplesimple:default-requeue-rejected: falseacknowledge-mode: manual
      
    • 队列和交换机的声明绑定

      @Configuration
      public class RabbitMQConfig {//定义一个交换机已及 routingkey,用来测试消息的可靠传递测试public static final String NORMAL_EXCHANGE = "normal.demo.exchange";public static final String NORMAL_ROUTING_KEY = "normal.demo.routingkey";public static final String NORMAL_QUEUE = "normal.demo.queue";@Beanpublic DirectExchange normalExchange(){return new DirectExchange(NORMAL_EXCHANGE);}@Beanpublic Queue normalQueue(){return new Queue(NORMAL_QUEUE);}@Beanpublic Binding normalBinding(@Qualifier("normalQueue") Queue queue,@Qualifier("normalExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY);}
      }
      
    • 消息发送,实现 ConfirmCallback 接口,重写 confirm 回调方法

      @Slf4j
      @Component
      public class MsgWithConfirmProducer implements RabbitTemplate.ConfirmCallback {@Resourceprivate RabbitTemplate rabbitTemplate;@PostConstructprivate void init() {System.out.println("bbb");rabbitTemplate.setConfirmCallback(this);}public void sendExceptMsg(String msg) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());log.info("消息id:{}, msg:{}", correlationData.getId(), msg);rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, msg, correlationData);if (msg != null && msg.contains("exception")) {int i = 1 / 0;}}@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id = correlationData != null ? correlationData.getId() : "";log.info("b的值:" + b);if (b) {log.info("消息确认成功, id:{},result:{}", id, s);} else {log.error("消息未成功投递, id:{}, cause:{}", id, s);}}
      }
      
    • 消息消费

      @Slf4j
      @Component
      public class DeadLetterQueueConsumer {@RabbitListener(queues = NORMAL_QUEUE)public void receiveMsg(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("当前时间:{},业务消费消息:{}", new Date().toString(), str);
      //        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);}
      }
      
    • controller 模拟请求

      @Slf4j
      @RequestMapping("rabbitmq")
      @RestController
      public class RabbitMQMsgController {/*** 测试消息的可靠传递:confirm机制解决* http://localhost:8080/rabbitmq/exceptMsg?msg=呵呵** 生产者确认机制跟事务是不能一起工作的,是事务的轻量级替代方案。* 因为事务和发布者确认模式都是需要先跟服务器协商,对信道启用的一种模式,* 不能对同一个信道同时使用两种模式。*/@Resourceprivate MsgWithConfirmProducer confirmProducer;@RequestMapping("exceptMsg")public void exceptMsg( String msg){log.info("当前时间:{},收到章程请求,msg:{}", new Date(), msg);confirmProducer.sendExceptMsg(msg);}
      }
      
    • 效果

      执行结果会发现,哪怕是出现了异常,消息也能响应成功,confirm 方法 b 也为 true,除非 MsgWithConfirmProducer 类的 发送消息rabbitTemplate.convertAndSend(交换机, 路由key, msg, correlationData); 配置了不存在的 交换机或者 key,导致 MQ 收不到消息,这个时候b 为 false

MQ接收到消息之后丢失了消息

让消息可靠投递到队列

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时,生产者是不知道消息被丢弃这个事件的。

  • yml配置

    spring:rabbitmq:host: 192.168.239.132password: adminusername: adminpublisher-confirm-type: correlatedlistener:type: simplesimple:default-requeue-rejected: falseacknowledge-mode: manual
    
  • 队列和交换机的声明绑定

    @Configuration
    public class RabbitMQConfig {public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.confirm.simple.business.exchange";public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";public static final String BUSINESS_ROUTING_KEY = "rabbitmq.routing.key";// 声明业务 Exchange@Bean("businessExchange")public DirectExchange businessExchange(){return new DirectExchange(BUSINESS_EXCHANGE_NAME);}// 声明业务队列@Bean("businessQueue")public Queue businessQueue(){return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();}// 声明业务队列绑定关系@Beanpublic Binding businessBinding(@Qualifier("businessQueue") Queue queue,@Qualifier("businessExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_ROUTING_KEY);}
    }
  • 消息发送

    @Slf4j
    @Component
    public class MsgWithConfirmProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Resourceprivate RabbitTemplate rabbitTemplate;@PostConstructprivate void init() {rabbitTemplate.setConfirmCallback(this);//当把 mandotory 参数设置为 true 时,// 如果交换机无法将消息进行路由时,会将该消息返回给生产者,// 而如果该参数设置为false,如果发现消息无法进行路由,则直接丢弃。rabbitTemplate.setMandatory(true);//如果设置这句,returnedMessage 方法不会执行rabbitTemplate.setReturnCallback(this);}public void sendExceptMsg(String msg) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());log.info("消息id:{}, msg:{}", correlationData.getId(), msg);rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, msg, correlationData);if (msg != null && msg.contains("exception")) {int i = 1 / 0;}}@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id = correlationData != null ? correlationData.getId() : "";if (b) {log.info("交换机收到成功, id:{}", id);} else {log.error("消息未成功投递, id:{}, cause:{}", id, s);}}public void sendCustomMsg(String exchange, String msg) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());log.info("消息id:{}, msg:{}", correlationData.getId(), msg);rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);correlationData = new CorrelationData(UUID.randomUUID().toString());log.info("消息id:{}, msg:{}", correlationData.getId(), msg);//投递到不存在的路由 keyrabbitTemplate.convertAndSend(exchange, "key2sdfsfsf", msg, correlationData);}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息被服务器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}",new String(message.getBody()), replyCode, replyText, exchange, routingKey);}
    }
  • 消息消费

    @Slf4j
    @Component
    public class DeadLetterQueueConsumer {@RabbitListener(queues = BUSINESS_QUEUEA_NAME)public void receiveMsg(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("业务消费消息:{}", str);channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);}
    }
    
  • controller 模拟请求

    @Slf4j
    @RequestMapping("rabbitmq")
    @RestController
    public class RabbitMQMsgController {/*** 测试消息的可靠传递:confirm机制解决* http://localhost:8080/rabbitmq/exceptMsg?msg=呵呵** 生产者确认机制跟事务是不能一起工作的,是事务的轻量级替代方案。* 因为事务和发布者确认模式都是需要先跟服务器协商,对信道启用的一种模式,* 不能对同一个信道同时使用两种模式。*/@Resourceprivate MsgWithConfirmProducer confirmProducer;@RequestMapping("exceptMsg")public void exceptMsg( String msg){log.info("当前时间:{},收到章程请求,msg:{}", new Date(), msg);confirmProducer.sendCustomMsg(RabbitMQConfig.BUSINESS_EXCHANGE_NAME,msg);}
    }
    
  • 浏览器测试 http://localhost:8080/rabbitmq/exceptMsg?msg=hok 结果

    2020-03-28 14:06:44.824  INFO 16080 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
    2020-03-28 14:06:44.824  INFO 16080 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
    2020-03-28 14:06:44.830  INFO 16080 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 6 ms
    2020-03-28 14:06:44.857  INFO 16080 --- [nio-8080-exec-1] c.e.s.controller.RabbitMQMsgController   : 当前时间:Sat Mar 28 14:06:44 CST 2020,收到章程请求,msg:hok
    2020-03-28 14:06:44.859  INFO 16080 --- [nio-8080-exec-1] c.e.s.mq.MsgWithConfirmProducer          : 消息id:149884c4-a2f9-4ad0-a37c-d6be2d370908, msg:hok
    2020-03-28 14:06:44.871  INFO 16080 --- [nio-8080-exec-1] c.e.s.mq.MsgWithConfirmProducer          : 消息id:0b0a7c3a-91ab-4faf-9f6c-632acbc8b2d8, msg:hok
    2020-03-28 14:06:44.881  INFO 16080 --- [nectionFactory1] c.e.s.mq.MsgWithConfirmProducer          : 交换机收到成功, id:149884c4-a2f9-4ad0-a37c-d6be2d370908
    2020-03-28 14:06:44.887  INFO 16080 --- [nectionFactory1] c.e.s.mq.MsgWithConfirmProducer          : 消息被服务器退回。msg:hok, replyCode:312. replyText:NO_ROUTE, exchange:rabbitmq.confirm.simple.business.exchange, routingKey :key2sdfsfsf
    2020-03-28 14:06:44.887  INFO 16080 --- [ntContainer#0-1] c.e.s.mq.DeadLetterQueueConsumer         : 业务消费消息:hok
    2020-03-28 14:06:44.888  INFO 16080 --- [nectionFactory2] c.e.s.mq.MsgWithConfirmProducer          : 交换机收到成功, id:0b0a7c3a-91ab-4faf-9f6c-632acbc8b2d8

    可以看到,我们接收到了被退回的消息,并带上了消息被退回的原因:NO_ROUTE。但是要注意的是, mandatory 参数仅仅是在当消息无法被路由的时候,让生产者可以感知到这一点,只要开启了生产者确认机制,无论是否设置了 mandatory 参数,都会在交换机接收到消息时进行消息确认回调;当设置 mandatory 参数后,如果消息无法被路由,则会返回给生产者,是通过回调的方式进行的,所以,生产者需要设置相应的回调函数才能接受该消息。

备份交换机

有了 mandatory 参数,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。

而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。

不要慌,在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。

什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会将这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

  • yml配置,不变

  • 队列和交换机的声明绑定

    @Configuration
    public class RabbitMQConfig {//业务public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.backup.test.exchange";public static final String BUSINESS_QUEUE_NAME = "rabbitmq.backup.test.queue";public static final String BUSINESS_ROUTING_KEY = "BUSINESS_ROUTING_KEY";//备份队列、交换机public static final String BUSINESS_BACKUP_EXCHANGE_NAME = "rabbitmq.backup.test.backup-exchange";//这个队列用作人工处理public static final String BUSINESS_BACKUP_QUEUE_NAME = "rabbitmq.backup.test.backup-queue";//这个队列用作报警public static final String BUSINESS_BACKUP_WARNING_QUEUE_NAME = "rabbitmq.backup.test.backup-warning-queue";// 声明业务 Exchange@Bean("businessExchange")public DirectExchange businessExchange() {//这里我们使用 ExchangeBuilder 来创建交换机,并为其设置备份交换机:ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(BUSINESS_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);return (DirectExchange) exchangeBuilder.build();}// 声明业务队列@Bean("businessQueue")public Queue businessQueue() {return QueueBuilder.durable(BUSINESS_QUEUE_NAME).build();}// 声明业务队列绑定关系@Beanpublic Binding businessBinding(@Qualifier("businessQueue") Queue queue,@Qualifier("businessExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_ROUTING_KEY);}// 声明备份 Exchange@Bean("backupExchange")public FanoutExchange backupExchange() {ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange(BUSINESS_BACKUP_EXCHANGE_NAME).durable(true);return (FanoutExchange) exchangeBuilder.build();}// 声明备份队列@Bean("backupQueue")public Queue backupQueue() {return QueueBuilder.durable(BUSINESS_BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系@Beanpublic Binding backupBinding(@Qualifier("backupQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}// 声明报警队列@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(BUSINESS_BACKUP_WARNING_QUEUE_NAME).build();}// 声明备份报警队列绑定关系@Beanpublic Binding backupWarningBinding(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange exchange){return BindingBuilder.bind(queue).to(exchange);}
    }
    
  • 消息发送,注意现在是用备份交换机处理没被路由到队列的消息,所以不用退回给生产者,rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(this);注释

    @Slf4j
    @Component
    public class MsgWithConfirmProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Resourceprivate RabbitTemplate rabbitTemplate;@PostConstructprivate void init() {//        rabbitTemplate.setMandatory(true);
    //        rabbitTemplate.setReturnCallback(this);rabbitTemplate.setConfirmCallback(this);}public void sendExceptMsg(String msg) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());log.info("消息id:{}, msg:{}", correlationData.getId(), msg);rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, msg, correlationData);if (msg != null && msg.contains("exception")) {int i = 1 / 0;}}@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id = correlationData != null ? correlationData.getId() : "";if (b) {log.info("交换机收到成功, id:{}", id);} else {log.error("消息未成功投递, id:{}, cause:{}", id, s);}}public void sendCustomMsg(String exchange, String msg) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());log.info("消息id:{}, msg:{}", correlationData.getId(), msg);//投递到存在的路由 keyrabbitTemplate.convertAndSend(exchange, RabbitMQConfig.BUSINESS_ROUTING_KEY, msg, correlationData);correlationData = new CorrelationData(UUID.randomUUID().toString());log.info("消息id:{}, msg:{}", correlationData.getId(), msg);//投递到不存在的路由 keyrabbitTemplate.convertAndSend(exchange, "key2sdfsfsf", msg, correlationData);}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息被服务器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}",new String(message.getBody()), replyCode, replyText, exchange, routingKey);}
    }
    
  • 消息消费

    @Slf4j
    @Component
    public class BusinessWaringConsumer {//没有路由到队列的消息,进入备份交换,报警或人工处理@RabbitListener(queues = BUSINESS_BACKUP_WARNING_QUEUE_NAME)public void receiveMsg(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.error("发现不可路由消息报警:{}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = BUSINESS_BACKUP_QUEUE_NAME)public void handlerMsg(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.error("人工处理这个消息:{}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
    }
    
    @Slf4j
    @Component
    public class BussinessMsgConsumer {//正常路由到队列的消息,业务消费即可@RabbitListener(queues = BUSINESS_QUEUE_NAME)public void receiveMsg(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("业务消费消息:{}", str);
    //        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);}
    }
    
  • controller 模拟请求

    @Slf4j
    @RequestMapping("rabbitmq")
    @RestController
    public class RabbitMQMsgController {   /*** 测试消息的可靠传递:confirm机制解决* http://localhost:8080/rabbitmq/exceptMsg?msg=呵呵** 生产者确认机制跟事务是不能一起工作的,是事务的轻量级替代方案。* 因为事务和发布者确认模式都是需要先跟服务器协商,对信道启用的一种模式,* 不能对同一个信道同时使用两种模式。*/@Resourceprivate MsgWithConfirmProducer confirmProducer;@RequestMapping("exceptMsg")public void exceptMsg( String msg){log.info("当前时间:{},收到章程请求,msg:{}", new Date(), msg);confirmProducer.sendCustomMsg(RabbitMQConfig.BUSINESS_EXCHANGE_NAME,msg);}
    }
    
  • 浏览器测试 http://localhost:8080/rabbitmq/exceptMsg?msg=hok 结果

    2020-03-28 14:42:56.815  INFO 6996 --- [nio-8080-exec-1] c.e.s.controller.RabbitMQMsgController   : 当前时间:Sat Mar 28 14:42:56 CST 2020,收到章程请求,msg:森
    2020-03-28 14:42:56.818  INFO 6996 --- [nio-8080-exec-1] c.e.s.mq.MsgWithConfirmProducer          : 消息id:a90fde7d-70f1-4e66-8179-5291f246b84e, msg:森
    2020-03-28 14:42:56.826  INFO 6996 --- [nio-8080-exec-1] c.e.s.mq.MsgWithConfirmProducer          : 消息id:6090d406-9461-4222-b3b2-872b36ffac0a, msg:森
    2020-03-28 14:42:56.835  INFO 6996 --- [ntContainer#2-1] c.e.springboot.mq.BussinessMsgConsumer   : 业务消费消息:森
    2020-03-28 14:42:56.837  INFO 6996 --- [nectionFactory1] c.e.s.mq.MsgWithConfirmProducer          : 交换机收到成功, id:a90fde7d-70f1-4e66-8179-5291f246b84e
    2020-03-28 14:42:56.846 ERROR 6996 --- [ntContainer#0-1] c.e.s.mq.BusinessWaringConsumer          : 人工处理这个消息:森
    2020-03-28 14:42:56.846 ERROR 6996 --- [ntContainer#1-1] c.e.s.mq.BusinessWaringConsumer          : 发现不可路由消息:森
    2020-03-28 14:42:56.855  INFO 6996 --- [nectionFactory1] c.e.s.mq.MsgWithConfirmProducer          : 交换机收到成功, id:6090d406-9461-4222-b3b2-872b36ffac0a
  • 那么问题来了,mandatory 参数与备份交换机可以一起使用吗?

    设置 mandatory 参数会让交换机将不可路由消息退回给生产者

    而备份交换机会让交换机将不可路由消息转发给它,那么如果两者同时开启,消息究竟何去何从??

    rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(this);注释放开,

    可以看到,两条消息都可以收到确认成功回调,但是不可路由消息不会被回退给生产者,而是直接转发给备份交换机。可见备份交换机的处理优先级更高。

RabbitMQ 延迟队列和消息可靠传递相关推荐

  1. RabbitMQ(九):RabbitMQ 延迟队列,消息延迟推送(Spring boot 版)

    应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持 ...

  2. RabbitMQ 延迟队列,消息延迟推送

    应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持 ...

  3. 谷粒商城笔记+踩坑(22)——库存自动解锁。RabbitMQ延迟队列

    导航: 谷粒商城笔记+踩坑汇总篇 目录 1 业务流程,订单失败后自动回滚解锁库存 可靠消息+最终一致性方案 2[仓库服务]RabbitMQ环境准备 2.1 导入依赖 2.2 yml配置RabbitMQ ...

  4. RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?

    以下文章来源方志朋的博客,回复"666"获面试宝典 场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等 ...

  5. rabbitmq 延迟队列_Delayed Message 插件实现 RabbitMQ 延迟队列

    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行. DLX + TTL 方式存在的时序问题 对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有 ...

  6. Delayed Message 插件实现 RabbitMQ 延迟队列

    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行. DLX + TTL 方式存在的时序问题 对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有 ...

  7. RabbitMQ 延迟队列-对于入门来说可以快速上手

    RabbitMQ 延迟队列-非常非常实用 RabbitMQ 延迟队列-非常非常实用 一.使用场景 二.消息延迟推送的实现 三.项目具体实现 RabbitMQ 延迟队列-非常非常实用 一.使用场景 ​ ...

  8. 【RabbitMQ】一文带你搞定RabbitMQ延迟队列

    本文口味:鱼香肉丝   预计阅读:10分钟 0|1一.说明 在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列.相信通过上一篇的学习,对于死信队列已经 ...

  9. rabbitmq导出队列_消息队列BCMQ在大云运维管理平台BCDeepWatch中的应用

    友情提示:全文约2600字,预计阅读时间12分钟 摘要 消息队列作为重要的中间件,广泛用于分布式系统中各子系统间的异步解耦:本文主要介绍了大云消息队列中间件BC-MQ在BC-DeepWatch中的应用 ...

最新文章

  1. [转]DPM2012系列之四:配置邮件报警功能
  2. 一图解码数据中心数字化运维管理之道
  3. .Net(c#)加密解密之Aes和Des
  4. 工业视觉镜头NAVITAR
  5. 关于width: 100%的一些看法
  6. 为什么要在Java的Serializable类中使用SerialVersionUID
  7. 在桌面应用中使用JAVA DB[组图]
  8. rtsp服务器如何低延时linux,web实现RTSP无插件低延迟播放方案整理
  9. Linux:写一个简单的服务器
  10. 【路径生成--绘制的方法】矢量地图巡线式路径探索
  11. 【分形理论、分形维数、多重分形、Matlab程序等整理】
  12. 计算机等级考试数据库三级模拟题7
  13. 推荐给大家12款好用的电脑软件
  14. C#操作三菱FX系列PLC数据
  15. 开源按键组件Multi_Button的使用,含测试工程
  16. 呼叫中心座席人员如何把控时间
  17. 阿里云URL转发类问题排查
  18. 艾宾浩斯记忆法和遗忘曲线
  19. UML类图以及类与类之间的关系
  20. html5这么盒子页面居中,实现盒子居中

热门文章

  1. 计算机方向kade期刊,计算机辅助导航技术在上颌骨肿瘤切除及缺损重建中的应用...
  2. 服务器网页连不上网络怎么回事啊,网络连接正常网页打不开怎么办? dns服务器问题解决[多图]...
  3. 交换机配置第七讲(不同vlan相互通信2)
  4. 解决go语言热部署组件fresh安装问题
  5. 安卓java代码ping网关_Android代码中使用Ping命令
  6. 东京迪斯尼海洋乐园攻略_迪士尼乐园-软件工程师的观点
  7. css多媒体竖屏,CSS3 手机横竖屏切换效果模拟动画
  8. 组装k39小钢炮(ubuntu16.04),了解一下!
  9. android的word默认字体大小设置,更改Microsoft Word文档的默认字体大小和样式 | MOS86...
  10. 教资报名网站显示无法访问此页面