SpringBoot整合RabbitMq实战(一)
1 Spring AMQP
简介
Spring AMQP
项目是一个引入Spring核心概念用于基于高级消息队列(AMQP
)的解决方案的开发,它提供了一个模板用于发送和接受消息的高级抽象。它对基于消息驱动并带有一个监听容器的pojo
对象提供支持,这个库促进AMQP
资源的管理,同时也促进Spring AMQP
的依赖注入和声明式配置。在所有的案例中,你可以看到类似于JMS
对Spring框架的支持。
整个Spring AMQP
项目包含两部分,即spring-amqp
和spring-rabbit
,前者是RabbitMq
的基础抽象,后者是RabbitMq
的实现。
目前Spring官网发布的最新稳定版本Spring AMQP
是2.2.9版本,它具有以下新特性:
- 支持异步处理入站消息的监听器容器;
RabbitTemplate
模板类用于发送和接收消息;RabbitAdmin
类用于自动声明队列、交换机和绑定
2 引入依赖和声明配置
2.1 引入依赖
在maven构建的spring
项目中可以在pom.xml
文件中通过下面这种引入spring-rabbitmq
的依赖
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.2.9.RELEASE</version>
</dependency>
而在spring-boot项目中则通过springboot
对应的rabbitmq
起步依赖项引入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.9.RELEASE</version></dependency>
2.2 声明配置
- 通过xml的方式配置
applicationContext.xml
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttps://www.springframework.org/schema/rabbit/spring-rabbit.xsdhttp://www.springframework.org/schema/beanshttps://www.springframework.org/schema/beans/spring-beans.xsd"><!--rabbit连接工厂--><rabbit:connection-factory id="connectionFactory"/><!--RabbitTemplate--><rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/><!--RabbitAdmnin--><rabbit:admin connection-factory="connectionFactory"/><!--Queue--><rabbit:queue name="myqueue"/></beans>
- 通过
java config
方式配置
@Configuration
public class RabbitConfiguration {@Beanpublic ConnectionFactory connectionFactory() {return new CachingConnectionFactory("localhost");}@Beanpublic AmqpAdmin amqpAdmin() {return new RabbitAdmin(connectionFactory());}@Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}@Beanpublic Queue myQueue() {return new Queue("myqueue");}
}
3AMQP
抽象概念
AMQP
抽象概念是使用Spring-AMQP
模块必须要掌握的重要概念,开发过程中也经常要使用到这些接口和类,主要包括Message
、Exchange
、Queue
和Binding
,它们都是org.springframework.amqp.core
包下的接口或类
3.1 Message
Spring AMQP
将Message
类定义为更通用的AMQP
域模型表示的一部分,Message类的目的是将主体和属性封装在单个实例中,从而使API
更简单。Message类的定义如下,发送消息的时候可直接将消息封装成Message类
public class Message implements Serializable{//消息属性类,具体可查看MessageProperties类源码private final MessageProperties messageProperties;//消息的主题部分,类型为byte数组private final byte[] body;public Message(byte[] body, MessageProperties messageProperties) {this.body = body;this.messageProperties = messageProperties;}public byte[] getBody() {return this.body;}public MessageProperties getMessageProperties() {return this.messageProperties;}
}
3.2 Exchange
Exchange
接口代表一个AMQP
交换机,它是消息生产者投送到的地方,代理的虚拟主机中的每个交换器都有一个惟一的名称和一些其他属性。Exchange
接口的源码如下:
public interface Exchange extends Declarable {String getName();String getType();boolean isDurable();boolean isAutoDelete();Map<String, Object> getArguments();boolean isDelayed();boolean isInternal();
}
从上面的源码可以看出Exchange
的实现类中都有一个type属性来决定属于什么类型的交换机,这些类型限制在ExchangeTypes
常量中,主要有direct、topic、fanout和headers
4种,每种类型的交换机都可以在org.springframework.amqp.core
下找到其对应的实现类。
在处理绑定到队列的方式方面,这些交换类型的行为各不相同。
direct
交换只允许队列被固定的路由键(通常是队列的名称)绑定;topic
交换机支持带有路由模式的绑定,这些模式可能分别包含“*”和“#”通配符,用于“确定的一个”和“0或多个”;Fanout exchange
发布到绑定到它的所有队列,而不考虑任何路由密钥
3.3 Queue
Queue类表示消息使用者从其中接收消息的组件。与各种Exchange
类一样,我们的实现是这个核心AMQP
类型的抽象表示。下面的清单显示了Queue类的主体的核心源码:
public class Queue extends AbstractDeclarable {private final String name;private volatile boolean durable;private volatile boolean exclusive;private volatile boolean autoDelete;private volatile Map<String, Object> arguments;/*** 默认的消息队列是持久化, 非独立和非自动删除的.* @param name 消息队列的命名.*/public Queue(String name) {this(name, true, false, false);}// Getters and Setters omitted for brevity}
请注意,构造函数接受队列名称。根据实现的不同,管理模板可能提供用于生成唯一命名队列的方法。这样的队列可以用作“回复”地址或其他临时情况。因此,自动生成队列的exclusive
和autoDelete
属性都将被设置为true
。
3.4 Binding
消息传递连接生产者和消费者至关重要, 在Spring AMQP
中,我们定义了一个Binding
类来表示这些连接;
构造Binding
实例的2种方式:
1) 通过关键字new
构造
Queue directBinding = new Binding(queueName, directExchange, "foo.bar");Queue topicBinding = new Binding(queueName, topicExchange, "foo.*");Queue fanoutBinding = new Binding(queueName, fanoutExchange);
- 通过
BindingBuilder
类构造
Binding binding = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
3 实战演练
由于rabbitmq
消息队列一般用于系统间异步通信,如电商项目中处理高峰期(双十一抢购)的订单时一般先把订单数据投递到消息队列中,之后再通过异步处理减轻服务器DB和IO压力。
3.1 构建聚合项目
本实战中笔者使用IDEA
构建了一个聚合模块spring-boot
项目message-practices
,该项目包含common、message-producer和message-consumer
三个子模块项目,common
项目中放一些公共的通用类;message-producer
项目模拟发送消息;message-consumer
项目用于模拟消费消息
聚合项目的结构如下:
messagepractices
|---common
|---message-consumer
|---message-producer
各个项目的 pom.xml
中引入依赖和坐标
messagepractices
项目pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.4.RELEASE</version><relativePath /></parent><groupId>com.hsf.rabbitmq</groupId><artifactId>message-practices</artifactId><packaging>pom</packaging><version>1.0-SNAPSHOT</version><modules><module>message-producer</module><module>message-consumer</module><module>common</module></modules><properties><java.version>1.8</java.version></properties><dependencies><!--阿里fastjson依赖--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.56</version></dependency></dependencies>
</project>
messagepractices
项目`中无配置项和业务逻辑代码
2)common
项目pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>message-practices</artifactId><groupId>com.hsf.rabbitmq</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>common</artifactId>
</project>
common
项目中src/main/java
目录下新建模拟订单的ProductOrder
实体类
注意:发送消息的实体类和消费消息的实体类必须具有相同的全限定类名,否则消费消息反序列化时会报找不到那个实体类,因此消息的实体类必须是一个公共类
package com.hsf.rabbitmq.common.pojo;import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;public class ProductOrder implements Serializable {private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd:HH:mm:ss");//订单ID,默认为UUIDprivate String orderId;//商品IDprivate String productId;//商品名称private String productName;//商品类目IDprivate String categoryId;//商品单价private Double price = 0.0;//商品数量private Integer count = 0;//下单时间戳,日期字符串格式private String timestamp;public String getOrderId() {if(orderId==null || "".equals(orderId)){orderId = UUID.randomUUID().toString();}return orderId;}public String getTimestamp() {if(timestamp==null || "".equals(timestamp)){timestamp = sdf.format(new Date());}return timestamp;}//省略其他setter和getter方法}
common
项目编辑完后需要在common
项目的根目录下通过IDEA的Terminal或者git bash或者cmd
命令窗口执行
mvn install
命令将common
项目以jar包的形式上传到本地maven仓库,方便依赖它的message-producer
和message-consumer
项目引用它,common
项目打包成功并上传到本地仓库路后可以看到在本地Maven仓库中
看到其对应的jar包和pom
文件,如下图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-33Ix885F-1595749932669)(D:\markdown撰写文档\images\common_jar.png)]
3)message-producer
项目pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><artifactId>message-producer</artifactId><parent><groupId>com.hsf.rabbitmq</groupId><artifactId>message-practices</artifactId><version>1.0-SNAPSHOT</version></parent><dependencies><dependency><groupId>com.hsf.rabbitmq</groupId><artifactId>common</artifactId><version>1.0-SNAPSHOT</version></dependency><!--引入spring mvc的起步依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--rabbitmq依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.9.RELEASE</version></dependency></dependencies><build><plugins><!--打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.1.5.RELEASE</version></plugin><!--编译和打包时跳过测试--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><configuration><skipTests>true</skipTests></configuration></plugin></plugins></build>
</project>
message-consumer
项目pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><groupId>com.hsf.rabbitmq</groupId><artifactId>message-practices</artifactId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>message-consumer</artifactId><dependencies><dependency><groupId>com.hsf.rabbitmq</groupId><artifactId>common</artifactId><version>1.0-SNAPSHOT</version></dependency><!--引入spring mvc的起步依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--rabbitmq依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.9.RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.1.5.RELEASE</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><configuration><skipTests>true</skipTests></configuration></plugin></plugins></build>
</project>
2.1.5版本的spring-boot-maven-plugin
打包插件用的阿里云仓库的,因为笔者将项目改为聚合项目后从maven中央仓库一直拉不下来
注: 笔者的maven仓库配置配置了阿里云的maven仓库镜像地址以及本地仓库
IDEA使用的maven对应的conf/setting.xml
配置镜像和代理仓库
<mirrors><!-- mirror| Specifies a repository mirror site to use instead of a given repository. The repository that| this mirror serves has an ID that matches the mirrorOf element of this mirror. IDs are used| for inheritance and direct lookup purposes, and must be unique across the set of mirrors.| --><mirror><id>aliyunPublic</id><mirrorOf>public</mirrorOf><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url></mirror><mirror><id>aliyunCentral</id><mirrorOf>central</mirrorOf><name>阿里云中央仓库</name><url>https://maven.aliyun.com/repository/central</url></mirror><mirror><id>aliyunSprinPlugin</id><mirrorOf>spring-plugin</mirrorOf><name>阿里云spring-plugin仓库</name><url>https://maven.aliyun.com/repository/spring-plugin</url></mirror><mirror><id>central</id><name>Maven Repository Switchboard</name><url>https://repo1.maven.org/maven2/</url><mirrorOf>central</mirrorOf></mirror><mirror><id>repo2</id><mirrorOf>central</mirrorOf><name>Human Readable Name for this Mirror.</name><url>https://repo1.maven.org/maven2/maven/</url></mirror></mirrors><repositories><!--本地仓库1--><repository><id>local1</id><url>file:///C:/Users/HP/.m2/repository</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository><repository><!--本地仓库2--> <id>local2</id><url>file:///D:/mavenRepository/.m2</url>m<releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository><!-阿里云的spring代理仓库--><repository><id>spring</id><url>https://maven.aliyun.com/repository/spring</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories>
配置好后执行拉取命令:
mvn install
3.2 配置启动类、rabbitmq连接、创建交换机和队列
3.2.1 message-producer
项目配置文件与bean
application.yaml
配置和项目启动类
application.yaml
为节约时间起见,这里没有配置不同环境下的application.yaml
server:port: 8081servlet:context-path: /messge-producer
MessageProducerApplication.java
@SpringBootApplication
public class MessageProducerApplication {public static void main(String[] args){SpringApplication.run(MessageProducerApplication.class,args);}
}
2)连接配置与交换机和队列bean实例的配置
package com.hsf.rabbitmq.message.producer.configuration;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConfig {@Bean//注意这个ConnectionFactory类是org.springframework.amqp.rabbit包下的类,而不是com.rabbit.client包下的类public ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin123");connectionFactory.setVirtualHost("/");//设置通道缓存最大值connectionFactory.setChannelCacheSize(50);//设置缓存模式connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);//设置最大连接数connectionFactory.setConnectionLimit(50);return connectionFactory;}//配置RabbitTemplate模板操作bean@Beanpublic RabbitTemplate rabbitTemplate(){return new RabbitTemplate(connectionFactory());}//配置消息队列bean@Beanpublic Queue myQueue(){return new Queue("myQueue");}@Beanpublic Queue topicQueue(){return new Queue("topicQueue");}@Beanpublic Queue testQueue(){return new Queue("testQueue");}@Beanpublic Queue fanoutQueue(){return new Queue("fanoutQueue");}//配置direct类型交换机@Beanpublic DirectExchange directExchange(){return new DirectExchange("directExchange",true,false);}//配置topic类型交换机@Beanpublic TopicExchange topicExchange(){return new TopicExchange("topicExchange",true,false);}//配置fanout型交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange",true,false);}//配置direct类型交换机的绑定@Beanpublic Binding directBinding(){return BindingBuilder.bind(myQueue()).to(directExchange()).with("direct.key");}//配置第二个direct类型交换机的绑定,用于测试不按固定啊绑定路由键发送消息时的场景@Beanpublic Binding testBinding(){return BindingBuilder.bind(testQueue()).to(directExchange()).with("test.key");}//配置topic类型交换机的绑定@Beanpublic Binding topicBinding(){return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*");}//配置fanout类型交换机的绑定@Beanpublic Binding fanoutBinding(){return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());}}
3.2.2 message-consumer
项目配置文件与bean
application.yaml
配置与启动类:
在消息消费项目中我们无需配置交换机、消息队列,只需要配置rabbitmq
的客户端连接即可
application.yaml
server:port: 8082servlet:context-path: /messge-consumer
spring:rabbitmq:virtual-host: /host: localhostuser: guestpassword: guestport: 5672
spring-boot-starter-amqp
模块中的条件配置类会根据spring.rabbitmq
的前缀自动配置连接工厂和RabbitTemplate
的bean实例
启动类
MessageConsumerApplication.java
@SpringBootApplication
public class MessageConsumerApplication {public static void main(String[] args){SpringApplication.run(MessageConsumerApplication.class,args);}}
3.3 完成生产消息与消费消息业务代码
3.3.1 完成生产消息逻辑
在message-producer
项目下完成使用接口投递消息的逻辑
@RestController
@RequestMapping("/rabbitmq")
public class RabbitController {@Autowiredprivate RabbitTemplate rabbitTemplate;//向direct交换机投递消息@GetMapping("sendDirect")public Map<String,Object> testDirectMessage(@RequestParam("message") String message){rabbitTemplate.convertAndSend("directExchange","direct.key",message);Map<String,Object> resMap = new HashMap<>();resMap.put("status",200);resMap.put("message","ok");resMap.put("data","hello "+message);return resMap;}//向第二个direct交换机投递消息,投送时路由键不与交换机绑定的路由键一致@GetMapping("sendDirect1")public Map<String,Object> testSendDirectMessage1(@RequestParam("message") String message){//测试投递的路由键不是direct型交换机中绑定的路由场景rabbitTemplate.convertAndSend("directExchange","test.queue",message);Map<String,Object> resMap = new HashMap<>();resMap.put("status",200);resMap.put("message","ok");resMap.put("data","hello "+message);return resMap;}//向topic型交换机投递消息@PostMapping("sendTopic")public Map<String,Object> testSendTopicObjectMessage(@RequestBody ProductOrder message){rabbitTemplate.convertAndSend("topicExchange","topic.order",message);Map<String,Object> resMap = new HashMap<>();resMap.put("status",200);resMap.put("message","ok");resMap.put("data",message);return resMap;}//headers交换机使用的不多,这里就不放测试demo了
上面的代码中使用接口的方式模拟生产和投递消息,这里要注意调用RabbitTemplate#convertAndSend
方法时最好使用convertAndSend(String exchange, String routingKey, final Object object)
方法,第一个参数为要投递的交换机名,第二个参数为路由键,第三个参数为任意序列化对象类型的消息;
如果调用的是convertAndSend(String routingKey, final Object object)
方法,很可能会导致消息无法消费,作者亲自踩过坑。
3.3.2 消费消息逻辑
在message-consumer
项目下完成消费消息的逻辑
- 测试消费direct型交换机转发到
myQueue
消息队列中的消息
DirectConsumer.java
@RabbitListener(queues = {"myQueue"})
@Component
public class DirectConsumer {private static Logger logger = LoggerFactory.getLogger(DirectConsumer.class);@RabbitHandlerpublic void consumeDirectMessage(String message)throws Exception{logger.info("myQueue收到消息:"+message);}
}
- 测试消费direct型交换机转发到
testQueue
消息队列中的消息
TestQueueConsumer.java
@Component
@RabbitListener(queues = {"testQueue"})
public class TestQueueConsumer {private static Logger logger = LoggerFactory.getLogger(TestQueueConsumer.class);@RabbitHandlerpublic void consumeMessage(String message)throws Exception{logger.info("testQueue收到消息:"+message);}
}
- 测试消费topic型交换机转发到
topicQueue
TopicConsumer.java
@Component
@RabbitListener(queues = {"topicQueue"})
public class TopicConsumer {private Logger logger = LoggerFactory.getLogger(TopicConsumer.class);@RabbitHandlerpublic void consumeTopicMessage(ProductOrder order)throws Exception{//在这里可以根据订单的信息进行订单数据持久化以及查库存的逻辑处理logger.info("topicQueue接收到消息:order={}", JSON.toJSON(order));}
}
- 测试消费fanout型交换机转发到
fanoutQueue
FanoutQueueConsumer.java
@Component
@RabbitListener(queues = {"fanoutQueue"})
public class FanoutQueueConsumer {Logger logger = LoggerFactory.getLogger(FanoutQueueConsumer.class);@RabbitHandlerpublic void consumeMessage(String message)throws Exception{logger.info("fanoutQueue收到消息:"+message);}
}
4 测试生产消息与消费消息
在IDEA中以debug
模式依次运行message-producer和
message-consumer两个项目下的启动类中的
main`函数
启动两个项目
4.1 测试投递到direct和fanout类型交换机中的String类型信息消费情况
依次在浏览器地址栏中输入
http://localhost:8081/messge-producer/rabbitmq/sendDirect?message=rabbitmq
http://localhost:8081/messge-producer/rabbitmq/sendDirect1?message=rabbitmq
http://localhost:8081/messge-producer/rabbitmq/sendFanout?message=hellow-Fanout
可以看到message-consumer
项目的控制台中输出如下信息
INFO 20360 --- [ntContainer#0-1] c.h.r.m.c.rabbitmq.DirectConsumer : myQueue收到消息:rabbitmq
INFO 24004 --- [ntContainer#1-1] c.h.r.m.c.rabbitmq.FanoutQueueConsumer : fanoutQueue收到消息:hellow-Fanout
调用第二个接口生产消息并投递到direct型交换机中的消息因为与绑定的路由键 不一致,没有投递到testQueue
消息队列中去,因而没有被它对对于的消费者TestQueueConsumer
消费,因而没有输出相应的日志信息
4.2 测试投递到topic类型交换机中的对ProductOrder
类型消息消费情况
在postman
中调用Post类型接口
http://localhost:8081/messge-producer/rabbitmq/sendTopic
//入参请求体,row类型application/json 格式
{"productId": "huawei1001","productName": "华为P30手机","categoryId": "hauweiPhone","price": 2950.0,"count": 1
}
可看到消息消费端控制台输出如下日志信息:
INFO 24004 --- [ntContainer#3-1] c.h.r.m.consumer.rabbitmq.TopicConsumer : topicQueue接收到消息:order={"productId":"huawei1001","orderId":"5fb3190d-ee82-4960-8fa4-d751653f3f9d","price":2950.0,"count":1,"categoryId":"hauweiPhone","productName":"华为P30手机","timestamp":"2020-07-26:11:48:33"}
再到rabbitmq
服务的管理页面查看交换机和消息队列信息
发现自定义的交换机都出现在了exchange
管理页面,其中amq.direct、amq.fanout、amq.headers、amq.match、qmq.rabbit.trace、amq.topic
为系统默认的交换机,右边的type字段表示交换机的类型,如果启动消息生产者项目后发现交换机中没有配置文件中定义的交换机,则需要在rabbitmq
管理页面手动创建,点击下面的Add new exchange
按钮输入交换机的名称和类型即可
消息生产者中自定义的消息队列同意出现在了Queues
管理页面,同样如果启动消息生产者项目后配置文件中定义的消息队列没有出现在该页面,则需要手动创建,点击下面的Add new Queue
按钮输入Name选项值即可完成创建新的消息队列。
5 采坑总结
5.1 客户端不必要配置项在消息消费异常导致的踩坑
在运行本文的测试demo
时作者踩了大半天的坑,其中一个坑就当消费者消费消息异常时,消费端控制台不停抛出ListenerExecutionFailedException
这个异常,说明程序在不停地消费异常消息。异常信息如下:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'no match' threw exceptionat org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:198) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
caused by: org.springframework.amqp.AmqpException: No method found for class com.hsf.rabbitmq.message.ProducerOrder
网上查了资料说时下面这个原因:
根据官方文档说明,是 consumer 在消息消费时发生了异常, 默认情况下,该消息会被 reject, 并重新回到队列中, 但如果异常发生在到达用户代码之前的异常,此消息将会一直投递。
最让作者头疼的是最开始ProductOrder
实体类在消息生产者和消息消费者两个项目中均有定义,造成两个项目中的该实体类全限定类名不一致导致消费消息报错,而后即使把这个投递ProductOrder
类型的接口注释,把对于的交换机和队列从rabbitmq
管理页面删除还是会继续报这个异常,后来发现是因为在消息消费者项目的配置中加入了下面两行配置,导致消息消费失败的话即使项目重启还是会一直报消费异常,找不到对应的消息消费方法
spring:rabbitmq:publisher-confirms: truepublisher-returns: true
后面把上面两行配置代码注释,并依次在消息生产者和消息消费者项目的根目录下执行mvn clean install
重新打包编译后重启项目后这一问题才算解决
5.2 消息对象实体类全限定名不一致导致不停的消费异常消息的坑
这个坑要是同时在消息生产者和消息消费者中定义了ProducerOrder
实体类,造成消费消息反序列化时全限定名与投递过来的消息全限定名不一致导致的,解决的办法是把消息实体类抽出到一个公共的模块中,然后再消息生产者和消费者项目的依赖性中引用公共模块的依赖。
5.4 一个交换机绑定多个消息队列的导致的坑
一个交换机绑定多个消息队列后会造成只有第一个绑定该交换机的消息队列能被投递消息,其他绑定的消息队列都不会投递消息,也是也就造成无法从其他消息队列中消费消息的问题。解决的办法是给每个需要绑定的消息队列配置一个单独唯一的Exchange
。
5.5 在消费方法中使用Message类和byte[]接受消息产生的坑
@Component
@RabbitListener(queues = {"testQueue"})
public class TestQueueConsumer {private static Logger logger = LoggerFactory.getLogger(TestQueueConsumer.class);@RabbitHandlerpublic void consumeMessage(Message message)throws Exception{String messageBody = new String(message.getBody(),"UTF-8");logger.info("testQueue收到消息:"+messageBody);}
}
@RabbitHandlerpublic void consumeMessage(byte[] message)throws Exception{String messageBody = new String(message,0,message.length,"UTF-8");logger.info("testQueue收到消息:"+messageBody);}
例如使用上面两种方式中的任何接收消息会导致消息消费端出现下面这种消息消费异常:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'no match' threw exceptionat org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:198) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
Caused by: org.springframework.amqp.AmqpException: No method found for class java.lang.String
解决上面这种异常的办法是将接受消息改为发送消息时对应的类,一般是String类型;同时可能还需要再rabbitmq
管理页面同时删掉消费异常对应的Exchange
和Queue
,然后手动新建与原来相同的Exchange和
Queue`,最后再重启项目
上面这些坑一个很重要的原因就是我们才刚入门Spring-AMQP
,没有配置死信队列和配置重复消费次数以及异常消费消息时的处理方法,等涩会给你如学习了Spring-AMQP
项目之后,我们会发现很多问题自然迎刃而解,而且还能弄懂产生消费异常的具体深层原因,从根本是防止消费消息异常的发生。
点个再看,持续关注作者,后面的文章会发布深入学习SpringBoot
整合AMQP
和实战Demo的系列文章
6 小结
本文系统了讲解了rabbitmq
整合springboot
项目,结合图文详细演示了一个集合消息生产者和消息消费者很公共模块的聚合项目的搭建,演示了使用direct、topic和fanout
三种交换机从生产者投递消息到消费端消费消息的详细过程,并结合作者踩过的坑给出了具体的解决办法,让读者在整合AMQP
开发需求时少走很多弯路!
参考资料
[1] Spring AMQP参考文档
[2] 黄朝兵的达人课04整合常用技术框架之 MongoDB 和 RabbitMQ](https://gitbook.cn/gitchat/column/5b4fd439bf8ece6c81e44cfb/topic/5b50254103f7e37c51456ee9/ “[2] 黄朝兵的达人课04整合常用技术框架之 MongoDB 和 RabbitMQ”)
[3] 王松著《SpringBoot
+Vue
全栈开发实战》第12章消息服务
[4]等等!这两个 Spring-RabbitMQ 的坑我们已经替你踩了
原创不易,首次阅读作者文章的读者如果觉得文章对你有帮助欢迎扫描下方二位二维码关注作者的微信公众号,作者的微信公众号将第一时间不断输出技术干货。
你的关注和点赞是作者持续创作的最大动力!
SpringBoot整合RabbitMq实战(一)相关推荐
- RabbitMQ 第一天 基础 6 SpringBoot 整合RabbitMQ
RabbitMQ [黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战] 文章目录 RabbitMQ 第一天 基础 6 SpringBoot 整合RabbitMQ 6.1 Sprin ...
- 九、springboot整合rabbitMQ
springboot整合rabbitMQ 简介 rabbitMQ是部署最广泛的开源消息代理. rabbitMQ轻量级,易于在内部和云中部署. 它支持多种消息传递协议. RabbitMQ可以部署在分布式 ...
- RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ
什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...
- SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka
1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...
- RabbitMq(九) SpringBoot整合RabbitMQ消费者示例代码
概述 在上一篇我们介绍了SpringBoot整合RabbitMQ生产者代码,本章我们介绍SpringBoot整合RabbitMQ,实现消费者工程的代码实现.与生产者集成相比,集成消费者不需要进行添加配 ...
- RabbitMq(八) SpringBoot整合RabbitMQ 生产者代码实现
在本章中我们将创建RabbitMQ的生产者工程,并实现生产者端代码实现. springboot整合RabbitMQ生产者工程步骤如下: 创建maven工程 引入springboot及RabbitMQ依 ...
- SpringBoot整合kafka实战之带回调的生产者
本文来说下SpringBoot整合kafka部分知识内容 文章目录 带回调的生产者 方式一 方式二 本文小结 带回调的生产者 前面我们说了简单的生产和消费,本文说下带回调的生产者.kafkaTempl ...
- Springboot整合一之Springboot整合RabbitMQ
前言 目前,springboot已然成为了最热的java开发整合框架,主要是因其简单的配置,并且本身提供了很多与第三方框架的整合,甚至可以让我们在短短的几分钟里就可以搭建一个完整的项目架构.所以,博主 ...
- Springboot整合RabbitMQ,包含direct,topic,fanout三种模式的整合
一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿 ...
最新文章
- log4j 压缩日志_Spring Boot 日志各种使用姿势,是时候捋清楚了!
- 在mybatis中模糊查询有三种写法
- TextView-- 测量文字宽度
- Spring 事务之@Transactional
- 三种方法实现CSS三栏布局
- Linux 802.11 Driver Developer’s Guide
- ios 自己服务器 苹果支付_iOS应用内支付(IAP)服务端端校验详解
- 在PHP服务器上使用JavaScript进行缓慢的Loris攻击[及其预防措施!]
- python3.6与3.7的区别_选择 Python3.6 还是 Python 3.7
- netbeans php 安装教程,php_xdebug安装+NetBeans的配置和使用
- SPRING IN ACTION 第4版笔记-第四章ASPECT-ORIENTED SPRING-008-带参数的ADVICE
- 初步解决leiningen配置到Eclipse中出错的问题
- LINUX下载编译libc(glibc)
- 微信绑定的卡服务器,微信亲属卡有什么作用 微信亲属卡怎么绑定
- Windows中I/O完成端口机制详解
- php网页读取sql数据库数据模板,discuz模板中直接读取数据库中的插件数据
- (Attention机制原文)论文阅读:Neural Machine Translation by Jointly Learning to Align and Translate
- Error 遇到错误:请求通道在等待 00:01:00 以后答复时超时。增加传递给请求调用的超时值,或者增加绑定上的 SendTimeout 值。分配给此操作的时间可能已经是更长超时的一部分
- 最新冰盾DDoS防火墙V9.1 新增防护功能更强大
- [附源码]Nodejs计算机毕业设计汽车维修服务系统Express(程序+LW)
热门文章
- JAVA小实验——接口与继承
- 星战 java_星战知识之多少 -- 暗黑原力西斯(Sith)篇
- [leetcode 面试题 17.17] -- 多次搜索,KMP与字典树
- git pull origin master与git pull --rebase origin master的区别
- 现代战争的制胜法宝?-黑科技原子无线电技术应用前景及最新研究进展
- 每一个软件开发人员绝对必须掌握的关于 Unicode 和字符集的最基础的知识 - A
- 学校计算机教室自查报告,多媒体教室自查报告
- 携手鸿蒙HarmonyOS背后,美的的阳谋
- 推荐算法之逻辑回归模型族
- 一个公众号,多个商户ID绑定