项目地址:
https://gitee.com/michael-spica-micro-service/michael-spica-rabbitmq.git

简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。

特性

  • 可伸缩性:集群服务
  • 消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存

实战

概述

管理后台介绍

RabbitMQ安装完后,输入http://localhost:15672/ ,可以访问后台的管理界面,如:

在这个管理后台,可以手动创建虚拟host,创建用户,分配权限,创建交换机,创建队列等;还能查看队列消息、消费效率、推送效率等。

消息推送到接收的流程简介


黄色的生产者就是消息推送服务,将消息推送到中间方框里面,也就是RabbitMQ的服务器,然后经过服务器里面的交换机、队列等各种关系将数据处理放入队列后,最终右边的蓝色圈消费者获取对应监听的消息。

交换机简介

常用交换机有以下3种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种:

Direct Exchange 直连型交换机 根据消息携带的路由键将消息投递给对应队列。

大致流程,将一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key;然后当一个消息携带着路由值为x,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值x去寻找绑定值也是x的队列。

Topic Exchange 主题交换机 其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。

#(井号) 用来表示任意数量(零个或多个)单词
*(星号) 用来表示一个单词 (必须出现的)

Fanout Exchange 扇型交换机 没有路由键概念,就算你绑了路由键也是无视的;交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

编码

此次实例共3个模块:
michael-spica-rabbitmq-common 公共模块
michael-spica-rabbitmq-producer 生产者 - 消息生产推送实例
michael-spica-rabbitmq-consumer生产者 - 消息消费实例

首先,创建 michael-spica-rabbitmq-common

创建Constants.java

package michael.spica.rabbitmq.common;/*** 常量* <p>* Created by michael on 2019-04-16.*/
public class Constants {public static final int MESSAGE_NUMBER = 3;// 消息数量/*** 直接*/public interface Direct {// 交换机名称String EXCHANGE = "TestDirectExchange";String EXCHANGE_SPECIAL = "specialDirectExchange";// 队列名称String QUEUE = "TestDirectQueue";// 绑定「队列」与「交换机」名称String ROUTING_KEY = "TestDirectRouting";}/*** 主题*/public interface Topic {// 交换机名称String EXCHANGE = "TestTopicExchange";// 绑定键String MAN = "topic.man";String WOMAN = "topic.woman";// 绑定「队列」与「交换机」名称String ROUTING_KEY = "topic.#";}/*** 扇形*/public interface Fanout {// 交换机名称String EXCHANGE = "TestFanoutExchange";// 队列名称String QUEUE_A = "fanout.A";String QUEUE_B = "fanout.B";String QUEUE_C = "fanout.C";}
}

创建 michael-spica-rabbitmq-producer
pom.xml依赖

<dependency><groupId>michael.spica.rabbitmq.common</groupId><artifactId>michael-spica-rabbitmq-common</artifactId><version>0.0.1-SNAPSHOT</version>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>

application.yml 配置

server:port: 8082spring:application:name: rabbitmq-producer# 配置 RabbitMQ 服务器rabbitmq:host: 127.0.0.1port: 5672 # 注意:这里的端口不要写成「15672」;5672 为 AMQP端口;15672 为网页管理username: rootpassword: root

直连型交换机「Direct Exchange」

package michael.spica.rabbitmq.producer.config;import michael.spica.rabbitmq.common.Constants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Created by michael on 2019-04-16.*/
@Configuration
public class RabbitMQOfDirectConfig {/*** 队列 起名:TestDirectQueue** @return*/@Beanpublic Queue directQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
//        return new Queue(Constants.Direct.QUEUE, true, true, false);// 一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue(Constants.Direct.QUEUE, true);}/*** Direct交换机 起名:TestDirectExchange** @return*/@Beanpublic DirectExchange directExchange() {//        return new DirectExchange(Constants.EXCHANGE_NAME,true,true);return new DirectExchange(Constants.Direct.EXCHANGE, true, false);}@Beanpublic DirectExchange directExchangeOfSpecial() {return new DirectExchange(Constants.Direct.EXCHANGE_SPECIAL);}/*** 绑定* 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting** @return*/@Beanpublic Binding bindingDirect() {return BindingBuilder.bind(directQueue()).to(directExchange()).with(Constants.Direct.ROUTING_KEY);}
}

通过api进行消息推送,也可以根据需求改成定时任务,看具体需求

package michael.spica.rabbitmq.producer.controller;import michael.spica.rabbitmq.common.Constants;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** 直连交换机* <p>* Created by michael on 2019-04-16.*/
@RestController
@RequestMapping("/api/rabbit/direct/exchange")
public class DirectExchangeController extends BaseController {@ResponseBody@GetMapping("/sendMessage")public Map<String, Object> sendMessage() {String msgId = String.valueOf(UUID.randomUUID());String msgData = "hello, I come from direct exchange.";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("msgId", msgId);map.put("msgData", msgData);map.put("createTime", createTime);// 将消息携带绑定键值:TestDirectRouting 发送到指定交换机TestDirectExchangerabbitTemplate.convertAndSend(Constants.Direct.EXCHANGE, Constants.Direct.ROUTING_KEY, map);return map;}
}

运行michael-spica-rabbitmq-producer项目,调用api

因为没有消费者,可以到RabbitMQ后台去看消息是否被推送成功,如:

队列也已经创建,如:

由此可见,消息已经推送到RabbitMQ服务器上了~~

创建 michael-spica-rabbitmq-consumer

pom.xml依赖

<dependency><groupId>michael.spica.rabbitmq.common</groupId><artifactId>michael-spica-rabbitmq-common</artifactId><version>0.0.1-SNAPSHOT</version>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml 配置

server:port: 8081spring:application:name: rabbitmq-consumer# 配置 RabbitMQ 服务器rabbitmq:host: 127.0.0.1port: 5672 # 注意:这里的端口不要写成「15672」;5672 为 AMQP端口;15672 为网页管理username: rootpassword: root

创建消息接收监听类:

package michael.spica.rabbitmq.consumer.listener.direct;import michael.spica.rabbitmq.common.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** Created by michael on 2019-04-16.*/
@Component
@RabbitListener(queues = Constants.Direct.QUEUE)// 监听的队列名称 TestDirectQueue
public class DirectReceiver {@RabbitHandlerpublic void process(Map<String, Object> message) {System.out.println("DirectReceiver消费者收到消息: " + message.toString());}
}

启动项目,然后就可以看到之前推送的那条消息被消费了,如:

DirectReceiver消费者收到消息: {createTime=2021-04-19 15:47:52, msgId=bf6230b7-53e7-499d-a0e6-5879dfc17170, msgData=hello, I come from direct exchange.}

直连交换机是一对一,那如果咱们配置多台监听绑定到同一个直连交互的同一个队列,会怎么样?

直连交换机是一对一的,如果配置多台监听,并绑定到同一个直连交互的同一个队列,如:

可以看出是实现了轮询的方式对消息进行消费,且不存在重复消费。

主题交换机「Topic Exchange」

michael-spica-rabbitmq-producer项目里面创建RabbitMQOfTopicConfig,如:

package michael.spica.rabbitmq.producer.config;import michael.spica.rabbitmq.common.Constants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Created by michael on 2019-04-16.*/
@Configuration
public class RabbitMQOfTopicConfig {@Beanpublic Queue firstQueue() {return new Queue(Constants.Topic.MAN);}@Beanpublic Queue secondQueue() {return new Queue(Constants.Topic.WOMAN);}@Beanpublic TopicExchange exchange() {return new TopicExchange(Constants.Topic.EXCHANGE);}/*** 将firstQueue和topicExchange绑定,而且绑定的键值为 topic.man* 这样只要是消息携带的路由键是 topic.man, 才会分发到该队列** @return*/@Beanpublic Binding bindingExchangeMessage() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(Constants.Topic.MAN);}/*** 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#* 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列** @return*/@Beanpublic Binding bindingExchangeMessage2() {return BindingBuilder.bind(secondQueue()).to(exchange()).with(Constants.Topic.ROUTING_KEY);}
}

主题交换机推送消息:

package michael.spica.rabbitmq.producer.controller;import michael.spica.rabbitmq.common.Constants;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Stream;/*** 主题交换机* <p>* Created by michael on 2019-04-16.*/
@RestController
@RequestMapping("/api/rabbit/topic/exchange")
public class TopicExchangeController extends BaseController {@ResponseBody@GetMapping("/sendMessage/man")public List<Map<String, Object>> sendMessageForMan() {List<Map<String, Object>> mapList = new ArrayList<>();Stream.iterate(0, i -> i + 1).limit(Constants.MESSAGE_NUMBER).forEach(i -> {String msgId = String.valueOf(UUID.randomUUID());String msgData = "hello, I come from topic exchange「man」.";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("msgId", msgId);map.put("msgData", msgData);map.put("createTime", createTime);rabbitTemplate.convertAndSend(Constants.Topic.EXCHANGE, Constants.Topic.MAN, map);mapList.add(map);});return mapList;}@ResponseBody@GetMapping("/sendMessage/woman")public List<Map<String, Object>> sendMessageForWoman() {List<Map<String, Object>> mapList = new ArrayList<>();Stream.iterate(0, i -> i + 1).limit(Constants.MESSAGE_NUMBER).forEach(i -> {String msgId = String.valueOf(UUID.randomUUID());String msgData = "hello, I come from topic exchange「woman」.";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("msgId", msgId);map.put("msgData", msgData);map.put("createTime", createTime);rabbitTemplate.convertAndSend(Constants.Topic.EXCHANGE, Constants.Topic.WOMAN, map);mapList.add(map);});return mapList;}
}

michael-spica-rabbitmq-consumer项目上,创建TopicManReceiver,如:

package michael.spica.rabbitmq.consumer.listener.topic;import michael.spica.rabbitmq.common.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** Created by michael on 2019-04-19.*/
@Component
@RabbitListener(queues = Constants.Topic.MAN)
public class TopicManReceiver {@RabbitHandlerpublic void process(Map<String, Object> message) {System.out.println("TopicManReceiver消费者收到消息: " + message.toString());}
}

再创建TopicTotalReceiver,如:

package michael.spica.rabbitmq.consumer.listener.topic;import michael.spica.rabbitmq.common.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** Created by michael on 2019-04-19.*/
@Component
@RabbitListener(queues = Constants.Topic.WOMAN)
public class TopicTotalReceiver {@RabbitHandlerpublic void process(Map<String, Object> message) {System.out.println("TopicTotalReceiver消费者收到消息: " + message.toString());}
}

michael-spica-rabbitmq-producermichael-spica-rabbitmq-consumer都运行起来,然后分别调用sendMessageForMan()sendMessageForWoman()方法:

调用sendMessageForMan()
然后看消费者michael-spica-rabbitmq-consumer的控制台输出情况:
TopicManReceiver监听队列1,绑定键为:topic.man
TopicTotalReceiver监听队列2,绑定键为:topic.#
而当前推送的消息,携带的路由键为:topic.man

所以,可以看到两个监听消费者receiver都成功消费到了消息,因为这两个recevier监听的队列的绑定键都能与这条消息携带的路由键匹配上。

调用sendMessageForWoman() 方法:

然后看消费者michael-spica-rabbitmq-consumer的控制台输出情况:
TopicManReceiver监听队列1,绑定键为:topic.man
TopicTotalReceiver监听队列2,绑定键为:topic.#
而当前推送的消息,携带的路由键为:topic.woman

所以,可以看到两个监听消费者只有TopicTotalReceiver成功消费到了消息。

扇型交换机「Fanout Exchang」

michael-spica-rabbitmq-producer项目上创建RabbitMQOfFanoutConfig :

package michael.spica.rabbitmq.producer.config;import michael.spica.rabbitmq.common.Constants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Created by michael on 2019-04-19.*/
@Configuration
public class RabbitMQOfFanoutConfig {/*** 创建三个队列 :fanout.A   fanout.B  fanout.C* 将三个队列都绑定在交换机 fanoutExchange 上* 因为是扇型交换机, 路由键无需配置, 配置也不起作用*/@Beanpublic Queue queueA() {return new Queue(Constants.Fanout.QUEUE_A);}@Beanpublic Queue queueB() {return new Queue(Constants.Fanout.QUEUE_B);}@Beanpublic Queue queueC() {return new Queue(Constants.Fanout.QUEUE_C);}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(Constants.Fanout.EXCHANGE);}@Beanpublic Binding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}@Beanpublic Binding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}@Beanpublic Binding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}
}

通过api进行消息推送:

package michael.spica.rabbitmq.producer.controller;import michael.spica.rabbitmq.common.Constants;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Stream;/*** 扇型交换机* <p>* Created by michael on 2019-04-19.*/
@RestController
@RequestMapping("/api/rabbit/fanout/exchange")
public class FanoutExchangController extends BaseController {@ResponseBody@GetMapping("/sendMessage")public Map<String, Object> sendMessage() {String msgId = String.valueOf(UUID.randomUUID());String msgData = "message: test Fanout Message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("msgId", msgId);map.put("msgData", msgData);map.put("createTime", createTime);rabbitTemplate.convertAndSend(Constants.Fanout.EXCHANGE, null, map);return map;}
}

michael-spica-rabbitmq-consumer项目里加上消息消费类,如:

FanoutReceiverA.java:

package michael.spica.rabbitmq.consumer.listener.fanout;import michael.spica.rabbitmq.common.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** Created by michael on 2019-04-19.*/
@Component
@RabbitListener(queues = Constants.Fanout.QUEUE_A)
public class FanoutReceiverA {@RabbitHandlerpublic void process(Map<String, Object> message) {System.out.println("FanoutReceiverA消费者收到消息: " + message.toString());}
}

FanoutReceiverB.java:

package michael.spica.rabbitmq.consumer.listener.fanout;import michael.spica.rabbitmq.common.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** Created by michael on 2019-04-19.*/
@Component
@RabbitListener(queues = Constants.Fanout.QUEUE_B)
public class FanoutReceiverB {@RabbitHandlerpublic void process(Map<String, Object> message) {System.out.println("FanoutReceiverB消费者收到消息: " + message.toString());}
}

FanoutReceiverC.java:

package michael.spica.rabbitmq.consumer.listener.fanout;import michael.spica.rabbitmq.common.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** Created by michael on 2019-04-19.*/
@Component
@RabbitListener(queues = Constants.Fanout.QUEUE_C)
public class FanoutReceiverC {@RabbitHandlerpublic void process(Map<String, Object> message) {System.out.println("FanoutReceiverC消费者收到消息: " + message.toString());}
}

最后将michael-spica-rabbitmq-producermichael-spica-rabbitmq-consumer项目都运行起来,调用sendMessage()方法,如:

然后,看michael-spica-rabbitmq-consumer项目的控制台情况:

所以,可以看到只要发送到fanoutExchange这个扇型交换机的消息, 三个队列都绑定这个交换机,所以三个消息接收类都监听到了这条消息。

消息确认(即:消息回调)

即:生产者推送消息成功,消费者接收消息成功

ps:本文实例使用的是SpringBoot 2.3.2.RELEASE;如果在配置确认回调,测试发现无法触发回调函数,那么存在原因也许是因为版本导致的配置项不起效,可以把 publisher-confirms: true 替换为 publisher-confirm-type: correlated

michael-spica-rabbitmq-producer项目的application.yml文件上,加上消息确认的配置项:

server:port: 8082spring:application:name: rabbitmq-producer# 配置 RabbitMQ 服务器rabbitmq:host: 127.0.0.1port: 5672 # 注意:这里的端口不要写成「15672」;5672 为 AMQP端口;15672 为网页管理
#    username: guest
#    password: guestusername: rootpassword: root# 消息确认配置项# ps: 如果在配置确认回调,测试发现无法触发回调函数,那么存在原因也许是因为版本导致的配置项不起效,可以把publisher-confirms: true 替换为  publisher-confirm-type: correlated# publisher-confirms: true # 确认消息已发送到交换机(Exchange)publisher-confirm-type: correlatedpublisher-returns: true # 确认消息已发送到队列(Queue)

配置相关的消息确认回调函数,RabbitMQConfig.java

package michael.spica.rabbitmq.producer.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Created by michael on 2019-04-19.*/
@Configuration
public class RabbitMQConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);// 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("ConfirmCallback: 相关数据:" + correlationData);System.out.println("ConfirmCallback: 确认情况:" + ack);System.out.println("ConfirmCallback: 原因:" + cause);}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("ReturnCallback: 回应码:" + replyCode);System.out.println("ReturnCallback: 回应信息:" + replyText);System.out.println("ReturnCallback: 消息:" + message);System.out.println("ReturnCallback: 交换机:" + exchange);System.out.println("ReturnCallback: 路由键:" + routingKey);}});return rabbitTemplate;}
}

至此,生产者推送消息的消息确认调用回调函数已经完毕。

可以看到上面写了两个回调函数,一个叫 ConfirmCallback,一个叫 RetrunCallback;那么以上这两种回调函数都是在什么情况会触发呢?

先从总体的情况分析,推送消息存在四种情况:

1.消息推送到server,但是在server里找不到交换机
把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的)

@GetMapping("/testMessageAck")
public String TestMessageAck() {String msgId = String.valueOf(UUID.randomUUID());String msgData = "message: non-existent-exchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("msgId", msgId);map.put("messageData", msgData);map.put("createTime", createTime);// 把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的)// 这种情况触发的是 ConfirmCallback 回调函数rabbitTemplate.convertAndSend("non-existent-exchange", Constants.Direct.ROUTING_KEY, map);return "ok";
}

调用接口,查看michael-spica-rabbitmq-producer项目的控制台输出情况(原因里面有说,没有找到交换机’non-existent-exchange’):

ConfirmCallback: 相关数据:null
ConfirmCallback: 确认情况:false
ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)

结论: 这种情况触发的是 ConfirmCallback回调函数。

2.消息推送到server,找到交换机了,但是没找到队列
把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的)

这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在RabbitMQOfDirectConfig里面新增一个直连交换机,名叫specialDirectExchange,但没给它做任何绑定配置操作:

@Bean
public DirectExchange directExchangeOfSpecial() {return new DirectExchange(Constants.Direct.EXCHANGE_SPECIAL);
}
@GetMapping("/testMessageAck2")
public String TestMessageAck2() {String msgId = String.valueOf(UUID.randomUUID());String msgData = "message: specialDirectExchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("msgId", msgId);map.put("msgData", msgData);map.put("createTime", createTime);// 把消息推送到名为‘specialDirectExchange’的交换机上(这个交换机是没有任何队列配置的)// 这种情况触发的是 ConfirmCallback和RetrunCallback 两个回调函数rabbitTemplate.convertAndSend(Constants.Direct.EXCHANGE_SPECIAL, Constants.Direct.ROUTING_KEY, map);return "ok";
}

调用接口,查看michael-spica-rabbitmq-producer项目的控制台输出情况:

ReturnCallback: 回应码:312
ReturnCallback: 回应信息:NO_ROUTE
ReturnCallback: 消息:(Body:'{createTime=2021-04-19 17:36:29, messageId=65166056-dcd0-4167-806d-65da877eab92, messageData=message: specialDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback: 交换机:specialDirectExchange
ReturnCallback: 路由键:TestDirectRouting
ConfirmCallback: 相关数据:null
ConfirmCallback: 确认情况:true
ConfirmCallback: 原因:null

可以看到这种情况,两个函数都被调用了;这种情况下,消息是推送成功到服务器了的,所以ConfirmCallback对消息确认情况是true;而在RetrunCallback回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE

结论: 这种情况触发的是 ConfirmCallbackRetrunCallback两个回调函数。

3.消息推送到sever,交换机和队列都没找到
把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机不存在,且没有任何队列配置的)

这种情况其实一看就觉得跟①很像,没错 ,③和①情况回调是一致的

@GetMapping("/testMessageAck3")
public String TestMessageAck3() {String msgId = String.valueOf(UUID.randomUUID());String msgData = "message: non-existent-exchange && non-existent-routingKey test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("msgId", msgId);map.put("msgData", msgData);map.put("createTime", createTime);// 把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机不存在,且没有任何队列配置的)// 这种情况触发的是 ConfirmCallback 回调函数rabbitTemplate.convertAndSend("non-existent-exchange", "non-existent-routingKey", map);return "ok";
}

调用接口,查看michael-spica-rabbitmq-producer项目的控制台输出情况(原因里面有说,没有找到交换机’non-existent-exchange’):

ConfirmCallback: 相关数据:null
ConfirmCallback: 确认情况:false
ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)

结论: 这种情况触发的是 ConfirmCallback回调函数。

4.消息推送成功

@GetMapping("/testMessageAck4")
public String TestMessageAck4() {String msgId = String.valueOf(UUID.randomUUID());String msgData = "message: non-existent-exchange && non-existent-routingKey test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("msgId", msgId);map.put("msgData", msgData);map.put("createTime", createTime);// 把消息推送到名为‘TestFanoutExchange’的交换机上// 这种情况触发的是 ConfirmCallback 回调函数rabbitTemplate.convertAndSend(Constants.Fanout.EXCHANGE, null, map);return "ok";
}

调用接口,查看michael-spica-rabbitmq-producer项目的控制台输出情况:

ConfirmCallback: 相关数据:null
ConfirmCallback: 确认情况:true
ConfirmCallback: 原因:null

结论: 这种情况触发的是 ConfirmCallback 回调函数。

以上是生产者推送消息的消息确认、回调函数的使用(可以在回调函数根据需求做对应的扩展或者业务数据处理)。

消费者接收到消息的消息确认机制

和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来,所以,消息接收的确认机制主要存在两种模式:

  • 自动确认
    这也是默认的消息确认情况(AcknowledgeMode.NONE
    RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
    所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
    一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

  • 手动确认
    这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
    消费者收到消息后,手动调用basic.ack basic.nack basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。
    basic.ack用于肯定确认
    basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
    basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息

消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。
basic.nack basic.reject表示没有被正确处理:

着重看下reject,因为有时候一些场景是需要重新入列的:
channel.basicReject(deliveryTag, true); 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。

使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。

但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。

nack 这个也是相当于设置不消费某条消息:
channel.basicNack(deliveryTag, false, true);
第一个参数依然是当前消息到的数据的唯一id;
第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。

同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。

一般的消息接收 手动确认是怎么样的,在消费者michael-spica-rabbitmq-consumer项目里,新建MessageListenerConfig.java上添加代码相关的配置代码:

package michael.spica.rabbitmq.consumer.config;import michael.spica.rabbitmq.common.Constants;
import michael.spica.rabbitmq.consumer.listener.MyAckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Created by michael on 2019-04-19.*/
@Configuration
public class MessageListenerConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;@Autowiredprivate MyAckReceiver myAckReceiver;// 消息接收处理类@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息// 设置一个队列
//        container.setQueueNames(Constants.Direct.QUEUE);container.setQueueNames(Constants.Direct.QUEUE, Constants.Fanout.QUEUE_A);// 如果同时设置多个如下: 前提是队列都是必须已经创建存在的
//        container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");// 另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
//        container.setQueues(new Queue("TestDirectQueue", true));
//        container.addQueues(new Queue("TestDirectQueue2", true));
//        container.addQueues(new Queue("TestDirectQueue3", true));container.setMessageListener(myAckReceiver);return container;}
}

对应的手动确认消息监听类,MyAckReceiver.java(手动确认模式需要实现 ChannelAwareMessageListener):

之前的相关监听器可以先注释掉,以免造成多个同类型监听器都监听同一个队列。
这里的获取消息转换,只作参考,如果报数组越界可以自己根据格式去调整。

package michael.spica.rabbitmq.consumer.listener;import com.rabbitmq.client.Channel;
import michael.spica.rabbitmq.common.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** Created by michael on 2019-04-19.*/
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//因为传递消息的时候用的map传递,所以将Map从Message内取出需要做些处理String msg = message.toString();String[] msgArray = msg.split("'");//可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据Map<String, String> msgMap = toMap(msgArray[1].trim(), 3);String msgId = msgMap.get("msgId");String msgData = msgMap.get("msgData");String createTime = msgMap.get("createTime");String consumerQueue = message.getMessageProperties().getConsumerQueue();//            System.out.println("消费的主题消息来自:" + consumerQueue);
//            System.out.println("MyAckReceiver  msgId:" + msgId + "  msgData:" + msgData + "  createTime:" + createTime);switch (consumerQueue) {case Constants.Direct.QUEUE:System.out.println("消费的消息来自的队列名为:" + consumerQueue);System.out.println("消息成功消费到  msgId:" + msgId + "  msgData:" + msgData + "  createTime:" + createTime);System.out.println("执行TestDirectQueue中的消息的业务处理流程......");break;case Constants.Fanout.QUEUE_A:System.out.println("消费的消息来自的队列名为:" + consumerQueue);System.out.println("消息成功消费到  msgId:" + msgId + "  msgData:" + msgData + "  createTime:" + createTime);System.out.println("执行fanout.A中的消息的业务处理流程......");break;default:break;}channel.basicAck(deliveryTag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
//          channel.basicReject(deliveryTag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}/*** {key=value,key=value,key=value} 格式转换成map** @param str* @param entryNum* @return*/private Map<String, String> toMap(String str, int entryNum) {str = str.substring(1, str.length() - 1);String[] strs = str.split(",", entryNum);Map<String, String> map = new HashMap<>();for (String string : strs) {String[] result = string.split("=");String key = result[0].trim();String value = result[1];map.put(key, value);}return map;}
}

分别往Direct ExchangeFanout Exchange 放入一条消息,可看michael-spica-rabbitmq-consumer项目的控制台,如:

消费的消息来自的队列名为:fanout.A
消息成功消费到  msgId:a42fa2e3-eec8-4074-9635-5e3073fe9350  msgData:hello, I come from fanout exchange.  createTime:2021-04-19 18:04:16
执行fanout.A中的消息的业务处理流程......
FanoutReceiverB消费者收到消息: {createTime=2021-04-19 18:04:16, msgId=a42fa2e3-eec8-4074-9635-5e3073fe9350, msgData=hello, I come from fanout exchange.}
FanoutReceiverC消费者收到消息: {createTime=2021-04-19 18:04:16, msgId=a42fa2e3-eec8-4074-9635-5e3073fe9350, msgData=hello, I come from fanout exchange.}
消费的消息来自的队列名为:TestDirectQueue
消息成功消费到  msgId:88967b14-f99b-43a8-94c8-bed8d850139f  msgData:hello, I come from direct exchange.  createTime:2021-04-19 18:04:28
执行TestDirectQueue中的消息的业务处理流程......

Springboot 整合 RabbitMQ「三种模式使用」相关推荐

  1. RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)

    说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...

  2. SpringBoot整合Dubbo的三种(配置)

    SpringBoot整合Dubbo的三种方式 1.使用application.properties 1.提供者 2.消费者 2.使用dubbo.xml 1.提供者 2.消费者 3.使用注解配置 1.提 ...

  3. SpringBoot整合RabbitMQ 实现五种消息模型

    目录 SpringBoot中使用RabbitMQ 搭建初始环境 引入依赖 配置配置文件 测试类 注入 rabbitTemplate 消息队列RabbitMQ之五种消息模型 第一种直连模型使用 开发生产 ...

  4. SpringBoot整合RocketMQ,三种测试附带源码【rocketmq-spring-boot-starter】

    我们整合boot项目的时候都是引入 xxx-start 依赖,但是现在大多数的整合RocketMQ都还不是这样. 我花了一天时间使用rocketmq-spring-boot-starter整合,使得操 ...

  5. SpringBoot整合RabbitMQ(六大消息模式、消息手动应答机制)

    目录 1.环境搭建 2.队列模式 3.发布订阅模式 4.路由模式 5.主题模式 6.消息手动应答机制 7.回调函数-确认机制(发布确认模式) 1.环境搭建 引入pom: <!-- rabbitM ...

  6. rabbitmq Confirm三种模式

    生产者端confirm模式的实现原理 生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布 的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的 ...

  7. RabbitMQ的三种模式-----主题模式(Topic)

    主题模式(Topic): 任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue 上 1:简述 如上图所示 此类交换器使得来自不同的源头的消息可以到达一 ...

  8. 【夏目鬼鬼分享】StringBoot整合RabbitMQ,使用Direct、Fanout、Topic三种模式

    RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的 ...

  9. Springboot整合RabbitMQ,包含direct,topic,fanout三种模式的整合

    一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿 ...

  10. RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ

    什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...

最新文章

  1. LeetCode_每日一题今日份_312.戳气球(没懂)
  2. Fragment的setUserVisibleHint方法实现延时加载
  3. 如果你是IT技术人员,请思考这15个问题
  4. Java 接受reactjs数据_ReactJS:从API获取数据
  5. GroupBy()方法
  6. JMV监控工具之JConsole
  7. d3.js——饼状图
  8. iMeta封面 | 宏蛋白质组学分析一站式工具集iMetaLab Suite(加拿大渥太华大学Figeys组)...
  9. 机器人仿真的应用价值
  10. Operator ‘==‘ cannot be applied to operands of type ‘byte[]‘ and ‘string‘
  11. 一文彻底搞懂替罪羊树
  12. 安徽计算机软件工程学院,安徽软件工程专业大学排名
  13. OJ 2310 Problem D Mandarin
  14. mt6592android7,实用八核处理器 MTK MT6592M完全解析
  15. vs2017旗舰版_2017年的所有旗舰手机都有明显的妥协。 这是他们告诉我们有关设计的信息。...
  16. SwitchyOmega情景模式
  17. 利用Poi-tl将富文本Html转换为Word文件
  18. 基于STM32的无刷电机驱动板 无感/有感制作
  19. python语言二分之一_Jython
  20. 基于Matlab实现的可视密码图示法设计

热门文章

  1. 【Java 8 新特性】Java LocalDate 和 Epoch 互相转换
  2. 内存颗粒位宽和容量_【科普】内存颗粒版本判断方法和编号解析V2.0
  3. BUAA_4:Kevin·Feng的正确@姿势
  4. php开发电脑i56200u,Intel Core i5-6200U性能跑分和评测 | ZMMOO
  5. 三维视觉之结构光原理详解
  6. 如何解决EXCEL中弹出“信息检索”的信息
  7. nmake -f ms\ntdll.mak 模块计算机类型“X86”与目标计算机类型“x64”冲突
  8. FineBI教程之入门例子
  9. ensp华为防火墙的简单区域划分和配置
  10. Linux查看mpp数据库地址,Linux环境搭建DM8 MPP双节点集群