参考:

http://blog.csdn.net/u014308482/article/details/53036770

http://blog.csdn.net/i_vic/article/details/72742277

里面的例子参考自这两篇博客,记录下使用过程。

为什么要延迟加载:

制定一项任务,在某个时间之后去执行,这种场景比较适合使用延迟加载的模式。

延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

原理:

Time To Live(TTL)
  RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
  RabbitMQ针对队列中的消息过期时间有两种方法可以设置。
A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
B: 对消息进行单独设置,每条消息TTL可以不同。
如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter

Dead Letter Exchanges(DLX)
  RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:指定routing-key发送

队列出现dead letter的情况有:

消息或者队列的TTL过期
  队列达到最大长度
  消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。

利用这两个特性,设置消息的过期时间,当生产的消息没有消费者去接收(这样的队列称为死信队列),消息在

rabbitServer的到达设置的过期时间时,就会将死信队列中的过期消息发送到DLX中设置的Exchange中,这样

就实现了延迟加载。

上图:

集成过程如下:

生产者端配置

1.引入依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.41</version></dependency></dependencies>

2.配置application.properties

server.port=10001
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/

3.配置AMQP

@Configuration
public class AmqpConfig {@BeanRabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {return new RabbitAdmin(connectionFactory);}@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);return template;}
}

4.声明队列、交换机

@Configuration
public class ExchangeConfig {/******************************************死信队列***************************************************///exchange name  public static final String DEFAULT_EXCHANGE = "KSHOP";       //DLX QUEUE  public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "kshop.dead.letter.queue";       //DLX repeat QUEUE 死信转发队列  public static final String DEFAULT_REPEAT_TRADE_QUEUE_NAME = "kshop.repeat.trade.queue";     //信道配置  @Bean  public DirectExchange defaultExchange() {  return new DirectExchange(DEFAULT_EXCHANGE, true, false);  }  @Bean public Queue repeatTradeQueue() {  Queue queue = new Queue(DEFAULT_REPEAT_TRADE_QUEUE_NAME,true,false,false);  return queue;   }  @Bean  public Binding  drepeatTradeBinding() {  return BindingBuilder.bind(repeatTradeQueue()).to(defaultExchange()).with(DEFAULT_REPEAT_TRADE_QUEUE_NAME);  }  @Bean public Queue deadLetterQueue() {  Map<String, Object> arguments = new HashMap<>();  arguments.put("x-dead-letter-exchange", DEFAULT_EXCHANGE);  arguments.put("x-dead-letter-routing-key", DEFAULT_REPEAT_TRADE_QUEUE_NAME);  Queue queue = new Queue(DEFAULT_DEAD_LETTER_QUEUE_NAME,true,false,false,arguments);  System.out.println("arguments :" + queue.getArguments());  return queue;   }  @Bean  public Binding  deadLetterBinding() {  return BindingBuilder.bind(deadLetterQueue()).to(defaultExchange()).with(DEFAULT_DEAD_LETTER_QUEUE_NAME);  }  }

这里指定了声明了死信队列失效之后的发送的交换机和routing-key,其实这里可以指定两个交换机,一个是死信队列的交换机1绑定死信队列,一个是失效之后到达的交换机2绑定延迟队列,死信队列的交换机没有消费者去监听,而交换机2绑定的队列就是真正的延迟队列了,消费者去监听这个队列。

5.定义业务service

@Service
public class DeadLetterService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(LogCarrier logCarrier) {MessagePostProcessor processor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(30000 + "");return message;}};rabbitTemplate.convertAndSend(ExchangeConfig.DEFAULT_EXCHANGE, ExchangeConfig.DEFAULT_DEAD_LETTER_QUEUE_NAME,JSON.toJSONString(logCarrier), processor);}}

这里指定了超时时间为30秒

6.创建controller去调用

@RestController
public class DeadLetterController { @Autowired
    private DeadLetterService deadLetterService;    @GetMapping("deadLetter")
    public void direct() throws InterruptedException {
        long i = 0;
        while(i<10) {
            LogCarrier contract = new LogCarrier();
            contract.setId(i++);
            contract.setType("direct");
            deadLetterService.send(contract);
        }
        System.out.println("消息发送时间:"+new Date());
    }}

这样,生产者端的基本都配置完成。

消费者端配置,消费者端的配置很简单:

1.依赖

<dependencies><!-- <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.41</version></dependency></dependencies>

2.application.properties

server.port=0
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/

3.AMQP配置

@Configuration
@EnableRabbit
public class ConsumerConfig implements RabbitListenerConfigurer {@Beanpublic DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();factory.setMessageConverter(new MappingJackson2MessageConverter());return factory;}@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPrefetchCount(1);//设置预读取数,可以进行有效的负载均衡。factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//自动askreturn factory;}@Overridepublic void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());}
}

4.监听service

/*** 死信队列* * @author cfyj 2017年11月24日 下午3:11:05**/
@Service
public class CustomService4 {private static int num = 0;@RabbitListener(queues = "kshop.repeat.trade.queue")  @RabbitHandlerpublic void process(String obj) {LogCarrier logCarrier= JSON.parseObject(obj, LogCarrier.class); System.out.println(num+":------消息接收时间"+new Date()+logCarrier);}
}

传输实体:

package com.cfyj.demo.domain;public class LogCarrier {private Long id;private String type;public String getType() {return type;}public void setType(String type) {this.type = type;}public Long getId() {return id;}public void setId(Long id) {this.id = id;}@Overridepublic String toString() {return "LogCarrier [id=" + id + ", type=" + type + "]";}
}

这样生产者和消费者都配置完成了。注意 生产者和消费者端传输的对象实体类信息必须一致。

这样就开始测试吧,测试之前我们带着几个问题去测试:

只启动生产者,然后向死信队列发送信息,消息失效后会怎么样?

如果指定交换机的类型为fanout,没有消费者监听是否会将信息直接丢弃呢?

1.测试,启动生产者和消费者(先启动生产者来声明交换机和队列)

发送消息的时间

死信队列中的消息数

延迟队列的消息,这时因为过期时间还没到,所以死信队列中的信息还没有到达延迟队列中

消费者收到延迟队列的时间

接收-发送的时间正好为过期时间30s,这样就实现了消息的延迟消费,在到达过期时间后,死信队列的消息会发送到指定x-dead-letter-exchange的交换机中,由交换机发送到设置的延迟队列。

2.当我们只启动生产者时,发送消息,消息会怎么样?(topic类型和direct类型的测试结果一致)

发送请求时间:

死信队列中的消息:

当达到过期时间后,延迟队列的消息(注意两个队列收到消息的时间):

当只启动生产者服务然后发送消息到死信队列时,消息会先堆积到死信队列,然后到达过期时间后重发到延迟队列中。

3.如果指定交换机的类型为fanout,没有消费者监听是否会将信息直接丢弃呢?

发送信息后,消息会先进入死信队列中,并没有直接丢弃消息

死信队列,注意接收消息的时间:

死信队列中的消息到达过期时间后:

延迟队列:

测试结果与direct类型相同。

springboot与rabbitMQ实现延迟加载相关推荐

  1. RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)

    说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...

  2. 九、springboot整合rabbitMQ

    springboot整合rabbitMQ 简介 rabbitMQ是部署最广泛的开源消息代理. rabbitMQ轻量级,易于在内部和云中部署. 它支持多种消息传递协议. RabbitMQ可以部署在分布式 ...

  3. SpringBoot使用RabbitMQ消息队列

    RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的 ...

  4. springboot 集成rabbitmq 实例

    springboot 集成rabbitmq 实例 个人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理 ...

  5. RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ

    什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...

  6. SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka

    1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...

  7. RabbitMq(九) SpringBoot整合RabbitMQ消费者示例代码

    概述 在上一篇我们介绍了SpringBoot整合RabbitMQ生产者代码,本章我们介绍SpringBoot整合RabbitMQ,实现消费者工程的代码实现.与生产者集成相比,集成消费者不需要进行添加配 ...

  8. RabbitMq(八) SpringBoot整合RabbitMQ 生产者代码实现

    在本章中我们将创建RabbitMQ的生产者工程,并实现生产者端代码实现. springboot整合RabbitMQ生产者工程步骤如下: 创建maven工程 引入springboot及RabbitMQ依 ...

  9. springboot之rabbitmq

    一.RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件).RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的.所有 ...

最新文章

  1. Bitcoin ABC近半数节点已经完成了版本更迭
  2. WSAGetLastError:10004 一个封锁操作被对 WSACancelBlockingCall的调用中断 的解决
  3. linux pip 找不到python
  4. Selenium3自动化测试——11. 下拉框处理
  5. hdu1978(递推dp)
  6. SECRET SHARING STEP BY STEP
  7. Go语言的多态(Polymorphism)
  8. MyBatis ResultMap(2)
  9. 创建oracle 数据库表空间,角色,用户的sql语句
  10. java序列化错在哪里_Spark序列化错误:java.io.NotSerializableException
  11. 领域驱动 开源项目_在开源领域建立职业的建议
  12. android分享到新浪微博,认证+发送微博,
  13. keil+proteus 制作计算器_设计费 | 工程设计费计算器使用指南
  14. php李炎恢笔记,一步步学习php笔记 李炎恢瓢城web俱乐部
  15. 使用Foxit Reader实现批量打印以及一页多版设置技巧
  16. R的农场 chebnear
  17. 刀刀漫画合集(共享PDF,RAR,UMD版本和语录)
  18. 【安装PyTorch报错】InvalidArchiveError(‘Error with archive D:\\anaconda\\pkgs\\pytorch-1.2.0-py3.6····
  19. 支持图灵架构和安培架构的TensorFlow Python库
  20. Linux宝塔控制面板如何实现多个二级域名301重定向跳转

热门文章

  1. android wear听音乐,用户反映Android Wear影响蓝牙耳机的音质
  2. 同花顺去年实现净利润6.3亿元 同比减少12.64%
  3. 入门级: WinForm 下的 ComboBox,ListBox 的使用 (三) 选择控件
  4. 学计算机笔记本8gb够用吗,计算机专业的学生适合什么样的笔记本电脑?商务本够用么?...
  5. 使用 State Hook
  6. 一个好看的表格,拿走不谢
  7. 百度云 职称计算机cad,全国职称计算机考试cad命令集.pdf
  8. 国内有什么好用的、能落地的AR远程协助解决方案?
  9. 【计算机视觉】Mip-nerf 论文精读记录
  10. 聚观早报 | Meta 考虑推出 Twitter 竞品;硅谷银行,真的倒闭了