rabbitmq

  • 一、简介
  • 二、业务场景
    • 1、异步
    • 2、应用解耦
    • 3、流量削峰
  • 三、下载
  • 四、界面认识
  • 五、五种模型示例
    • 0、springboot依赖配置
    • 1、Hello World简单模型
    • 2、Work queues工作队列
    • 3、Publish/Subscribe发布订阅模型
    • 4、Routing路由模型
    • 5、Topics主题模型
    • 6、消息转换器
  • 六、进阶
    • 1、基于插件延迟队列
    • 2、TTL队列
    • 3、死信队列
    • 4、消息确认
      • 1、发送消息确认机制
      • 2、消费者消息确认机制
  • 七、rabbitmq集群搭建
    • 1、普通集群
    • 2、镜像集群(高可用)(推荐)
  • 八、与其他mq的区别
  • 九、rabbitmq常见面试题

一、简介

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言。是面向消息的中间件。

你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。

主要流程:生产者(Producer)与消费者(Consumer)和 RabbitMQ 服务(Broker)建立连接, 然后生产者发布消息(Message)同时需要携带交换机(Exchange) 名称以及路由规则(Routing Key),这样消息会到达指定的交换机,然后交换机根据路由规则匹配对应的 Binding,最终将消息发送到匹配的消息队列(Quene),最后 RabbitMQ 服务将队列中的消息投递给订阅了该队列的消费者(消费者也可以主动拉取消息)。

二、业务场景

1、异步

如: 用户注册发送,注册邮件、注册短信,
传统做法

1、串行 (先发送邮件、再发短信)。问题:持续时间长

2、并行(将注册信息写入数据库后,同时发送邮件、短信),速度快、但不能满足高吞吐需求。

消息队列做法
将数据写入数据库、同时发送消息给发送邮件和注册,异步处理

2、应用解耦

如:双十一购物节,用户下单后、订单系统通知库存系统。

传统做法:
订单系统调用库存系统接口。问题:库存接口故障,订单就会失败,而损失大量订单

消息队列做法

订单系统:下单,订单系统完成持久化,将消息写入队列,返回下单成功给用户
库存系统:订阅下单的消息,获取下单消息,进行库操作,就算库存系统故障,消息队列也能保证消息可靠投递,不会导致消息丢失。

3、流量削峰

如:秒杀活动、一般会因为流量过大,导致应用挂掉,一般在应用前端加入消息队列。

作用:1、可以控制活动人数,超过一定阈值,订单直接丢弃
2、可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

消息队列做法
1、用户的请求,服务器收到后,首先写入消息队列,加入消息队列长度最大值,则直接抛弃用户请求或跳转到错误页面
2、秒杀业务根据消息队列中的请求信息,再做后续处理

三、下载

1、docker 安装 rabbitmq

docker pull rabbitmq:3.7.7-management

2、​​启动镜像(用户名和密码设置为 guest guest)

docker run -dit --restart=always --name rabbitmq3.7.7 -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest  -v /home/rabbitmq/data:/var/lib/rabbitmq   -p 15672:15672 -p 5672:5672 rabbitmq:3.7.7-management

3、访问 rabbitmq 管理界面

http://127.0.0.1:15672 账号密码都是 guest

4、docker 安装 rabbitMQ 延时队列插件(delayed_message_exchange)

下载解压文件 链接:https://pan.baidu.com/s/1PpeOn8NJT4hgh7ZBP0J0OA?pwd=u2gu
提取码:u2gu

拷贝插件文件到 rabbitMQ 的 Docker 容器中
先解压

unzip rabbitmq_delayed_message_exchange-20171201-3.7.x.zip

拷贝插件

docker cp rabbitmq_delayed_message_exchange-20171201-3.7.x.ez rabbitmq3.7.7:/plugins

进入容器:

docker ps  // 查看启动容器信息
docker exec -it 镜像ID /bin/bash    //开启进入终端

查看插件列表

rabbitmq-plugins list

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

四、界面认识

1、概要

2、连接

3、通道

4、交换机

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

Type 解释
direct 它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中
fanout 它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中
headers headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。(headers 类型的交换器性能差,不实用,基本不会使用。)
topic 与direct模型相比,多了个可以使用通配符!,这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以"."分割,例如:item.insert ---------星号 匹配一个1词 , 例audit.* ------- #号匹配一个或多个词 audit.#
x-delayed-message 延迟交换机,可以延迟接收消息
Features 解释
D d 是 durable 的缩写,代表这个队列中的消息支持持久化
AD ad 是 autoDelete 的缩写。代表当前队列的最后一个消费者退订时被自动删除。注意:此时不管队列中是否还存在消息,队列都会删除。
excl 是 exclusive 的缩写。代表这是一个排他队列。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
Args 是 arguments 的缩写。代表该队列配置了 arguments 参数。
TTL 是 x-message-ttl 的缩写。设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒。
Exp Auto Expire,是 x-expires 配置的缩写。当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp。注意这里是删除队列,不是队列中的消息。
Lim 说明该队列配置了 x-max-length。限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉。
Lim B 说明队列配置了 x-max-length-bytes。限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小。
DLX 说明该队列配置了 x-dead-letter-exchange。当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉。
DLK x-dead-letter-routing-key 的缩写,将删除的消息推送到指定交换机的指定路由键的队列中去。
Pri x-max-priority 的缩写,优先级队列。表明该队列支持优先级,先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费。
Ovfl x-overflow 的缩写。队列中的消息溢出时,如何处理这些消息。要么丢弃队列头部的消息,要么拒绝接收后面生产者发送过来的所有消息。有两个配置项:drop-head,代表丢弃队列头部的消息,默认行为;reject-publish 设置队列中的消息溢出后,该队列的行为:”拒绝接收”(所有消息)。
ha-all 镜像队列。all 表示镜像到集群上的所有节点,ha-params 参数忽略。

5、队列


点击名称进去,可以看到队列的详细信息

get Message可以看到消息的内容

arguments具体参数如下:

参数名 作用
x-message-ttl 发送到队列的消息在丢弃之前可以存活时间(毫秒)
x-max-length 队列最大长度
x-expires 队列在被自动删除(毫秒)之前可以使用多长时间
x-max-length-bytes 消息容量限制,该参数是非负整数值。该参数和x-max-length目的一样限制队列的容量,但是这个是靠队列大小(bytes)来达到限制。
x-dead-letter-exchange 设置队列溢出行为。这决定了在达到队列的最大长度时消息会发生什么。有效值为drop-head或reject-publish。交换的可选名称,如果消息被拒绝或过期,将重新发布这些名称
x-dead-letter-routing-key 可选的替换路由密钥,用于在消息以字母为单位时使用。如果未设置,叫使用消息的原始路由密钥
x-max-priority 队列支持的最大优先级数;如果未设置,队列将不支持消息优先级
x-queue-mode 将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用,如果未设置,队列将保留内存缓存以尽快传递消息
x-queue-master-locator 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则

6、用户

就是添加用户和设置用户权限

7、开启日志:
进入容器,输入

rabbitmq-plugins enable rabbitmq_tracing

此时会多一个tracing标签,输入信息添加日志。
Name: 自定义,建议标准点容易区分
Format: 表示输出的消息日志格式,有Text和JSON两种,Text格式的日志方便人类阅读,JSON的方便程序解析。
JSON格式的payload(消息体)默认会采用Base64进行编码,如上面的“trace test payload.”会被编码成“dHJhY2Ug dGVzdCBwYXlsb2FkLg==”。

Max payload bytes: 表示每条消息的最大限制,单位为B。比如设置了了此值为10,那么当有超过10B的消息经过Rabbit MQ流转时,在记录到trace文件的时候会被截断。如上text日志格式中“trace test payload.”会被截断成“trace tes
Pattern:
# 追踪所有进入和离开MQ的消息
publish.# 追踪所有进入MQ的消息
publish.myExchage 追踪所有进入到myExchange的消息
deliver.# 跟踪所有离开MQ的消息
deliver.myQueue 追踪所有从myQueue离开的消息
#.myQueue实测效果等同于deliver.myQueue

添加后,点击即可查看日志
如果出现错误,是因为插件默认是使用 guest 用户,是因为把 guest 用户删除了,或者在配置文件里面使用其他用户

解决: 配置/etc/rabbitmq/rabbitmq.config,添加配置(未尝试,可以或者不行或者有其他解决方法请留言)

{rabbitmq_tracing,[{directory, "/var/vcap/sys/log/rabbitmq-server/tracing"},{username, <<"admin">>},{password, <<"password">>}]
},

五、五种模型示例

0、springboot依赖配置

依赖

<!-- amqp依赖,包含Rabbitmq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

yml配置

spring:application:name: rabbitmqrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /

1、Hello World简单模型

一对一消费,只有一个消费者能接收到

消费者

@Component
public class HolloWordListener {// @RabbitListener(queues = ("simple.queue")) // queues需手动先创建队列@RabbitListener(queuesToDeclare = @Queue("simple.queue"))  // queuesToDeclare 自动声明队列public void holloWordListener(String message){System.out.println("message = " + message);}
}

生产者

 @Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {String queueName = "simple.queue"; // 队列名称String message = "heel,simple.queue"; // 要发送的消息rabbitTemplate.convertAndSend(queueName,message);}

2、Work queues工作队列

多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置能者多劳模式,谁完成的快,谁多做一点

消费者

@Component
public class WoekWordListener {@RabbitListener(queuesToDeclare = @Queue("workQueue")) // queuesToDeclare 自动声明队列public void holloWordListener(String message) throws InterruptedException {Thread.sleep(200);System.out.println("message1 = " + message);}@RabbitListener(queuesToDeclare = @Queue("workQueue")) // queuesToDeclare 自动声明队列public void holloWordListener1(String message) throws InterruptedException {Thread.sleep(400);System.out.println("message2 = " + message);}
}

生产者

 @Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testWorkQueue(){String queueName = "workQueue";String message = "hello,work.queue__";for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(queueName,message+i);System.out.println("i = " + i);}}

取消预取机制,能者多劳配置

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /listener: simple:prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条

3、Publish/Subscribe发布订阅模型

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。
实现方式是加入了exchange(交换机),注意:交换机是不缓存消息的

使用fanout交换机,会将接收到的消息路由到每一个跟其绑定的queue(队列)

消费者

// 消费者直接绑定交换机,指定类型为fanout
@Component
public class FanoutExchangeListener {// 不指定队列,消息过了就没了//  @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})// 指定队列,可以接收缓存到队列里的消息@RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test",durable = "true" ),exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})public void reveivel(String message){System.out.println("message = " + message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})public void reveivel2(String message){System.out.println("message1 = " + message);}
}

生产者

 @Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void tesyPubSubQueue(){// 参数1:交换机名称 , 参数2routingKey,(fanout类型可不写) , 参数3,消息内容rabbitTemplate.convertAndSend("fanoutTest","","消息内容");}

4、Routing路由模型

routing模型也是将消息发送到交换机

使用的是Direct类型的交换机,会将接收到的消息根据规则路由到指定的Queue(队列),因此称为路由模式

消费者

// 消费者直接绑定交换机,指定类型为direct,并指定key表示能消费的key
@Component
public class RoutingExchangeListener {// 不指定队列,消息过了就没了//  @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),key = {"info","error"})})// 指定队列,可以接收缓存到队列里的消息// key = {"info","error"} 表示我能接收到routingKey为 info和error的消息@RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test1",durable = "true" ),exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),key = {"info","error"})})public void receivel(String message){System.out.println("message = " + message);}// key = {"error"} 表示我只能接收到routingKey为 error的消息@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),key = {"error"})})public void receivel1(String message){System.out.println("message1 = " + message);}
}

生产者

 @Autowiredprivate RabbitTemplate rabbitTemplate;// 路由模型@Testpublic void direstExchangeTest(){rabbitTemplate.convertAndSend("direstTest","info","发送info的key的路由消息");}// 路由模型@Testpublic void direstExchangeTest1(){rabbitTemplate.convertAndSend("direstTest","error","发送error的key的路由消息");}

5、Topics主题模型

topicExchange与directExchange类型,区别在于routingKey必须是多个单词的列表,并且以 . 分隔

*(代表通配符,任意一个字段)
#(号代表一个或多个字段)

消费者

@Component
public class TopicsExchangeListener {// 不指定队列,消息过了就没了//  @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"user.save","user.*"})})// 指定队列,可以接收缓存到队列里的消息// key = {"user.save","user.*"} 表示能消费 routingkey为  user.save 和 user.任意一个字符  的消息@RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test2",durable = "true" ),exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"user.save","user.*"})})public void recevicel(String message){System.out.println("message = " + message);}// key = {"order.#","user.*"} 表示能消费 routingkey为  order.一个或多个字符   和  user.任意一个字符  的消息@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"order.#","user.*"})})public void recevicel1(String message){System.out.println("message1 = " + message);}
}

生产者

 @Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void topicTest(){rabbitTemplate.convertAndSend("topicTest","user.save","topic路由消息,use.save");}@Testpublic void topicTest1(){rabbitTemplate.convertAndSend("topicTest","order.select.getone","topic路由消息,order.select.getone");}

6、消息转换器

代码里直接发送对象,虽然接收的到消息,但是rabbitmq的界面上看到的消息会是乱码

依赖

 <dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version></dependency>

配置

@Configuration
public class rabbitmqConfig {// 消息转换配置@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
}

再次发送就会是转换好的消息

六、进阶

1、基于插件延迟队列

延迟队列非常常用且好用,可以将消息发送后使消费者延迟接收

RabbitAdmin配置

RabbitAdmin是用于对交换机和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息的组件。

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitAdminConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtualhost}")private String virtualhost;@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(host);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualhost);return connectionFactory;}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}
}

封装发送延迟队列工具类

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;@Component
public class DelayedQueue {// routingKeyprivate static final String DELAYED_ROUTING_KEY = "delayed.routingkey";// 延迟队列交换机private static final String DELAYED_EXCHANGE = "delayed.exchange";@AutowiredRabbitTemplate rabbitTemplate;@ResourceRabbitAdmin rabbitAdmin;/*** 发送延迟队列* @param queueName 队列名称* @param params 消息内容* @param expiration 延迟时间 毫秒*/public void sendDelayedQueue(String queueName, Object params, Integer expiration) {// 先创建一个队列Queue queue = new Queue(queueName);rabbitAdmin.declareQueue(queue);// 创建延迟队列交换机CustomExchange customExchange = createCustomExchange();rabbitAdmin.declareExchange(customExchange);// 将队列和交换机绑定Binding binding = BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();rabbitAdmin.declareBinding(binding);// 发送延迟消息rabbitTemplate.convertAndSend(DELAYED_EXCHANGE, DELAYED_ROUTING_KEY, params, msg -> {// 发送消息的时候 延迟时长msg.getMessageProperties().setDelay(expiration);return msg;});}public CustomExchange createCustomExchange() {Map<String, Object> arguments = new HashMap<>();/*** 参数说明:* 1.交换机的名称* 2.交换机的类型* 3.是否需要持久化* 4.是否自动删除* 5.其它参数*/arguments.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", true, false, arguments);}}

生产者

 @Autowiredprivate DelayedQueue delayedQueue;/*** 发送延迟队列* @param queueName 队列名称* @param params 消息内容* @param expiration 延迟时间 毫秒*/@GetMapping("/test9")public void topicTest8() {delayedQueue.sendDelayedQueue("delayTest2","这是消息",5000);}

消费者

 @RabbitListener(queuesToDeclare = @Queue(value = "delayTest2",durable = "true"))public void declareExchange2(String message){System.out.println("delayTest2 = " + message);}

2、TTL队列

TTL是time to live的缩写,生存时间,RabbitMQ支持消息的过期时间,消息发送时可以指定,从消息入队列开始计算,只要超过队列的超时时间配置,消息没被接收,消息就会自动清除

封装发送TTL队列工具类

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Component
public class TtlQueue {// routingKeyprivate static final String TTL_KEY = "ttl.routingkey";private static final String TTL_EXCHANGE = "ttl.exchange";@AutowiredRabbitTemplate rabbitTemplate;@ResourceRabbitAdmin rabbitAdmin;/*** 发送TTL队列* @param queueName 队列名称* @param params 消息内容* @param expiration 过期时间 毫秒*/public void sendTtlQueue(String queueName, Object params, Integer expiration) {/*** ----------------------------------先创建一个ttl队列--------------------------------------------*/Map<String, Object> map = new HashMap<>();// 队列设置存活时间,单位ms,必须是整形数据。map.put("x-message-ttl",expiration);/*参数1:队列名称  参数2:持久化  参数3:是否排他 参数4:自动删除队列  参数5:队列参数*/Queue queue = new Queue(queueName,true,false,false,map);rabbitAdmin.declareQueue(queue);/*** ---------------------------------创建交换机---------------------------------------------*/DirectExchange directExchange = new DirectExchange(TTL_EXCHANGE, true, false);rabbitAdmin.declareExchange(directExchange);/*** ---------------------------------队列绑定交换机---------------------------------------------*/// 将队列和交换机绑定Binding binding = BindingBuilder.bind(queue).to(directExchange).with(TTL_KEY);rabbitAdmin.declareBinding(binding);// 发送消息rabbitTemplate.convertAndSend(TTL_EXCHANGE,TTL_KEY,params);}
}

生产者

 @Autowiredprivate TtlQueue ttlQueue;/*** 发送TTL队列* @param queueName 队列名称* @param params 消息内容* @param expiration 过期时间 毫秒*/@GetMapping("/test10")public void topicTest10() {ttlQueue.sendTtlQueue("ttlQueue","这是消息内容",5000);}

消费者

 @RabbitListener(queues = "ttlQueue" )public void ttlQueue(String message){System.out.println("message = " + message);}

3、死信队列

DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器。队列消息变成死信(deadmessage)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。
消息变成死信的几种情况:

1.消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2. 消息TTL过期
3. 队列达到最大长度

流程:发送消息,消息过期后进入到另一个队列(这个队列设置持久化,不过期)的过程。

封装发送死信队列工具类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;@Component
public class DLXQueue {// routingKeyprivate static final String DEAD_ROUTING_KEY = "dead.routingkey";private static final String ROUTING_KEY = "routingkey";private static final String DEAD_EXCHANGE = "dead.exchange";private static final String EXCHANGE = "common.exchange";@AutowiredRabbitTemplate rabbitTemplate;@ResourceRabbitAdmin rabbitAdmin;/*** 发送死信队列,过期后进入死信交换机,进入死信队列* @param queueName 队列名称* @param deadQueueName 死信队列名称* @param params 消息内容* @param expiration 过期时间 毫秒*/public void sendDLXQueue(String queueName, String deadQueueName,Object params, Integer expiration){/*** ----------------------------------先创建一个ttl队列和死信队列--------------------------------------------*/Map<String, Object> map = new HashMap<>();// 队列设置存活时间,单位ms,必须是整形数据。map.put("x-message-ttl",expiration);// 设置死信交换机map.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信交换器路由键map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);/*参数1:队列名称  参数2:持久化  参数3:是否排他 参数4:自动删除队列  参数5:队列参数*/Queue queue = new Queue(queueName,true,false,false,map);rabbitAdmin.declareQueue(queue);/*** ---------------------------------创建交换机---------------------------------------------*/DirectExchange directExchange = new DirectExchange(EXCHANGE, true, false);rabbitAdmin.declareExchange(directExchange);/*** ---------------------------------队列绑定交换机---------------------------------------------*/Binding binding = BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);rabbitAdmin.declareBinding(binding);/*** ---------------------------------在创建一个死信交换机和队列,接收死信队列---------------------------------------------*/DirectExchange deadExchange = new DirectExchange(DEAD_EXCHANGE, true, false);rabbitAdmin.declareExchange(deadExchange);Queue deadQueue = new Queue(deadQueueName,true,false,false);rabbitAdmin.declareQueue(deadQueue);/*** ---------------------------------队列绑定死信交换机---------------------------------------------*/// 将队列和交换机绑定Binding deadbinding = BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);rabbitAdmin.declareBinding(deadbinding);// 发送消息rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,params);}

生产者

 @Autowiredprivate DLXQueue dlxQueue;/*** 发送死信队列,过期后进入死信交换机,进入死信队列* @param queueName 队列名称* @param deadQueueName 死信队列名称* @param params 消息内容* @param expiration 过期时间 毫秒*/@GetMapping("/test11")public void topicTest11() {dlxQueue.sendDLXQueue("queue","deadQueue","这是消息内容",5000);}

消费者

 // 接收转移后的队列消息@RabbitListener(queuesToDeclare = @Queue(value = "deadQueue",durable = "true"))public void ttlQueue(String message){System.out.println("message = " + message);}

4、消息确认

1、发送消息确认机制

为确保消息发送有真的发送出去,设置发布时确认,确认消息是否到达 Broker 服务器

配置

spring:rabbitmq:host: 47.99.110.29port: 5672username: guestpassword: guestvirtual-host: /listener:simple:prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange)publisher-returns: true  #确认消息已发送到队列(Queue)

如果有使用rabbitAdmin配置的话,那里也需要加配置
修改RabbitAdmin配置

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitAdminConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtualhost}")private String virtualhost;@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(host);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualhost);// 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);connectionFactory.setPublisherReturns(true);return connectionFactory;}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}
}

实现发送消息确认接口

消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。

/*** 消息发送确认配置*/
@Component
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstruct // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执public void init(){rabbitTemplate.setConfirmCallback(this);}/*** 交换机不管是否收到消息的一个回调方法* @param correlationData 消息相关数据* @param ack 交换机是否收到消息* @param cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){ // 消息投递到broker 的状态,true表示成功System.out.println("消息发送成功!");}else { // 发送异常System.out.println("发送异常原因 = " + cause);}}
}

实现发送消息回调接口

如果消息未能投递到目标queue里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

@Component
public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstruct // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执public void init(){rabbitTemplate.setReturnsCallback(this);}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("消息"+returnedMessage.getMessage().toString()+"被交换机"+returnedMessage.getExchange()+"回退!"+"退回原因为:"+returnedMessage.getReplyText());// 回退了所有的信息,可做补偿机制}
}

2、消费者消息确认机制

为确保消息消费成功,需设置消费者消息确认机制,如果消费失败或异常了,可做补偿机制。

配置

spring:rabbitmq:host: 47.99.110.29port: 5672username: guestpassword: guestvirtual-host: /# 消费者配置listener:simple:prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条acknowledge-mode: manual # 设置消费端手动ack确认retry:enabled: true # 是否支持重试# 生产者配置      publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange)publisher-returns: true  #确认消息已发送到队列(Queue)

channel.basicAck消息确认

消费者修改,利用消费者参数Channel 进行消息确认操作

 @RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列public void holloWordListener(String msg, Channel channel, Message message) throws IOException {// 消息System.out.println("msg = " + msg);/*** 确认* deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加* multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。*/ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}

channel.basicNack消息回退

将消息重返队列

@RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列public void holloWordListener(String msg, Channel channel, Message message) throws IOException {try {// 消息System.out.println("msg = " + msg);throw new RuntimeException("来个异常");} catch (Exception e) {e.printStackTrace();System.out.println("消息消费异常,重回队列");/*** deliveryTag:表示消息投递序号。* multiple:是否批量确认。* requeue:值为 true 消息将重新入队列。*/channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}// 确认/*** deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加* multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}

channel.basicReject消息拒绝

拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

/*** 消息拒绝* deliveryTag:表示消息投递序号。* requeue:值为 true 消息将重新入队列。*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

封装消息确认处理类

链接: https://blog.csdn.net/qq_48721706/article/details/125709761

七、rabbitmq集群搭建

1、普通集群

1、新建三个docker容器

docker run -d --hostname rabbit1 --name myrabbit1  -v /home/rabbitmq/data:/var/lib/rabbitmq -p 15671:15672 -p 5671:5672 rabbitmqdocker run -d --hostname rabbit2 --name myrabbit2  -v /home/rabbitmq/data:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 --link myrabbit1:rabbit1 rabbitmqdocker run -d --hostname rabbit3 --name myrabbit3  -v /home/rabbitmq/data:/var/lib/rabbitmq -p 15673:15672 -p 5673:5672 --link myrabbit1:rabbit1 --link myrabbit2:rabbit2 rabbitmq

2、三个都进入容器下载可视化工具

3、进入第一个mq容器重启

docker exec -it ef4a1f0fade7 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit

4、进入第二个 和 第三个 mq容器执行

docker exec -it e36d94d40008 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbit1   //如遇到报错再执行上句、再继续执行
rabbitmqctl start_app
exit

5、进去mq可视化界面,overview面板中的Nodes可查看到节点信息。

6、测试,在mq上新建交换机、其余两个也出现新建的交换机
此时普通集群以构建完成

1、此种集群主节点down掉后,消费者也无法消费从节点的消息,不能做故障转移,只能当作备份。
2、主节点正常,从节点则可以消费消息

2、镜像集群(高可用)(推荐)

这种集群弥补第一种的缺陷,需在普通集群的基础下搭建(确保第一种集群可用)

镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提升mq集群的高可用性。

1、配置集群架构
2、进入任意节点配置策略

docker exec -it ef4a1f0fade7 /bin/bashrabbitmqctl set_policy ha-all "^rabbitmq" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

3、测试,新建一个rabbitmq开头的队列

此时某个节点down掉(包括主节点),其余节点也能消费

将主节点down掉,节点自动切换
4、清除策略

rabbitmqctl clear_policy ha-all

八、与其他mq的区别

九、rabbitmq常见面试题

链接: 大神整理面试资料

rabbitmq详解相关推荐

  1. spring boot rabbitmq_Spring Boot2(十):RabbitMQ 详解

    关于SpringBoot整合RabbitMQ,我看了微笑哥的博文,实在不知道还有比这更加全面的了.于是我转载过来了.. RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起 ...

  2. Linux中级实战专题篇:rabbitmq(消息中间件p2p模式和pub模式,消息队列rabbitmq详解,单机安装,集群部署以及配置实战)

    一.消息中间件相关概念 1.简介 消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台相关 的数据交流,并基于数据通信来进行分布式系统的集成.通过提供消息传递和消息 队列模型,可以在分布 ...

  3. RabbitMQ详解(一)

    一.消息队列相关概念 二.安装rabbitmq 三.配置rabbitmq 四.运行时参数配置 五.rabbitmq集群 一.消息队列相关概念 消息中间件: AMQP:高级消息队列协议 MQ是消费-生产 ...

  4. RabbitMQ详解以及spring对RabbitMQ的集成(附带部分源码解读)

    一·简介 1丶为什么要使用消息队列 https://wenku.baidu.com/view/e297236f83c4bb4cf7ecd193.html ①异步处理(高并发) ②系统解耦 ③流量削锋 ...

  5. 史上最全RabbitMq详解

    RabbitMq 资料 1.win 安装 第一步:下载并安装erlang RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装RabbitMQ的前提是安装Erlan ,下载地址为 :ht ...

  6. rabbitmq 限制速度_=(:) RabbitMQ详解

    本篇包含了RabbitMQ概念的一些东西,下篇会整理出SpringBoot结合RabbitMQ的使用案例. 目录 一.MQ概述 1.什么是消息 2.什么是消息队列 3.MQ的特点 二.MQ适用场景 1 ...

  7. RabbitMq 详解

    1.RabbitMq 原理 1.Message 消息,消息是不具名的,它由消息头和消息体组成.消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键).prio ...

  8. .Net使用RabbitMQ详解

    序言 这里原来有一句话,触犯啦天条,被阉割!!!! 首先不去讨论我的日志组件怎么样.因为有些日志需要走网络,有的又不需要走网路,也是有性能与业务场景的多般变化在其中,就把他抛开,我们只谈消息Rabbi ...

  9. RabbitMQ详解(三)

    一.分发到多Consumer(fanout) 二.Routing路由(Direct) 三.主题路由(Topic) 一.分发到多Consumer(fanout)将同一个Message deliver到多 ...

最新文章

  1. ▽算符在球坐标系_球坐标系下的角动量算符
  2. EFI Shell 命令参考
  3. 使用String.format简化代码
  4. Ubuntu上安装TensorFlow(python2.7版)
  5. ​【Python基础】告别枯燥,60 秒学会一个 Python 小例子(文末下载)
  6. sql2008能否打开mysql数据库_mysql数据库数据能不能导入到sql server中
  7. oracle exp执行失败,EXP-00056: 遇到 ORACLE 错误 25153
  8. SOAP、WSDL、 UDDI之间的关系
  9. html5学习笔记(progress)
  10. 海尔微型计算机hdp-9108,9108能安装内置声卡?
  11. android读取主板数据恢复,重磅干货!高通9008模式与数据提取用于恢复数据
  12. 爱可生 mysql监控_actiontech-zabbix-mysql-monitor
  13. sql导出的身份证后几位是000
  14. 挣脱“数据沼泽”,重获用云自由
  15. Java开发必读--初识微服务一定要阅读这篇文章
  16. 诺基亚如何利用计算机上网,诺基亚手机连接wifi的方法步骤
  17. matlab拉普拉斯因式分解,拉氏变换与反变换
  18. Qt数字电子钟(根据进位计算实现)
  19. ZIF-8包裹溶菌酶作配体的金纳米颗粒(Lys-AuNPs)|磁性g-C3N4/Fe3O4@ZIF-8纳米复合材料|ZIF-8@银基SERS基底|齐岳试剂
  20. 《硅谷之谜》读书笔记:追求卓越,改变自己

热门文章

  1. plc-300c语言编程,PLC初学者必备:7个PLC经典编程
  2. USACO The castle 小白代码-供参考(会不断更改代码)
  3. js实现每日签到功能
  4. 标定mynt小觅相机(仅双目)
  5. 计算机组成原理之字和字节
  6. MYSQL新特性secure_file_priv 读写文件
  7. strlen ,strcmp,strcat,strcpy函数以及实现
  8. C语言学习Day23 递归函数、局部变量、全局变量
  9. 支付宝开放平台应用— 乡镇卫生院申请
  10. mybatisplus打印sql语句不带问号