本文口味:鱼香肉丝   预计阅读:10分钟

0|1一、说明

在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列。相信通过上一篇的学习,对于死信队列已经有了更多的了解,这一篇的内容也跟死信队列息息相关,如果你还不了解死信队列,那么建议你先进行上一篇文章的阅读。

这一篇里,我们将继续介绍RabbitMQ的高级特性,通过本篇的学习,你将收获:

  1. 什么是延时队列
  2. 延时队列使用场景
  3. RabbitMQ中的TTL
  4. 如何利用RabbitMQ来实现延时队列

0|1二、本文大纲

以下是本文大纲:

本文阅读前,需要对RabbitMQ以及死信队列有一个简单的了解。

0|1三、什么是延时队列

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。

其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

0|1四、延时队列使用场景

那么什么时候需要用延时队列呢?考虑一下以下场景:

  1. 订单在十分钟之内未支付则自动取消。
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 账单在一周内未支付,则自动结算。
  4. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  6. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;发生店铺创建事件,十天后检查该店铺上新商品数,然后通知上新数为0的商户;发生账单生成事件,检查账单支付状态,然后自动结算未支付的账单;发生新用户注册事件,三天后检查新注册用户的活动数据,然后通知没有任何活动记录的用户;发生退款事件,在三天之后检查该订单是否已被处理,如仍未被处理,则发送消息给相关运营人员;发生预定会议事件,判断离会议开始是否只有十分钟了,如果是,则通知各个与会人员。

看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

更重要的一点是,不!优!雅!

没错,作为一名有追求的程序员,始终应该追求更优雅的架构和更优雅的代码风格,写代码要像写诗一样优美。【滑稽】

这时候,延时队列就可以闪亮登场了,以上场景,正是延时队列的用武之地。

既然延时队列可以解决很多特定场景下,带时间属性的任务需求,那么如何构造一个延时队列呢?接下来,本文将介绍如何用RabbitMQ来实现延时队列。

0|1五、RabbitMQ中的TTL

在介绍延时队列之前,还需要先介绍一下RabbitMQ中的一个高级特性——TTL(Time To Live)

TTL是什么呢?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”(至于什么是死信,请翻看上一篇)。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。

那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:


Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

这样所有被投递到该队列的消息都最多不会存活超过6s。

另一种方式便是针对每条消息设置TTL,代码如下:


AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("6000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

这样这条消息的过期时间也被设置成了6s。

但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

0|1六、如何利用RabbitMQ实现延时队列

前一篇里介绍了如果设置死信队列,前文中又介绍了TTL,至此,利用RabbitMQ实现延时队列的两大要素已经集齐,接下来只需要将它们进行调和,再加入一点点调味料,延时队列就可以新鲜出炉了。

想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就万事大吉了,因为里面的消息都是希望被立即处理的消息。

从下图可以大致看出消息的流向:

生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。

下面来看代码:

先声明交换机、队列以及他们的绑定关系:


@Configuration public class RabbitMQConfig { public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; public static final String DELAY_QUEUEA_NAME = "delay.queue.demo.business.queuea"; public static final String DELAY_QUEUEB_NAME = "delay.queue.demo.business.queueb"; public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey"; public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey"; public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey"; public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey"; public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.demo.deadletter.queuea"; public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.demo.deadletter.queueb"; // 声明延时Exchange @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); } // 声明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 声明延时队列A 延时10s // 并绑定到对应的死信交换机 @Bean("delayQueueA") public Queue delayQueueA(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY); // x-message-ttl 声明队列的TTL args.put("x-message-ttl", 6000); return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build(); } // 声明延时队列B 延时 60s // 并绑定到对应的死信交换机 @Bean("delayQueueB") public Queue delayQueueB(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY); // x-message-ttl 声明队列的TTL args.put("x-message-ttl", 60000); return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build(); } // 声明死信队列A 用于接收延时10s处理的消息 @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue(DEAD_LETTER_QUEUEA_NAME); } // 声明死信队列B 用于接收延时60s处理的消息 @Bean("deadLetterQueueB") public Queue deadLetterQueueB(){ return new Queue(DEAD_LETTER_QUEUEB_NAME); } // 声明延时队列A绑定关系 @Bean public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY); } // 声明业务队列B绑定关系 @Bean public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY); } // 声明死信队列A绑定关系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } // 声明死信队列B绑定关系 @Bean public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY); } }

接下来,创建两个消费者,分别对两个死信队列的消息进行消费:


@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME) public void receiveB(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }

然后是消息的生产者:


@Component public class DelayMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg, DelayTypeEnum type){ switch (type){ case DELAY_10s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg); break; case DELAY_60s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg); break; } } }

接下来,我们暴露一个web接口来生产消息:


@Slf4j @RequestMapping("rabbitmq") @RestController public class RabbitMQMsgController { @Autowired private DelayMessageSender sender; @RequestMapping("sendmsg") public void sendMsg(String msg, Integer delayType){ log.info("当前时间:{},收到请求,msg:{},delayType:{}", new Date(), msg, delayType); sender.sendMsg(msg, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnumByValue(delayType))); } }

准备就绪,启动!

打开rabbitMQ的管理后台,可以看到我们刚才创建的交换机和队列信息:

接下来,我们来发送几条消息,http://localhost:8080/rabbitmq/sendmsg?msg=testMsg1&delayType=1 http://localhost:8080/rabbitmq/sendmsg?msg=testMsg2&delayType=2

日志如下:


2019-07-28 16:02:19.813 INFO 3860 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController : 当前时间:Sun Jul 28 16:02:19 CST 2019,收到请求,msg:testMsg1,delayType:1 2019-07-28 16:02:19.815 INFO 3860 --- [nio-8080-exec-9] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-o-qPpkWIkRm73DIrOIVhig identity=766339] started 2019-07-28 16:02:25.829 INFO 3860 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 当前时间:Sun Jul 28 16:02:25 CST 2019,死信队列A收到消息:testMsg1 2019-07-28 16:02:41.326 INFO 3860 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController : 当前时间:Sun Jul 28 16:02:41 CST 2019,收到请求,msg:testMsg2,delayType:2 2019-07-28 16:03:41.329 INFO 3860 --- [ntContainer#0-1] c.m.d.mq.DeadLetterQueueConsumer : 当前时间:Sun Jul 28 16:03:41 CST 2019,死信队列B收到消息:testMsg2

第一条消息在6s后变成了死信消息,然后被消费者消费掉,第二条消息在60s之后变成了死信消息,然后被消费掉,这样,一个还算ok的延时队列就打造完成了。

不过,等等,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有6s和60s两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求??

嗯,仔细想想,事情并不简单。

0|1七、RabbitMQ延时队列优化

显然,需要一种更通用的方案才能满足需求,那么就只能将TTL设置在消息属性里了。我们来试一试。

增加一个延时队列,用于接收设置为任意延时时长的消息,增加一个相应的死信队列和routingkey:


@Configuration public class RabbitMQConfig { public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec"; public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey"; public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey"; public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec"; // 声明延时Exchange @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); } // 声明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 声明延时队列C 不设置TTL // 并绑定到对应的死信交换机 @Bean("delayQueueC") public Queue delayQueueC(){ Map<String, Object> args = new HashMap<>(3); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY); return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build(); } // 声明死信队列C 用于接收延时任意时长处理的消息 @Bean("deadLetterQueueC") public Queue deadLetterQueueC(){ return new Queue(DEAD_LETTER_QUEUEC_NAME); } // 声明延时列C绑定关系 @Bean public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY); } // 声明死信队列C绑定关系 @Bean public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY); } }

增加一个死信队列C的消费者:


@RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME) public void receiveC(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("当前时间:{},死信队列C收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }

再次启动!然后访问:http://localhost:8080/rabbitmq/delayMsg?msg=testMsg1delayTime=5000 来生产消息,注意这里的单位是毫秒。


2019-07-28 16:45:07.033 INFO 31468 --- [nio-8080-exec-4] c.m.d.controller.RabbitMQMsgController : 当前时间:Sun Jul 28 16:45:07 CST 2019,收到请求,msg:testMsg1,delayTime:5000 2019-07-28 16:45:11.694 INFO 31468 --- [nio-8080-exec-5] c.m.d.controller.RabbitMQMsgController : 当前时间:Sun Jul 28 16:45:11 CST 2019,收到请求,msg:testMsg2,delayTime:5000 2019-07-28 16:45:12.048 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 当前时间:Sun Jul 28 16:45:12 CST 2019,死信队列C收到消息:testMsg1 2019-07-28 16:45:16.709 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 当前时间:Sun Jul 28 16:45:16 CST 2019,死信队列C收到消息:testMsg2

看起来似乎没什么问题,但不要高兴的太早,在最开始的时候,就介绍过,如果使用在消息属性上设置TTL的方式,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。

实验一下:


2019-07-28 16:49:02.957 INFO 31468 --- [nio-8080-exec-8] c.m.d.controller.RabbitMQMsgController : 当前时间:Sun Jul 28 16:49:02 CST 2019,收到请求,msg:longDelayedMsg,delayTime:20000 2019-07-28 16:49:10.671 INFO 31468 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController : 当前时间:Sun Jul 28 16:49:10 CST 2019,收到请求,msg:shortDelayedMsg,delayTime:2000 2019-07-28 16:49:22.969 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 当前时间:Sun Jul 28 16:49:22 CST 2019,死信队列C收到消息:longDelayedMsg 2019-07-28 16:49:22.970 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 当前时间:Sun Jul 28 16:49:22 CST 2019,死信队列C收到消息:shortDelayedMsg

我们先发了一个延时时长为20s的消息,然后发了一个延时时长为2s的消息,结果显示,第二个消息会在等第一个消息成为死信后才会“死亡“。

0|1八、利用RabbitMQ插件实现延迟队列

上文中提到的问题,确实是一个硬伤,如果不能实现在消息粒度上添加TTL,并使其在设置的TTL时间及时死亡,就无法设计成一个通用的延时队列。

那如何解决这个问题呢?不要慌,安装一个插件即可:Community Plugins — RabbitMQ ,下载rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录。

接下来,进入RabbitMQ的安装目录下的sbin目录,执行下面命令让该插件生效,然后重启RabbitMQ。


rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后,我们再声明几个Bean:


@Configuration public class DelayedRabbitMQConfig { public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue"; public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange"; public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey"; @Bean public Queue immediateQueue() { return new Queue(DELAYED_QUEUE_NAME); } @Bean public CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue, @Qualifier("customExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs(); } }

controller层再添加一个入口:


@RequestMapping("delayMsg2") public void delayMsg2(String msg, Integer delayTime) { log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), msg, delayTime); sender.sendDelayMsg(msg, delayTime); }

消息生产者的代码也需要修改:


public void sendDelayMsg(String msg, Integer delayTime) { rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{ a.getMessageProperties().setDelay(delayTime); return a; }); }

最后,再创建一个消费者:


@RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("当前时间:{},延时队列收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }

一切准备就绪,启动!然后分别访问以下链接:


http://localhost:8080/rabbitmq/delayMsg2?msg=msg1&delayTime=20000 http://localhost:8080/rabbitmq/delayMsg2?msg=msg2&delayTime=2000

日志如下:


2019-07-28 17:28:13.729 INFO 25804 --- [nio-8080-exec-2] c.m.d.controller.RabbitMQMsgController : 当前时间:Sun Jul 28 17:28:13 CST 2019,收到请求,msg:msg1,delayTime:20000 2019-07-28 17:28:20.607 INFO 25804 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController : 当前时间:Sun Jul 28 17:28:20 CST 2019,收到请求,msg:msg2,delayTime:2000 2019-07-28 17:28:22.624 INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 当前时间:Sun Jul 28 17:28:22 CST 2019,延时队列收到消息:msg2 2019-07-28 17:28:33.751 INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 当前时间:Sun Jul 28 17:28:33 CST 2019,延时队列收到消息:msg1

第二个消息被先消费掉了,符合预期。至此,RabbitMQ实现延时队列的部分就完结了。

0|1九、总结

延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用Java的DelayQueu,利用Redis的zset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,但就像炉石传说一般,这些知识就好比手里的卡牌,知道的越多,可以用的卡牌也就越多,遇到问题便能游刃有余,所以需要大量的知识储备和经验积累才能打造出更出色的卡牌组合,让自己解决问题的能力得到更好的提升。

但另一方面,随着时间的流逝和阅历的增长,越来越感觉到自己的能力有限,无法独自面对纷繁复杂且多变的业务需求,在很多方面需要其他人的协助才能很好的完成任务。也知道闻道有先后,术业有专攻,不会再狂妄自大,觉得自己能把所有事情都搞定,也将重心慢慢转移到研究如何有效的进行团队合作上来,我相信一个高度协调的团队永远比一个人战斗要更有价值。

花了一个周末的时间完成了这篇文章,文中所有的代码都上传到了github,https://github.com/MFrank2016/delayed-queue-demo如有需要可以自行查阅,希望能对你有帮助,如果有错误的地方,欢迎指正,也欢迎关注我的公众号进行留言交流。

【RabbitMQ】一文带你搞定RabbitMQ延迟队列相关推荐

  1. 干货 | 一文带你搞定Python 数据可视化

    2019独角兽企业重金招聘Python工程师标准>>> 01前言 在之前的一篇文章<Python 数据可视化利器>中,我写了 Bokeh.pyecharts 的用法,但是 ...

  2. 阿里、京东、字节跳动春招,Java岗offer不好拿?一文带你搞定

    前言 以下内容均为二月面试真题整理,面试内容均来自阿里.京东.腾讯.字节跳动等一线大厂,由网友集合反馈整理! 如有雷同,请在评论区提醒,全部内容GitHub可查阅. 由于篇幅原因,内容会比较杂乱,程序 ...

  3. 一文带你搞定svg-icon的使用

    前置 有的时候,我们经常在业务中会需要使用字体图标的场景,比如同一个图标在不同的地方显示不同的颜色,这个时候我们使用字体图标就非常的合适, 这篇文章我们主要讲的是在Vue中显示的字体图标`svg-ic ...

  4. C语言学习:编程、源文件、源代码是什么?一文带你搞定它!

    编程:人通过某种方式命令计算机做一些动作,来得到人想要的结果,就叫编程. 比如开关灯,把灯看做计算机,按下按钮,灯就开了,松开按钮灯就关了,这样也就达到了人向计算机下达指令的需求. 在早期,计算机全是 ...

  5. 一文带你搞定抖音最近最火的情侣微信早报信息推送

    文章目录 一.引言 二.公众号配置 2.1 注册号申请 2.2 填写appID和appsecret 2.3 配置推送模板 2.4 扫码关注 三.API配置 3.1 账号申请 3.2 创建应用 3.3 ...

  6. 一文带你搞定线程池原理

    1.使用线程池的意义何在? 项目开发中,为了统一管理线程,并有效精准地进行排错,我们经常要求项目人员统一使用线程池去创建线程.因为我们是在受不了有些人动不动就去创建一个线程,使用的多了以后,一旦报错就 ...

  7. DDD进阶_一文带你搞定什么是前台、中台、后台

    DDD从入门到精通,系列文章传送地址,请点击本链接. 目录 一.中台和平台的关系 二.什么是中台? 三.数字化转型中台应该共享什么? 四.如何实现前中后台的协同? 1. 前台 2. 中台 3. 后台 ...

  8. 一文带你搞定JDBC

    前言:"只有自己强大,才不会被别人践踏." 你好,我是梦阳辰,让我们轻松玩编程,一起走进JDBC的世界吧!文章较长建议收藏再看! 文章目录 1.JDBC概述 2.JDBC编程六步( ...

  9. Go 实战 | 一文带你搞懂从单队列到优先级队列的实现

    大家好,我是「Go学堂」的渔夫子,今天跟大家聊聊在我们项目中的优先级队列的实现及应用场景. 优先级队列概述 队列,是数据结构中实现先进先出策略的一种数据结构.而优先队列则是带有优先级的队列,即先按优先 ...

最新文章

  1. 使用Nmap获取目标服务器开放的服务以及操作系统信息
  2. 如何查询高考成绩2021年的成绩排位,2021年四川高考个人排名怎么查询,四川高考成绩排名查询方法...
  3. tkinter回调异常_Python tkinter文本修改后的回调
  4. OPENCV已知内参求外参
  5. django中的缓存 单页面缓存,局部缓存,全站缓存 跨域问题的解决
  6. 仿BlogEngine.NET的cnBlog主题
  7. 单片机独立式按键c语言程序,(原创)51单片机C语言程序设计--速学教程实例(入门篇)之独立按键(查询)...
  8. 训练日志 2019.3.7
  9. 不到 1000 元,你的所有隐私竟然都能随便查!!!
  10. 记一次渗透学习||钓鱼网站渗透
  11. 微信小程序登录界面 服务器,微信小程序之登录页-------实例
  12. java不解压获取压缩包(zip,rar)文件列表或文本文件内容
  13. 开发版速达扩展功能-提供便捷的界面布局功能
  14. Android实例精讲——通过ListView构造微信聊天界面视图
  15. 世界上都有哪些常用的聊天软件?
  16. Visual Studio NuGet程序包找不到源
  17. 知道创宇云防御平台通过2021上半年可信云安全运营中心能力评估
  18. 大屏可视化色彩设计基本知识
  19. Invalid bound statement (not found): com.xx.dao.TypeMapper.selectAll
  20. 九宫图-图片轮播-兼容IE8和Chrome浏览器

热门文章

  1. QT的QAudioInput类的使用
  2. QML基础类型之rect
  3. python考试报名官网安徽_今年优选:芜湖python
  4. 13.8.可视化虚拟机工具--Jconsole内存监控、13.9.可视化虚拟机工具--Jconsole线程监控、13.10.死锁原理以及可视化虚拟机工具--Jconsole线程
  5. 07_clickhouse、自定义分区及底层存储合并机制、自定义分区键、分区目录的命名规则、分区目录的合并过程、分区目录的合并过程、分区表达式指定、分区案例
  6. 从websphere6.1迁移到weblogic10.3的问题总结
  7. mysql max_allowed_packet 设置过小导致记录写入失败
  8. oracle ha节点,oracle linux ha配置
  9. Qt网络编程之UDP编程练习(20200219)
  10. Qt学习笔记之QChar