文章目录

    • MQ学习
    • rabbitMq软件上传到liunx服务器
      • 基础准备
        • 1. linux的服务器-创建RabbitMq文件夹-并进入文件夹
        • 2. 上传需要的文件
        • 3. 上传成功
      • mq安装
        • 1.在线安装依赖环境:
        • 2.安装Erlang
        • 3.安装RabbitMQ
        • 4. 开启管理界面及配置
        • 5. 启动
        • 6. 配置虚拟主机及用户
          • 新建用户
          • Virtual Hosts配置
        • 7. Overview配置文件 (not found)
        • 8.端口
  • RabbitMQ入门
    • 1.1. 搭建示例工程
      • 1.1.1. 创建工程
      • 1.1.2. 添加依赖
    • 1.2.编写工具类
    • 1.3编写生产者
    • 1.3. 编写消费者
    • 1.4. 小结
  • RabbitMQ的工作模式
    • rabbitmq的官网
    • mq第二种工作模式-(工作队列模式)[Work queues](https://www.rabbitmq.com/tutorials/tutorial-two-python.html)
      • rabbitmq-producer 模块
        • 1. 新建work包
        • 2. 复制简单的模式类-修改名字为Producer_WorkQueues
        • 3. 复制简单的模式类-修改名字为Producer_WorkQueues
      • rabbitmq-consumer模块
        • 1. 新建work包
        • 2. 复制简单的模式类-修改名字为Producer_WorkQueues
        • 3. 复制简单的模式类-修改名字为Consumer_WorkQueues1
        • 4. 复制Consumer_WorkQueues1 创建Consumer_WorkQueues2
      • 小结
    • mq第三种工作模式-[Publish/Subscribe](https://www.rabbitmq.com/tutorials/tutorial-three-python.html)
      • 生产者模块 Producer
        • 1. 新建ps包
        • 2. Producer_PublishSubscribe代码
      • 消费者模块consumer
        • 1. 新建ps包
        • 2. Consumer_PublishSubscribe1代码
        • 4. Consumer_PublishSubscribe2代码
      • 小结
    • mq第四种工作模式-[Routing](https://www.rabbitmq.com/tutorials/tutorial-four-python.html)
      • 生产者模块 Producer
        • 1. 新建routing包
        • 2. Producer_Routing代码
      • 消费者模块consumer
        • 1. 新建routing包
        • 2. Consumer_Routing1代码
        • 4. Consumer_Routing2代码
      • 小结
    • mq第五种工作模式-[Topics](https://www.rabbitmq.com/tutorials/tutorial-five-python.html)
      • 生产者模块 Producer
        • 1. 新建topics包
        • 2. Producer_Topics代码
      • 消费者模块consumer
        • 1. 新建topics包
        • 2. Consumer_Topic1代码
        • 4. Consumer_Topic2代码
      • 小结
    • 忘记改交换机的类型,这里
    • 模式总结
  • Springboot整合RabbitMQ
    • Springboot整合RabbitMQ生产者
      • 1.创建生产者Springboot工程
      • 2.引入依赖坐标
      • 3.编写yml配置文件
      • 4.编写启动类
      • 5.编写配置类
      • 6.编写测试类
      • 7.查看mq控制台的消息
    • Springboot整合RabbitMQ消费者
      • 1.创建消费者Springboot工程
      • 2.引入依赖坐标
      • 3.编写yml配置文件
      • 4.编写启动类
      • 5.创建监听类
      • 6.启动-启动类
    • 小结
  • SpringBoot整合RabbitMQ (交换机与多个队列绑定)
    • 1.将上面的springboot整合直接复制过来
    • 2.生产端
    • 3.消费端
    • 3.启动test测试类,进行测试
  • RabbitMQ-集群搭建
    • 概述
      • 前期准备-创建多节点
      • 设置主节点操作:
        • rabbit1操作作为主节点:
        • rabbit2操作作为从节点:
        • RabbitMQ镜像集群配置
          • 管理控制台设置
          • 含义解释
  • 负载均衡-HAProxy
    • 概述
    • 安装-HAProxy
    • 配置-HAProxy
    • Haproxy启动失败
      • 错误信息
        • 解决方案
    • 创建新工程-测试

MQ学习

rabbitMq软件上传到liunx服务器

基础准备

1. linux的服务器-创建RabbitMq文件夹-并进入文件夹

 mkdir rabbitmq

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FAVDxMuM-1663250323428)(img\创建文件夹.png)]

2. 上传需要的文件

erlang-18.3-1.el7.centos.x86_64.rpm
socat-1.7.3.2-5.el7.lux.x86_64.rpm
rabbitmq-server-3.6.5-1.noarch.rpm

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fUg8vYcO-1663250323429)(img\上传文件到linux.png)]

3. 上传成功

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ir39NTsq-1663250323429)(img\上传成功.png)]

mq安装

1.在线安装依赖环境:
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dnarnf7j-1663250323429)(img\安装日志.png)]

2.安装Erlang
# 安装
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
  • 如果出现如下错误

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PBwr3Dzd-1663250323430)(img\错误.png)]

  • 说明gblic 版本太低。我们可以查看当前机器的gblic 版本
strings /lib64/libc.so.6 | grep GLIBC

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Tk1zSpmC-1663250323430)(img\最高版本.png)]

需要升级glibc

  • 使用yum更新安装依赖
sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make -y
  • 安装rpm包
sudo rpm -Uvh *-2.17-55.el6.x86_64.rpm --force --nodeps
  • 安装完毕后再查看glibc版本
strings /lib64/libc.so.6 | grep GLIBC
3.安装RabbitMQ
# 安装  -soca
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm# 安装  -mq
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

执行 出现如下错误:

error: open of socat-1.7.3.2-1.1.el7.x86_64.rpm failed: No such file or directory

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PxNpoIvF-1663250323430)(img\错误2.png)]

  • 执行如下命令

    yum -y install tcp_wrappers
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-roKNBuZL-1663250323431)(img\执行.png)]

再执行安装RabbitMQ -socat:

rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm

再执行安装RabbitMQ :

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MzlRAZZk-1663250323431)(img\执行安装.png)]

4. 开启管理界面及配置
# 开启管理界面
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
linux不保存退出的命令是q!。
1、打开linux系统,在linux的桌面的空白处右击。2、在弹出的下拉选项里,点击打开终端。3、在终端窗口输入vi+文件名打开需要编辑的文件。4、通过vi更改文件后,按ESC键退出insert模式,输入冒号(:),在冒号后面输入q!命令,回车即可不保存退出。
linux保存退出
1、打开linux系统,在linux的桌面的空白处右击。2、在弹出的下拉选项里,点击打开终端。3、在终端窗口输入vi+文件名打开需要编辑的文件。4、通过vi更改文件后,按ESC键退出insert模式,输入冒号(:),在冒号后面输入wq命令,回车即可保存退出。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WuCVepr9-1663250323431)(img\放开.png)]

查看命令

 cat /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4xaHQAvU-1663250323431)(img\修改成功.png)]

5. 启动
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YYk3xc4C-1663250323432)(img\启动成功.png)]

如果这里访问不到: (http://ip:15672/ )关闭防火墙

service iptables stop

防火墙关闭后还是无法访问: 重启mq

service rabbitmq-server restart

访问 http://ip:15672/#/

登录名: guest
密码: guest

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CsicGaJs-1663250323432)(img\成功登录控制台.png)]

6. 配置虚拟主机及用户

RabbitMQ在安装好后,可以访问http://ip地址:15672 ;其自带了guest/guest的用户名和密码;

我们创建自定义用户;也可以登录管理界面:

新建用户

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2HQH5Nl5-1663250323432)(img\设置用户.png)]

角色说明

1、 超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

2、 监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

3、 策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

4、 普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

5、 其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

Virtual Hosts配置

像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器(类似于mysql的库),每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。

  • 创建Virtual Hosts

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4PZ9tJ52-1663250323432)(img\设置.png)]

  • 设置Virtual Hosts权限

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eHpNT4tW-1663250323433)(img\点击虚拟机.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-J8hwCcan-1663250323433)(img\打开.png)]

点击set permission

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D2ecQYUW-1663250323433)(img\set.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KC6bGRO3-1663250323433)(img\成功.png)]

登录为abc用户

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yZP8tlID-1663250323434)(img\登录为abc用户.png)]

7. Overview配置文件 (not found)

/etc/rabbitmq/rabbitmq.config (not found)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FP2tvwVF-1663250323434)(img\找不到配置.png)]

  • 设置配置文件步骤
  • 1.cd找到配置文件所在目录
cd /usr/share/doc/rabbitmq-server-3.6.5/

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MBXk5p1B-1663250323434)(img\找到了文件.png)]

  • 2.将配置文件copy到指定位置
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
  • 3.重启生效

    service rabbitmq-server restart
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YzRonbiy-1663250323434)(img\成功解决.png)]

8.端口

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OMLR2yz7-1663250323435)(img\端口.png)]

注意: 15672是控制台的端口. 25672是集群端口 5672是tcp连接的端口.

RabbitMQ入门

1.1. 搭建示例工程

1.1.1. 创建工程

  • 创建一个空工程rabbitmq

  • 创建rabbitmq-producer 工程

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cTruYwyJ-1663250323435)(img/创建工程1.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jJUESYZF-1663250323435)(img/创建工程2.png)]

  • 创建rabbitmq-consumer 工程

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HFXkXCUa-1663250323435)(img/创建工程3.png)]

1.1.2. 添加依赖

往powernode-rabbitmq的rabbitmq-producer 工程和rabbitmq-consumer 工程的pom.xml文件中添加如下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.powernode</groupId><artifactId>rabbitmq-consumer</artifactId><version>1.0-SNAPSHOT</version><dependencies><!--rabbitmq 的java客户端依赖--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZBCcw0E2-1663250323435)(img/添加pom的依赖.png)]

1.2.编写工具类

(com.powernode.rabbitmq.util.ConnectionUtil)

package com.powernode.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @program: rabbitmq* @ClassName: ConnectionUtil* @version: 1.0* @description:* @author: bjpowernode**/
public class ConnectionUtil {public static Connection getConnection() throws Exception {//创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//主机地址;默认为 localhostconnectionFactory.setHost("123.57.41.174");//连接端口;默认为 5672connectionFactory.setPort(5672);//虚拟主机名称;默认为 /connectionFactory.setVirtualHost("powernode");//连接用户名;默认为guestconnectionFactory.setUsername("powernode");//连接密码;默认为guestconnectionFactory.setPassword("powernode");//创建连接return connectionFactory.newConnection();}
}

1.3编写生产者

编写消息生产者com.powernode.rabbitmq.simple.Producer

package com.powernode.rabbitmq.simple;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** @program: rabbitmq* @ClassName: Producer* @version: 1.0* @description:* @author: bjpowernode**/
public class Producer {//队列名称 (simple_queue 简单队列)static final String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws Exception {//工具类获取连接Connection connection = ConnectionUtil.getConnection();//4. 创建ChannelChannel channel = connection.createChannel();//5. 创建队列Queue/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)参数:1. queue:队列名称2. durable:是否持久化,当mq重启之后,还在3. exclusive:* 是否独占本次连接。只能有一个消费者监听这队列* 当Connection关闭时,是否删除队列*4. autoDelete:是否在不使用的时候自动删除队列。当没有Consumer时,自动删除掉5. arguments:队列其它参数*///如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建channel.queueDeclare(QUEUE_NAME, true, false, false, null);String message = "hello rabbitmq~~~";/*basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)参数:1. exchange:交换机名称。简单模式下交换机会使用默认的 ""2. routingKey:路由名称3. props:配置信息4. body:发送消息数据*///6. 发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//7.释放资源channel.close();connection.close();}
}

1.3. 编写消费者

编写消息的消费者com.powernode.rabbitmq.simple.Consumer

package com.powernode.rabbitmq.simple;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** @program: rabbitmq* @ClassName: Consumer* @version: 1.0* @description:* @author: bjpowernode**/
public class Consumer {//队列名称 (simple_queue 简单队列)static final String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws Exception {//工具类获取连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {/***  回调方法,当收到消息后,会自动执行该方法* consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//consumerTagSystem.out.println("consumerTag:" + consumerTag);//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息为:" + new String(body));}};//监听消息/*** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数1:queue:队列名称* 参数2: autoAck:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:callback:消息接收到后回调对象*/channel.basicConsume(QUEUE_NAME, true, consumer);//不关闭资源,应该一直监听消息
//        channel.close();
//        connection.close();}
}

1.4. 小结

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

RabbitMQ的工作模式

rabbitmq的官网

  • rabbitmq的官网:https://www.rabbitmq.com/
  • 进入MQ的工作模式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wfGDYXOJ-1663250323439)(img/点击1.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xFinOBtV-1663250323439)(img/点击2.png)]

  • MQ的七种工作模式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-A9OSRSGB-1663250323439)(img/工作模式.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V5uJS7de-1663250323440)(img/第七种.png)]

工作模式概述:其实就是消息路由分发的一种方式

七种工作模式:(

​ 简单模式(Hello World)

​ 工作队列模式(Work queues)

​ 订阅模式(Publish/Subscribe)

​ 路由模式(Routing)

​ 主题模式(Topics)

​ 远程过程调用(RPC)

​ 发布者确认(Publisher Confirms))

刚才我们已经实现了第一种模式。[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zVU88XkE-1663250323440)(img/01.png)]

(简单模式)“Hello World!”

做最简单的事情,一个生产者对应一个消费者,RabbitMQ 相当于一个消息代理,负责将 A 的消息转发给 B 应用场景:将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人。

现在我们实现第二种

mq第二种工作模式-(工作队列模式)Work queues

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YNw5GVkG-1663250323440)(img/02.png)]

work_queues概念:

Work Queues在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理 应用场景:一个订单的处理需要 10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况。注意:一条消息只能被一个消费者消费,不能被多个消费者重复消费;应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

rabbitmq-producer 模块

1. 新建work包

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pHLlgBxR-1663250323440)(img/新建work包.png)]

2. 复制简单的模式类-修改名字为Producer_WorkQueues

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iciMR6li-1663250323441)(img/复制简单改名work.png)]

3. 复制简单的模式类-修改名字为Producer_WorkQueues
  • 修改队列名称-

  • 增加for循环,一次新建10条消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8Qco47zr-1663250323441)(img/修改队列名称.png)]

  • 完整代码
package com.powernode.rabbitmq.work;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;/*** @program: rabbitmq* @ClassName: Producer* @version: 1.0* @description:* @author: bjpowernode**/
public class Producer_WorkQueues {//队列名称 (work_queues 工作队列)static final String QUEUE_NAME = "work_queues";public static void main(String[] args) throws Exception {//工具类获取连接Connection connection = ConnectionUtil.getConnection();//4. 创建ChannelChannel channel = connection.createChannel();//5. 创建队列Queue//如果没有一个名字叫work_queues的队列,则会创建该队列,如果有则不会创建channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 0; i <= 10 ; i++) {String message = i+"你好,hello WorkQueues";//6. 发送消息channel.basicPublish("", QUEUE_NAME, null,message.getBytes("UTF-8"));}//7.释放资源channel.close();connection.close();}
}

rabbitmq-consumer模块

1. 新建work包

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-h9OA60nV-1663250323441)(img/新建work包2.png)]

2. 复制简单的模式类-修改名字为Producer_WorkQueues

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Dcl6oPgE-1663250323441)(img/复制简单改名work.png)]

3. 复制简单的模式类-修改名字为Consumer_WorkQueues1
  • 修改队列名称-

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x9jqd28q-1663250323441)(img/修改队列名称1.png)]

  • 完整代码
package com.powernode.rabbitmq.work;import com.powernode.rabbitmq.simple.Producer;
import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** @program: rabbitmq* @ClassName: Consumer* @version: 1.0* @description:* @author: bjpowernode**/
public class Consumer_WorkQueues1 {//队列名称 (simple_queue 简单队列)static final String QUEUE_NAME = "work_queues";public static void main(String[] args) throws Exception {//工具类获取连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {/***  回调方法,当收到消息后,会自动执行该方法* consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//consumerTag/*    System.out.println("consumerTag:" + consumerTag);//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());*///收到的消息System.out.println("接收到的消息为:" + new String(body,"UTF-8"));}};//监听消息/*** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数1:queue:队列名称* 参数2: autoAck:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:callback:消息接收到后回调对象*/channel.basicConsume(QUEUE_NAME, true, consumer);//不关闭资源,应该一直监听消息
//        channel.close();
//        connection.close();}
}
4. 复制Consumer_WorkQueues1 创建Consumer_WorkQueues2

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fJEgUKlm-1663250323442)(img/新建2.png)]

首先启动两个客户端(消费者)-Consumer_WorkQueues2和Consumer_WorkQueues1

然后启动服务端(生产者)-Producer_WorkQueues

到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息,可以看到是顺序消费Consumer_WorkQueues1消费了消息0-2-4-6-8-10

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-251YZfhk-1663250323442)(img/消费成功.png)]

Consumer_WorkQueues2消费了消息1-3-5-7-9

小结

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

mq第三种工作模式-Publish/Subscribe

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NYlpVGFL-1663250323442)(img/03.png)]

Publish/Subscribe(发布订阅模式)概念:

一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。 应用场景:更新商品库存后需要通知多个缓存和多个数据库

前面2个案例中,只有3个角色:

而在订阅模型中,多了一个exchange角色,而且过程略有变化:

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

  • 发布订阅模式:
    1、每个消费者监听自己的队列。
    2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
    到消息

生产者模块 Producer

1. 新建ps包
2. Producer_PublishSubscribe代码
  • 完整代码
package com.powernode.rabbitmq.ps;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** 发布与订阅使用的交换机类型为:fanout*/
public class Producer_PublishSubscribe {//交换机名称static final String FANOUT_EXCHAGE = "fanout_exchange";//队列名称static final String FANOUT_QUEUE_1 = "fanout_queue_1";//队列名称static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,*                      fanout、扇形(广播),发送消息到每一个与之绑定的队列*                      topic、通配符方式 把消息交给符合routing pattern(路由模式) 的队列*                      direct、定向 把消息交给符合指定routing key 的队列*                      headers 参数匹配(使用的比较少,不做赘述)*///5. 创建交换机channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);//6. 创建队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);//7.队列绑定交换机/*queueBind(String queue, String exchange, String routingKey)参数:1. queue:队列名称2. exchange:交换机名称3. routingKey:路由键,绑定规则如果交换机的类型为fanout ,routingKey设置为""*/channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");String message = "日志信息:A调用了findAll方法...日志级别:info...";//8. 发送消息channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());//9. 释放资源channel.close();connection.close();}
}

运行生产者–运行完成,队列多了两个

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0spEDPGM-1663250323442)(img/02运行完成,队列多了两个.png)]

消费者模块consumer

1. 新建ps包
2. Consumer_PublishSubscribe1代码
  • 完整代码
package com.powernode.rabbitmq.ps;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
//消费者1
public class Consumer_PublishSubscribe1 {//交换机名称static final String FANOUT_EXCHAGE = "fanout_exchange";//队列名称static final String FANOUT_QUEUE_1 = "fanout_queue_1";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息打印到控制台.....");}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(FANOUT_QUEUE_1, true, consumer);//不需要关闭连接}
}
4. Consumer_PublishSubscribe2代码
  • 复制Consumer_PublishSubscribe1创建Consumer_PublishSubscribe2

  • 修改队列名称 为2

    package com.powernode.rabbitmq.ps;import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    //消费者2
    public class Consumer_PublishSubscribe2 {//交换机名称static final String FANOUT_EXCHAGE = "fanout_exchange";//队列名称static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息打印到控制台.....");}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(FANOUT_QUEUE_2, true, consumer);//不需要关闭连接}
    }
    

启动两个(消费者)-

到IDEA的两个消费者对应的控制台查看

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9pv0hzW1-1663250323443)(img/02消费者01.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5JYa6os6-1663250323443)(img/02消费者02.png)]

小结

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

mq第四种工作模式-Routing

Routing(路由模式)概念:

有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由 key ,消费者将队列绑定到交换机时需要指定路由 key,仅消费指定路由 key 的消息 应用场景:如在商品库存中增加了 1 台 iphone12,iphone12 促销活动消费者指定 routing key 为 iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此 routing key 的消息

生产者模块 Producer

1. 新建routing包
2. Producer_Routing代码

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eiud32nH-1663250323443)(img/指定模式为direct01.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MTKRt6L3-1663250323445)(img/指定模式为direct02.png)]

  • 完整代码
package com.powernode.rabbitmq.routing;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** 发布与订阅使用的交换机类型为:fanout*/
public class Producer_Routing {//交换机名称static final String DIRCET_EXCHAGE = "dircet_exchange";//队列名称static final String DIRCET_QUEUE_1 = "dircet_queue_1";//队列名称static final String DIRCET_QUEUE_2 = "dircet_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,*                      fanout、扇形(广播),发送消息到每一个与之绑定的队列*                      topic、通配符方式 把消息交给符合routing pattern(路由模式) 的队列*                      direct、定向 把消息交给符合指定routing key 的队列*                      headers 参数匹配(使用的比较少,不做赘述)*///5. 创建交换机channel.exchangeDeclare(DIRCET_EXCHAGE, BuiltinExchangeType.DIRECT);//6. 创建队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(DIRCET_QUEUE_1, true, false, false, null);channel.queueDeclare(DIRCET_QUEUE_2, true, false, false, null);//7.队列绑定交换机/*queueBind(String queue, String exchange, String routingKey)参数:1. queue:队列名称2. exchange:交换机名称3. routingKey:路由键,绑定规则如果交换机的类型为fanout ,routingKey设置为""*///队列一 绑定了errorchannel.queueBind(DIRCET_QUEUE_1, DIRCET_EXCHAGE, "error");//队列二 绑定了 error  info warningchannel.queueBind(DIRCET_QUEUE_2, DIRCET_EXCHAGE, "error");channel.queueBind(DIRCET_QUEUE_2, DIRCET_EXCHAGE, "info");channel.queueBind(DIRCET_QUEUE_2, DIRCET_EXCHAGE, "warning");String message = "日志信息:A调用了findAll方法...日志级别:info...";//8. 发送消息-并且指定 routingKey参数为channel.basicPublish(DIRCET_EXCHAGE, "info", null, message.getBytes());//9. 释放资源channel.close();connection.close();}
}

运行生产者–运行完成,队列多了两个

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BweBd865-1663250323446)(img/02运行完成,队列多了两个.png)]

消费者模块consumer

1. 新建routing包
2. Consumer_Routing1代码

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9S6tGd2t-1663250323446)(img/03客户端01.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cGfolP42-1663250323446)(img/03客户端02.png)]

  • 完整代码
package com.powernode.rabbitmq.routing;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;//消费者1
public class Consumer_Routing1 {//交换机名称static final String DIRECT_EXCHAGE = "direct_exchange";//队列名称static final String DIRECT_QUEUE_1 = "direct_queue_1";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(DIRECT_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(DIRECT_QUEUE_1, DIRECT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息存储到数据库.....");}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(DIRECT_QUEUE_1, true, consumer);//不需要关闭连接}
}
4. Consumer_Routing2代码

复制Consumer_Routing1创建Consumer_Routing2

修改队列名称 为2

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OYp8rXEw-1663250323446)(img/03-改成2.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Osjaw6DY-1663250323446)(img/03-改成打印到控制台.png)]

  • 代码
package com.powernode.rabbitmq.routing;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;//消费者1
public class Consumer_Routing2 {//交换机名称static final String DIRECT_EXCHAGE = "direct_exchange";//队列名称static final String DIRECT_QUEUE_2 = "direct_queue_2";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(DIRECT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(DIRECT_QUEUE_2, DIRECT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息打印到控制台.....");}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(DIRECT_QUEUE_2, true, consumer);//不需要关闭连接}
}

启动两个(消费者)-

到IDEA的两个消费者对应的控制台查看

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-y4QkV2mM-1663250323447)(img/查看结果1.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aDH6fyrv-1663250323447)(img/查看结果2.png)]

因为我们的2队列绑定了info级别的,所以在控制台打印了

小结

Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。

mq第五种工作模式-Topics

Topics(主题模式)概念:

根据主题(Topics)来接收消息,将路由 key 和某模式进行匹配,此时队列需要绑定在一个模式上,
#匹配一个词或多个词,*只匹配一个词。
应用场景:同上,iphone 促销活动可以接收主题为 iphone 的消息,如 iphone12、iphone13 等

生产者模块 Producer

1. 新建topics包
2. Producer_Topics代码

从复制Producer_PublishSubscribe到Producer_Topics代码

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fejmlZbo-1663250323447)(img/复制到topics生产者.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ASlqUZa9-1663250323447)(img/修改生产者topic01.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AckQdWlK-1663250323448)(img/修改生产者topic02.png)]

  • 完整代码
package com.powernode.rabbitmq.topics;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer_Topics {//交换机名称static final String TOPIC_EXCHAGE = "topic_exchange";//队列名称static final String TOPIC_QUEUE_1 = "topic_queue_1";//队列名称static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,*                      fanout、扇形(广播),发送消息到每一个与之绑定的队列*                      topic、通配符方式 把消息交给符合routing pattern(路由模式) 的队列*                      direct、定向 把消息交给符合指定routing key 的队列*                      headers 参数匹配(使用的比较少,不做赘述)*///5. 创建交换机channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);//6. 创建队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);//7.队列绑定交换机/*queueBind(String queue, String exchange, String routingKey)参数:1. queue:队列名称2. exchange:交换机名称3. routingKey:路由键,绑定规则如果交换机的类型为fanout ,routingKey设置为""*/// routing key  系统的名称.日志的级别。//=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库//绑定队列1channel.queueBind(TOPIC_QUEUE_1,TOPIC_EXCHAGE,"#.error");channel.queueBind(TOPIC_QUEUE_1,TOPIC_EXCHAGE,"order.*");//绑定队列2channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHAGE,"*.*");String message = "日志信息:A调用了findAll方法...日志级别:info...";//8. 发送消息 指定routingKey 为channel.basicPublish(TOPIC_EXCHAGE, "order.info", null, message.getBytes());//9. 释放资源channel.close();connection.close();}
}

运行生产者–运行完成,队列多了两个

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cocJTljz-1663250323448)(img/控制台拿到消息.png)]

消费者模块consumer

1. 新建topics包
2. Consumer_Topic1代码

从复制Consumer_Routing1到Consumer_Topic1代码

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2a6HGRc6-1663250323448)(img/复制到topics消费者01.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Wr9Gnw33-1663250323448)(img/复制到topics消费者02.png)]

  • 完整代码
package com.powernode.rabbitmq.topics;import com.powernode.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;//消费者1
public class Consumer_Topic1 {//交换机名称static final String TOPIC_EXCHAGE = "topic_exchange";//队列名称static final String TOPIC_QUEUE_1 = "topic_queue_1";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息存储到数据库.....");}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(TOPIC_QUEUE_1, true, consumer);//不需要关闭连接}
}
4. Consumer_Topic2代码
  • 复制Consumer_Topic1创建Consumer_Topic2

  • 修改队列名称 为2

    package com.powernode.rabbitmq.topics;import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;import java.io.IOException;//消费者1
    public class Consumer_Topic2 {//交换机名称static final String TOPIC_EXCHAGE = "topic_exchange";//队列名称static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(TOPIC_QUEUE_2, TOPIC_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息打印到控制台.....");}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(TOPIC_QUEUE_2, true, consumer);//不需要关闭连接}
    }

启动两个(消费者)-

到IDEA的两个消费者对应的控制台查看

因为满足两个路由条件,则两个控制台都收到消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WveUg37Z-1663250323448)(img/因为满足两个路由条件,则两个控制台都收到消息1.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jqrkRNrh-1663250323449)(img/因为满足两个路由条件,则两个控制台都收到消息2.png)]

小结

Topic主题模式可以实现 Publish/Subscribe发布与订阅模式Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。

忘记改交换机的类型,这里

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-odkgolQS-1663250323449)(img/交换机类型错误.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yBGgDGtA-1663250323449)(img/交换机类型错误,01.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NXEvhng5-1663250323449)(img/交换机类型错误02.png)]

模式总结

RabbitMQ工作模式:
1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)一对一

2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)一个队列的消息被多个人消费,每个人消费的消息是不够完整的,只能获取到队列的一半的消息–>例如 2-4-6-7-8

3、发布订阅模式 Publish/subscribe
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列 产生两个队列的消息被两个人消费,每个人的消息是完整的

4、路由模式 Routing
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列–只能指定一个参数,不能指定多个参数 ,例如,只能指定info

5、通配符模式 Topic
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列可以指定多个参数

6、远程过程调用(RPC)
如果我们需要在远程计算机上运行功能并等待结果就可以使用 RPC,具体流程可以看图。

应用场景:需要等待接口返回数据。

7、发布者确认(Publisher Confirms)

与发布者进行可靠的发布确认,发布者确认是 RabbitMQ 扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ 将异步确认发送者发布的消息,这意味着它们已在服务器端处理

应用场景:对于消息可靠性要求较高,比如钱包扣款

Springboot整合RabbitMQ

Springboot整合RabbitMQ生产者

1.创建生产者Springboot工程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eDhBWa8m-1663250323450)(img/创建springboot的producer模块.jpg)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mBqqc5DT-1663250323450)(img/创建工程02.png)]

创建springboot-producer 工程,Groupld为com.powernode

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MhiudEeo-1663250323450)(img/创建工程03.png)]

2.引入依赖坐标
<!--引入Spring Boot的父级依赖--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.4.RELEASE</version></parent><dependencies><!--引入整合mq的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--引入单元测试依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
3.编写yml配置文件

application.yml

spring:rabbitmq:host: ip #服务地址username: powernode #账户名password: powernode #密码virtual-host: powernode  # 虚拟机(默认/)port: 5672  #端口
4.编写启动类

com.powernode.ProducerApplication

  • 代码
package com.powernode;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @program: rabbitmq* @ClassName: ProducerApplication* @version: 1.0* @description: 主启动类* @author: bjpowernode**/
@SpringBootApplication
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class);}
}
5.编写配置类

创建config包

创建RabbitMQConfig配置类 --(定义交换机,队列 和绑定关系)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bcWUX1Xp-1663250323450)(img/创建config包.png)]

  • 代码
package com.powernode.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @program: rabbitmq* @ClassName: RabbitMQConfig* @version: 1.0* @description: 配置类* @author: bjpowernode**/
@Configuration
public class RabbitMQConfig {//指定EXCHANGE_NAME交换机 的名称public static final String EXCHANGE_NAME = "boot_topic_exchange";//指定QUEUE_NAME队列 的名称public static final String QUEUE_NAME = "boot_queue";//1.交换机@Bean("bootExchange")public Exchange bootExchange(){/*** ExchangeBuilder构建交换机对象.*          topicExchange(String name) -->topicExchange-指定交换机的类型--> (String name)指定交换机的名称.*          directExchange(String name)*          fanoutExchange(String name)*          headersExchange(String name)*  .durable(true) 选择true指定为持久化*  .build()为构建.*/return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//2.Queue 队列@Bean("bootQueue")public Queue bootQueue(){/*** QueueBuilder构建队列的对象.*          durable()*          durable(String name) --> ()指定队列的名称.*          nonDurable()*          nonDurable(String name)*  .build()为构建.*/return QueueBuilder.durable(QUEUE_NAME).build();}//3. 队列和交互机绑定关系 Binding/**1. 指定队列2. 指定交换机3. routing key@Qualifier-->如果配置类出现多个队列,通过名称绑定参数*/@Beanpublic Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){/*** BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();* .bind(queue)通过什么队列* .to(exchange)绑定什么交换机* .with("boot.#")指定routingKey* .noargs()不需要指定参数,如果需要指定参数,调用.and()*/return BindingBuilder.bind(queue).to(exchange).with("springboot.#").noargs();}}
6.编写测试类

com.powernode.ProducerTest

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4RWgzJRD-1663250323450)(img/编写测试类.png)]

  • 代码

注入RabbitTemplate–调用方法,完成发送

package com.powernode;import com.powernode.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** @program: rabbitmq* @ClassName: ProducerTest* @version: 1.0* @description: 测试类* @author: bjpowernode**/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {//1.注入RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSend(){/*** convertAndSend 发送消息*      1.参数1 exchange交换机的名称*      2.参数2 routingKey*      3.参数3 消息*/rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"springboot.java","springboot mq hello~~~");}
}

启动测试类

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Kv0wkn3a-1663250323451)(img/测试启动成功.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aEScJaM6-1663250323451)(img/springboot测试开启.png)]

7.查看mq控制台的消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1Smet39p-1663250323451)(img/点击01.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tJgnheOm-1663250323451)(img/点击02.png)]

Springboot整合RabbitMQ消费者

1.创建消费者Springboot工程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NcJoOYqO-1663250323451)(img/创建springboot的producer模块.jpg)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p9FIqxTS-1663250323452)(img/创建工程02.png)]

创建springboot-producer 工程,Groupld为com.powernode

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2GSxPn2X-1663250323452)(img/创建springboot消费者模块.png)]

2.引入依赖坐标
    <!--引入Spring Boot的父级依赖--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.4.RELEASE</version></parent><dependencies><!--引入整合mq的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>
3.编写yml配置文件

application.yml

spring:rabbitmq:host: ip #服务地址username: powernode #账户名password: powernode #密码virtual-host: powernode  # 虚拟机(默认/)port: 5672  #端口
4.编写启动类

com.powernode.ConsumerApplication

  • 代码
package com.powernode;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @program: rabbitmq* @ClassName: ConsumerApplication* @version: 1.0* @description: 主启动类* @author: bjpowernode**/
@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class);}
}
5.创建监听类
com.powernode.listener.RabbimtMQListener

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zS1XeBnN-1663250323452)(img/监听类.png)]

  • 代码

使用 完成监听

package com.powernode.listener;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @program: rabbitmq* @ClassName: RabbimtMQListener* @version: 1.0* @description: 监听* @author: bjpowernode**/
@Component
public class RabbimtMQListener {@RabbitListener(queues = "boot_queue")public void ListenerQueue(Message message){System.out.println(new String(message.getBody()));}
}
6.启动-启动类

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5Bk87gAR-1663250323452)(img/取到消息.png)]

小结

SpringBoot提供了快速整合RabbitMQ的方式

基本信息yml配置 ,队列交互以及绑定关系在配置类中使用Bean注入的方式配置(配置类在)

生产端直接注入RabbitTemplate完成消息发送

消费端直接使用@RabbitListener完成消息的接收

SpringBoot整合RabbitMQ (交换机与多个队列绑定)

1.将上面的springboot整合直接复制过来

2.生产端

  • RabbitMQConfig类新增-队列名称的定义[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-srO0VUD7-1663250323453)(img/多个队列02.png)]

  • RabbitMQConfig类新增-构建bootQueue2的队列[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YZRNINwh-1663250323453)(img/多个队列03.png)]

  • RabbitMQConfig类新增- 队列和交互机绑定关系 Binding[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iYAmaTbI-1663250323453)(img/多个队列04.png)]

  • ​ 类完整代码

    package com.powernode.config;import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;/*** @program: rabbitmq* @ClassName: RabbitMQConfig* @version: 1.0* @description: 配置类* @author: bjpowernode**/
    @Configuration
    public class RabbitMQConfig {//指定EXCHANGE_NAME交换机 的名称public static final String EXCHANGE_NAME = "boot_topic_exchange";//指定QUEUE_NAME队列 的名称public static final String QUEUE_NAME = "boot_queue";//指定QUEUE_NAME队列 的名称public static final String QUEUE_NAME2 = "boot_queue2";//1.交换机@Bean("bootExchange")public Exchange bootExchange(){/*** ExchangeBuilder构建交换机对象.*          topicExchange(String name) -->topicExchange-指定交换机的类型--> (String name)指定交换机的名称.*          directExchange(String name)*          fanoutExchange(String name)*          headersExchange(String name)*  .durable(true) 选择true指定为持久化*  .build()为构建.*/return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//2.Queue 队列@Bean("bootQueue")public Queue bootQueue(){/*** QueueBuilder构建队列的对象.*          durable()*          durable(String name) --> ()指定队列的名称.*          nonDurable()*          nonDurable(String name)*  .build()为构建.*/return QueueBuilder.durable(QUEUE_NAME).build();}//2.Queue 队列@Bean("bootQueue2")public Queue bootQueue2(){/*** QueueBuilder构建队列的对象.*          durable()*          durable(String name) --> ()指定队列的名称.*          nonDurable()*          nonDurable(String name)*  .build()为构建.*/return QueueBuilder.durable(QUEUE_NAME2).build();}//3. 队列和交互机绑定关系 Binding/**1. 指定队列2. 指定交换机3. routing key@Qualifier-->如果配置类出现多个队列,通过名称绑定参数*/@Beanpublic Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){/*** BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();* .bind(queue)通过什么队列* .to(exchange)绑定什么交换机* .with("boot.#")指定routingKey* .noargs()不需要指定参数,如果需要指定参数,调用.and()*//*    amqpAdmin.declareQueue(new Queue(QUEUE_HI, true));amqpAdmin.declareQueue(new Queue(QUEUE_HELLO, true));*/return BindingBuilder.bind(queue).to(exchange).with("1.txt").noargs();}@Beanpublic Binding bindQueueExchange1(@Qualifier("bootQueue2") Queue queue, @Qualifier("bootExchange") Exchange exchange){/*** BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();* .bind(queue)通过什么队列* .to(exchange)绑定什么交换机* .with("boot.#")指定routingKey* .noargs()不需要指定参数,如果需要指定参数,调用.and()*/return BindingBuilder.bind(queue).to(exchange).with("1.#").noargs();}@Beanpublic Binding bindQueueExchange2(@Qualifier("bootQueue2") Queue queue, @Qualifier("bootExchange") Exchange exchange){/*** BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();* .bind(queue)通过什么队列* .to(exchange)绑定什么交换机* .with("boot.#")指定routingKey* .noargs()不需要指定参数,如果需要指定参数,调用.and()*/return BindingBuilder.bind(queue).to(exchange).with("555.zzz").noargs();}}

3.消费端

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8Z2z6Mui-1663250323453)(img/多个队列01.png)]

  • ​ 新增的代码
    //跟你消息生产者的队列名称一致@RabbitListener(queues = "boot_queue2")public void ListenerQueue2(Message message){System.out.println(new String(message.getBody())+ "222");}

3.启动test测试类,进行测试

(注意:先启动生产者,因为绑定关系在消费者未配置,则消费者不能先启动–>消费者先启动,会报找不到队列异常)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-71g7HkdX-1663250323453)(img/多队列执行.png)]

RabbitMQ-集群搭建

概述

当我们在项目开发中引入了mq中间件,那中间件的高可用就变得尤为重要,但是如果我们只有单台mq,则容易出现不可用的问题,导致我们整个项目宕机.这时候,我们就想到了要部署mq中间件集群.来提高我们项目中间件的高可用.

在我们的项目当中,单机多实例的集群部署方式,被称为伪集群

前期准备-创建多节点

  • 首先检查mq是否正常运行

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ejmpgIes-1663250323454)(img/检查mq是否正常运行.png)]

检查命令: -确保RabbitMQ运行没有问题

rabbitmqctl  status

停止rabbitmq服务

service rabbitmq-server stop

**新建一个会话窗口–>**启动第一个节点:(前台启动方式)

RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
#web管理插件端口占用,所以还要指定其web插件占用的端口号。
RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit1 rabbitmq-server start

使用:guest 账户登录(http://ip:15673/#/)控制台查看

**新建一个会话窗口–>**启动第二个节点:(前台启动方式)

#web管理插件端口占用,所以还要指定其web插件占用的端口号。
RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start

使用:guest 账户登录(http://ip:15674/#/)控制台查看

结束命令:–(书写,但是这里不用,用下面的停止命令)

rabbitmqctl -n rabbit1 stop
rabbitmqctl -n rabbit2 stop

设置主节点操作:

新建一个会话窗口–>

rabbit1操作作为主节点:
  • 停止rabbit1 节点
rabbitmqctl -n rabbit1 stop_app
  • 重置rabbit1 节点
rabbitmqctl -n rabbit1 reset
  • 重启rabbit1 节点
rabbitmqctl -n rabbit1 start_app
rabbit2操作作为从节点:
  • 停止rabbit1 节点
rabbitmqctl -n rabbit2 stop_app
  • 重置rabbit1 节点
rabbitmqctl -n rabbit2 reset
  • 将rabbit2加入到集群中

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1sFquWgs-1663250323454)(img/主机名位置.png)]

rabbitmqctl -n rabbit2 join_cluster rabbit1@'iz2ze268ldc0zhjsfzajw4z'
###''内是主机名换成自己的 例如  iz2ze268ldc0zhjsfzajw4z
  • 重启rabbit2 节点
rabbitmqctl -n rabbit2 start_app

添加节点加入集群设置成功

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LVfRZySM-1663250323454)(img/节点添加成功01.png)]

从两个控制台查看,–(http://123.57.41.174:15673/#/) (http://123.57.41.173:15673/#/)均添加成功

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QVV1HZmv-1663250323454)(img/控制台查看01.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FAq8jEDT-1663250323454)(img/控制台查看02.png)]

  • 新增队列

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nFVkDVXl-1663250323455)(img/新增队列.png)]

RabbitMQ镜像集群配置

上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一个项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。

镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。

设置的镜像队列-(设置集群队列信息同步)可以通过开启的网页的管理端Admin->Policies,也可以通过命令

管理控制台设置

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5dSQpGOW-1663250323455)(img/设置集群队列信息同步 步骤.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mC4JQkEa-1663250323455)(img/设置集群队列信息同步01.png)]

含义解释
  • Name:策略名称

  • Pattern:匹配的规则,如果是匹配所有的队列,是^.

  • Definition: 镜像定义,包括三个部分 ha-mode,ha-params,ha-sync-mode

    • ha-mode: 指明镜像队列的模式,有效值为 all/exactly/nodes
    • all表示在集群所有的节点上进行镜像 ,也就是同步所有匹配的队列
    • exactly表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
    • nodes表示在指定的节点上进行镜像,节点名称通过ha-params指定
    • ha-params: ha-mode模式需要用到的参数
    • ha-sync-mode: 镜像队列中消息的同步方式,有效值为automatic(自动),manually (手动)
  • 成功设置

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bN3Jxt0R-1663250323455)(img/成功设置01.png)]

  • +1 表示同步到一个队列。如果有多个队列,这里就是+几

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ERucpvfL-1663250323456)(img/到悬浮窗口,看到02.jpg)]

负载均衡-HAProxy

概述

HAProxy是一个使用C语言编写的自由及开放源代码软件[1],其提供高可用性、负载均衡,以及基于TCP和HTTP的应用程序代理。
HAProxy特别适用于那些负载特大的web站点,这些站点通常又需要会话保持或七层处理。HAProxy运行在当前的硬件上,完全可以支持数以万计的并发连接。并且它的运行模式使得它可以很简单安全的整合进您当前的架构中, 同时可以保护你的web服务器不被暴露到网络上。
HAProxy实现了一种事件驱动, 单一进程模型,此模型支持非常大的并发连接数。多进程或多线程模型受内存限制 、系统调度器限制以及无处不在的锁限制,很少能处理数千并发连接。事件驱动模型因为在有更好的资源和时间管理的用户空间(User-Space) 实现所有这些任务,所以没有这些问题。此模型的弊端是,在多核系统上,这些程序通常扩展性较差。这就是为什么他们必须进行优化以 使每个CPU时间片(Cycle)做更多的工作。
包括 GitHub、Bitbucket[3]、Stack Overflow[4]、Reddit、Tumblr、Twitter[5][6]和 Tuenti[7]在内的知名网站,及亚马逊网络服务系统都使用了HAProxy。 [1]

安装-HAProxy

  • 下载依赖包
yum install gcc vim wget
  • 上传haproxy源码包

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i4sVAtAa-1663250323456)(img/点击上传01.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wBrdzYcA-1663250323456)(img/上传02.png)]

  • ls命令查看是否上传成功

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JvDsKHRl-1663250323456)(img/查看上传成功.png)]

  • 解压

tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
  • 进入目录
cd /usr/local/haproxy-1.6.5
  • 编译

    make TARGET=linux31 PREFIX=/usr/local/haproxy
    
  • 安装

    make install PREFIX=/usr/local/haproxy
    
  • 赋权

#添加组
groupadd -r -g 149 haproxy
#添加用户
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
  • 创建haproxy文件夹
mkdir /etc/haproxy
  • 进入haproxy配置文件
vim /etc/haproxy/haproxy.cfg

配置-HAProxy

配置文件路径:/etc/haproxy/haproxy.cfg

#logging options
globallog 127.0.0.1 local0 infomaxconn 5120chroot /usr/local/haproxyuid 99gid 99daemonquietnbproc 20pidfile /var/run/haproxy.piddefaultslog globalmode tcpoption tcplogoption dontlognullretries 3option redispatchmaxconn 2000contimeout 5sclitimeout 60ssrvtimeout 15s
#front-end IP for consumers and producters
#rabbitmq_cluster 是个名字,随意
listen rabbitmq_cluster#对外提供服务的端口号bind 0.0.0.0:5672mode tcp#balance url_param userid#balance url_param session_id check_post 64#balance hdr(User-Agent)#balance hdr(host)#balance hdr(Host) use_domain_only#balance rdp-cookie#balance leastconn#balance source //ipbalance roundrobin#两个rabbit节点和haproxy在同一个服务器可以使用127.0.0.1 当三台不在同一个服务器的时候,写具体的IP地址server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2listen stats
#Haproxy控制台的访问地址bind ip:8100mode httpoption httplogstats enablestats uri /stats refresh 5s
  • 保存并退出
#esc(键退出)->:(符号输入)->wq(保存编辑操作退出)
#:wq!保存编辑强制退出
:wq
  • 启动HAproxy负载
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
  • 查看haproxy进程状态
ps -ef | grep haproxy

访问如下地址对mq节点进行监控
http://ip:8100/

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SYsJZ8Io-1663250323456)(img/启动成功01.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jmIOJmGR-1663250323457)(img/启动成功02.png)]

注意:代码中访问mq集群地址,则变为访问haproxy地址:5672

Haproxy启动失败

haproxy配置本机IP或0.0.0.0以外的IP,启动时报错,

错误信息
[ALERT] 252/225311 (24204) : Starting proxy stats: cannot bind socket [43.109.197.238:8100]

错误的原因

高可用虚IP配置后,无法启动。
解决方案

绑定非本机的IP需要在sysctl.conf文件中配置

vi /etc/sysctl.conf #修改内核参数
net.ipv4.ip_nonlocal_bind = 1 #没有就新增此条记录
sysctl -p #保存结果,使结果生效

创建新工程-测试

colony-rabbitmq

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xM9oeML6-1663250323457)(img/创建新工程.png)]

测试代码

package com.powernode;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @program: rabbitmq* @ClassName: Demo* @version: 1.0* @description:* @author: bjpowernode**/
public class Demo {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂11ConnectionFactory factory = new ConnectionFactory();// 2.设置参数factory.setHost("ip");//设置ipfactory.setPort(5672);//创建连接connectionConnection connection = factory.newConnection();//创建channelChannel channel = connection.createChannel();//创建队列channel.queueDeclare("hello world",true,false,false,null);String  boby="hello rabbit~";//发送消息channel.basicPublish("","hello world",null,boby.getBytes());//释放资源channel.close();connection.close();System.out.println("send success....");}
}

rabbitmq完整学习-springboot整合rabbitmq相关推荐

  1. RabbitMQ原理及SpringBoot整合RabbitMQ

    RabbitMQ原理及SpringBoot整合RabbitMQ 1. RabbitMQ环境搭建 参考:https://blog.csdn.net/u013071014/article/details/ ...

  2. RabbitMq详解+SpringBoot整合RabbitMq快速入门

    1概述: 1.1.什么是MQ 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已. 其主要用途:不同进程Pro ...

  3. RabbitMQ 第一天 基础 6 SpringBoot 整合RabbitMQ

    RabbitMQ [黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战] 文章目录 RabbitMQ 第一天 基础 6 SpringBoot 整合RabbitMQ 6.1 Sprin ...

  4. Springboot整合一之Springboot整合RabbitMQ

    前言 目前,springboot已然成为了最热的java开发整合框架,主要是因其简单的配置,并且本身提供了很多与第三方框架的整合,甚至可以让我们在短短的几分钟里就可以搭建一个完整的项目架构.所以,博主 ...

  5. Spring Boot---(10)SpringBoot整合RabbitMQ

    请参考:Spring Boot---(24)springboot整合RabbitMQ 由于docker安装非常方便,这里就用docker来安装和启动了.没接触过docker的可以参考这里:零基础学习D ...

  6. SpringBoot整合RabbitMq实战(一)

    1 Spring AMQP 简介 Spring AMQP项目是一个引入Spring核心概念用于基于高级消息队列(AMQP)的解决方案的开发,它提供了一个模板用于发送和接受消息的高级抽象.它对基于消息驱 ...

  7. Springboot——整合Rabbitmq之Confirm和Return详解

    文章目录 前言 为什么会有Confirm Springboot 整合 Mq 实现 Confirm 监听机制 依赖引入 增加配置文件,设定连接信息 配置队列.交换机,以及对其进行绑定 编写mq消息发送服 ...

  8. 九、springboot整合rabbitMQ

    springboot整合rabbitMQ 简介 rabbitMQ是部署最广泛的开源消息代理. rabbitMQ轻量级,易于在内部和云中部署. 它支持多种消息传递协议. RabbitMQ可以部署在分布式 ...

  9. RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ

    什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...

  10. SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka

    1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...

最新文章

  1. Linux(lamp安装)
  2. 使用session防止表单进行重复提交
  3. phpstrom xdebug配置
  4. Vmware安装与使用
  5. mysql 分组占比_含泪整理MySQL索引
  6. 漫游Kafka实现篇之分布式
  7. 苹果mac闪退_自从Mac有了WPS,从此和双系统说再见!
  8. PHP设计模式之建造者模式
  9. violate、内存屏障
  10. opencv怎么2个摄像头_内脏脂肪过高怎么办?从2个方法入手,坚持3个月甩掉小肚腩...
  11. matlab 双曲线拟合,利用MATLAB进行logistic曲线拟合
  12. windows无法完成格式化怎么办?
  13. python实现之极限
  14. mysql 5.6 登录 警告_mysql登录警告问题的解决方法
  15. 计算机表格 求差,教大家Excel2013中表格求差函数公式怎么使用
  16. 云原生应用实践与未来趋势
  17. 利用燕尾花数据集画出P-R曲线
  18. 数字经济|引领建筑业数字化信息化转型
  19. 二维卷积网络函数con2d
  20. 在python中设置密码登录_如何从python脚本在linux中设置用户密码?

热门文章

  1. lamp phpstudy mysql_Phpstudy 搭建服务器教程
  2. iOS - Carthage的安装和使用,以及常见报错解决
  3. 【Tensorflow2.0】8、tensorflow2.0_hdf5_savedmodel_pb模型转换[2]
  4. debian6安装nvidia GT620显卡 驱动
  5. ENSP直连路由和静态路由配置(含路由表结构分析)
  6. 电脑位数(32位或者64位)问题导致eclipse不能正常启动
  7. 楼板计算塑形弹性_土木吧丨弹性与弹塑性计算差异性分析
  8. 8进制的乘法计算、加法计算
  9. python京东预约抢购_python 脚本实现京东抢购
  10. linux发行版_看一看2020年最漂亮的Linux发行版