1. 预备知识

1.1 消息传递

首先我们知道消费者是从队列中获取消息的,那么消息是如何到达队列的?

当我们发送一条消息时,首先会发给交换器(exchange),交换器根据规则(路由键:routing key)将会确定消息投递到那个队列(queue)。

带着这几个关键字:交换器、路由键和队列。

1.2 交换器类型

如之前所说,交换器根据规则决定消息的路由方向。因此,rabbitmq的消息投递分类便是从交换器开始的,不同的交换器实现不同的路由算法便实现了不同的消息投递方式。

direct交换器

direct -> routingKey -> queue,相当一种点对点的消息投递,如果路由键匹配,就直接投递到相应的队列

fanout交换器

fanout交换器相当于实现了一(交换器)对多(队列)的广播投递方式

topic交换器

提供一种模式匹配的投递方式,我们可以根据主题来决定消息投递到哪个队列。

1.3 消息延迟

本文想要实现一个可延迟发送的消息机制。消息如何延迟?

ttl (time to live) 消息存活时间

ttl是指一个消息的存活时间。

Per-Queue Message TTL in Queues

引用官方的一句话:

TTL can be set for a given queue by setting the x-message-ttl argument to queue.declare, or by setting the message-ttl policy. A message that has been in the queue for longer than the configured TTL is said to be dead.
我们可以通过x-message-ttl设置一个队列中消息的过期时间,消息一旦过期,将会变成死信(dead-letter),可以选择重新路由。

Per-Message TTL in Publishers

引用官方的一句话:

A TTL can be specified on a per-message basis, by setting the expiration field in the basic AMQP class when sending a basic.publish.

The value of the expiration field describes the TTL period in milliseconds. The same constraints as for x-message-ttl apply. Since the expiration field must be a string, the broker will (only) accept the string representation of the number.

我们可以通过设置每一条消息的属性expiration,指定单条消息有效期。消息一旦过期,将会变成死信(dead-letter),可以选择重新路由。

重新路由-死信交换机(Dead Letter Exchanges)
引用官方一句话:

Dead Letter Exchanges

Messages from a queue can be ‘dead-lettered’; that is, republished to
another exchange when any of the following events occur:

The message is rejected (basic.reject or basic.nack) with
requeue=false, The TTL for the message expires; or The queue length
limit is exceeded. Dead letter exchanges (DLXs) are normal exchanges.
They can be any of the usual types and are declared as usual.
To set the dead letter exchange for a queue, set the x-dead-letter-exchange argument to the name of the exchange.

我们可以通过设置死信交换器(x-dead-letter-exchange)来重新发送消息到另外一个队列,而这个队列将是最终的消费队列。

  1. 具体实现

rabbitmq配置

属性文件-rabbitmq.properties

交换、路由等配置按照以上策略,其中,添加了prefetch参数来根据服务器能力控制消费数量。

连接用户名

mq.user =sms_user

密码

mq.password =123456

主机

mq.host =192.168.99.100

端口

mq.port =5672

默认virtual-host

mq.vhost =/

the default cache size for channels is 25

mq.channelCacheSize =50

发送消息路由

sms.route.key =sms_route_key

延迟消息队列

sms.delay.queue =sms_delay_queue

延迟消息交换器

sms.delay.exchange =sms_delay_exchange

消息的消费队列

sms.queue =sms_queue

消息交换器

sms.exchange =sms_exchange

每秒消费消息数量

sms.prefetch =30

配置rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><context:property-placeholder location="rabbitmq.properties"/><!--配置connection-factory,指定连接rabbit server参数 --><rabbit:connection-factory id="connectionFactory"username="${mq.user}" password="${mq.password}"host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}" /><!--定义rabbit template用于数据的接收和发送 --><rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /><!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 --><rabbit:admin connection-factory="connectionFactory" /><!--定义queue --><rabbit:queue name="${sms.queue}" durable="true" auto-delete="false" exclusive="false" /><!-- 创建延迟,有消息有效期的队列 --><rabbit:queue name="${sms.delay.queue}" durable="true" auto-delete="false"><rabbit:queue-arguments><entry key="x-message-ttl"><!-- 队列默认消息过期时间 --><value type="java.lang.Long">3600000</value></entry><!-- 消息过期根据重新路由 --><entry key="x-dead-letter-exchange" value="${sms.exchange}"/></rabbit:queue-arguments></rabbit:queue><!-- 定义direct exchange,sms_queue --><rabbit:direct-exchange name="${sms.exchange}" durable="true" auto-delete="false"><rabbit:bindings><rabbit:binding queue="${sms.queue}" key="${sms.route.key}"/></rabbit:bindings></rabbit:direct-exchange><!-- 延迟消息配置,durable=true 持久化生效 --><rabbit:direct-exchange name="${sms.delay.exchange}" durable="true" auto-delete="false"><rabbit:bindings><rabbit:binding queue="${sms.delay.queue}" key="${sms.route.key}"/></rabbit:bindings></rabbit:direct-exchange><!-- 消息接收者 --><bean id="messageReceiver" class="git.yampery.consumer.MsgConsumer"/><!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--><rabbit:listener-container connection-factory="connectionFactory" prefetch="${sms.prefetch}"><rabbit:listener queues="${sms.queue}" ref="messageReceiver"/></rabbit:listener-container>
</beans>

消息发布者

package git.yampery.producer;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/**
* @decription MsgProducer
* <p>生产者</p>
* @author Yampery
* @date 2018/2/11 11:44
*/
@Component
public class MsgProducer {@Resourceprivate AmqpTemplate amqpTemplate;@Value("${sms.delay.exchange}") private String SMS_DELAY_EXCHANGE;@Value("${sms.exchange}") private String SMS_EXCHANGE;@Value("${sms.route.key}") private String SMS_ROUTE_KEY;/*** 延迟消息放入延迟队列中* @param msg* @param expiration*/public void publish(String msg, String expiration) {amqpTemplate.convertAndSend(SMS_DELAY_EXCHANGE, SMS_ROUTE_KEY, msg, message -> {// 设置消息属性-过期时间message.getMessageProperties().setExpiration(expiration);return message;});}/*** 非延迟消息放入待消费队列* @param msg*/public void publish(String msg) {amqpTemplate.convertAndSend(SMS_EXCHANGE, SMS_ROUTE_KEY, msg);}
}

消费者

package git.yampery.consumer;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;/**
* @decription MsgConsumer
* <p>消费者</p>
* @author Yampery
* @date 2018/2/11 11:43
*/
public class MsgConsumer implements MessageListener {@Overridepublic void onMessage(Message message) {String msg;try {// 线程每秒消费一次Thread.sleep(1000);msg = new String(message.getBody(), "utf-8");System.out.println(msg);} catch (Exception e) {// 这里并没有对服务异常等失败的消息做处理,直接丢弃了// 防止因业务异常导致消息失败造成unack阻塞再队列里// 可以选择路由到另外一个专门处理消费失败的队列return;}}
}

测试

package git.yampery.mq;//需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码//  一零三八七七四六二六
import com.alibaba.fastjson.JSONObject;
import git.yampery.producer.MsgProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;/*** @decription TestMq* <p>测试</p>* @author Yampery* @date 2018/2/11 15:03*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestMq {@Resourceprivate MsgProducer producer;@Testpublic void testMq() {JSONObject jObj = new JSONObject();jObj.put("msg", "这是一条短信");producer.publish(jObj.toJSONString(), String.valueOf(10 * 1000));}
}

java B2B2C 仿淘宝电子商城系统-基于Rabbitmq实现延迟消息相关推荐

  1. java B2B2C 仿淘宝电子商城系统-Spring Cloud Feign的文件上传实现

    在Spring Cloud封装的Feign中并不直接支持传文件,但可以通过引入Feign的扩展包来实现,本文就来具体说说如何实现.需要JAVA Spring Cloud大型企业分布式微服务云构建的B2 ...

  2. java B2B2C 仿淘宝电子商城系统-Spring Cloud Eureka参数配置项详解

    Eureka涉及到的参数配置项数量众多,它的很多功能都是通过参数配置来实现的,了解这些参数的含义有助于我们更好的应用Eureka的各种功能,下面对Eureka的配置项做具体介绍,供大家参考. 需要JA ...

  3. java B2B2C 仿淘宝电子商城系统-eureka 基础

    服务发现:Eureka客户端 服务发现是基于微服务架构的关键原则之一.尝试配置每个客户端或某种形式的约定可能非常困难,可以非常脆弱.Netflix服务发现服务器和客户端是Eureka.可以将服务器配置 ...

  4. java B2B2C Springboot仿淘宝电子商城系统-负载均衡之ribbon+feign

    一. feign简介 Feign是一个声明式的伪Http客户端,它使得写Http客户端变得更简单.使用Feign,只需要创建一个接口并注解.它具有可插拔的注解特性,可使用Feign注解和JAX-RS注 ...

  5. java B2B2C Springcloud仿淘宝电子商城系统-spring cloud 框架原理

    我们从整体来看一下Spring Cloud主要的组件,以及它的访问流程 需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码 一零三八七七四六二六 1.外部或者 ...

  6. java B2B2C Springcloud仿淘宝电子商城系统

    Spring Cloud是基于Spring Boot的一整套实现微服务的框架.他提供了微服务开发所需的配置管理.服务发现.断路器.智能路由.微代理.控制总线.全局锁.决策竞选.分布式会话和集群状态管理 ...

  7. (四)java B2B2C Springboot仿淘宝电子商城系统技术解决方案

    鸿鹄云商大型企业分布式互联网电子商务平台,推出PC+微信+APP+云服务的云商平台系统,其中包括B2B.B2C.C2C.O2O.新零售.直播电商等子平台.愿意了解源码的朋友直接求求交流分享技术:二一四 ...

  8. java B2B2C springmvc mybatis仿淘宝电子商城系统(十)用spring Restdocs创建API文档

    这篇文章将带你了解如何用spring官方推荐的restdoc去生成api文档.本文创建一个简单的springboot工程,将http接口通过Api文档暴露出来.只需要通过 JUnit单元测试和Spri ...

  9. java B2B2C Springboot仿淘宝电子商城系统(六)springboot整合mybatis

    引入依赖 在pom文件引入mybatis-spring-boot-starter的依赖: <dependency><groupId>org.mybatis.spring.boo ...

最新文章

  1. 别再傻傻地用这些软件G转P了,修复后不稳定的真相在这里
  2. 你不知道的关于计算机大师 Dijkstra 的事情
  3. .net IL 指令速查
  4. 2021-09-25
  5. boost::range模块reversed相关的测试程序
  6. iOS-AFNetworking参数和多文件同时上传【多文件上传】
  7. 默认方法一种扩展旧代码的方法
  8. abcde依次进入一个队列_数据结构与算法(6):队列
  9. linux安装java_安装和配置Linux上的Java运行环境
  10. WIN10系统和压缩内存占用磁盘过高的解决方案(亲测有效)
  11. [新整理] CAD高级模拟考题
  12. 小马 KMS10激活系统后的浏览器小尾巴分析与清除
  13. STM32L476应用开发之七:流量的PID控制
  14. 人工智能如何自我进化
  15. mysql 解决1062报错
  16. [Poi2003 ][bzoj 2601]MAL猴子捞月
  17. 微信小程序CMS系统开发教程开发初级
  18. libjpeg的安装与使用
  19. 树莓派初次配置C++环境以及进行简单的hcsr04驱动
  20. 内涵TV段子,价值500元的dz内涵笑话商业源码

热门文章

  1. log4j:warn找不到_修复log4j WARN找不到记录器的附加程序,请正确初始化log4j系统
  2. android 崩溃捕获_Android从相机和图库捕获图像
  3. ROS的学习(十二)用C++写一个简单的发布者
  4. ubuntu下安装英汉词典——stardict
  5. 如何利用C++和libCurl使用OCR SDK
  6. C语言基础教程之递归
  7. 点击编辑table变为可编辑状态
  8. 海鲜之战:上半场活鲜炒作已曲终,下半场冷冻海鲜厚积薄发?
  9. ​七周二次课(5月7日)监控io性能、free命令、ps命令、查看网络状态、linux下抓包...
  10. log4net进阶手札(四):保存自定义对象到oracle