rabbitmq完整学习-springboot整合rabbitmq
文章目录
- MQ学习
- rabbitMq软件上传到liunx服务器
- 基础准备
- 1. linux的服务器-创建RabbitMq文件夹-并进入文件夹
- 2. 上传需要的文件
- 3. 上传成功
- mq安装
- 1.在线安装依赖环境:
- 2.安装Erlang
- 3.安装RabbitMQ
- 4. 开启管理界面及配置
- 5. 启动
- 6. 配置虚拟主机及用户
- 新建用户
- Virtual Hosts配置
- 7. Overview配置文件 (not found)
- 8.端口
- RabbitMQ入门
- 1.1. 搭建示例工程
- 1.1.1. 创建工程
- 1.1.2. 添加依赖
- 1.2.编写工具类
- 1.3编写生产者
- 1.3. 编写消费者
- 1.4. 小结
- RabbitMQ的工作模式
- rabbitmq的官网
- mq第二种工作模式-(工作队列模式)[Work queues](https://www.rabbitmq.com/tutorials/tutorial-two-python.html)
- rabbitmq-producer 模块
- 1. 新建work包
- 2. 复制简单的模式类-修改名字为Producer_WorkQueues
- 3. 复制简单的模式类-修改名字为Producer_WorkQueues
- rabbitmq-consumer模块
- 1. 新建work包
- 2. 复制简单的模式类-修改名字为Producer_WorkQueues
- 3. 复制简单的模式类-修改名字为Consumer_WorkQueues1
- 4. 复制Consumer_WorkQueues1 创建Consumer_WorkQueues2
- 小结
- mq第三种工作模式-[Publish/Subscribe](https://www.rabbitmq.com/tutorials/tutorial-three-python.html)
- 生产者模块 Producer
- 1. 新建ps包
- 2. Producer_PublishSubscribe代码
- 消费者模块consumer
- 1. 新建ps包
- 2. Consumer_PublishSubscribe1代码
- 4. Consumer_PublishSubscribe2代码
- 小结
- mq第四种工作模式-[Routing](https://www.rabbitmq.com/tutorials/tutorial-four-python.html)
- 生产者模块 Producer
- 1. 新建routing包
- 2. Producer_Routing代码
- 消费者模块consumer
- 1. 新建routing包
- 2. Consumer_Routing1代码
- 4. Consumer_Routing2代码
- 小结
- mq第五种工作模式-[Topics](https://www.rabbitmq.com/tutorials/tutorial-five-python.html)
- 生产者模块 Producer
- 1. 新建topics包
- 2. Producer_Topics代码
- 消费者模块consumer
- 1. 新建topics包
- 2. Consumer_Topic1代码
- 4. Consumer_Topic2代码
- 小结
- 忘记改交换机的类型,这里
- 模式总结
- Springboot整合RabbitMQ
- Springboot整合RabbitMQ生产者
- 1.创建生产者Springboot工程
- 2.引入依赖坐标
- 3.编写yml配置文件
- 4.编写启动类
- 5.编写配置类
- 6.编写测试类
- 7.查看mq控制台的消息
- Springboot整合RabbitMQ消费者
- 1.创建消费者Springboot工程
- 2.引入依赖坐标
- 3.编写yml配置文件
- 4.编写启动类
- 5.创建监听类
- 6.启动-启动类
- 小结
- SpringBoot整合RabbitMQ (交换机与多个队列绑定)
- 1.将上面的springboot整合直接复制过来
- 2.生产端
- 3.消费端
- 3.启动test测试类,进行测试
- RabbitMQ-集群搭建
- 概述
- 前期准备-创建多节点
- 设置主节点操作:
- rabbit1操作作为主节点:
- rabbit2操作作为从节点:
- RabbitMQ镜像集群配置
- 管理控制台设置
- 含义解释
- 负载均衡-HAProxy
- 概述
- 安装-HAProxy
- 配置-HAProxy
- Haproxy启动失败
- 错误信息
- 解决方案
- 创建新工程-测试
MQ学习
rabbitMq软件上传到liunx服务器
基础准备
1. linux的服务器-创建RabbitMq文件夹-并进入文件夹
mkdir rabbitmq
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FAVDxMuM-1663250323428)(img\创建文件夹.png)]
2. 上传需要的文件
erlang-18.3-1.el7.centos.x86_64.rpm
socat-1.7.3.2-5.el7.lux.x86_64.rpm
rabbitmq-server-3.6.5-1.noarch.rpm
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fUg8vYcO-1663250323429)(img\上传文件到linux.png)]
3. 上传成功
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ir39NTsq-1663250323429)(img\上传成功.png)]
mq安装
1.在线安装依赖环境:
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dnarnf7j-1663250323429)(img\安装日志.png)]
2.安装Erlang
# 安装
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
- 如果出现如下错误
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PBwr3Dzd-1663250323430)(img\错误.png)]
- 说明gblic 版本太低。我们可以查看当前机器的gblic 版本
strings /lib64/libc.so.6 | grep GLIBC
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Tk1zSpmC-1663250323430)(img\最高版本.png)]
需要升级glibc
- 使用yum更新安装依赖
sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make -y
- 安装rpm包
sudo rpm -Uvh *-2.17-55.el6.x86_64.rpm --force --nodeps
- 安装完毕后再查看glibc版本
strings /lib64/libc.so.6 | grep GLIBC
3.安装RabbitMQ
# 安装 -soca
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm# 安装 -mq
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
执行 出现如下错误:
error: open of socat-1.7.3.2-1.1.el7.x86_64.rpm failed: No such file or directory
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PxNpoIvF-1663250323430)(img\错误2.png)]
执行如下命令
yum -y install tcp_wrappers
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-roKNBuZL-1663250323431)(img\执行.png)]
再执行安装RabbitMQ -socat:
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
再执行安装RabbitMQ :
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MzlRAZZk-1663250323431)(img\执行安装.png)]
4. 开启管理界面及配置
# 开启管理界面
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
linux不保存退出的命令是q!。
1、打开linux系统,在linux的桌面的空白处右击。2、在弹出的下拉选项里,点击打开终端。3、在终端窗口输入vi+文件名打开需要编辑的文件。4、通过vi更改文件后,按ESC键退出insert模式,输入冒号(:),在冒号后面输入q!命令,回车即可不保存退出。
linux保存退出
1、打开linux系统,在linux的桌面的空白处右击。2、在弹出的下拉选项里,点击打开终端。3、在终端窗口输入vi+文件名打开需要编辑的文件。4、通过vi更改文件后,按ESC键退出insert模式,输入冒号(:),在冒号后面输入wq命令,回车即可保存退出。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WuCVepr9-1663250323431)(img\放开.png)]
查看命令
cat /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4xaHQAvU-1663250323431)(img\修改成功.png)]
5. 启动
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YYk3xc4C-1663250323432)(img\启动成功.png)]
如果这里访问不到: (http://ip:15672/ )关闭防火墙
service iptables stop
防火墙关闭后还是无法访问: 重启mq
service rabbitmq-server restart
访问 http://ip:15672/#/
登录名: guest
密码: guest
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CsicGaJs-1663250323432)(img\成功登录控制台.png)]
6. 配置虚拟主机及用户
RabbitMQ在安装好后,可以访问http://ip地址:15672
;其自带了guest/guest的用户名和密码;
我们创建自定义用户;也可以登录管理界面:
新建用户
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2HQH5Nl5-1663250323432)(img\设置用户.png)]
角色说明:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
Virtual Hosts配置
像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器(类似于mysql的库),每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。
创建Virtual Hosts
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4PZ9tJ52-1663250323432)(img\设置.png)]
设置Virtual Hosts权限
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eHpNT4tW-1663250323433)(img\点击虚拟机.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-J8hwCcan-1663250323433)(img\打开.png)]
点击set permission
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D2ecQYUW-1663250323433)(img\set.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KC6bGRO3-1663250323433)(img\成功.png)]
登录为abc用户
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yZP8tlID-1663250323434)(img\登录为abc用户.png)]
7. Overview配置文件 (not found)
/etc/rabbitmq/rabbitmq.config (not found)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FP2tvwVF-1663250323434)(img\找不到配置.png)]
- 设置配置文件步骤
- 1.cd找到配置文件所在目录
cd /usr/share/doc/rabbitmq-server-3.6.5/
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MBXk5p1B-1663250323434)(img\找到了文件.png)]
- 2.将配置文件copy到指定位置
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
3.重启生效
service rabbitmq-server restart
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YzRonbiy-1663250323434)(img\成功解决.png)]
8.端口
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OMLR2yz7-1663250323435)(img\端口.png)]
注意: 15672是控制台的端口. 25672是集群端口 5672是tcp连接的端口.
RabbitMQ入门
1.1. 搭建示例工程
1.1.1. 创建工程
创建一个空工程rabbitmq
创建rabbitmq-producer 工程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cTruYwyJ-1663250323435)(img/创建工程1.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jJUESYZF-1663250323435)(img/创建工程2.png)]
创建rabbitmq-consumer 工程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HFXkXCUa-1663250323435)(img/创建工程3.png)]
1.1.2. 添加依赖
往powernode-rabbitmq的rabbitmq-producer 工程和rabbitmq-consumer 工程的pom.xml文件中添加如下依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.powernode</groupId><artifactId>rabbitmq-consumer</artifactId><version>1.0-SNAPSHOT</version><dependencies><!--rabbitmq 的java客户端依赖--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZBCcw0E2-1663250323435)(img/添加pom的依赖.png)]
1.2.编写工具类
(com.powernode.rabbitmq.util.ConnectionUtil)
package com.powernode.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @program: rabbitmq* @ClassName: ConnectionUtil* @version: 1.0* @description:* @author: bjpowernode**/
public class ConnectionUtil {public static Connection getConnection() throws Exception {//创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//主机地址;默认为 localhostconnectionFactory.setHost("123.57.41.174");//连接端口;默认为 5672connectionFactory.setPort(5672);//虚拟主机名称;默认为 /connectionFactory.setVirtualHost("powernode");//连接用户名;默认为guestconnectionFactory.setUsername("powernode");//连接密码;默认为guestconnectionFactory.setPassword("powernode");//创建连接return connectionFactory.newConnection();}
}
1.3编写生产者
编写消息生产者com.powernode.rabbitmq.simple.Producer
package com.powernode.rabbitmq.simple;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** @program: rabbitmq* @ClassName: Producer* @version: 1.0* @description:* @author: bjpowernode**/
public class Producer {//队列名称 (simple_queue 简单队列)static final String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws Exception {//工具类获取连接Connection connection = ConnectionUtil.getConnection();//4. 创建ChannelChannel channel = connection.createChannel();//5. 创建队列Queue/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)参数:1. queue:队列名称2. durable:是否持久化,当mq重启之后,还在3. exclusive:* 是否独占本次连接。只能有一个消费者监听这队列* 当Connection关闭时,是否删除队列*4. autoDelete:是否在不使用的时候自动删除队列。当没有Consumer时,自动删除掉5. arguments:队列其它参数*///如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建channel.queueDeclare(QUEUE_NAME, true, false, false, null);String message = "hello rabbitmq~~~";/*basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)参数:1. exchange:交换机名称。简单模式下交换机会使用默认的 ""2. routingKey:路由名称3. props:配置信息4. body:发送消息数据*///6. 发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//7.释放资源channel.close();connection.close();}
}
1.3. 编写消费者
编写消息的消费者com.powernode.rabbitmq.simple.Consumer
package com.powernode.rabbitmq.simple;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** @program: rabbitmq* @ClassName: Consumer* @version: 1.0* @description:* @author: bjpowernode**/
public class Consumer {//队列名称 (simple_queue 简单队列)static final String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws Exception {//工具类获取连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 回调方法,当收到消息后,会自动执行该方法* consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//consumerTagSystem.out.println("consumerTag:" + consumerTag);//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息为:" + new String(body));}};//监听消息/*** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数1:queue:队列名称* 参数2: autoAck:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:callback:消息接收到后回调对象*/channel.basicConsume(QUEUE_NAME, true, consumer);//不关闭资源,应该一直监听消息
// channel.close();
// connection.close();}
}
1.4. 小结
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
RabbitMQ的工作模式
rabbitmq的官网
- rabbitmq的官网:https://www.rabbitmq.com/
- 进入MQ的工作模式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wfGDYXOJ-1663250323439)(img/点击1.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xFinOBtV-1663250323439)(img/点击2.png)]
- MQ的七种工作模式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-A9OSRSGB-1663250323439)(img/工作模式.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V5uJS7de-1663250323440)(img/第七种.png)]
工作模式概述:其实就是消息路由分发的一种方式
七种工作模式:(
简单模式(Hello World)
工作队列模式(Work queues)
订阅模式(Publish/Subscribe)
路由模式(Routing)
主题模式(Topics)
远程过程调用(RPC)
发布者确认(Publisher Confirms))
刚才我们已经实现了第一种模式。[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zVU88XkE-1663250323440)(img/01.png)]
(简单模式)“Hello World!”
做最简单的事情,一个生产者对应一个消费者,RabbitMQ 相当于一个消息代理,负责将 A 的消息转发给 B 应用场景:将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人。
现在我们实现第二种
mq第二种工作模式-(工作队列模式)Work queues
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YNw5GVkG-1663250323440)(img/02.png)]
Work Queues在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理 应用场景:一个订单的处理需要 10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况。注意:一条消息只能被一个消费者消费,不能被多个消费者重复消费;应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
rabbitmq-producer 模块
1. 新建work包
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pHLlgBxR-1663250323440)(img/新建work包.png)]
2. 复制简单的模式类-修改名字为Producer_WorkQueues
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iciMR6li-1663250323441)(img/复制简单改名work.png)]
3. 复制简单的模式类-修改名字为Producer_WorkQueues
修改队列名称-
增加for循环,一次新建10条消息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8Qco47zr-1663250323441)(img/修改队列名称.png)]
- 完整代码
package com.powernode.rabbitmq.work;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;/*** @program: rabbitmq* @ClassName: Producer* @version: 1.0* @description:* @author: bjpowernode**/
public class Producer_WorkQueues {//队列名称 (work_queues 工作队列)static final String QUEUE_NAME = "work_queues";public static void main(String[] args) throws Exception {//工具类获取连接Connection connection = ConnectionUtil.getConnection();//4. 创建ChannelChannel channel = connection.createChannel();//5. 创建队列Queue//如果没有一个名字叫work_queues的队列,则会创建该队列,如果有则不会创建channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 0; i <= 10 ; i++) {String message = i+"你好,hello WorkQueues";//6. 发送消息channel.basicPublish("", QUEUE_NAME, null,message.getBytes("UTF-8"));}//7.释放资源channel.close();connection.close();}
}
rabbitmq-consumer模块
1. 新建work包
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-h9OA60nV-1663250323441)(img/新建work包2.png)]
2. 复制简单的模式类-修改名字为Producer_WorkQueues
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Dcl6oPgE-1663250323441)(img/复制简单改名work.png)]
3. 复制简单的模式类-修改名字为Consumer_WorkQueues1
- 修改队列名称-
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x9jqd28q-1663250323441)(img/修改队列名称1.png)]
- 完整代码
package com.powernode.rabbitmq.work;import com.powernode.rabbitmq.simple.Producer;
import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** @program: rabbitmq* @ClassName: Consumer* @version: 1.0* @description:* @author: bjpowernode**/
public class Consumer_WorkQueues1 {//队列名称 (simple_queue 简单队列)static final String QUEUE_NAME = "work_queues";public static void main(String[] args) throws Exception {//工具类获取连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 回调方法,当收到消息后,会自动执行该方法* consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//consumerTag/* System.out.println("consumerTag:" + consumerTag);//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());*///收到的消息System.out.println("接收到的消息为:" + new String(body,"UTF-8"));}};//监听消息/*** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数1:queue:队列名称* 参数2: autoAck:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:callback:消息接收到后回调对象*/channel.basicConsume(QUEUE_NAME, true, consumer);//不关闭资源,应该一直监听消息
// channel.close();
// connection.close();}
}
4. 复制Consumer_WorkQueues1 创建Consumer_WorkQueues2
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fJEgUKlm-1663250323442)(img/新建2.png)]
首先启动两个客户端(消费者)-Consumer_WorkQueues2和Consumer_WorkQueues1
然后启动服务端(生产者)-Producer_WorkQueues
到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息,可以看到是顺序消费Consumer_WorkQueues1消费了消息0-2-4-6-8-10
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-251YZfhk-1663250323442)(img/消费成功.png)]
Consumer_WorkQueues2消费了消息1-3-5-7-9
小结
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
mq第三种工作模式-Publish/Subscribe
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NYlpVGFL-1663250323442)(img/03.png)]
一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。 应用场景:更新商品库存后需要通知多个缓存和多个数据库
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- headers: 参数匹配 , (使用的比较少,不做赘述)
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
- 发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息
生产者模块 Producer
1. 新建ps包
2. Producer_PublishSubscribe代码
- 完整代码
package com.powernode.rabbitmq.ps;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** 发布与订阅使用的交换机类型为:fanout*/
public class Producer_PublishSubscribe {//交换机名称static final String FANOUT_EXCHAGE = "fanout_exchange";//队列名称static final String FANOUT_QUEUE_1 = "fanout_queue_1";//队列名称static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,* fanout、扇形(广播),发送消息到每一个与之绑定的队列* topic、通配符方式 把消息交给符合routing pattern(路由模式) 的队列* direct、定向 把消息交给符合指定routing key 的队列* headers 参数匹配(使用的比较少,不做赘述)*///5. 创建交换机channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);//6. 创建队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);//7.队列绑定交换机/*queueBind(String queue, String exchange, String routingKey)参数:1. queue:队列名称2. exchange:交换机名称3. routingKey:路由键,绑定规则如果交换机的类型为fanout ,routingKey设置为""*/channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");String message = "日志信息:A调用了findAll方法...日志级别:info...";//8. 发送消息channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());//9. 释放资源channel.close();connection.close();}
}
运行生产者–运行完成,队列多了两个
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0spEDPGM-1663250323442)(img/02运行完成,队列多了两个.png)]
消费者模块consumer
1. 新建ps包
2. Consumer_PublishSubscribe1代码
- 完整代码
package com.powernode.rabbitmq.ps;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
//消费者1
public class Consumer_PublishSubscribe1 {//交换机名称static final String FANOUT_EXCHAGE = "fanout_exchange";//队列名称static final String FANOUT_QUEUE_1 = "fanout_queue_1";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息打印到控制台.....");}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(FANOUT_QUEUE_1, true, consumer);//不需要关闭连接}
}
4. Consumer_PublishSubscribe2代码
复制Consumer_PublishSubscribe1创建Consumer_PublishSubscribe2
修改队列名称 为2
package com.powernode.rabbitmq.ps;import com.powernode.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; //消费者2 public class Consumer_PublishSubscribe2 {//交换机名称static final String FANOUT_EXCHAGE = "fanout_exchange";//队列名称static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息打印到控制台.....");}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(FANOUT_QUEUE_2, true, consumer);//不需要关闭连接} }
启动两个(消费者)-
到IDEA的两个消费者对应的控制台查看
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9pv0hzW1-1663250323443)(img/02消费者01.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5JYa6os6-1663250323443)(img/02消费者02.png)]
小结
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
mq第四种工作模式-Routing
有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由 key ,消费者将队列绑定到交换机时需要指定路由 key,仅消费指定路由 key 的消息 应用场景:如在商品库存中增加了 1 台 iphone12,iphone12 促销活动消费者指定 routing key 为 iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此 routing key 的消息
生产者模块 Producer
1. 新建routing包
2. Producer_Routing代码
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eiud32nH-1663250323443)(img/指定模式为direct01.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MTKRt6L3-1663250323445)(img/指定模式为direct02.png)]
- 完整代码
package com.powernode.rabbitmq.routing;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** 发布与订阅使用的交换机类型为:fanout*/
public class Producer_Routing {//交换机名称static final String DIRCET_EXCHAGE = "dircet_exchange";//队列名称static final String DIRCET_QUEUE_1 = "dircet_queue_1";//队列名称static final String DIRCET_QUEUE_2 = "dircet_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,* fanout、扇形(广播),发送消息到每一个与之绑定的队列* topic、通配符方式 把消息交给符合routing pattern(路由模式) 的队列* direct、定向 把消息交给符合指定routing key 的队列* headers 参数匹配(使用的比较少,不做赘述)*///5. 创建交换机channel.exchangeDeclare(DIRCET_EXCHAGE, BuiltinExchangeType.DIRECT);//6. 创建队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(DIRCET_QUEUE_1, true, false, false, null);channel.queueDeclare(DIRCET_QUEUE_2, true, false, false, null);//7.队列绑定交换机/*queueBind(String queue, String exchange, String routingKey)参数:1. queue:队列名称2. exchange:交换机名称3. routingKey:路由键,绑定规则如果交换机的类型为fanout ,routingKey设置为""*///队列一 绑定了errorchannel.queueBind(DIRCET_QUEUE_1, DIRCET_EXCHAGE, "error");//队列二 绑定了 error info warningchannel.queueBind(DIRCET_QUEUE_2, DIRCET_EXCHAGE, "error");channel.queueBind(DIRCET_QUEUE_2, DIRCET_EXCHAGE, "info");channel.queueBind(DIRCET_QUEUE_2, DIRCET_EXCHAGE, "warning");String message = "日志信息:A调用了findAll方法...日志级别:info...";//8. 发送消息-并且指定 routingKey参数为channel.basicPublish(DIRCET_EXCHAGE, "info", null, message.getBytes());//9. 释放资源channel.close();connection.close();}
}
运行生产者–运行完成,队列多了两个
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BweBd865-1663250323446)(img/02运行完成,队列多了两个.png)]
消费者模块consumer
1. 新建routing包
2. Consumer_Routing1代码
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9S6tGd2t-1663250323446)(img/03客户端01.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cGfolP42-1663250323446)(img/03客户端02.png)]
- 完整代码
package com.powernode.rabbitmq.routing;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;//消费者1
public class Consumer_Routing1 {//交换机名称static final String DIRECT_EXCHAGE = "direct_exchange";//队列名称static final String DIRECT_QUEUE_1 = "direct_queue_1";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(DIRECT_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(DIRECT_QUEUE_1, DIRECT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息存储到数据库.....");}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(DIRECT_QUEUE_1, true, consumer);//不需要关闭连接}
}
4. Consumer_Routing2代码
复制Consumer_Routing1创建Consumer_Routing2
修改队列名称 为2
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OYp8rXEw-1663250323446)(img/03-改成2.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Osjaw6DY-1663250323446)(img/03-改成打印到控制台.png)]
- 代码
package com.powernode.rabbitmq.routing;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;//消费者1
public class Consumer_Routing2 {//交换机名称static final String DIRECT_EXCHAGE = "direct_exchange";//队列名称static final String DIRECT_QUEUE_2 = "direct_queue_2";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(DIRECT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(DIRECT_QUEUE_2, DIRECT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息打印到控制台.....");}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(DIRECT_QUEUE_2, true, consumer);//不需要关闭连接}
}
启动两个(消费者)-
到IDEA的两个消费者对应的控制台查看
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-y4QkV2mM-1663250323447)(img/查看结果1.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aDH6fyrv-1663250323447)(img/查看结果2.png)]
因为我们的2队列绑定了info级别的,所以在控制台打印了
小结
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
mq第五种工作模式-Topics
根据主题(Topics)来接收消息,将路由 key 和某模式进行匹配,此时队列需要绑定在一个模式上,
#匹配一个词或多个词,*只匹配一个词。
应用场景:同上,iphone 促销活动可以接收主题为 iphone 的消息,如 iphone12、iphone13 等
生产者模块 Producer
1. 新建topics包
2. Producer_Topics代码
从复制Producer_PublishSubscribe到Producer_Topics代码
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fejmlZbo-1663250323447)(img/复制到topics生产者.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ASlqUZa9-1663250323447)(img/修改生产者topic01.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AckQdWlK-1663250323448)(img/修改生产者topic02.png)]
- 完整代码
package com.powernode.rabbitmq.topics;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer_Topics {//交换机名称static final String TOPIC_EXCHAGE = "topic_exchange";//队列名称static final String TOPIC_QUEUE_1 = "topic_queue_1";//队列名称static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,* fanout、扇形(广播),发送消息到每一个与之绑定的队列* topic、通配符方式 把消息交给符合routing pattern(路由模式) 的队列* direct、定向 把消息交给符合指定routing key 的队列* headers 参数匹配(使用的比较少,不做赘述)*///5. 创建交换机channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);//6. 创建队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);//7.队列绑定交换机/*queueBind(String queue, String exchange, String routingKey)参数:1. queue:队列名称2. exchange:交换机名称3. routingKey:路由键,绑定规则如果交换机的类型为fanout ,routingKey设置为""*/// routing key 系统的名称.日志的级别。//=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库//绑定队列1channel.queueBind(TOPIC_QUEUE_1,TOPIC_EXCHAGE,"#.error");channel.queueBind(TOPIC_QUEUE_1,TOPIC_EXCHAGE,"order.*");//绑定队列2channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHAGE,"*.*");String message = "日志信息:A调用了findAll方法...日志级别:info...";//8. 发送消息 指定routingKey 为channel.basicPublish(TOPIC_EXCHAGE, "order.info", null, message.getBytes());//9. 释放资源channel.close();connection.close();}
}
运行生产者–运行完成,队列多了两个
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cocJTljz-1663250323448)(img/控制台拿到消息.png)]
消费者模块consumer
1. 新建topics包
2. Consumer_Topic1代码
从复制Consumer_Routing1到Consumer_Topic1代码
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2a6HGRc6-1663250323448)(img/复制到topics消费者01.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Wr9Gnw33-1663250323448)(img/复制到topics消费者02.png)]
- 完整代码
package com.powernode.rabbitmq.topics;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;//消费者1
public class Consumer_Topic1 {//交换机名称static final String TOPIC_EXCHAGE = "topic_exchange";//队列名称static final String TOPIC_QUEUE_1 = "topic_queue_1";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息存储到数据库.....");}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(TOPIC_QUEUE_1, true, consumer);//不需要关闭连接}
}
4. Consumer_Topic2代码
复制Consumer_Topic1创建Consumer_Topic2
修改队列名称 为2
package com.powernode.rabbitmq.topics;import com.powernode.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*;import java.io.IOException;//消费者1 public class Consumer_Topic2 {//交换机名称static final String TOPIC_EXCHAGE = "topic_exchange";//队列名称static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(TOPIC_QUEUE_2, TOPIC_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息打印到控制台.....");}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(TOPIC_QUEUE_2, true, consumer);//不需要关闭连接} }
启动两个(消费者)-
到IDEA的两个消费者对应的控制台查看
因为满足两个路由条件,则两个控制台都收到消息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WveUg37Z-1663250323448)(img/因为满足两个路由条件,则两个控制台都收到消息1.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jqrkRNrh-1663250323449)(img/因为满足两个路由条件,则两个控制台都收到消息2.png)]
小结
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式
和 Routing路由模式
的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
忘记改交换机的类型,这里
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-odkgolQS-1663250323449)(img/交换机类型错误.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yBGgDGtA-1663250323449)(img/交换机类型错误,01.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NXEvhng5-1663250323449)(img/交换机类型错误02.png)]
模式总结
RabbitMQ工作模式:
1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)一对一
2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)一个队列的消息被多个人消费,每个人消费的消息是不够完整的,只能获取到队列的一半的消息–>例如 2-4-6-7-8
3、发布订阅模式 Publish/subscribe
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列 产生两个队列的消息被两个人消费,每个人的消息是完整的
4、路由模式 Routing
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列–只能指定一个参数,不能指定多个参数 ,例如,只能指定info
5、通配符模式 Topic
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列可以指定多个参数
6、远程过程调用(RPC)
如果我们需要在远程计算机上运行功能并等待结果就可以使用 RPC,具体流程可以看图。
应用场景:需要等待接口返回数据。
7、发布者确认(Publisher Confirms)
与发布者进行可靠的发布确认,发布者确认是 RabbitMQ 扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ 将异步确认发送者发布的消息,这意味着它们已在服务器端处理
应用场景:对于消息可靠性要求较高,比如钱包扣款
Springboot整合RabbitMQ
Springboot整合RabbitMQ生产者
1.创建生产者Springboot工程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eDhBWa8m-1663250323450)(img/创建springboot的producer模块.jpg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mBqqc5DT-1663250323450)(img/创建工程02.png)]
创建springboot-producer 工程,Groupld为com.powernode
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MhiudEeo-1663250323450)(img/创建工程03.png)]
2.引入依赖坐标
<!--引入Spring Boot的父级依赖--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.4.RELEASE</version></parent><dependencies><!--引入整合mq的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--引入单元测试依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
3.编写yml配置文件
application.yml
spring:rabbitmq:host: ip #服务地址username: powernode #账户名password: powernode #密码virtual-host: powernode # 虚拟机(默认/)port: 5672 #端口
4.编写启动类
com.powernode.ProducerApplication
- 代码
package com.powernode;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @program: rabbitmq* @ClassName: ProducerApplication* @version: 1.0* @description: 主启动类* @author: bjpowernode**/
@SpringBootApplication
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class);}
}
5.编写配置类
创建config包
创建RabbitMQConfig配置类 --(定义交换机,队列 和绑定关系)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bcWUX1Xp-1663250323450)(img/创建config包.png)]
- 代码
package com.powernode.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @program: rabbitmq* @ClassName: RabbitMQConfig* @version: 1.0* @description: 配置类* @author: bjpowernode**/
@Configuration
public class RabbitMQConfig {//指定EXCHANGE_NAME交换机 的名称public static final String EXCHANGE_NAME = "boot_topic_exchange";//指定QUEUE_NAME队列 的名称public static final String QUEUE_NAME = "boot_queue";//1.交换机@Bean("bootExchange")public Exchange bootExchange(){/*** ExchangeBuilder构建交换机对象.* topicExchange(String name) -->topicExchange-指定交换机的类型--> (String name)指定交换机的名称.* directExchange(String name)* fanoutExchange(String name)* headersExchange(String name)* .durable(true) 选择true指定为持久化* .build()为构建.*/return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//2.Queue 队列@Bean("bootQueue")public Queue bootQueue(){/*** QueueBuilder构建队列的对象.* durable()* durable(String name) --> ()指定队列的名称.* nonDurable()* nonDurable(String name)* .build()为构建.*/return QueueBuilder.durable(QUEUE_NAME).build();}//3. 队列和交互机绑定关系 Binding/**1. 指定队列2. 指定交换机3. routing key@Qualifier-->如果配置类出现多个队列,通过名称绑定参数*/@Beanpublic Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){/*** BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();* .bind(queue)通过什么队列* .to(exchange)绑定什么交换机* .with("boot.#")指定routingKey* .noargs()不需要指定参数,如果需要指定参数,调用.and()*/return BindingBuilder.bind(queue).to(exchange).with("springboot.#").noargs();}}
6.编写测试类
com.powernode.ProducerTest
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4RWgzJRD-1663250323450)(img/编写测试类.png)]
- 代码
注入RabbitTemplate–调用方法,完成发送
package com.powernode;import com.powernode.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** @program: rabbitmq* @ClassName: ProducerTest* @version: 1.0* @description: 测试类* @author: bjpowernode**/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {//1.注入RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSend(){/*** convertAndSend 发送消息* 1.参数1 exchange交换机的名称* 2.参数2 routingKey* 3.参数3 消息*/rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"springboot.java","springboot mq hello~~~");}
}
启动测试类
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Kv0wkn3a-1663250323451)(img/测试启动成功.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aEScJaM6-1663250323451)(img/springboot测试开启.png)]
7.查看mq控制台的消息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1Smet39p-1663250323451)(img/点击01.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tJgnheOm-1663250323451)(img/点击02.png)]
Springboot整合RabbitMQ消费者
1.创建消费者Springboot工程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NcJoOYqO-1663250323451)(img/创建springboot的producer模块.jpg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p9FIqxTS-1663250323452)(img/创建工程02.png)]
创建springboot-producer 工程,Groupld为com.powernode
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2GSxPn2X-1663250323452)(img/创建springboot消费者模块.png)]
2.引入依赖坐标
<!--引入Spring Boot的父级依赖--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.4.RELEASE</version></parent><dependencies><!--引入整合mq的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>
3.编写yml配置文件
application.yml
spring:rabbitmq:host: ip #服务地址username: powernode #账户名password: powernode #密码virtual-host: powernode # 虚拟机(默认/)port: 5672 #端口
4.编写启动类
com.powernode.ConsumerApplication
- 代码
package com.powernode;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @program: rabbitmq* @ClassName: ConsumerApplication* @version: 1.0* @description: 主启动类* @author: bjpowernode**/
@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class);}
}
5.创建监听类
com.powernode.listener.RabbimtMQListener
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zS1XeBnN-1663250323452)(img/监听类.png)]
- 代码
使用 完成监听
package com.powernode.listener;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @program: rabbitmq* @ClassName: RabbimtMQListener* @version: 1.0* @description: 监听* @author: bjpowernode**/
@Component
public class RabbimtMQListener {@RabbitListener(queues = "boot_queue")public void ListenerQueue(Message message){System.out.println(new String(message.getBody()));}
}
6.启动-启动类
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5Bk87gAR-1663250323452)(img/取到消息.png)]
小结
SpringBoot提供了快速整合RabbitMQ的方式
基本信息yml配置 ,队列交互以及绑定关系在配置类中使用Bean注入的方式配置(配置类在)
生产端直接注入RabbitTemplate完成消息发送
消费端直接使用@RabbitListener完成消息的接收
SpringBoot整合RabbitMQ (交换机与多个队列绑定)
1.将上面的springboot整合直接复制过来
2.生产端
RabbitMQConfig类新增-队列名称的定义[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-srO0VUD7-1663250323453)(img/多个队列02.png)]
RabbitMQConfig类新增-构建bootQueue2的队列[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YZRNINwh-1663250323453)(img/多个队列03.png)]
RabbitMQConfig类新增- 队列和交互机绑定关系 Binding[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iYAmaTbI-1663250323453)(img/多个队列04.png)]
类完整代码
package com.powernode.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** @program: rabbitmq* @ClassName: RabbitMQConfig* @version: 1.0* @description: 配置类* @author: bjpowernode**/ @Configuration public class RabbitMQConfig {//指定EXCHANGE_NAME交换机 的名称public static final String EXCHANGE_NAME = "boot_topic_exchange";//指定QUEUE_NAME队列 的名称public static final String QUEUE_NAME = "boot_queue";//指定QUEUE_NAME队列 的名称public static final String QUEUE_NAME2 = "boot_queue2";//1.交换机@Bean("bootExchange")public Exchange bootExchange(){/*** ExchangeBuilder构建交换机对象.* topicExchange(String name) -->topicExchange-指定交换机的类型--> (String name)指定交换机的名称.* directExchange(String name)* fanoutExchange(String name)* headersExchange(String name)* .durable(true) 选择true指定为持久化* .build()为构建.*/return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//2.Queue 队列@Bean("bootQueue")public Queue bootQueue(){/*** QueueBuilder构建队列的对象.* durable()* durable(String name) --> ()指定队列的名称.* nonDurable()* nonDurable(String name)* .build()为构建.*/return QueueBuilder.durable(QUEUE_NAME).build();}//2.Queue 队列@Bean("bootQueue2")public Queue bootQueue2(){/*** QueueBuilder构建队列的对象.* durable()* durable(String name) --> ()指定队列的名称.* nonDurable()* nonDurable(String name)* .build()为构建.*/return QueueBuilder.durable(QUEUE_NAME2).build();}//3. 队列和交互机绑定关系 Binding/**1. 指定队列2. 指定交换机3. routing key@Qualifier-->如果配置类出现多个队列,通过名称绑定参数*/@Beanpublic Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){/*** BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();* .bind(queue)通过什么队列* .to(exchange)绑定什么交换机* .with("boot.#")指定routingKey* .noargs()不需要指定参数,如果需要指定参数,调用.and()*//* amqpAdmin.declareQueue(new Queue(QUEUE_HI, true));amqpAdmin.declareQueue(new Queue(QUEUE_HELLO, true));*/return BindingBuilder.bind(queue).to(exchange).with("1.txt").noargs();}@Beanpublic Binding bindQueueExchange1(@Qualifier("bootQueue2") Queue queue, @Qualifier("bootExchange") Exchange exchange){/*** BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();* .bind(queue)通过什么队列* .to(exchange)绑定什么交换机* .with("boot.#")指定routingKey* .noargs()不需要指定参数,如果需要指定参数,调用.and()*/return BindingBuilder.bind(queue).to(exchange).with("1.#").noargs();}@Beanpublic Binding bindQueueExchange2(@Qualifier("bootQueue2") Queue queue, @Qualifier("bootExchange") Exchange exchange){/*** BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();* .bind(queue)通过什么队列* .to(exchange)绑定什么交换机* .with("boot.#")指定routingKey* .noargs()不需要指定参数,如果需要指定参数,调用.and()*/return BindingBuilder.bind(queue).to(exchange).with("555.zzz").noargs();}}
3.消费端
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8Z2z6Mui-1663250323453)(img/多个队列01.png)]
- 新增的代码
//跟你消息生产者的队列名称一致@RabbitListener(queues = "boot_queue2")public void ListenerQueue2(Message message){System.out.println(new String(message.getBody())+ "222");}
3.启动test测试类,进行测试
(注意:先启动生产者,因为绑定关系在消费者未配置,则消费者不能先启动–>消费者先启动,会报找不到队列异常)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-71g7HkdX-1663250323453)(img/多队列执行.png)]
RabbitMQ-集群搭建
概述
当我们在项目开发中引入了mq中间件,那中间件的高可用就变得尤为重要,但是如果我们只有单台mq,则容易出现不可用的问题,导致我们整个项目宕机.这时候,我们就想到了要部署mq中间件集群.来提高我们项目中间件的高可用.
在我们的项目当中,单机多实例的集群部署方式,被称为伪集群
前期准备-创建多节点
- 首先检查mq是否正常运行
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ejmpgIes-1663250323454)(img/检查mq是否正常运行.png)]
检查命令: -确保RabbitMQ运行没有问题
rabbitmqctl status
停止rabbitmq服务
service rabbitmq-server stop
**新建一个会话窗口–>**启动第一个节点:(前台启动方式)
RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
#web管理插件端口占用,所以还要指定其web插件占用的端口号。
RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
使用:guest 账户登录(http://ip:15673/#/)控制台查看
**新建一个会话窗口–>**启动第二个节点:(前台启动方式)
#web管理插件端口占用,所以还要指定其web插件占用的端口号。
RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start
使用:guest 账户登录(http://ip:15674/#/)控制台查看
结束命令:–(书写,但是这里不用,用下面的停止命令)
rabbitmqctl -n rabbit1 stop
rabbitmqctl -n rabbit2 stop
设置主节点操作:
新建一个会话窗口–>
rabbit1操作作为主节点:
- 停止rabbit1 节点
rabbitmqctl -n rabbit1 stop_app
- 重置rabbit1 节点
rabbitmqctl -n rabbit1 reset
- 重启rabbit1 节点
rabbitmqctl -n rabbit1 start_app
rabbit2操作作为从节点:
- 停止rabbit1 节点
rabbitmqctl -n rabbit2 stop_app
- 重置rabbit1 节点
rabbitmqctl -n rabbit2 reset
- 将rabbit2加入到集群中
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1sFquWgs-1663250323454)(img/主机名位置.png)]
rabbitmqctl -n rabbit2 join_cluster rabbit1@'iz2ze268ldc0zhjsfzajw4z'
###''内是主机名换成自己的 例如 iz2ze268ldc0zhjsfzajw4z
- 重启rabbit2 节点
rabbitmqctl -n rabbit2 start_app
添加节点加入集群设置成功
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LVfRZySM-1663250323454)(img/节点添加成功01.png)]
从两个控制台查看,–(http://123.57.41.174:15673/#/) (http://123.57.41.173:15673/#/)均添加成功
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QVV1HZmv-1663250323454)(img/控制台查看01.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FAq8jEDT-1663250323454)(img/控制台查看02.png)]
- 新增队列
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nFVkDVXl-1663250323455)(img/新增队列.png)]
RabbitMQ镜像集群配置
上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一个项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。
镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。
设置的镜像队列-(设置集群队列信息同步)可以通过开启的网页的管理端Admin->Policies,也可以通过命令
管理控制台设置
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5dSQpGOW-1663250323455)(img/设置集群队列信息同步 步骤.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mC4JQkEa-1663250323455)(img/设置集群队列信息同步01.png)]
含义解释
Name:策略名称
Pattern:匹配的规则,如果是匹配所有的队列,是^.
Definition: 镜像定义,包括三个部分 ha-mode,ha-params,ha-sync-mode
- ha-mode: 指明镜像队列的模式,有效值为 all/exactly/nodes
- all表示在集群所有的节点上进行镜像 ,也就是同步所有匹配的队列
- exactly表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
- nodes表示在指定的节点上进行镜像,节点名称通过ha-params指定
- ha-params: ha-mode模式需要用到的参数
- ha-sync-mode: 镜像队列中消息的同步方式,有效值为automatic(自动),manually (手动)
成功设置
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bN3Jxt0R-1663250323455)(img/成功设置01.png)]
- +1 表示同步到一个队列。如果有多个队列,这里就是+几
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ERucpvfL-1663250323456)(img/到悬浮窗口,看到02.jpg)]
负载均衡-HAProxy
概述
HAProxy是一个使用C语言编写的自由及开放源代码软件[1],其提供高可用性、负载均衡,以及基于TCP和HTTP的应用程序代理。
HAProxy特别适用于那些负载特大的web站点,这些站点通常又需要会话保持或七层处理。HAProxy运行在当前的硬件上,完全可以支持数以万计的并发连接。并且它的运行模式使得它可以很简单安全的整合进您当前的架构中, 同时可以保护你的web服务器不被暴露到网络上。
HAProxy实现了一种事件驱动, 单一进程模型,此模型支持非常大的并发连接数。多进程或多线程模型受内存限制 、系统调度器限制以及无处不在的锁限制,很少能处理数千并发连接。事件驱动模型因为在有更好的资源和时间管理的用户空间(User-Space) 实现所有这些任务,所以没有这些问题。此模型的弊端是,在多核系统上,这些程序通常扩展性较差。这就是为什么他们必须进行优化以 使每个CPU时间片(Cycle)做更多的工作。
包括 GitHub、Bitbucket[3]、Stack Overflow[4]、Reddit、Tumblr、Twitter[5][6]和 Tuenti[7]在内的知名网站,及亚马逊网络服务系统都使用了HAProxy。 [1]
安装-HAProxy
- 下载依赖包
yum install gcc vim wget
- 上传haproxy源码包
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i4sVAtAa-1663250323456)(img/点击上传01.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wBrdzYcA-1663250323456)(img/上传02.png)]
ls命令查看是否上传成功
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JvDsKHRl-1663250323456)(img/查看上传成功.png)]
解压
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
- 进入目录
cd /usr/local/haproxy-1.6.5
编译
make TARGET=linux31 PREFIX=/usr/local/haproxy
安装
make install PREFIX=/usr/local/haproxy
赋权
#添加组
groupadd -r -g 149 haproxy
#添加用户
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
- 创建haproxy文件夹
mkdir /etc/haproxy
- 进入haproxy配置文件
vim /etc/haproxy/haproxy.cfg
配置-HAProxy
配置文件路径:/etc/haproxy/haproxy.cfg
#logging options
globallog 127.0.0.1 local0 infomaxconn 5120chroot /usr/local/haproxyuid 99gid 99daemonquietnbproc 20pidfile /var/run/haproxy.piddefaultslog globalmode tcpoption tcplogoption dontlognullretries 3option redispatchmaxconn 2000contimeout 5sclitimeout 60ssrvtimeout 15s
#front-end IP for consumers and producters
#rabbitmq_cluster 是个名字,随意
listen rabbitmq_cluster#对外提供服务的端口号bind 0.0.0.0:5672mode tcp#balance url_param userid#balance url_param session_id check_post 64#balance hdr(User-Agent)#balance hdr(host)#balance hdr(Host) use_domain_only#balance rdp-cookie#balance leastconn#balance source //ipbalance roundrobin#两个rabbit节点和haproxy在同一个服务器可以使用127.0.0.1 当三台不在同一个服务器的时候,写具体的IP地址server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2listen stats
#Haproxy控制台的访问地址bind ip:8100mode httpoption httplogstats enablestats uri /stats refresh 5s
- 保存并退出
#esc(键退出)->:(符号输入)->wq(保存编辑操作退出)
#:wq!保存编辑强制退出
:wq
- 启动HAproxy负载
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
- 查看haproxy进程状态
ps -ef | grep haproxy
访问如下地址对mq节点进行监控
http://ip:8100/
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SYsJZ8Io-1663250323456)(img/启动成功01.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jmIOJmGR-1663250323457)(img/启动成功02.png)]
注意:代码中访问mq集群地址,则变为访问haproxy地址:5672
Haproxy启动失败
haproxy配置本机IP或0.0.0.0以外的IP,启动时报错,
错误信息
[ALERT] 252/225311 (24204) : Starting proxy stats: cannot bind socket [43.109.197.238:8100]
错误的原因
高可用虚IP配置后,无法启动。
解决方案
绑定非本机的IP需要在sysctl.conf文件中配置
vi /etc/sysctl.conf #修改内核参数
net.ipv4.ip_nonlocal_bind = 1 #没有就新增此条记录
sysctl -p #保存结果,使结果生效
创建新工程-测试
colony-rabbitmq
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xM9oeML6-1663250323457)(img/创建新工程.png)]
测试代码
package com.powernode;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @program: rabbitmq* @ClassName: Demo* @version: 1.0* @description:* @author: bjpowernode**/
public class Demo {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂11ConnectionFactory factory = new ConnectionFactory();// 2.设置参数factory.setHost("ip");//设置ipfactory.setPort(5672);//创建连接connectionConnection connection = factory.newConnection();//创建channelChannel channel = connection.createChannel();//创建队列channel.queueDeclare("hello world",true,false,false,null);String boby="hello rabbit~";//发送消息channel.basicPublish("","hello world",null,boby.getBytes());//释放资源channel.close();connection.close();System.out.println("send success....");}
}
rabbitmq完整学习-springboot整合rabbitmq相关推荐
- RabbitMQ原理及SpringBoot整合RabbitMQ
RabbitMQ原理及SpringBoot整合RabbitMQ 1. RabbitMQ环境搭建 参考:https://blog.csdn.net/u013071014/article/details/ ...
- RabbitMq详解+SpringBoot整合RabbitMq快速入门
1概述: 1.1.什么是MQ 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已. 其主要用途:不同进程Pro ...
- RabbitMQ 第一天 基础 6 SpringBoot 整合RabbitMQ
RabbitMQ [黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战] 文章目录 RabbitMQ 第一天 基础 6 SpringBoot 整合RabbitMQ 6.1 Sprin ...
- Springboot整合一之Springboot整合RabbitMQ
前言 目前,springboot已然成为了最热的java开发整合框架,主要是因其简单的配置,并且本身提供了很多与第三方框架的整合,甚至可以让我们在短短的几分钟里就可以搭建一个完整的项目架构.所以,博主 ...
- Spring Boot---(10)SpringBoot整合RabbitMQ
请参考:Spring Boot---(24)springboot整合RabbitMQ 由于docker安装非常方便,这里就用docker来安装和启动了.没接触过docker的可以参考这里:零基础学习D ...
- SpringBoot整合RabbitMq实战(一)
1 Spring AMQP 简介 Spring AMQP项目是一个引入Spring核心概念用于基于高级消息队列(AMQP)的解决方案的开发,它提供了一个模板用于发送和接受消息的高级抽象.它对基于消息驱 ...
- Springboot——整合Rabbitmq之Confirm和Return详解
文章目录 前言 为什么会有Confirm Springboot 整合 Mq 实现 Confirm 监听机制 依赖引入 增加配置文件,设定连接信息 配置队列.交换机,以及对其进行绑定 编写mq消息发送服 ...
- 九、springboot整合rabbitMQ
springboot整合rabbitMQ 简介 rabbitMQ是部署最广泛的开源消息代理. rabbitMQ轻量级,易于在内部和云中部署. 它支持多种消息传递协议. RabbitMQ可以部署在分布式 ...
- RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ
什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...
- SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka
1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...
最新文章
- Linux(lamp安装)
- 使用session防止表单进行重复提交
- phpstrom xdebug配置
- Vmware安装与使用
- mysql 分组占比_含泪整理MySQL索引
- 漫游Kafka实现篇之分布式
- 苹果mac闪退_自从Mac有了WPS,从此和双系统说再见!
- PHP设计模式之建造者模式
- violate、内存屏障
- opencv怎么2个摄像头_内脏脂肪过高怎么办?从2个方法入手,坚持3个月甩掉小肚腩...
- matlab 双曲线拟合,利用MATLAB进行logistic曲线拟合
- windows无法完成格式化怎么办?
- python实现之极限
- mysql 5.6 登录 警告_mysql登录警告问题的解决方法
- 计算机表格 求差,教大家Excel2013中表格求差函数公式怎么使用
- 云原生应用实践与未来趋势
- 利用燕尾花数据集画出P-R曲线
- 数字经济|引领建筑业数字化信息化转型
- 二维卷积网络函数con2d
- 在python中设置密码登录_如何从python脚本在linux中设置用户密码?
热门文章
- lamp phpstudy mysql_Phpstudy 搭建服务器教程
- iOS - Carthage的安装和使用,以及常见报错解决
- 【Tensorflow2.0】8、tensorflow2.0_hdf5_savedmodel_pb模型转换[2]
- debian6安装nvidia GT620显卡 驱动
- ENSP直连路由和静态路由配置(含路由表结构分析)
- 电脑位数(32位或者64位)问题导致eclipse不能正常启动
- 楼板计算塑形弹性_土木吧丨弹性与弹塑性计算差异性分析
- 8进制的乘法计算、加法计算
- python京东预约抢购_python 脚本实现京东抢购
- linux发行版_看一看2020年最漂亮的Linux发行版