Java分布式篇6——RabbitMQ

1、MQ(Message Queue)消息队列

  • 消息队列中间件,是分布式系统中的重要组件
  • 主要解决,异步处理,应用解耦,流量削峰等问题
  • 实现高性能,高可用,可伸缩和最终一致性的架构
  • 使用较多的消息队列产品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka等

2、应用场景

2.1、异步处理

2.2、应用解耦

2.3、流量削峰

3、AMQP、JMS、Erlang

3.1、AMQP高级消息队列协议

  • Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议
  • 协议:数据在传输的过程中必须要遵守的规则
  • 基于此协议的客户端可以与消息中间件传递消息
  • 并不受产品、开发语言等条件的限制

3.2、JMS

  • Java Message Server,Java消息服务应用程序接口, 一种规范,和JDBC担任的角色类似
  • 是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消 息,进行异步通信

JMS是定义了统一接口,统一消息操作;AMQP通过协议统一数据交互格式 JMS必须是java语言;AMQP只是协议,与语言无关

3.3、Erlang

  • Erlang(['ə:læŋ])是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CSLab开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境
  • 最初是由爱立信专门为通信应用设计的,比如控制交换机或者变换协议等,因此非常适合构建分布式,实时软并行计算系统
  • Erlang运行时环境是一个虚拟机,有点像Java的虚拟机,这样代码一经编译,同样可以随处运行

4、RabbitMQ

  • 安装部署简单,上手门槛低
  • 有强大的WEB管理页面
  • 强大的社区支持,为技术进步提供动力
  • 支持消息持久化、支持消息确认机制、灵活的任务分发机制等,支持功能非常丰富
  • 集群扩展很容易,并且可以通过增加节点实现成倍的性能提升

RabbitMQ:可靠性高、功能强大、易于管理

kafka:性能高,会有数据丢失

5、RabbitMQ组件介绍

  • Broker:消息队列服务器实体
  • Virtual Host:虚拟主机
    • 每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权 限机制
    • vhost是AMQP概念的基础,RabbitMQ默认的vhost是 /,必须在链接时指定
  • Exchange:交换器(路由)用来接收生产者发送的消息并将这些消息路由给服务器中的队列
  • Queue:消息队列
    • 用来保存消息直到发送给消费者
    • 它是消息的容器,也是消息的终点
    • 一个消息可投入一个或多个队列
    • 消息一直在队列里面,等待消费者连接到这个队列将其取走
  • Banding:绑定,用于消息队列和交换机之间的关联
  • Channel:通道(信道)
    • 建立和销毁TCP连接都是非常昂贵的开销,所以引入了信道的概 念,用来复用TCP连接。
    • 信道是建立在真实的TCP连接内的虚拟链接
    • AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,都是通过信 道完成的
  • Connection:网络连接,比如一个TCP连接
  • Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序
  • Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序
  • Message:消息
    • 消息是不具名的,它是由消息头和消息体组成
    • 消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由 键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等

6、RabbitMQ安装

Erlang 百度云:https://pan.baidu.com/s/11FSrNOYLOMw4wGxyoiLvBw提取码:ntml

RabbitMQ 百度云:https://pan.baidu.com/s/1Es9PxyigFipNUQF7BoZ2DA提取码:5cml

socat 百度云:https://pan.baidu.com/s/1JHfLzarEEhOw_MSk2DT7lQ提取码:qtlb

6.1、RabbitMQ Erlang 版本要求

https://www.rabbitmq.com/which-erlang.html

安装之前请详细阅读上方链接内的版本要求(提供的安装包已经调好版本)

6.2、安装

rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm

6.3、安装后台管理插件

rabbitmq-plugins enable rabbitmq_management

6.4、启动RabbitMQ

#启动
systemctl start rabbitmq-server.service
#重启
systemctl restart rabbitmq-server.service
#关闭
systemctl stop rabbitmq-server.service
#状态
systemctl status rabbitmq-server.service

6.5、测试

  • 5672:RabbitMQ提供给编程语言客户端链接的端口
  • 15672:RabbitMQ管理界面的端口
  • 25672:RabbitMQ集群的端口
ps -ef | grep rabbitmq

访问

http://101.34.116.9:15672/

默认帐号密码:guest,guest用户默认不允许远程连接

创建账户并授权

#创建角色
[root@VM-0-3-centos rabbitmq]# rabbitmqctl add_user winkto blingbling
Adding user "winkto" ...
#授权
[root@VM-0-3-centos rabbitmq]# rabbitmqctl set_user_tags winkto administrator
Setting tags for user "winkto" to [administrator] ...
[root@VM-0-3-centos rabbitmq]# rabbitmqctl set_permissions -p "/" winkto ".*" ".*" ".*"
Setting permissions for user "winkto" in vhost "/" ...
#查看用户列表
[root@VM-0-3-centos rabbitmq]#  rabbitmqctl list_users
Listing users ...
user    tags
winkto  [administrator]
guest   [administrator]

用设定的账号密码登录即可

访问不成功,原因:

确定 erlang 和 rabbitmq 正确安装后 无法远程连接web管理界面。 (本人 centos7 阿里云的服务器)

个人总结以下三个原因:

1、确定防火墙开放端口

开放端口:firewall-cmd --zone=public --add-port=15672/tcp --permanent

查看端口:firewall-cmd --zone=public --list-ports

2、确定开启web管理组件

开启组件: rabbitmq-plugins enable rabbitmq_management

查看组件: rabbitmq-plugins list

3、最重要的一点,检查服务器安全组配置了没

当在 Linux 上配置好 Rabbitmq服务器后,如果从主机中无法访问到 Linux 中的Rabbitmq服务器时,需要做如下的检查:

1.Rabbitmq是否启动成功

在控制台输入:

ps -ef | grep rabbitmq

命令含义:从当前所有进程中查找是否含有rabbitmq进程

如果有内容显示,则说明 Rabbitmq启动成功

否则,重新启动 Rabbitmq

2.检查能否从 Linux 本地中访问到 Rabbitmq

从控制台输入命令:

wget http://localhost:15672

命令含义:访问 http://localhost:15672

否则,检查 Rabbitmq端口号是否正确

3.检查 Rabbitmq启动端口号

Rabbitmq 默认的启动端口号是 15672,如果你没有对 Rabbitmq 的配置文件做修改的话应该是没有问题的
输入命令:

ps -ef | grep rabbitmq

命令含义:查看 Rabbitmq进程信息

查看进程号(图中红色框位置)

接着输入命令:

netstat -apn | grep 9810

(注:grep 后跟的就是上一步所查的进程号)
命令含义:查看 9810进程占用的端口号

图中红色框位置即是你的 Rabbitmq的启动端口号

4.检查远程访问的 ip 地址是否正确

如果从 Linux 本地可以成功访问 Rabbitmq服务器,而从 Windows(主机) 上无法访问,那么首先检查远程访问的 ip 地址是否正确
在 Liunx 控制台上输入命令:

ifconfig

图中位置即是 Linux 的 ip 地址,若此处没出现ens33的ip地址,可能是网络配置没配置好,解决方法具体请见https://www.cnblogs.com/zipxzf/p/11237269.html

5.检查 Linux 防火墙是否开放 Rabbitmq端口号

当window能够ping通linux的ip,而还是不能通过ip在windows上访问linux的一些服务,如tomcat、mysql、nginx、rabbitmq等服务,最可能的原因是linux的防火墙问题。

如果你没有修改过 Linux 防火墙配置的话,那么 Rabbitmq 端口号一定是被禁用了 ,因为 Linux 防火墙默认只开启 22 号端口。

你需要设置防火墙配置,开放 Rabbitmq的端口号 (注:网上有其他解决方法说直接关闭防火墙,这种方法很不可取

我的 Linux 版本是 CentOS 7 ,在CentOS 7或RHEL 7或Fedora中防火墙由firewalld来管理,如果要添加范围例外端口 如 1000-2000

语法命令如下:启用区域端口和协议组合

firewall-cmd [--zone=<zone>] --add-port=<port>[-<port>]/<protocol> [--timeout=<seconds>]

此举将启用端口和协议的组合。端口可以是一个单独的端口 或者是一个端口范围 - 。协议可以是 tcp 或 udp。

实际命令如下:

添加

firewall-cmd --zone=public --add-port=80/tcp --permanent (--permanent永久生效,没有此参数重启后失效)firewall-cmd --zone=public --add-port=1000-2000/tcp --permanent

重新载入

firewall-cmd --reload

查看

firewall-cmd --zone=public --query-port=80/tcp

删除

firewall-cmd --zone=public --remove-port=80/tcp --permanent

此处的解决方案是开放 15672端口号只需输入命令:

firewall-cmd --zone=public --add-port=15672/tcp --permanent

然后重启防火墙,即可解决:

firewall-cmd --reload

成功访问rabbitmq管理页面

7、快速入门

7.1、创建虚拟主机

7.2、导入依赖

<dependencies><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.32</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.11</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13</version><scope>test</scope></dependency></dependencies>

7.3、日志

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rebbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file

7.4、MQ工具类

public class MQUtil {public static Connection rabbitMQConection() throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("101.34.116.9");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/winkto");connectionFactory.setUsername("winkto");connectionFactory.setPassword("blingbling");return connectionFactory.newConnection();}
}

7.5、测试

public class MQTest {@Testpublic void connectionTest() throws IOException {Connection connection = null;try {connection = MQUtil.rabbitMQConection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}System.out.println(connection);connection.close();}
}

8、RabbitMQ模式

  • 点对点模式:P2P(point to point)模式(1、2)

    • 消息队列(queue),发送者(sender),接收者(receiver)
    • 每个消息发送到一个特定的队列中,接收者从中获得消息
    • 队列中保留这些消息,直到他们被消费或超时
    • 特点
      • 每个消息只有一个消费者,一旦消费,消息就不在队列中了
      • 发送者和接收者之间没有依赖性,发送者发送完成,不管接收者是否运行,都不会影响消息发送到队列中
      • 接收者成功接收消息之后需向对象应答成功(确认)
    • 如果希望发送的每个消息都会被成功处理,那需要P2P
  • 发布订阅模式:publish(Pub)/subscribe(Sub)(3、4、5)
    • 交换机(exchange),发布者(publisher),订阅者(subcriber)
    • 多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者
    • 特点
      • 每个消息可以有多个订阅者
      • 发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅后,才能消费发布者的消息
      • 为了消费消息,订阅者必须保持运行状态
    • 如果希望发送的消息被多个消费者处理,可采用本模式

8.1、简单模型

RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office and a postman.

译文:RabbitMQ是一个消息代理:它接收和转发消息。你可以把它想象成一个邮局:当你把你想要 寄的邮件放到一个邮箱里,你可以确定邮递员先生或女士最终会把邮件送到你的收件人那里。在 这个类比中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。

RabbitMQ本身只是接收,存储和转发消息,并不会对信息进行处理!

8.1.1、发送消息

public class Publisher {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();// 获取信道Channel channel=connection.createChannel();// 创建队列// 参数1:队列的名称// 参数2:队列中的数据是否持久化// 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)// 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)// 参数5:队列参数(没有参数为null)channel.queueDeclare("queue1",false,false,false,null);// 向指定的队列发送消息// 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""// 参数2:目标队列的名称// 参数3:设置消息的属性(没有属性则为null)// 参数4:消息的内容(只接收字节数组)channel.basicPublish("","queue1",null,"hello winkto!".getBytes());System.out.println("发送成功!");channel.close();connection.close();}
}

8.1.2、控制台查看消息

8.1.3、接收消息

public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();// 获取信道Channel channel=connection.createChannel();DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("收到消息:" + new String(body));}};// true:自动消息确认channel.basicConsume("queue1",true,consumer);}
}
收到消息:hello winkto!

8.1.4、消息确认机制

消息一旦被消费,消息就会立刻从队列中移除,RabbitMQ如何得知消息被消费者接收?

因此,RabbitMQ有一个ACK机制,当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收

ACK:(Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种 传输类控制字符。表示发来的数据已确认接收无误我们在使用http请求时,http的状态码200 就是告诉我们服务器执行成功

  • 自动ACK:消息接收后,消费者立刻自动发送ACK(快递放在快递柜)
  • 手动ACK:消息接收后,不会发送ACK,需要手动调用(快递必须本人签收)
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();// 获取信道final Channel channel=connection.createChannel();DefaultConsumer consumer = new DefaultConsumer(channel) {@Override// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("收到消息:" + new String(body));// 手动确认channel.basicAck(envelope.getDeliveryTag(),false);}};// true:自动消息确认channel.basicConsume("queue1",false,consumer);}
}

8.2、工作队列模式

8.2.1、生产者

public class Publisher {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();// 获取信道Channel channel=connection.createChannel();// 创建队列// 参数1:队列的名称// 参数2:队列中的数据是否持久化// 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)// 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)// 参数5:队列参数(没有参数为null)channel.queueDeclare("queue2",false,false,false,null);// 向指定的队列发送消息// 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""// 参数2:目标队列的名称// 参数3:设置消息的属性(没有属性则为null)// 参数4:消息的内容(只接收字节数组)for (int i = 0; i < 100; i++) {String letter="哈尔滨啤酒-"+i;channel.basicPublish("","queue2",null,letter.getBytes());System.out.println(letter+" 发送成功!");}channel.close();connection.close();}
}

8.2.2、消费者

public class Consumer {private static int count=0;public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();// 获取信道final Channel channel=connection.createChannel();channel.queueDeclare("queue2",false,false,false,null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Override// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {count++;System.out.println("喝了一瓶:" + new String(body)+",共计喝了"+count+"瓶啤酒");// 手动确认try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}};// true:自动消息确认channel.basicConsume("queue2",false,consumer);}
}
public class Consumer1 {private static int count=0;public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();// 获取信道final Channel channel=connection.createChannel();channel.queueDeclare("queue2",false,false,false,null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Override// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {count++;System.out.println("喝了一瓶:" + new String(body)+",共计喝了"+count+"瓶啤酒");// 手动确认try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}};// true:自动消息确认channel.basicConsume("queue2",false,consumer);}
}

运行后,结果是平均分配,有时候这并是我们想要的的结果,我们希望效率高的多干点,效率低的少干点,必须要配合手动的ACK机制才生效

You might have noticed that the dispatching still doesn’t work exactly as we want.For example in a situation with twoworkers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the otherone will do hardly any work. Well,RabbitMa doesn’t know anything about that and willstill dispatch messages evenlyThis happens because RabbitMQjust dispatches a message when the message enters the queue.lt doesnt look at thenumber of unacknowledged messages for a consumer.ltjust blindly dispatches every n-th message to the n-th consumer.

您可能已经注意到分派仍然不能完全按照我们的要求工作。例如,如果有两个员工,当所有 奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都不做。好吧,RabbitMQ对此一无所知,它仍然会均匀地分派消息。 这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它只是盲目地将每条第n个消息分派给第n个消费者

In order to defeat that we can use the basicOos method with the prefet clCount -setting.This tels RabbitMa not to givemore than one message to a worker at a time. Or.in other words, dont dispatch a new message to a worker untit it hasprocessed and acknowledged the previous one.Instead. it will dispatchit to the next worker that is not still busy.

为了克服这个问题,我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉 RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并 确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的 worker

8.2.3、按劳分配消费者

public class Consumer {private static int count=0;public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();// 获取信道final Channel channel=connection.createChannel();channel.queueDeclare("queue2",false,false,false,null);// 可以理解为:送完一个再送下一个,速度快的送的就多channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel) {@Override// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {count++;System.out.println("喝了一瓶:" + new String(body)+",共计喝了"+count+"瓶啤酒");// 手动确认try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}};// true:自动消息确认channel.basicConsume("queue2",false,consumer);}
}
public class Consumer1 {private static int count=0;public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();// 获取信道final Channel channel=connection.createChannel();channel.queueDeclare("queue2",false,false,false,null);// 可以理解为:快递一个一个送,送完一个再送下一个,速度快的送件就多channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel) {@Override// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {count++;System.out.println("喝了一瓶:" + new String(body)+",共计喝了"+count+"瓶啤酒");// 手动确认try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}};// true:自动消息确认channel.basicConsume("queue2",false,consumer);}
}

测试结果约为2:1

8.3、发布订阅模式

In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we’ll do something completely different – we’ll deliver a message to multiple consumers. This pattern is known as “publish/subscribe”.

To illustrate the pattern, we’re going to build a simple logging system. It will consist of two programs – the first will emit log messages and the second will receive and print them.

In our logging system every running copy of the receiver program will get the messages. That way we’ll be able to run one receiver and direct the logs to disk; and at the same time we’ll be able to run another receiver and see the logs on the screen.

Essentially, published log messages are going to be broadcast to all the receivers.

在上一篇教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都被准确地交 付给一个工作者。在这一部分中,我们将做一些完全不同的事情——将消息传递给多个消费者。 此模式称为“发布/订阅”。

为了演示这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个将 发送日志消息,第二个将接收和打印它们。

在我们的日志系统中,接收程序的每一个正在运行的副本都将获得消息。这样我们就可以运行 一个接收器并将日志指向磁盘;与此同时,我们可以运行另一个接收器并在屏幕上看到日志。

基本上,发布的日志消息将广播到所有接收方。

整个过程,必须先创建路由,路由在生产者程序中创建

8.3.1、生产者(包含路由)

public class Publisher {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();Channel channel = connection.createChannel();// 声明路由(路由名,路由类型)// fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上)channel.exchangeDeclare("fanout1","fanout");// 向指定的队列发送消息// 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""// 参数2:目标队列的名称// 参数3:设置消息的属性(没有属性则为null)// 参数4:消息的内容(只接收字节数组)channel.basicPublish("fanout1","",null,"大家好,我是眼眸流转!".getBytes());channel.close();connection.close();System.out.println("发送成功");}
}

8.3.2、消费者

public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();final Channel channel = connection.createChannel();// 创建队列// 参数1:队列的名称// 参数2:队列中的数据是否持久化// 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)// 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)// 参数5:队列参数(没有参数为null)channel.queueDeclare("queue3",false,false,false,null);// 绑定路由// 参数1:队列名// 参数2:交换器名称// 参数3:路由key(暂时无用,""即可)channel.queueBind("queue3","fanout1","");DefaultConsumer consumer = new DefaultConsumer(channel) {@Override// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("收到消息" + new String(body));}};channel.basicConsume("queue3",true,consumer);}
}
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();final Channel channel = connection.createChannel();// 创建队列// 参数1:队列的名称// 参数2:队列中的数据是否持久化// 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)// 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)// 参数5:队列参数(没有参数为null)channel.queueDeclare("queue4",false,false,false,null);// 绑定路由// 参数1:队列名// 参数2:交换器名称// 参数3:路由key(暂时无用,""即可)channel.queueBind("queue4","fanout1","");DefaultConsumer consumer = new DefaultConsumer(channel) {@Override// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("收到消息" + new String(body));}};channel.basicConsume("queue4",true,consumer);}
}

8.4、路由模式

路由会根据类型进行定向分发消息给不同的队列

8.4.1、生产者

public class Publisher {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();Channel channel = connection.createChannel();// 声明路由(路由名,路由类型)// fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上)channel.exchangeDeclare("fanout2","fanout");// 参数2表示分类channel.basicPublish("fanout2","csdn",null,"大家好,我是眼眸流转!".getBytes());channel.close();connection.close();System.out.println("发送成功");}
}

8.4.2、消费者

public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();final Channel channel = connection.createChannel();// 创建队列// 参数1:队列的名称// 参数2:队列中的数据是否持久化// 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)// 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)// 参数5:队列参数(没有参数为null)channel.queueDeclare("queue5",false,false,false,null);// 绑定路由// 参数1:队列名// 参数2:交换器名称// 参数3:路由key(暂时无用,""即可)绑定路由分类channel.queueBind("queue5","fanout2","csdn");DefaultConsumer consumer = new DefaultConsumer(channel) {@Override// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("收到消息" + new String(body));}};channel.basicConsume("queue5",true,consumer);}
}

8.5、通配符模式

模糊匹配路由分类(*,#)

  • *:只能匹配一个词(mysql%)
  • #:匹配0个或更多个词(mysql_)

8.5.1、生产者

public class Publisher {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();Channel channel = connection.createChannel();// 声明路由(路由名,路由类型)// fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上)channel.exchangeDeclare("fanout4","topic");// 向指定的队列发送消息// 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""// 参数2:目标队列的名称// 参数3:设置消息的属性(没有属性则为null)// 参数4:消息的内容(只接收字节数组)channel.basicPublish("fanout4","sdn",null,"大家好,我是眼眸流转!".getBytes());channel.close();connection.close();System.out.println("发送成功");}
}

8.5.2、消费者

public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();final Channel channel = connection.createChannel();// 创建队列// 参数1:队列的名称// 参数2:队列中的数据是否持久化// 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)// 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)// 参数5:队列参数(没有参数为null)channel.queueDeclare("queue6",false,false,false,null);// 绑定路由// 参数1:队列名// 参数2:交换器名称// 参数3:路由key(暂时无用,""即可)channel.queueBind("queue6","fanout4","*dn");DefaultConsumer consumer = new DefaultConsumer(channel) {@Override// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("收到消息" + new String(body));}};channel.basicConsume("queue6",true,consumer);}
}

9、持久化

消息的可靠性是RabbitMQ的一大特色

  • 消费者的ACK确认机制,可以防止消费者丢失消息
  • 路由和队列持久化

9.1、路由持久化

// 路由名称 路由类型 是否持久化
channel.exchangeDeclare("topic","topic",true);

9.2、队列持久化

// 队列名 是否持久化 是否排外 是否自动删除 队列参数
channel.queueDeclare("queue",true,false,false,null);

9.3、消息持久化

//路由名称 队列名 持久化 消息字节数组
channel.basicPublish("topic", "product",MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

10、Spring整合RabbitMQ

10.1、导入依赖

<dependencies><!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.3.6</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.32</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.11</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.6</version></dependency>
</dependencies>

10.2、生产者

10.2.1、配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 配置连接工厂 --><rabbit:connection-factoryid="connectionFactory"host="101.34.116.9"port="5672"username="winkto"password="blingbling"virtual-host="/winkto" /><!-- 配置队列 --><rabbit:queue name="spring_queue" /><!-- 配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 --><rabbit:admin connection-factory="connectionFactory" /><!--配置topic类型exchange;队列绑定到交换机--><rabbit:topic-exchange name="spring_exchange"><rabbit:bindings><rabbit:binding queue="spring_queue" pattern="msg.#" /></rabbit:bindings></rabbit:topic-exchange><!--配置消息对象json转换类--><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><!--配置RabbitTemplate(消息生产者)--><rabbit:templateid="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_exchange"message-converter="jsonMessageConverter" />
</beans>

10.2.2、生产者

public class Publisher {public static void main(String[] args) {ClassPathXmlApplicationContext classPathXmlApplicationContext =new ClassPathXmlApplicationContext("spring-rabbitmq-producer.xml");RabbitTemplate rabbitTemplate = classPathXmlApplicationContext.getBean(RabbitTemplate.class);HashMap<String, String> hashMap = new HashMap<String, String>();hashMap.put("name","winkto");hashMap.put("password","blingbling");rabbitTemplate.convertAndSend("msg.user",hashMap);classPathXmlApplicationContext.close();}
}

10.2.3、控制台

10.3、消费者

10.3.1、配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd"><!-- 配置连接工厂 --><rabbit:connection-factoryid="connectionFactory"host="101.34.116.9"port="5672"username="winkto"password="blingbling"virtual-host="/winkto" /><!-- 配置队列 --><rabbit:queue name="spring_queue" /><!-- 配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 --><rabbit:admin connection-factory="connectionFactory" /><!-- 配置扫描 --><context:component-scan base-package="winkto"/><!-- 配置监听 --><rabbit:listener-container connection-factory="connectionFactory"><rabbit:listener ref="consumerListener" queue-names="spring_queue"/></rabbit:listener-container><bean id="jackson" class="com.fasterxml.jackson.databind.ObjectMapper" />
</beans>

10.3.2、消费者

@Component
public class ConsumerListener implements MessageListener {@Autowiredprivate ObjectMapper objectMapper;public void onMessage(Message message) {System.out.println("收到消息");try {JsonNode jsonNode = objectMapper.readTree(message.getBody());System.out.println(jsonNode.get("name").asText());System.out.println(jsonNode.get("password").asText());} catch (IOException e) {e.printStackTrace();}}
}
public class RabbitMQTest {public static void main(String[] args) throws IOException {ClassPathXmlApplicationContext classPathXmlApplicationContext =new ClassPathXmlApplicationContext("spring-rabbitmq-consumer.xml");System.in.read();}
}

10.3.3、控制台

11、消息确认机制

在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢

  • 事务机制(很少使用)
  • 发布确认机制

11.1、事务机制

AMQP协议提供的一种保证消息成功投递的方式,通过信道开启 transactional 模式,并利用信道 的三个方法来实现以事务方式 发送消息,若发送失败,通过异常处理回滚事务,确保 消息成功投递

  • 开启事务:channel.txSelect()
  • 提交事务:channel.txCommit()
  • 回滚事务: channel.txRollback()

11.1.1、事务实现(未集成spring)

public class Publisher {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();Channel channel = connection.createChannel();// 声明路由(路由名,路由类型)// fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上)channel.exchangeDeclare("topic","topic");// 向指定的队列发送消息// 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""// 参数2:目标队列的名称// 参数3:设置消息的属性(没有属性则为null)// 参数4:消息的内容(只接收字节数组)channel.txSelect();try {channel.basicPublish("topic","s.dn",null,"大家好,我是眼眸流转!".getBytes());channel.basicPublish("topic","s.dn",null,"很高兴见到大家".getBytes());// int a=1/0channel.txCommit();System.out.println("发送成功");} catch (IOException e) {e.printStackTrace();channel.txRollback();System.out.println("发送失败");}channel.close();connection.close();}
}
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = MQUtil.rabbitMQConection();final Channel channel = connection.createChannel();// 创建队列// 参数1:队列的名称// 参数2:队列中的数据是否持久化// 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)// 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)// 参数5:队列参数(没有参数为null)channel.queueDeclare("queue7",false,false,false,null);// 绑定路由// 参数1:队列名// 参数2:交换器名称// 参数3:路由key(暂时无用,""即可)channel.queueBind("queue7","topic","*.dn");DefaultConsumer consumer = new DefaultConsumer(channel) {@Override// 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("收到消息" + new String(body));}};channel.basicConsume("queue7",true,consumer);}
}

11.1.2、事务缺点

RabbitMQ为了保证消息的成功投递,采用通过AMQP协议层面为我们提供事务机制的方案,但是采用事务会大大降低消息的吞吐量

Using standard AMQP 0-9-1, the only way to guarantee that a message isn’t lost is by using transactions – make the channel transactional then for each message or set of messages publish, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced. It mimics the consumer acknowledgements mechanism already present in the protocol.

使用标准AMQP 0-9-1,确保消息不会丢失的唯一方法是使用事务–使通道具有事务性,然后针对每条消息或一组消息发布、提交。在这种情况下,事务是不必要的重量级事务,吞吐量最多会减少了250倍。为了解决这个问题,引入了确认机制。它模仿协议中已经存在的消费者确认机制。

11.2、发布确认机制

对于事务而言,10条消息,前9条成功,如果第10条失败,那么9条消息要全部撤销回滚,太太太浪费

confirm模式则采用补发第10条的措施来完成10条消息的送达

11.2.1、修改配置文件

  • connectionFactory启动生产者确认机制:publisher-returns=“true”
  • RabbitTemplate模板添加确认回调处理:confirm-callback=“mqConfirm”
  • 自定义确认回调处理类:

发生clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0),这里有个大坑!publisher-returns="true"在新版本被替换为了confirm-type=“CORRELATED”,低版本忽略即可

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 配置连接工厂 --><rabbit:connection-factoryid="connectionFactory"host="101.34.116.9"port="5672"username="winkto"password="blingbling"virtual-host="/winkto"confirm-type="CORRELATED"publisher-returns="true" /><!-- 配置队列 --><rabbit:queue name="spring_queue" /><!-- 配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 --><rabbit:admin connection-factory="connectionFactory" /><!--配置topic类型exchange;队列绑定到交换机--><rabbit:topic-exchange name="spring_exchange"><rabbit:bindings><rabbit:binding queue="spring_queue" pattern="msg.#" /></rabbit:bindings></rabbit:topic-exchange><!--配置消息对象json转换类--><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><!--配置RabbitTemplate(消息生产者)--><rabbit:templateid="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_exchange"message-converter="jsonMessageConverter"confirm-callback="mqConfirm" /><!-- 确认机制处理类 --><bean id="mqConfirm" class="MQConfirm"/>
</beans>

11.2.2、MQConfirm

public class MQConfirm implements RabbitTemplate.ConfirmCallback {public void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println(correlationData);System.out.println(s);if (b){System.out.println("消息已经确认!");}else{System.out.println("消息未确认,记得补发哦!");}}
}

11.2.3、生产者

public class Publisher {public static void main(String[] args) throws InterruptedException {ClassPathXmlApplicationContext classPathXmlApplicationContext =new ClassPathXmlApplicationContext("spring-rabbitmq-producer.xml");RabbitTemplate rabbitTemplate = classPathXmlApplicationContext.getBean(RabbitTemplate.class);HashMap<String, String> hashMap = new HashMap<String, String>();hashMap.put("name","winkto");hashMap.put("password","blingbling");rabbitTemplate.convertAndSend("msg.user",hashMap);Thread.sleep(30000);classPathXmlApplicationContext.close();}
}

12、消费端限流

Rabbitmq 服务器积压了成千上万条未处理的消息,然后随便打开一个消费者客户端,就会出现这样的情况: 巨量的消息瞬间全部喷涌推送过来,但是单个客户端无法同时处理这么多数据, 就会被压垮崩溃

RabbitMQ 提供了一种 Qos (Quality of Service,服务质量)服务质量保证功能,即在非自动确认消息的前提下,如果一定数目的消息未被确认前,不再进行消费新的消息

12.1、生产者

配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 配置连接工厂 --><rabbit:connection-factoryid="connectionFactory"host="101.34.116.9"port="5672"username="winkto"password="blingbling"virtual-host="/winkto"confirm-type="CORRELATED"publisher-returns="true" /><!-- 配置队列 --><rabbit:queue name="spring_queue" /><!-- 配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 --><rabbit:admin connection-factory="connectionFactory" /><!--配置topic类型exchange;队列绑定到交换机--><rabbit:topic-exchange name="spring_exchange"><rabbit:bindings><rabbit:binding queue="spring_queue" pattern="msg.#" /></rabbit:bindings></rabbit:topic-exchange><!--配置消息对象json转换类--><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><!--配置RabbitTemplate(消息生产者)--><rabbit:templateid="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_exchange"message-converter="jsonMessageConverter"confirm-callback="mqConfirm" /><!-- 确认机制处理类 --><bean id="mqConfirm" class="MQConfirm"/>
</beans>

生产者

public class Publisher {public static void main(String[] args) throws InterruptedException {ClassPathXmlApplicationContext classPathXmlApplicationContext =new ClassPathXmlApplicationContext("spring-rabbitmq-producer.xml");RabbitTemplate rabbitTemplate = classPathXmlApplicationContext.getBean(RabbitTemplate.class);HashMap<String, String> hashMap = new HashMap<String, String>();hashMap.put("name","winkto");hashMap.put("password","blingbling");for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend("msg.user",hashMap);}Thread.sleep(1000);classPathXmlApplicationContext.close();}
}

消息确认回调

public class MQConfirm implements RabbitTemplate.ConfirmCallback {public void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println("消息已经确认!");}else{System.out.println("消息未确认,记得补发哦!");System.out.println(correlationData);System.out.println(s);}}
}

12.2、消费者

  • prefetch=“3” :一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,一旦有 N 个消息还没有ack,则该 consumer 将阻塞,直到消息被ack

  • acknowledge=“manual”:配置消息确认为手动

  • 此时ConsumerListener不在实现MessageListener,而是继承AbstractAdaptableMessageListener(用于在spring容器接收到消息后用于处理消息的抽象基类)

12.2.1、不确认消息

此时我们修改完毕,不去确认消息,可以发现,消费者仅会收到3条消息

配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd"><!-- 配置连接工厂 --><rabbit:connection-factoryid="connectionFactory"host="101.34.116.9"port="5672"username="winkto"password="blingbling"virtual-host="/winkto" /><!-- 配置队列 --><rabbit:queue name="spring_queue" /><!-- 配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 --><rabbit:admin connection-factory="connectionFactory" /><!-- 配置扫描 --><context:component-scan base-package="winkto"/><!-- 配置监听 --><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="3"><rabbit:listener ref="consumerListener" queue-names="spring_queue"/></rabbit:listener-container><bean id="jackson" class="com.fasterxml.jackson.databind.ObjectMapper" />
</beans>

ConsumerListener

@Component
public class ConsumerListener extends AbstractAdaptableMessageListener {@Autowiredprivate ObjectMapper objectMapper;public void onMessage(Message message, Channel channel) throws Exception {System.out.println("收到消息");try {JsonNode jsonNode = objectMapper.readTree(message.getBody());System.out.println("消息解析中。。。。。。");System.out.println(jsonNode.get("name").asText());System.out.println(jsonNode.get("password").asText());Thread.sleep(3000);} catch (IOException e) {e.printStackTrace();}}public void onMessage(Message message) {}public void onMessageBatch(List<Message> messages, Channel channel) {}public void onMessageBatch(List<Message> messages) {}
}

测试

public class RabbitMQTest {public static void main(String[] args) throws IOException {ClassPathXmlApplicationContext classPathXmlApplicationContext =new ClassPathXmlApplicationContext("spring-rabbitmq-consumer.xml");System.in.read();}
}

12.2.2、确认消息

修改ConsumerListener

@Component
public class ConsumerListener extends AbstractAdaptableMessageListener {@Autowiredprivate ObjectMapper objectMapper;public void onMessage(Message message, Channel channel) throws Exception {System.out.println("收到消息");try {JsonNode jsonNode = objectMapper.readTree(message.getBody());System.out.println("消息解析中。。。。。。");System.out.println(jsonNode.get("name").asText());System.out.println(jsonNode.get("password").asText());Thread.sleep(3000);// message.getMessageProperties().getDeliveryTag():RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID// true:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);} catch (IOException e) {e.printStackTrace();}}public void onMessage(Message message) {}public void onMessageBatch(List<Message> messages, Channel channel) {}public void onMessageBatch(List<Message> messages) {}
}

注意观察控制台

13、过期时间TTL

在过期时间这个周期内,消息可以被消费者正常消费,超过这个时间,则自动删除(其实是被称为dead message并投入到死信队列,无法消费该消息)

RabbitMQ可以对消息和队列设置TTL

  • 通过队列设置,队列中所有消息都有相同的过期时间(常用)
  • 对消息单独设置,每条消息的TTL可以不同

如果同时设置了queue和message的TTL值,则二者中较小的才会起作用

13.1、队列过期时间

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 配置连接工厂 --><rabbit:connection-factoryid="connectionFactory"host="101.34.116.9"port="5672"username="winkto"password="blingbling"virtual-host="/winkto"confirm-type="CORRELATED"publisher-returns="true" /><!-- 配置队列 --><!--<rabbit:queue name="spring_queue" />--><rabbit:queue name="spring_queue" auto-declare="true"><rabbit:queue-arguments><entry key="x-message-ttl" value-type="long" value="30000" /></rabbit:queue-arguments></rabbit:queue><!-- 配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 --><rabbit:admin connection-factory="connectionFactory" /><!--配置topic类型exchange;队列绑定到交换机--><rabbit:topic-exchange name="spring_exchange"><rabbit:bindings><rabbit:binding queue="spring_queue" pattern="msg.#" /></rabbit:bindings></rabbit:topic-exchange><!--配置消息对象json转换类--><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><!--配置RabbitTemplate(消息生产者)--><rabbit:templateid="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_exchange"message-converter="jsonMessageConverter"confirm-callback="mqConfirm" /><!-- 确认机制处理类 --><bean id="mqConfirm" class="MQConfirm"/>
</beans>
public class Publisher {public static void main(String[] args) throws InterruptedException {ClassPathXmlApplicationContext classPathXmlApplicationContext =new ClassPathXmlApplicationContext("spring-rabbitmq-producer.xml");RabbitTemplate rabbitTemplate = classPathXmlApplicationContext.getBean(RabbitTemplate.class);HashMap<String, String> hashMap = new HashMap<String, String>();hashMap.put("name","winkto");hashMap.put("password","blingbling");rabbitTemplate.convertAndSend("msg.user",hashMap);Thread.sleep(1000);classPathXmlApplicationContext.close();}
}
public class MQConfirm implements RabbitTemplate.ConfirmCallback {public void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println("消息已经确认!");}else{System.out.println("消息未确认,记得补发哦!");System.out.println(correlationData);System.out.println(s);}}
}

注意查看控制台,30秒后消息消失

13.2、消息过期时间

配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 配置连接工厂 --><rabbit:connection-factoryid="connectionFactory"host="101.34.116.9"port="5672"username="winkto"password="blingbling"virtual-host="/winkto"confirm-type="CORRELATED"publisher-returns="true" /><!-- 配置队列 --><rabbit:queue name="spring_queue" /><!--<rabbit:queue name="spring_queue" auto-declare="true">--><!--    <rabbit:queue-arguments>--><!--        <entry key="x-message-ttl" value-type="long" value="30000" />--><!--    </rabbit:queue-arguments>--><!--</rabbit:queue>--><!-- 配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 --><rabbit:admin connection-factory="connectionFactory" /><!--配置topic类型exchange;队列绑定到交换机--><rabbit:topic-exchange name="spring_exchange"><rabbit:bindings><rabbit:binding queue="spring_queue" pattern="msg.#" /></rabbit:bindings></rabbit:topic-exchange><!--配置消息对象json转换类--><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><!--配置RabbitTemplate(消息生产者)--><rabbit:templateid="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_exchange"message-converter="jsonMessageConverter"confirm-callback="mqConfirm" /><!-- 确认机制处理类 --><bean id="mqConfirm" class="MQConfirm"/>
</beans>

生产者

public class Publisher {public static void main(String[] args) throws InterruptedException {ClassPathXmlApplicationContext classPathXmlApplicationContext =new ClassPathXmlApplicationContext("spring-rabbitmq-producer.xml");RabbitTemplate rabbitTemplate = classPathXmlApplicationContext.getBean(RabbitTemplate.class);// 创建消息配置对象MessageProperties messageProperties = new MessageProperties();// 设置消息过期时间messageProperties.setExpiration("30000");// 创建消息Message message = new Message("30秒后自动删除".getBytes(), messageProperties);// 发送消息rabbitTemplate.convertAndSend("msg.user",message);Thread.sleep(1000);classPathXmlApplicationContext.close();}
}

消息确认

public class MQConfirm implements RabbitTemplate.ConfirmCallback {public void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println("消息已经确认!");}else{System.out.println("消息未确认,记得补发哦!");System.out.println(correlationData);System.out.println(s);}}
}

14、死信队列

DLX(Dead Letter Exchanges)死信交换机/死信邮箱,当消息在队列中由于某些原因没有被及时消费而变成死信(dead message)后,这些消息就会被分发到DLX交换机中,而绑定DLX交换机 的队列,称之为:“死信队列”

消息没有被及时消费的原因

  • 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
  • 消息超时未消费
  • 达到最大队列长度

14.1、配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 配置连接工厂 --><rabbit:connection-factoryid="connectionFactory"host="101.34.116.9"port="5672"username="winkto"password="blingbling"virtual-host="/winkto" /><!-- 配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 --><rabbit:admin connection-factory="connectionFactory" /><!--配置RabbitTemplate(消息生产者)--><rabbit:templateid="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_exchange" /><!--死信队列--><rabbit:queue name="dlx_queue" /><!--死信交换机--><rabbit:direct-exchange name="dlx_exchange"><rabbit:bindings><rabbit:binding key="dlx_ttl" queue="dlx_queue" /><rabbit:binding key="dlx_max" queue="dlx_queue" /></rabbit:bindings></rabbit:direct-exchange><!--超时队列--><rabbit:queue name="ttl_queue"><rabbit:queue-arguments><!--队列消息最多保存30秒--><entry key="x-message-ttl" value-type="long" value="30000" /><entry key="x-dead-letter-exchange" value="dlx_exchange" /></rabbit:queue-arguments></rabbit:queue><!--超长队列--><rabbit:queue name="max_queue"><rabbit:queue-arguments><!--队列最长2个--><entry key="x-max-length" value-type="long" value="2" /><entry key="x-dead-letter-exchange" value="dlx_exchange" /></rabbit:queue-arguments></rabbit:queue><!--交换机--><rabbit:direct-exchange name="spring_exchange"><rabbit:bindings><rabbit:binding key="dlx_ttl" queue="ttl_queue" /><rabbit:binding key="dlx_max" queue="max_queue" /></rabbit:bindings></rabbit:direct-exchange>
</beans>

14.2、生产者

public class Publisher {public static void main(String[] args) {ClassPathXmlApplicationContext classPathXmlApplicationContext =new ClassPathXmlApplicationContext("spring-mq-publisher.xml");RabbitTemplate rabbitTemplate = classPathXmlApplicationContext.getBean(RabbitTemplate.class);// rabbitTemplate.convertAndSend("dlx_ttl","超时测试".getBytes());rabbitTemplate.convertAndSend("dlx_max","超长测试1".getBytes());rabbitTemplate.convertAndSend("dlx_max","超长测试2".getBytes());rabbitTemplate.convertAndSend("dlx_max","超长测试3".getBytes());System.out.println("发送成功!");classPathXmlApplicationContext.close();}
}

14.3、控制台

超时

超长

死信队列查看

其实被挤出去的是超长测试1

15、延时队列

死信队列只是一种特殊的队列,里面的消息仍然可以消费

延迟队列:TTL + 死信队列的合体

参考死信队列即可,区别在于消费者监听死信队列

16、RabbitMQ集群

  • 单一模式:即单机情况不做集群,就单独运行一个rabbitmq
  • 普通模式:默认模式,以两个节点(A、B)为例来进行说明
    • 当消息进入A节点的Queue后,consumer从B节点消费时,RabbitMQ会在A和B之间创建临 时通道进行消息传输,把A中的消息实体取出并经过通过交给B发送给consumer
    • 当A故障后,B就无法取到A节点中未消费的消息实体,如果做了消息持久化,那么得等A节点恢复,然后才可被消费,如果没有持久化的话,就会产生消息丢失的现象
  • 镜像模式:非常经典的 mirror 镜像模式,保证 100% 数据不丢失
    • 高可靠性解决方案,主要就是实现数据的同步,一般来讲是 2 - 3 个节点实现数据同步
    • 对于 100% 数据可靠性解决方案,一般是采用 3 个节点
  • 还有主备模式,远程模式,多活模式等,这里不再一一介绍

我们这里只介绍镜像模式的搭建,前提是三台机器都安装了rabbitMQ

16.1、修改 /etc/hosts 映射文件

vim /etc/hosts
127.0.0.1 A  VM-0-3-centos VM-0-3-centos
127.0.0.1 localhost.localdomain localhost
127.0.0.1 localhost4.localdomain4 localhost4::1 A VM-0-3-centos VM-0-3-centos
::1 localhost.localdomain localhost
::1 localhost6.localdomain6 localhost6101.34.116.9 A
106.54.85.216 B
81.70.1.65 C

16.2、配置 Erlang Cookie

将 A 上的.erlang.cookie文件(该文件是一个隐藏文件,需要使用 ls -al 命令查看)拷贝到其他两台主机上,该 cookie 文件相当于密钥令牌,集群中的 RabbitMQ 节点需要通过交换密钥令牌以获得相互认证

跨服务器拷贝

scp /var/lib/rabbitmq/.erlang.cookie 81.70.1.65:/var/lib/rabbitmq
[root@VM-0-3-centos ~]# scp /var/lib/rabbitmq/.erlang.cookie 81.70.1.65:/var/lib/rabbitmq
The authenticity of host '81.70.1.65 (81.70.1.65)' can't be established.
ECDSA key fingerprint is SHA256:+lXe47HOACvDcdGppV1nivA+A4lX6wfDgPd9fIl3AhY.
ECDSA key fingerprint is MD5:f1:c4:dc:33:8e:69:89:21:c8:99:0c:f2:96:a2:01:40.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '81.70.1.65' (ECDSA) to the list of known hosts.
root@81.70.1.65's password:
.erlang.cookie                                                                                                                                                                                                 100%   20     0.7KB/s   00:00

查看文件是否一致

cat /var/lib/rabbitmq/.erlang.cookie

修改完一定要重启服务器

16.3、启动

关闭防火墙,启动rabbitMQ服务

systemctl stop firewalld
systemctl start rabbitmq-server

16.4、集群搭建

有时候虽然修改了/etc/hosts,但是主机名没有变化,这时候可以使用下面命令来主动改变一下主机名(我试的时候,他好像忽略了我的大小写,导致我的服务器名字变成了小写。。。。)

hostnamectl set-hostname A

a

什么也不用做

b

# 加入集群前要停止服务
[root@b ~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@b ...
# 重置服务状态
[root@b ~]# rabbitmqctl reset
Resetting node rabbit@b ...
# 加入集群a
[root@b ~]# rabbitmqctl join_cluster rabbit@a
Clustering node rabbitbc with rabbit@b
# 启动服务
[root@b ~]# rabbitmqctl start_app
Starting node rabbit@b ...

c

# 加入集群前要停止服务
[root@c ~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@c ...
# 重置服务状态
[root@c ~]# rabbitmqctl reset
Resetting node rabbit@c ...
# 加入集群b
[root@c ~]# rabbitmqctl join_cluster rabbit@b
Clustering node rabbit@c with rabbit@b
# 启动服务
[root@c ~]# rabbitmqctl start_app
Starting node rabbit@c ...

查看集群状态

[root@a ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@a ...
BasicsCluster name: rabbit@ADisk Nodesrabbit@a
rabbit@b
rabbit@cRunning Nodesrabbit@a
rabbit@b
rabbit@cVersionsrabbit@a: RabbitMQ 3.8.6 on Erlang 21.3.8.16
rabbit@b: RabbitMQ 3.8.6 on Erlang 21.3.8.16
rabbit@c: RabbitMQ 3.8.6 on Erlang 21.3.8.16Alarms(none)Network Partitions(none)ListenersNode: rabbit@a, interface: [::], port: 15672, protocol: http, purpose: HTTP API
Node: rabbit@a, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@a, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@b, interface: [::], port: 15672, protocol: http, purpose: HTTP API
Node: rabbit@b, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@b, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@c, interface: [::], port: 15672, protocol: http, purpose: HTTP API
Node: rabbit@c, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@c, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0Feature flagsFlag: drop_unroutable_metric, state: enabled
Flag: empty_basic_get_metric, state: enabled
Flag: implicit_default_bindings, state: enabled
Flag: quorum_queue, state: enabled
Flag: virtual_host_metadata, state: enabled

16.5、集群后的用户问题

搭建集群结构之后,之前创建的交换机、队列、用户都属于单一结构,在新的集群环境中是不能用的,需要在新的集群中重新手动添加用户(任意节点添加,所有节点共享)

[root@a ~]# rabbitmqctl add_user winkto blingbling123.
Adding user "winkto" ...
[root@a ~]# rabbitmqctl set_user_tags winkto administrator
Setting tags for user "winkto" to [administrator] ...
[root@a ~]# rabbitmqctl set_permissions -p "/" winkto ".*" ".*" ".*"
Setting permissions for user "winkto" in vhost "/" ...

注意:当节点脱离集群还原成单一结构后,交换机,队列和用户等数据都会重新回来

16.6、集群模式问题

集群搭建完毕,但默认采用的模式“普通模式”,可靠性不高

set_policy {name} {pattern} {definition}
  • name:策略名,可自定义
  • pattern:队列的匹配模式(正则表达式)
    • “^” 可以使用正则表达式,比如"^queue_" 表示对队列名称以“queue_”开头的所有 队列进行镜像,而"^"表示匹配所有的队列
  • definition:镜像定义
    • ha-mode:(High Available,高可用)模式,指明镜像队列的模式,有效值为 all/exactly/nodes,当前策略模式为 all,即复制到所有节点,包含新增节点

      • all:表示在集群中所有的节点上进行镜像
      • exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
      • nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
    • ha-params:ha-mode模式需要用到的参数
    • ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
[root@a ~]# rabbitmqctl set_policy xall "^" '{"ha-mode":"all"}'
Setting policy "xall" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...

16.7、集群关闭

没有直接的命令可以关闭整个集群,需要逐一进行关闭,但是需要保证在重启时,最后关闭的节点最先被启动。如果第一个启动的不是最后关闭的节点,那么这个节点会等待最后关闭的那个节点启动,默认进行 10 次连接尝试,超时时间为 30 秒,如果依然没有等到,则该节点启动失败

假设在一个三节点的集群当中,关闭的顺序为 a,b,c,如果 a 因为故障暂时没法恢复,此时 b 和 b 就无法启动。想要解决这个问题,可以先将 a 节点进行剔除

rabbitmqctl forget_cluster_node rabbit@a --offline

解除集群

# 停止服务
rabbitmqctl stop_app
# 重置状态
rabbitmqctl reset
# 重启服务
rabbitmqctl start_app

17、 HAProxy负载均衡

HA(High Available,高可用),Proxy(代理)

HAProxy是一款提供高可用性,负载均衡,并且基于TCP和HTTP应用的代理软件,可以支持数以万计的并发连接,可以简单又安全的整合进架构中,同时还保护web服务器不被暴露到网络上

17.1、HAProxy与Nginx

17.1.1、Nginx

  • 工作在OSI第7层,可以针对http应用做一些分流的策略
  • Nginx对网络的依赖非常小,理论上能ping通就就能进行负载功能
  • Nginx安装和配置比较简单,测试起来比较方便;
  • Nginx不仅仅是一款优秀的负载均衡器/反向代理软件,它同时也是功能强大的Web应用服务器

17.1.2、HAProxy

  • 工作在OSI第4层和第7层,支持TCP与Http协议
  • 仅仅就只是一款负载均衡软件;单纯从效率上来讲HAProxy更会比Nginx有更出色的负载均 衡速度,在并发处理上也是优于Nginx的
  • 支持8种负载均衡策略 ,支持心跳检测

17.2、安装配置

官网:https://www.haproxy.org

HAProxy2.4.2 百度云:https://pan.baidu.com/s/1jIfrsegaQyU0H3pzevIJQA提取码:f5fo

17.2.1、解压

tar -zxvf haproxy-2.4.2.tar.gz

17.2.2、进入目录编译

[root@a haproxy]# cd haproxy-2.4.2/
[root@a haproxy-2.4.2]# make TARGET=linux-glibc  PREFIX=/usr/local/haproxy-2.4.2
[root@a haproxy]# make install PREFIX=/usr/local/haproxy-2.4.2

17.2.3、配置环境变量

vim /etc/profileexport HAPROXY_HOME=/usr/local/haproxy-2.4.2
export PATH=$PATH:$HAPROXY_HOME/sbin

17.2.4、使得配置的环境变量立即生效

source /etc/profile

17.2.5、测试

[root@a haproxy-2.4.2]# haproxy -v
HAProxy version 2.4.2-553dee3 2021/07/07 - https://haproxy.org/
Status: long-term supported branch - will stop receiving fixes around Q2 2026.
Known bugs: http://www.haproxy.org/bugs/bugs-2.4.2.html
Running on: Linux 3.10.0-1160.11.1.el7.x86_64 #1 SMP Fri Dec 18 16:34:56 UTC 2020 x86_64

17.2.6、书写配置文件

# 创建目录
mkdir /etc/haproxy
# 编辑文件内容
vim /etc/haproxy/haproxy.cfg
# 全局配置
global# 日志输出配置、所有日志都记录在本机,通过 local0 进行输出log 127.0.0.1 local0 info# 最大连接数maxconn 4096# 改变当前的工作目录chroot /usr/local/haproxy-2.4.2# 以指定的 UID 运行 haproxy 进程uid 99# 以指定的 GID 运行 haproxy 进程gid 99# 以守护进行的方式运行daemon# 当前进程的 pid 文件存放位置pidfile /usr/local/haproxy-2.4.2/haproxy.pid# 默认配置
defaults# 应用全局的日志配置log global# 使用4层代理模式,7层代理模式则为"http"mode tcp# 日志类别option tcplog# 不记录健康检查的日志信息option dontlognull# 3次失败则认为服务不可用retries 3# 每个进程可用的最大连接数maxconn 2000# 连接超时timeout connect 5s# 客户端超时timeout client 120s# 服务端超时timeout server 120s# 绑定配置
listen rabbitmq_clusterbind :5671# 配置TCP模式mode tcp# 采用加权轮询的机制进行负载均衡balance roundrobin# RabbitMQ 集群节点配置,每隔5秒对mq集群做检查,2次正确证明服务可用,3次失败证明服务不可用server A  101.34.116.9:5672 check inter 5000 rise 2 fall 3 weight 1server B  106.54.85.216:5672 check inter 5000 rise 2 fall 3 weight 1server C  c:5672 check inter 5000 rise 2 fall 3 weight 1# 配置监控页面
listen monitorbind :8100mode httpoption httplogstats enablestats uri /statsstats refresh 5s

17.2.7、启动

[root@a haproxy]# haproxy -f /etc/haproxy/haproxy.cfg

17.2.8、访问

http://101.34.116.9:8100/stats

所有的请求在101.34.116.9都会交给HAProxy,其负载均衡给每个rabbitmq服务器

17.3、测试Haproxy

配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 配置连接工厂 --><rabbit:connection-factoryid="connectionFactory"host="101.34.116.9"port="5672"username="winkto"password="blingbling123."virtual-host="/winkto" /><!-- 配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 --><rabbit:admin connection-factory="connectionFactory" /><!--配置RabbitTemplate(消息生产者)--><rabbit:templateid="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_exchange" /><!--死信队列--><rabbit:queue name="dlx_queue" /><!--死信交换机--><rabbit:direct-exchange name="dlx_exchange"><rabbit:bindings><rabbit:binding key="dlx_ttl" queue="dlx_queue" /><rabbit:binding key="dlx_max" queue="dlx_queue" /></rabbit:bindings></rabbit:direct-exchange><!--超时队列--><rabbit:queue name="ttl_queue"><rabbit:queue-arguments><!--队列消息最多保存30秒--><entry key="x-message-ttl" value-type="long" value="30000" /><entry key="x-dead-letter-exchange" value="dlx_exchange" /></rabbit:queue-arguments></rabbit:queue><!--超长队列--><rabbit:queue name="max_queue"><rabbit:queue-arguments><!--队列最长2个--><entry key="x-max-length" value-type="long" value="2" /><entry key="x-dead-letter-exchange" value="dlx_exchange" /></rabbit:queue-arguments></rabbit:queue><!--交换机--><rabbit:direct-exchange name="spring_exchange"><rabbit:bindings><rabbit:binding key="dlx_ttl" queue="ttl_queue" /><rabbit:binding key="dlx_max" queue="max_queue" /></rabbit:bindings></rabbit:direct-exchange>
</beans>

生产者

public class Publisher {public static void main(String[] args) {ClassPathXmlApplicationContext classPathXmlApplicationContext =new ClassPathXmlApplicationContext("spring-mq-publisher.xml");RabbitTemplate rabbitTemplate = classPathXmlApplicationContext.getBean(RabbitTemplate.class);// rabbitTemplate.convertAndSend("dlx_ttl","超时测试".getBytes());rabbitTemplate.convertAndSend("dlx_max","超长测试1".getBytes());rabbitTemplate.convertAndSend("dlx_max","超长测试2".getBytes());rabbitTemplate.convertAndSend("dlx_max","超长测试3".getBytes());System.out.println("发送成功!");classPathXmlApplicationContext.close();}
}

18、KeepAlived

解决 HAProxy 故障转移,实现HAProxy 高可用

Keepalived的作用是检测服务器的状态,它根据TCP/IP参考模型的第三、第四层、第五层交换机 制检测每个服务节点的状态,如果有一台web服务器宕机,或工作出现故障,Keepalived将检测 到,并将有故障的服务器从系统中剔除,同时使用其他服务器代替该服务器的工作,当服务器工作 正常后Keepalived自动将服务器加入到服务器群中,这些工作全部自动完成,不需要人工干涉, 需要人工做的只是修复故障的服务器

keepalived基于vrrp(Virtual Router Redundancy Protocol,虚拟路由冗余协议)协议,vrrp它 是一种主备(主机和备用机)模式的协议,通过VRRP可以在网络发生故障时透明的进行设备切换 而不影响主机之间的数据通信

两台主机之间生成一个虚拟的ip,我们称漂移ip,漂移ip由主服务器承担,一但主服务器宕机,备 份服务器就会抢夺漂移ip,继续工作,有效的解决了群集中的单点故障

18.1、安装

yum -y install keepalived

18.2、配置 MASTER 和 BACKUP

18.2.1、MASTER

确认网卡

[root@a haproxy]# ip a
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00inet 127.0.0.1/8 scope host lovalid_lft forever preferred_lft foreverinet6 ::1/128 scope host valid_lft forever preferred_lft forever
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000link/ether 52:54:00:0b:14:5b brd ff:ff:ff:ff:ff:ffinet 172.17.0.3/20 brd 172.17.15.255 scope global eth0valid_lft forever preferred_lft foreverinet6 fe80::5054:ff:fe0b:145b/64 scope link valid_lft forever preferred_lft forever

删除默认文件

rm -rf /etc/keepalived/keepalived.conf
vim /etc/keepalived/keepalived.conf
global_defs {# 路由id,主备节点不能相同router_id kh1
}# 自定义监控脚本
vrrp_script chk_haproxy {# 脚本位置script "/etc/keepalived/haproxy_check.sh" # 脚本执行的时间间隔interval 5 weight 10
}vrrp_instance VI_1 {# Keepalived的角色,MASTER 表示主节点,BACKUP 表示备份节点state MASTER # 指定监测的网卡,可以使用 ifconfig 进行查看interface eth0# 虚拟路由的id,主备节点需要设置为相同virtual_router_id 99# 优先级,主节点的优先级需要设置比备份节点高priority 100 # 设置主备之间的检查时间,单位为秒 advert_int 1 # 定义验证类型和密码authentication { auth_type PASSauth_pass 123456}# 调用上面自定义的监控脚本track_script {chk_haproxy}virtual_ipaddress {# 虚拟IP地址,可以设置多个101:102:103:104}
}
# 虚拟ip的详细配置
virtual_server 192.168.204.66 5672 {# 健康检查间隔,单位为秒delay_loop 6# lvs调度算法rr|wrr|lc|wlc|lblc|sh|dhlb_algo rr # 负载均衡转发规则。一般包括DR,NAT,TUN 3种lb_kind NAT# 转发协议,有TCP和UDP两种,一般用TCPprotocol TCP ## 本机的真实ipreal_server 101.34.116.9 5672 {# 默认为1,0为失效weight 1 }
}

配置 HAProxy 检查(用于判断 HAProxy 服务是否正常,如果不正常且无法启动,此时就需要将本机 Keepalived 关闭,从而让虚拟 IP 漂移到备份节点)

#!/bin/bash# 判断haproxy是否已经启动
if [ ${ps -C haproxy --no-header |wc -l} -eq 0 ] ; then#如果没有启动,则启动haproxy -f /etc/haproxy/haproxy.cfg
fi#睡眠3秒以便haproxy完全启动
sleep 3#如果haproxy还是没有启动,此时需要将本机的keepalived服务停掉,以便让VIP自动漂移到另外一台haproxy
if [ ${ps -C haproxy --no-header |wc -l} -eq 0 ] ; thensystemctl stop keepalived
fi

创建后为其赋予执行权限

chmod +x /etc/keepalived/haproxy_check.sh

配置 IP 转发(要在root下执行)

# 文件配置
echo "net.ipv4.ip_nonlocal_bind = 1" >> /etc/sysctl.conf
# 生效
sysctl -p

18.2.2、BACKUP

除配置文件,其余都一样

global_defs {# 路由id,主备节点不能相同router_id kh2
}# 自定义监控脚本
vrrp_script chk_haproxy {# 脚本位置script "/etc/keepalived/haproxy_check.sh" # 脚本执行的时间间隔interval 5 weight 10
}vrrp_instance VI_1 {# Keepalived的角色,MASTER 表示主节点,BACKUP 表示备份节点state BACKUP# 指定监测的网卡,可以使用 ifconfig 进行查看interface eth0# 虚拟路由的id,主备节点需要设置为相同virtual_router_id 99# 优先级,主节点的优先级需要设置比备份节点高priority 50# 设置主备之间的检查时间,单位为秒 advert_int 1 # 定义验证类型和密码authentication { auth_type PASSauth_pass 123456}# 调用上面自定义的监控脚本track_script {chk_haproxy}virtual_ipaddress {# 虚拟IP地址,可以设置多个101:102:103:104}
}
# 虚拟ip的详细配置
virtual_server 192.168.204.66 5672 {# 健康检查间隔,单位为秒delay_loop 6# lvs调度算法rr|wrr|lc|wlc|lblc|sh|dhlb_algo rr # 负载均衡转发规则。一般包括DR,NAT,TUN 3种lb_kind NAT# 转发协议,有TCP和UDP两种,一般用TCPprotocol TCP ## 本机的真实ipreal_server 106.54.85.216 5672 {# 默认为1,0为失效weight 1 }
}

18.2.3、启动

systemctl start keepalived
systemctl stop keepalived
#开机自启
systemctl enable keepalived
[root@a haproxy]# ip a
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00inet 127.0.0.1/8 scope host lovalid_lft forever preferred_lft foreverinet6 ::1/128 scope host valid_lft forever preferred_lft forever
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000link/ether 52:54:00:0b:14:5b brd ff:ff:ff:ff:ff:ffinet 172.17.0.3/20 brd 172.17.15.255 scope global eth0valid_lft forever preferred_lft foreverinet 192.168.0.200/32 scope global eth0valid_lft forever preferred_lft foreverinet6 fe80::5054:ff:fe0b:145b/64 scope link valid_lft forever preferred_lft forever

18.3、AMQP测试

[root@a haproxy]#  curl 192.168.0.200:5672
curl: (56) Recv failure: Connection reset by peer
AMQP

18.4、漂移规则

默认使用 MASTER 服务器(101.34.116.9),虚拟 IP 为 192.168.0.200,此时 MASTER 服务器会有 2 个IP
当 MASTER 出问题时,IP 会漂移到 BACKUP 服务器(106.54.85.216),此时 BACKUP 服务器会有 2 个IP
当 MASTER 重新启动后,虚拟 IP 又会漂移回 MASTER 服务器

18.5、API测试

配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 配置连接工厂 --><rabbit:connection-factoryid="connectionFactory"host="192.168.0.200"port="5672"username="winkto"password="blingbling123."virtual-host="/winkto" /><!-- 配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 --><rabbit:admin connection-factory="connectionFactory" /><!--配置RabbitTemplate(消息生产者)--><rabbit:templateid="rabbitTemplate"connection-factory="connectionFactory"exchange="spring_exchange" /><!--死信队列--><rabbit:queue name="dlx_queue" /><!--死信交换机--><rabbit:direct-exchange name="dlx_exchange"><rabbit:bindings><rabbit:binding key="dlx_ttl" queue="dlx_queue" /><rabbit:binding key="dlx_max" queue="dlx_queue" /></rabbit:bindings></rabbit:direct-exchange><!--超时队列--><rabbit:queue name="ttl_queue"><rabbit:queue-arguments><!--队列消息最多保存30秒--><entry key="x-message-ttl" value-type="long" value="30000" /><entry key="x-dead-letter-exchange" value="dlx_exchange" /></rabbit:queue-arguments></rabbit:queue><!--超长队列--><rabbit:queue name="max_queue"><rabbit:queue-arguments><!--队列最长2个--><entry key="x-max-length" value-type="long" value="2" /><entry key="x-dead-letter-exchange" value="dlx_exchange" /></rabbit:queue-arguments></rabbit:queue><!--交换机--><rabbit:direct-exchange name="spring_exchange"><rabbit:bindings><rabbit:binding key="dlx_ttl" queue="ttl_queue" /><rabbit:binding key="dlx_max" queue="max_queue" /></rabbit:bindings></rabbit:direct-exchange>
</beans>

生产者

public class Publisher {public static void main(String[] args) {ClassPathXmlApplicationContext classPathXmlApplicationContext =new ClassPathXmlApplicationContext("spring-mq-publisher.xml");RabbitTemplate rabbitTemplate = classPathXmlApplicationContext.getBean(RabbitTemplate.class);// rabbitTemplate.convertAndSend("dlx_ttl","超时测试".getBytes());rabbitTemplate.convertAndSend("dlx_max","超长测试1".getBytes());rabbitTemplate.convertAndSend("dlx_max","超长测试2".getBytes());rabbitTemplate.convertAndSend("dlx_max","超长测试3".getBytes());System.out.println("发送成功!");classPathXmlApplicationContext.close();}
}

Java分布式篇6——RabbitMQ相关推荐

  1. Java分布式篇5——FastDFS

    Java分布式篇5--FastDFS 分布式文件系统 1.主流的分布式文件系统 1.1. HDFS (Hadoop Distributed File System)Hadoop 分布式文件系统 高容错 ...

  2. Java分布式篇4——Redis

    Java分布式篇4--Redis 1.互联网架构的演变历程 1.1.第一阶段 数据访问量不大,简单的架构即可搞定! 1.2.第二阶段 数据访问量大,使用缓存技术来缓解数据库的压力 不同的业务访问不同的 ...

  3. JAVA分布式篇3——Dubbo

    JAVA分布式篇3--Dubbo 1.架构演变 1.1.单应用架构 当网站流量很小时,只需要一个应用,将所有的功能部署到一起(所有业务都放在一个tomcat 里),从而减少部署节点和成本 用于简化 增 ...

  4. JAVA分布式篇2——Zookeeper

    JAVA分布式篇2--Zookeeper 1.简介 Zookeeper是一个开源的分布式(多台服务器干一件事)的,为分布式应用提供协调服务的 Apache项目 2.工作机制 Zookeeper从设计模 ...

  5. JAVA分布式篇1——Linux

    JAVA分布式篇1--Linux 1.linux命令 1.1.常用指令 ls 显示文件或目录 -l 列出文件详细信息l(list)-a 列出当前目录下所有文件及目录,包括隐藏的a(all) mkdir ...

  6. Java分布式锁看这篇就够了,java基础面试笔试题

    我总结出了很多互联网公司的面试题及答案,并整理成了文档,以及各种学习的进阶学习资料,免费分享给大家. 扫描二维码或搜索下图红色VX号,加VX好友,拉你进[程序员面试学习交流群]免费领取.也欢迎各位一起 ...

  7. 53.大数据之旅——java分布式项目14-信息检索技术之Lucene,Solr

    信息检索技术 概念介绍 全文检索是一种将文件中所有文本与检索项匹配的文字资料检索方法.全文检索系统是按照全文检索理论建立起来的用于提供全文检索服务的软件系统. 全文检索主要对非结构化数据的数据检索. ...

  8. Java核心篇之Redis--day4

    Java核心篇之Redis–day4 Redis有哪些数据结构? 字符串String.字典Hash.列表List.集合Set.有序集合SortedSet. 1.String:字符串,常用命令:get, ...

  9. Java分布式中文分词组件 - word分词(转自 https //github com/ysc/word)

    首先给大家分享一个巨牛巨牛的人工智能教程,是我无意中发现的.教程不仅零基础,通俗易懂,而且非常风趣幽默,还时不时有内涵段子,像看小说一样,哈哈-我正在学习中,觉得太牛了,所以分享给大家!点这里可以跳转 ...

最新文章

  1. 分布式版本控制系统 Git 教程
  2. python写出的程序如何给别人使用-涨姿势!这些小技巧让小白也可以写出更优雅的Python代码!...
  3. 常用数据结构及复杂度
  4. 征稿 | MIUA 2022 医学影像理解与分析会议
  5. 求生欲强烈!HTC官方回应并未退出中国市场
  6. linux postgre服务名,linux中service配置之postgresql
  7. 高性能滚动 scroll 及页面渲染优化
  8. 【讨论】对技术的掌握到底应该又多深?
  9. Oracle DataGuard 之--Physical DG转换Logical DG
  10. 华为交换机MSTP常见问题定位
  11. java 构建_Java入门环境构建
  12. BZOJ 2006 NOI2010 超级钢琴 划分树+堆
  13. Matlab实现均值滤波与FPGA进行对比,并采用modelsim波形仿真
  14. C语言定时器按键消抖程序,按键消抖方法(中断与定时器配合使用)
  15. 计算机原理 做实验报告,微机原理实验报告心得体会
  16. DDR SDRAM内存发展历程
  17. day03 python基础
  18. bing输入法linux,必应Bing输入法特殊符号输入方法图文教程
  19. 鸿蒙初开三界未立,太子长琴
  20. 评分卡实例:一步一步实现评分卡(详细长文)

热门文章

  1. obj[]与obj._Ruby中带有示例的Array.include?(obj)方法
  2. 指针在c语言中的运用,怎么理解C语言中的指针,如何运用?
  3. observable_Java Observable notifyObservers()方法与示例
  4. ipv6寻址_有类和无类寻址:IPV4寻址| 计算机网络
  5. Python日历模块| weekheader()方法与示例
  6. windows自定义快速启动(运行)命令
  7. Python openpyxl打开有公式的excel表取值错误的解决办法,Python openpyxl获取excel有公式的单元格的数值错误,Python操作excel(.xlsx)封装类
  8. 精通ASP.NET MVC ——模型绑定
  9. python django 表单_Django ModelForm与Form
  10. linux 查看neihe版本_linux查看系统内核版本号