RabbitMQ实战
RabbitMQ
文章目录
- RabbitMQ
- 一、RabbitMQ介绍
- 1.1 现存问题
- 1.2 处理问题
- 1.3 RabbitMQ介绍
- 二、RabbitMQ安装
- 2.1 Docker安装RabbitMQ
- 2.2 启动图形化界面
- 三、RabbitMQ架构
- 四、RabbitMQ通讯方式
- 4.1 RabbitMQ提供的通讯方式
- 4.2 构建Connection工具类
- 4.3 Hello World
- 4.4 Work Queues
- 4.5 Publish/Subscribe
- 4.6 Routing
- 4.7 Topic
- 4.8 RPC(了解)
- 五、SpringBoot操作RabbitMQ
- 5.1 SpringBoot声明信息
- 5.2 生产者操作
- 5.3 消费者操作
- 六、RabbitMQ保证消息可靠性
- 6.1 保证消息一定送达到Exchange
- 6.2 保证消息可以路由到Queue
- 6.3 保证Queue可以持久化消息
- 6.4 保证消费者可以正常消费消息
- 6.5 SpringBoot实现上述操作
- 6.5.1 Confirm
- 6.5.2 Return
- 6.5.3 消息持久化
- 七、RabbitMQ死信队列&延迟交换机
- 7.1 什么是死信
- 7.2 实现死信队列
- 7.2.1 准备Exchange&Queue
- 7.2.2 实现效果
- 7.3 延迟交换机
- 八、RabbitMQ的集群
- 九、RabbitMQ其他内容
- 9.1 Headers类型Exchange
一、RabbitMQ介绍
1.1 现存问题
服务异步调用:服务A如何保证异步请求一定能被服务B接收到并处理
服务异步调用
削峰:海量请求,如何实现削峰的效果,将请求全部放到一个队列中,慢慢的消费,这个队列怎么实现?
削峰 ![ 服务解耦:如何尽量的降低服务之间的耦合问题,如果在订单服务与积分和商家服务解耦,需要一个队列,而这个队列依然需要实现上述两种情况功能。
服务解耦 ![
1.2 处理问题
异步调用
异步调用 ![ 削峰
削峰 ![ 服务解耦
服务解耦 ![
1.3 RabbitMQ介绍
百度百科:
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
AMQP协议:
AMQP |
---|
Erlang:Erlang是一种通用的面向并发的编程语言,Erlang充分发挥CPU的性能,延迟特别低,相比其他的MQ(Kafka,RocketMQ)延迟是最低的。
RabbitMQ支持多种语言通讯:Java,Python…………都有响应的API
RabbitMQ支持海量的插件去实现一些特殊功能,RabbitMQ自带了一款图形化界面,操作异常的简单。
二、RabbitMQ安装
2.1 Docker安装RabbitMQ
docker-compose.yml文件
version: "3.1"
services:rabbitmq:image: daocloud.io/library/rabbitmq:3.8.5container_name: rabbitmqrestart: alwaysvolumes:- ./data/:/var/lib/rabbitmq/ports:- 5672:5672- 15672:15672
在Linux内部执行:curl localhost:5672
2.2 启动图形化界面
启动图形化界面 |
---|
![ |
访问15672端口:默认的用户名和密码均为:guest
访问15672端口 |
---|
三、RabbitMQ架构
完整架构 |
---|
四、RabbitMQ通讯方式
4.1 RabbitMQ提供的通讯方式
Hello World!:为了入门操作!
Work queues:一个队列被多个消费者消费
Publish/Subscribe:手动创建Exchange(FANOUT)
Routing:手动创建Exchange(DIRECT)
Topics:手动创建Exchange(TOPIC)
RPC:RPC方式
Publisher Confirms:保证消息可靠性
4.2 构建Connection工具类
导入依赖:amqp-client,junit
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency> </dependencies>
构建工具类:
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;public class RabbitMQConnectionUtil {public static final String RABBITMQ_HOST = "192.168.11.32";public static final int RABBITMQ_PORT = 5672;public static final String RABBITMQ_USERNAME = "guest";public static final String RABBITMQ_PASSWORD = "guest";public static final String RABBITMQ_VIRTUAL_HOST = "/";/*** 构建RabbitMQ的连接对象* @return*/public static Connection getConnection() throws Exception {//1. 创建Connection工厂ConnectionFactory factory = new ConnectionFactory();//2. 设置RabbitMQ的连接信息factory.setHost(RABBITMQ_HOST);factory.setPort(RABBITMQ_PORT);factory.setUsername(RABBITMQ_USERNAME);factory.setPassword(RABBITMQ_PASSWORD);factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);//3. 返回连接对象Connection connection = factory.newConnection();return connection;}}
4.3 Hello World
通讯方式 |
---|
生产者:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;public class Publisher {public static final String QUEUE_NAME = "hello";@Testpublic void publish() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//4. 发布消息String message = "Hello World!";channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送成功!");}
}
消费者:
import com.rabbitmq.client.*;
import org.junit.Test;import java.io.IOException;public class Consumer {@Testpublic void consume() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//4. 监听消息DefaultConsumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者获取到消息:" + new String(body,"UTF-8"));}};channel.basicConsume(Publisher.QUEUE_NAME,true,callback);System.out.println("开始监听队列");System.in.read();}
}
4.4 Work Queues
WorkQueues需要学习的内容 |
---|
生产者:生产者和Hello World的形式是一样的,都是将消息推送到默认交换机。
消费者:让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息
import com.rabbitmq.client.*; import org.junit.Test;import java.io.IOException;public class Consumer {@Testpublic void consume1() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//3.5 设置消息的流控channel.basicQos(3);//4. 监听消息DefaultConsumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1号-获取到消息:" + new String(body,"UTF-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Publisher.QUEUE_NAME,false,callback);System.out.println("开始监听队列");System.in.read();}@Testpublic void consume2() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);channel.basicQos(3);//4. 监听消息DefaultConsumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2号-获取到消息:" + new String(body,"UTF-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Publisher.QUEUE_NAME,false,callback);System.out.println("开始监听队列");System.in.read();} }
4.5 Publish/Subscribe
自定义一个交换机 |
---|
生产者:自行构建Exchange并绑定指定队列(FANOUT类型)
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
public class Publisher {public static final String EXCHANGE_NAME = "pubsub";public static final String QUEUE_NAME1 = "pubsub-one";public static final String QUEUE_NAME2 = "pubsub-two";@Testpublic void publish() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//4. 构建队列channel.queueDeclare(QUEUE_NAME1,false,false,false,null);channel.queueDeclare(QUEUE_NAME2,false,false,false,null);//5. 绑定交换机和队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");//6. 发消息到交换机channel.basicPublish(EXCHANGE_NAME,"45jk6h645jk",null,"publish/subscribe!".getBytes());System.out.println("消息成功发送!");}
}
4.6 Routing
DIRECT类型Exchange |
---|
生产者:在绑定Exchange和Queue时,需要指定好routingKey,同时在发送消息时,也指定routingKey,只有routingKey一致时,才会把指定的消息路由到指定的Queue
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;public class Publisher {public static final String EXCHANGE_NAME = "routing";public static final String QUEUE_NAME1 = "routing-one";public static final String QUEUE_NAME2 = "routing-two";@Testpublic void publish() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//4. 构建队列channel.queueDeclare(QUEUE_NAME1,false,false,false,null);channel.queueDeclare(QUEUE_NAME2,false,false,false,null);//5. 绑定交换机和队列channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");//6. 发消息到交换机channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"大橙子!".getBytes());channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"黑布林大狸子".getBytes());channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔!".getBytes());System.out.println("消息成功发送!");}}
4.7 Topic
Topic模式 |
---|
生产者:TOPIC类型可以编写带有特殊意义的routingKey的绑定方式
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;public class Publisher {public static final String EXCHANGE_NAME = "topic";public static final String QUEUE_NAME1 = "topic-one";public static final String QUEUE_NAME2 = "topic-two";@Testpublic void publish() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//4. 构建队列channel.queueDeclare(QUEUE_NAME1,false,false,false,null);channel.queueDeclare(QUEUE_NAME2,false,false,false,null);//5. 绑定交换机和队列,// TOPIC类型的交换机在和队列绑定时,需要以aaa.bbb.ccc..方式编写routingkey// 其中有两个特殊字符:*(相当于占位符),#(相当通配符)channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");//6. 发消息到交换机channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙兔子!".getBytes());channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes());channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog.dog.dog.dog",null,"懒狗狗狗狗狗狗".getBytes());System.out.println("消息成功发送!");}
}
4.8 RPC(了解)
因为两个服务在交互时,可以尽量做到Client和Server的解耦,通过RabbitMQ进行解耦操作
需要让Client发送消息时,携带两个属性:
- replyTo告知Server将相应信息放到哪个队列
- correlationId告知Server发送相应消息时,需要携带位置标示来告知Client响应的信息
RPC方式 |
---|
客户端:
import com.rabbitmq.client.*;
import org.junit.Test;import java.io.IOException;
import java.util.UUID;public class Publisher {public static final String QUEUE_PUBLISHER = "rpc_publisher";public static final String QUEUE_CONSUMER = "rpc_consumer";@Testpublic void publish() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);//4. 发布消息String message = "Hello RPC!";String uuid = UUID.randomUUID().toString();AMQP.BasicProperties props = new AMQP.BasicProperties().builder().replyTo(QUEUE_CONSUMER).correlationId(uuid).build();channel.basicPublish("",QUEUE_PUBLISHER,props,message.getBytes());channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String id = properties.getCorrelationId();if(id != null && id.equalsIgnoreCase(uuid)){System.out.println("接收到服务端的响应:" + new String(body,"UTF-8"));}channel.basicAck(envelope.getDeliveryTag(),false);}});System.out.println("消息发送成功!");System.in.read();}}
服务端:
import com.rabbitmq.client.*;
import org.junit.Test;import java.io.IOException;public class Consumer {public static final String QUEUE_PUBLISHER = "rpc_publisher";public static final String QUEUE_CONSUMER = "rpc_consumer";@Testpublic void consume() throws Exception {//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建队列channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);//4. 监听消息DefaultConsumer callback = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者获取到消息:" + new String(body,"UTF-8"));String resp = "获取到了client发出的请求,这里是响应的信息";String respQueueName = properties.getReplyTo();String uuid = properties.getCorrelationId();AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(uuid).build();channel.basicPublish("",respQueueName,props,resp.getBytes());channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(QUEUE_PUBLISHER,false,callback);System.out.println("开始监听队列");System.in.read();}
}
五、SpringBoot操作RabbitMQ
5.1 SpringBoot声明信息
创建项目
导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置RabbitMQ信息
spring:rabbitmq:host: 192.168.11.32port: 5672username: guestpassword: guestvirtual-host: /
声明交换机&队列
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitMQConfig {public static final String EXCHANGE = "boot-exchange";public static final String QUEUE = "boot-queue";public static final String ROUTING_KEY = "*.black.*";@Beanpublic Exchange bootExchange(){// channel.DeclareExchangereturn ExchangeBuilder.topicExchange(EXCHANGE).build();}@Beanpublic Queue bootQueue(){return QueueBuilder.durable(QUEUE).build();}@Beanpublic Binding bootBinding(Exchange bootExchange,Queue bootQueue){return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();} }
5.2 生产者操作
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void publish(){rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");}@Testpublic void publishWithProps(){rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "messageWithProps", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setCorrelationId("123");return message;}});System.out.println("消息发送成功");}
}
5.3 消费者操作
import com.rabbitmq.client.Channel;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class ConsumeListener {@RabbitListener(queues = RabbitMQConfig.QUEUE)public void consume(String msg, Channel channel, Message message) throws IOException {System.out.println("队列的消息为:" + msg);String correlationId = message.getMessageProperties().getCorrelationId();System.out.println("唯一标识为:" + correlationId);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}
六、RabbitMQ保证消息可靠性
6.1 保证消息一定送达到Exchange
Confirm机制
可以通过Confirm效果保证消息一定送达到Exchange,官方提供了三种方式,选择了对于效率影响最低的异步回调的效果
//4. 开启confirms
channel.confirmSelect();//5. 设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息成功的发送到Exchange!");}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作!");}
});
6.2 保证消息可以路由到Queue
Return机制
为了保证Exchange上的消息一定可以送达到Queue
//6. 设置Return回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息没有路由到指定队列,做其他的补偿措施!!");}
});
//7. 在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
6.3 保证Queue可以持久化消息
DeliveryMode设置消息持久化
DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化。
//7. 设置消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();//7. 发布消息
channel.basicPublish("","confirms",true,props,message.getBytes());
6.4 保证消费者可以正常消费消息
详情看WorkQueue模式
6.5 SpringBoot实现上述操作
6.5.1 Confirm
编写配置文件开启Confirm机制
spring:rabbitmq:publisher-confirm-type: correlated # 新版本publisher-confirms: true # 老版本
在发送消息时,配置RabbitTemplate
@Test public void publishWithConfirms() throws IOException {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("消息已经送达到交换机!!");}else{System.out.println("消息没有送达到Exchange,需要做一些补偿操作!!retry!!!");}}});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");System.in.read(); }
6.5.2 Return
编写配置文件开启Return机制
spring:rabbitmq:publisher-returns: true # 开启Return机制
在发送消息时,配置RabbitTemplate
@Test public void publishWithReturn() throws IOException {// 新版本用 setReturnsCallback ,老版本用setReturnCallbackrabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {String msg = new String(returned.getMessage().getBody());System.out.println("消息:" + msg + "路由队列失败!!做补救操作!!");}});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");System.in.read(); }
6.5.3 消息持久化
@Test
public void publishWithBasicProperties() throws IOException {rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 设置消息的持久化!message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}});System.out.println("消息发送成功");
}
七、RabbitMQ死信队列&延迟交换机
7.1 什么是死信
死信&死信队列 |
---|
死信队列的应用:
- 基于死信队列在队列消息已满的情况下,消息也不会丢失
- 实现延迟消费的效果。比如:下订单时,有15分钟的付款时间
7.2 实现死信队列
7.2.1 准备Exchange&Queue
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DeadLetterConfig {public static final String NORMAL_EXCHANGE = "normal-exchange";public static final String NORMAL_QUEUE = "normal-queue";public static final String NORMAL_ROUTING_KEY = "normal.#";public static final String DEAD_EXCHANGE = "dead-exchange";public static final String DEAD_QUEUE = "dead-queue";public static final String DEAD_ROUTING_KEY = "dead.#";@Beanpublic Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();}@Beanpublic Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();}@Beanpublic Binding normalBinding(Queue normalQueue,Exchange normalExchange){return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();}@Beanpublic Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();}@Beanpublic Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE).build();}@Beanpublic Binding deadBinding(Queue deadQueue,Exchange deadExchange){return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();}}
7.2.2 实现效果
基于消费者进行reject或者nack实现死信效果
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;@Component public class DeadListener {@RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)public void consume(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到normal队列的消息:" + msg);channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);} }
消息的生存时间
给消息设置生存时间
@Test public void publishExpire(){String msg = "dead letter expire";rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");return message;}}); }
给队列设置消息的生存时间
@Bean public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").ttl(10000).build(); }
设置Queue中的消息最大长度
@Bean public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").maxLength(1).build(); }
只要Queue中已经有一个消息,如果再次发送一个消息,这个消息会变为死信!
7.3 延迟交换机
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9
死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。
构建延迟交换机
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;@Configuration public class DelayedConfig {public static final String DELAYED_EXCHANGE = "delayed-exchange";public static final String DELAYED_QUEUE = "delayed-queue";public static final String DELAYED_ROUTING_KEY = "delayed.#";@Beanpublic Exchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","topic");Exchange exchange = new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);return exchange;}@Beanpublic Queue delayedQueue(){return QueueBuilder.durable(DELAYED_QUEUE).build();}@Beanpublic Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();} }
发送消息
import org.junit.jupiter.api.Test; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest public class DelayedPublisherTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void publish(){rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(30000);return message;}});} }
八、RabbitMQ的集群
RabbitMQ的镜像模式
RabbitMQ的集群 |
---|
高可用
提升RabbitMQ的效率
搭建RabbitMQ集群
准备两台虚拟机(克隆)
准备RabbitMQ的yml文件
rabbitmq1:
version: '3.1' services:rabbitmq1:image: rabbitmq:3.8.5-management-alpinecontainer_name: rabbitmq1hostname: rabbitmq1extra_hosts:- "rabbitmq1:192.168.11.32"- "rabbitmq2:192.168.11.33"environment: - RABBITMQ_ERLANG_COOKIE=SDJHFGDFFSports:- 5672:5672- 15672:15672- 4369:4369- 25672:25672
rabbitmq2:
version: '3.1' services:rabbitmq2:image: rabbitmq:3.8.5-management-alpinecontainer_name: rabbitmq2hostname: rabbitmq2extra_hosts:- "rabbitmq1:192.168.11.32"- "rabbitmq2:192.168.11.33"environment: - RABBITMQ_ERLANG_COOKIE=SDJHFGDFFSports:- 5672:5672- 15672:15672- 4369:4369- 25672:25672
准备完毕之后,启动两台RabbitMQ
启动效果
让RabbitMQ服务实现join操作
需要四个命令完成join操作
让rabbitmq2 join rabbitmq1,需要进入到rabbitmq2的容器内部,去执行下述命令
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@rabbitmq1 rabbitmqctl start_app
执行成功后:
执行成功后
设置镜像模式
在指定的RabbitMQ服务中设置好镜像策略即可
镜像模式
九、RabbitMQ其他内容
9.1 Headers类型Exchange
headers就是一个基于key-value的方式,让Exchange和Queue绑定的到一起的一种规则
相比Topic形式,可以采用的类型更丰富。
headers绑定方式 |
---|
具体实现方式
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;import java.util.HashMap;
import java.util.Map;public class Publisher {public static final String HEADER_EXCHANGE = "header_exchange";public static final String HEADER_QUEUE = "header_queue";@Testpublic void publish()throws Exception{//1. 获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannelChannel channel = connection.createChannel();//3. 构建交换机和队列并基于header的方式绑定channel.exchangeDeclare(HEADER_EXCHANGE, BuiltinExchangeType.HEADERS);channel.queueDeclare(HEADER_QUEUE,true,false,false,null);Map<String,Object> args = new HashMap<>();// 多个header的key-value只要可以匹配上一个就可以// args.put("x-match","any");// 多个header的key-value要求全部匹配上!args.put("x-match","all");args.put("name","jack");args.put("age","23");channel.queueBind(HEADER_QUEUE,HEADER_EXCHANGE,"",args);//4. 发送消息String msg = "header测试消息!";Map<String, Object> headers = new HashMap<>();headers.put("name","jac");headers.put("age","2");AMQP.BasicProperties props = new AMQP.BasicProperties().builder().headers(headers).build();channel.basicPublish(HEADER_EXCHANGE,"",props,msg.getBytes());System.out.println("发送消息成功,header = " + headers);}
}
RabbitMQ实战相关推荐
- rabbitmq实战指南_RabbitMQ之脑裂
点击上方蓝色字体,选择"设为星标" 9 10 本文总结<RabbitMQ实战指南>网络分区章节,并亲自实践才有这篇文章,手动处理章节详细记录了操作过程中的注意事项.如果 ...
- 《RabbitMQ实战指南》笔误及改进记录
2017年12月上旬笔者的一本新书--<RabbitMQ实战指南>上架,里面的校稿都是自己独自完成的,一共进行了15遍,但还是会有漏网之鱼.本篇博文用来记录现在发现的一些笔误,一是给购书的 ...
- RabbitMQ实战经验分享
RabbitMQ实战经验分享 原文:RabbitMQ实战经验分享 前言 最近在忙一个高考项目,看着系统顺利完成了这次高考,终于可以松口气了.看到那些即将参加高考的学生,也想起当年高三的自己. 下面分享 ...
- 《RabbitMQ 实战指南》第五章 RabbitMQ 进阶(下)
<RabbitMQ 实战指南>第五章 RabbitMQ 进阶(下) 文章目录 <RabbitMQ 实战指南>第五章 RabbitMQ 进阶(下) 一.持久化 二.生产者确认 1 ...
- 《RabbitMQ 实战指南》第四章 RabbitMQ进阶(上)
<RabbitMQ 实战指南>第四章 RabbitMQ进阶(上) 文章目录 <RabbitMQ 实战指南>第四章 RabbitMQ进阶(上) 一.简介 二.消息何去何从 1.m ...
- 《RabbitMQ 实战指南》第三章 客户端开发向导
<RabbitMQ 实战指南> 文章目录 <RabbitMQ 实战指南> 一.简介 二.连接 RabbitMQ 三.使用交换器和队列 1.exchangeDeclare 方法详 ...
- 《RabbitMQ 实战指南》第二章 RabbitMQ 入门
<RabbitMQ 实战指南> 文章目录 <RabbitMQ 实战指南> 一.相关概念介绍 1.生产者和消费者 2.队列 3.交换器.路由键.绑定 4.交换器类型 5.Rabb ...
- 《RabbitMQ 实战指南》第一章 RabbitMQ 简介
<RabbitMQ 实战指南>第一章 RabbitMQ 简介 文章目录 <RabbitMQ 实战指南>第一章 RabbitMQ 简介 一.什么是消息中间件 二.消息中间件的作用 ...
- RabbitMQ实战指南之RabbitMQ架构及运转流程
相关概念 RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收.存储和转发消息.可以把消息传递的过程想象成∶ 当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,Rab ...
- RabbitMQ实战教程
RabbitMQ实战教程 1.什么是RabbitMQ 1.1 MQ(Message Queue)消息队列 1.1.1 异步处理 1.1.2 应用解耦 1.1.3 流量削峰 1.2 背景知识介绍 1.2 ...
最新文章
- mysql 5.1 禁用innodb
- 零基础可以学python吗-零基础适合学习python吗?
- 15、子查询注意事项
- Leetcode题库 744.寻找比目标字母大的最小字母(C实现)
- Spring框架—基础介绍
- python取前三位_python3 获取前几个高频列表元素
- OpenCV---ROI(region of interest)和泛洪填充
- [收藏]Visual Component Framework
- 分布式存储系统Minio简介
- springboot优雅的加载海康sdk
- unity, Animation crossfade需要两动画在时间上确实有交叠
- 在Visual Studio上开启自己的C++学习之旅
- 做自媒体视频剪辑怎么赚钱呢?
- 海思高校合作——QA培训资料
- threejs(webgl)-shader入门教程(1)
- 使用vagrant搭建三台虚拟机环境
- 聚名:拼音域名选择和投资的技巧
- c语言程序设计臧,臧学莲
- RT_thread 独立看门狗 watchdog 不断自动复位的解决方法
- 长沙尚学堂python培训学校
热门文章
- Flink使用connect实现双流join全外连接
- 人体红外传感器HC-SR501
- Django项目实现验证码
- keil中创建lib库,使用lib库
- 弹性法计算方法的mck法_SAM4E单片机之旅——9、UART与MCK之MAINCK
- [论文总结]:faster cnns with direct sparse convolutions and guided pruning 直接稀疏卷积和引导剪枝
- uniapp 小程序 加载显示激励视频广告
- Python入门教程NO.5 用python写个自动选择加油站的小程序
- 自制stm32平衡车
- 如何实现基于微信小程序的人脸识别