消息中间件---RabbitMQ
rabbitmq和spring是同一个公司,支持性最好
1.什么是中间件
中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件+平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和使用软件区分开来
1.为什么需要使用消息中间件
具体地说,中间件屏蔽了底层操作系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必再为程序在不同系统软件上的移植而重复工作,从而大大减少了技术上的负担,中间件带给应用系统的,不只是开发的简便、开发周期的缩短,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。
2.中间件应具有如下的一些特点:
(1)满足大量应用的需要
(2)运行于多种硬件和 OS平台
(3)支持分布计算,提供跨网络、硬件和 OS平台的透明性的应用或服务的交互
(4)支持标准的协议
(5)支持标准的接口
3.在项目中什么时候使用中间件技术
在项目的架构和重构中,使用任何技术和架构的改变我们都需要谨慎斟酌和思考,因为任何技术的融入和变化都可能人员,技术,和成本的增加,中间件的技术一般现在一些互联网公司或者项目中使用比较多,如果你仅仅还只是一个初创公司建议还是使用单体架构,最多加个缓存中间件即可,不要盲目追求新或者所谓的高性能,而追求的背后一定是业务的驱动和项目的驱动,因为一旦追求就意味着你的学习成本,公司的人员结构以及服务器成本,维护和运维的成本都会增加,所以需要谨慎选择和考虑。
但是作为一个开发人员,一定要有学习中间件技术的能力和思维,否则很容易当项目发展到一个阶段在去掌握估计或者在面试中提及,就会给自己带来不小的困扰,在当今这个时代这些技术也并不是什么新鲜的东西,如果去掌握和挖掘最关键的还是自己花时间和经历去探讨和研究。
2.中间件技术及架构的概述
什么是消息中间件
在实际的项目中,大部分的企业项目开发中,在早起都采用的是单体的架构模式
单体架构
在企业开发当中,大部分的初期架构都采用的是单体架构的模式进行架构,而这种架构的典型的特点:就是把所有的业务和模块,源代码,静态资源文件等都放在一个工程中,如果其中的一个模块升级或者迭代发生一个很小的变动都会重新编译和重新部署项目。这种这狗存在的问题是:
- 耦合度太高
- 不易维护
- 服务器的成本高
- 以及升级架构的复杂度也会增大
这样就有后续的分布式架构系统。如下
分布式架构
何谓分布式系统:
通俗一点:就是一个请求由服务器端的多个服务(服务或者系统)协同处理完成
和单体架构不同的是,单体架构是一个请求发起 jvm调度线程(确切的是 tomcat线程池)分配线程 Thread来处理请求直到释放,而分布式系统是:一个请求时由多个系统共同来协同完成,jvm和环境都可能是独立。如果生活中的比喻的话,单体架构就像建设一个小房子很快就能够搞定,如果你要建设一个鸟巢或者大型的建筑,你就必须是各个环节的协同和分布,这样目的也是项目发展到后期的时候要去部署和思考的问题。我们也不难看出来:分布式架构系统存在的特点和问题如下:
存在问题:
- 学习成本高,技术栈过多
- 运维成本和服务器成本增高
- 人员的成本也会增高
- 项目的负载度也会上升
- 面临的错误和容错性也会成倍增加
- 占用的服务器端口和通讯的选择的成本高
- 安全性的考虑和因素逼迫可能选择 RMI/MQ相关的服务器端通讯
好处:
- 服务系统的独立,占用的服务器资源减少和占用的硬件成本减少,确切的说是:可以合理的分配服务资源,不造成服务器资源的浪费
- 系统的独立维护和部署,耦合度降低,可插拔性
- 系统的架构和技术栈的选择可以变的灵活(而不是单纯地选择 java)
- 弹性的部署,不会造成平台因部署造成的瘫痪和停服的状态
3.基于消息中间件的分布式系统的架构
从上图中可以看出来,消息中间件的是
- 利用可靠的消息传递机制进行系统和系统直接的通讯
- 通过提供消息传递和消息的派对机制,它可以在分布式系统环境下扩展进程间的通讯
消息中间件应用的场景(解耦,异步,削峰)
- 跨系统数据传递
- 高并发的流量削峰
- 数据的并发和异步处理
- 大数据分析与传递
- 分布式事务
比如你有一个数据要进行迁移或者请求并发过多的时候,比如你有10 W的并发请求下订单,我们可以在这些订单入库之前,我们可以把订单请求堆积到消息队列中,让它稳健可靠的入库和执行
常见的消息中间件
ActiveMQ、RabbitMQ、Kafka、RocketMQ等
消息中间件的本质及设计
它是一种接受数据、接受请求、存储数据、发送数据等功能的技术服务
MQ消息队列:负责数据的传接受,存储和传递,所以性能要高于普通服务和技术
谁来生产消息,存储消息和消费消息呢?
4.消息队列协议:
什么是协议
所谓协议是指:
- 计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流
- 和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高
- 协议对数据格式和计算机之间交换数据都必须严格遵守规范
网络协议的三要素
- 语法:语法是用户数据与控制信息的结构与格式,以及数据出现的顺序
- 语义:语义是解释控制信息每个部分的意义,它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应
- 时序:时序是对事件发生顺序的详细说明
比如我 MQ发送一个信息,是以什么数据格式发送到队列中,然后每个部分的含义是什么,发送完毕以后的执行的动作,以及消费者消费消息的动作,消费完毕的相应结构和反馈是什么,然后按照对应的执行顺序进行处理。如果你还是不理解:大家每天都在接触的 http请求协议:
- 语法:http规定了请求报文和响应报文的格式
- 语义:客户端主动发起请求称之为请求(这是一种定义,同时你发起的是 post/get请求)
- 时序:一个请求对应一个响应(一定先有请求在有响应,这个是时序)
而消息中间件采用的并不是 http协议,而常见的消息中间件协议有有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议
面试题:为什么消息中间件不直接使用 http协议
- 因为 http请求报文头和响应报文头是比较复杂的,包含了Cookie,数据的加密解密,状态码,响应码等附加的功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就行,一定要追求的是高性能。尽量简洁,快速
- 大部分情况下 http大部分都是短链接,在实际的交互过程中,一个请求到响应都很有可能会中断,中断以后就不会执行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取信息的过程,出现问题和故障要对数据或消息执行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行
AMQP协议
AMQP:(全称:Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现由 RabbitMQ等
特性:
- 分布式事务支持
- 消息的持久化支持
- 高性能和高可靠的消息处理优势
MQTT协议
MQTT协议(Message Queueing Telemetry Transport)消息队列是 IBM开放的及时通讯协议,物联网系统架构中的重要组成部分
特点:
- 轻量
- 结构简单
- 传输快,不支持事务
- 没有持久化设计
应用场景:
- 适用于计算能力有限
- 低带宽
- 网络不稳定的场景
支持者:
OpenMessage协议
是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式信息中间件、流处理等领域的应用开发标准
特点:
- 结构简单
- 解析速度快
- 支持事务和持久化设计
支持者:
Kafka协议
Kafka协议是基于 TCP/IP的二进制协议。消息内部是 通过长度来分割,由一些基本数据类型组成
特点:
- 结构简单
- 解析速度快
- 无事务支持
- 有持久化设计
支持者:
小结
协议:实在 tcp/ip协议基础之上构建的一种约定俗称的规范和机制、它的主要目的可以让客户端(应用程序 java,go)进行沟通和通讯。并且这种写一下规范必须具有持久性,高可用,高可靠的性能
5.消息队列持久化
持久化
简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存
常见的持久化方式
6.消息的分发策略
消息的分发策略
MQ消息 队列有如下几个角色
- 生产者
- 存储消息
- 消费者
那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的 git就有推拉机制,我们发送的 http请求就是一种典型的拉取数据库数据返回的过程。而消息队列 MQ是一种推送的过程,而这些推机制会使用到很多的业务场景也有很多对应推机制策略
场景分析一
比如我在 APP上下了一个订单,我们的系统和服务很多,我们如何得知这个消息被哪个系统或者哪些服务器或者系统进行消费,那这个时候就需要一个分发的策略。这就需要消费策略。或者称之为消费的方法论
场景分析二 重发
在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费 MQ接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件就必须支持消息重试机制策略。也就是支持:出现问题和故障的情况下,消息不丢失还可以进行重发
消息分发策略的机制和对比
7.消息队列高可用和高可靠
什么是高可用机制
所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署,来达到高可用的目的
集群模式1 - Master-slave主从共享数据的部署方式
解说:生产者将消费发送到 Master节点,所有的都连接这个消息队列共享这块数据区域,Master节点负责写入,一旦 Master挂掉,slave节点继续服务。从而形成高可用
集群模式2 - Master-slave主从同步部署方式 (单写多读)
解释:这种模式写入消息同样在 Master主节点上,但是主节点会同步数据到 slave节点形成副本,和 zookeeper或者 redis主从机制很雷同。这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同的节点进行消费,以为消息的拷贝和同步会占用很大的带宽和网络资源。在后去的 rabbitmq中会有使用
集群模式3 - 多主集群同步部署模式 (多写多读)
解释:和上面的区别不是特别的大,但是它的写入可以往任意节点去写入
集群模式4 - 多主集群转发部署模式 (对比上一种,可以减少数据空间的存储)
解释:如果你插入的数据是 broker-1中国,元数据信息会存储数据的相关描述和记录存放的位置(队列)。它会对描述信息也就是元数据信息进行同步,如果消费者在 broker-2中进行消费,发现自己节点没有对应的信息,可以从对应的元数据信息中去查询,然后返回对应的消息信息,场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛有顾客说要买的演唱会门票,但是没有但是他回去联系其他的黄牛询问,如果有就返回
集群模式5 Master-slave与 Broker-cluster组合的方案(大型)
解释:实现多主多从的热备机制来完成消息的高可用以及数据的热备机制,在生产规模达到一定的阶段的时候,这种使用的频率比较高
小结:
总归三句话:
- 要么消息共享
- 要么消息同步
- 要么元数据共享
什么是高可靠机制
所谓高可靠是指:系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠
在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的
如何保证中间件消息的可靠性呢,可以从两个方面考虑:
- 消息的传输:通过协议来保证系统间数据解析的正确性
- 消息的存储区可靠:通过持久化来保证消息的可靠性
8.入门及安装
https://www.bilibili.com/video/BV1dX4y1V73G?p=27
简单概述:
RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写,支持多种客户端(语言),用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征
下载RabbitMQ
- 下载地址:https://www.rabbitmq.com/download.html
- 环境准备:CentOS7.x + /Erlang
RabbitMQ是采用 Erlang语言开发的,所以系统环境必须提供 Erlang环境,第一步就是安装 Erlang
版本匹配:
安装Erlang
linux环境下:
9.RabbitMQWeb管理界面及授权操作
RabbitMQ管理界面
说明:rabbitmq有一个默认账号和密码是:guest
默认情况只能在 localhost本计下访问,所以需要添加一个远程登录的用户
一定要记住,在对应服务器(阿里云,腾讯云等)的安全组中开放15672
端口
在浏览器访问
授权:
10.RabbitMQ的角色分类
操作的页面:
11.RabbitMQ入门案例 - Simple 简单模式
实现步骤
- jdk1.8
- 构建一个 maven工程
- 导入 rabbitmq的 maven依赖
- 启动 rabbitmq-server服务
- 定义生产者
- 定义消费者
- 观察消息的在 rabbitmq-server服务中的进程
构建一个maven工程
导入依赖
java原生依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version>
</dependency>
生产者:
package com.ggqq.rabbitmq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//简单模式
public class Producer {//AMQP协议public static void main(String[] args) {//1.创建连接工程ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");//2.创建连接ConnectionConnection connection =null;Channel channel =null;try {connection = connectionFactory.newConnection("生产者");//3.创建连接获取通道Channelchannel = connection.createChannel();//4.通过通道创建交换机,声明队列,绑定关系,路由KEY,发送消息,接收消息String queueName ="queue1";/** 四个参数:* 队列的名称* 是否要持久化* 排他性,是否独立占有* 是否自动删除,随着最后一个消费者消息完毕以后是否把队列自动删除* 携带一些附属参数* */channel.queueDeclare(queueName,false,false,false,null);//5.准备发送消息String message ="Hello,ggqq!";//6.发送消息给队列queuechannel.basicPublish("",queueName,null,message.getBytes());System.out.println("发送消息成功");} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}finally {//7.关闭连接if(channel != null && channel.isOpen()){try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}//8.关闭通道if(connection != null && connection.isOpen()){try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
消费者:
package com.ggqq.rabbitmq.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {//AMQP协议public static void main(String[] args) {//1.创建连接工程ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");//2.创建连接ConnectionConnection connection =null;Channel channel =null;try {connection = connectionFactory.newConnection("消费者");//3.创建连接获取通道Channelchannel = connection.createChannel();//4.通过通道创建交换机,声明队列,绑定关系,路由KEY,发送消息,接收消息/** 四个参数:* 队列的名称* 是否要持久化* 排他性,是否独立占有* 是否自动删除,随着最后一个消费者消息完毕以后是否把队列自动删除* 携带一些附属参数* */channel.basicConsume("queue1", true, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {System.out.println("收到的消息是:" + new String(message.getBody(), "UTF-8"));}}, new CancelCallback() {public void handle(String consumerTag) throws IOException {System.out.println("接受失败了....");}});System.out.println("开始接受消息");System.in.read();//阻断的作用} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}finally {//7.关闭连接if(channel != null && channel.isOpen()){try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}//8.关闭通道if(connection != null && connection.isOpen()){try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
12.什么是AMQP
什么是AMQP
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计
AMQP生产者流转过程
AMQP消费者流转过程
13.RabbitMQ的核心组成部分
RabbitMQ的核心组成部分
RabbitMQ整体架构是什么样子的?
RabbitMQ的运行流程
14.RabbitMQ支持的消息模型
上面的topic模式中:
*(星号)表示一个单词,#(井号)表示零个或多个单词
非常重要的一点:
消息都必须经过交换机!!!
- 简单模式 Simple
- 工作模式 Work
- 发布订阅模式
- 路由模式
- 主题 Topic模式
- 参数模式
RabbitMQ入门案例 - fanout 模式(发布订阅)
图解
发布订阅模式的具体实现
- web操作查看视频
- 类型:fanout
- 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
生产者
//简单模式
public class Producer{//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("10.15.0.9");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection("生产者");//2.创建通道Channel channel = connection.createChannel();//3.通过创建交换机,声明队列,绑定关系,路由key,发送消息和接受消息/*参数1: 是否持久化,非持久化消息会存盘吗?会存盘,但是会随着重启服务器而丢失参数2:是否独占队列 参数3:是否自动删除,随着最后一个消费者消息完毕消息以后是否把队列自动删除参数4:携带附属属性*/String queueName = "queue1";channel.queueDeclare(queueName,false,false,false,null);//4.发送消息给队列queue/*参数1: 交换机参数2:队列、路由key参数3:消息的状态控制参数4:消息主题*///面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机String message = "Hello";//5.准备交换机String exchangeName = "fanout-exchange";//6.定义路由keyString routeKey = "";//7.指定交换机的类型String type = "fanout";channel.basicPublish(exchangeName,routeKey, null,message.getBytes());//8.关闭channel.close();connection.close();
}
消费者
代码一样,使用线程启动测试而已!
此处没有通过代码去绑定交换机和队列,而是通过可视化界面去绑定的!
RabbitMQ入门案例 - Direct 模式
//6.定义路由key
String routeKey = "email";
//7.指定交换机的类型
String type = "direct";
channel.basicPublish(exchangeName,routeKey, null,message.getBytes());
RabbitMQ入门案例 - Topic 模式
//6.定义路由key
String routeKey = "com.order.test.xxx";
//7.指定交换机的类型
String type = "direct";
channel.basicPublish(exchangeName,routeKey, null,message.getBytes());
完整的代码创建及绑定
//5.准备交换机
String exchangeName = "direct_message_exchange";
String exchangeType = "direct";
//如果你用界面把queue和exchange的关系先绑定话,代码就不需要在编写这些声明代码可以让代码变得更简洁
//如果用代码的方式去声明,我们要学习一下
//6.声明交换机 所谓的持久化就是指,交换机会不会随着服务器重启造成丢失
channel.exchangeDeclare(exchangeName,exchangeType,true);//7.声明队列
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
channel.queueDeclare("queue7",true,false,false,null);//8.绑定队列和交换机的关系 # 路由的名字
channel.queueBind("queue5",exchangeName,"order");
channel.queueBind("queue6",exchangeName,"order");
channel.queueBind("queue7",exchangeName,"course");channel.basicPublish(exchangeName,course, null,message.getBytes());
RabbitMQ入门案例 - Work模式
Work模式轮询模式(Round-Robin)
图解
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
- 轮询模式的分发:一个消费者一条,按均分配
- 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配
生产者
跟简单模式一样!
消费者
创建两个一样的!
Work模式公平分发模式(必须手动应答:设置autoAck=false)
生产者
跟简单模式一样!
消费者
//简单模式
public class Consumer{//3.接受内容//指标定义出来channel.basicQos(1);//值不要设置的太大channel.basicConsume("queue1",false,new DefaultConsumer(){public void handle(String consumerTag, Delivery message) throws IOException {System.out.println(new String("收到消息是" + new String(meassage.getBody()),"UTF-8"));//改成手动应答channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);},new CancelCallback(){public void handle(String consumerTag) throws IOException {System.out.println("接受失败了");}});//4.关闭channel.close();connection.close();
}
创建两个一样的!
15.RabbitMQ使用场景
解耦、削峰、异步
同步异步的问题(串行)
串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
并行方式 异步线程池
并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
public void test(){//异步theadpool.submit(new Callable<Object>{//1.发送短信服务})//异步theadpool.submit(new Callable<Object>{//2.})//异步theadpool.submit(new Callable<Object>{//3.})//异步theadpool.submit(new Callable<Object>{//4.})
}
存在问题
- 耦合度高
- 需要自己写线程池自己维护成本太高
- 出现了消息可能会丢失,需要你自己做消息补偿
- 如何保证消息的可靠性你自己写
- 如果服务器承载不了,你需要自己去写高可用
异步消息队列的方式
好处:
- 完全解耦,用 MQ建立桥接
- 有独立的线程池和运行模型
- 出现了消息可能会丢失,MQ有持久化功能
- 如何保证消息的可靠性,死信队列和消息转移等
- 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍
高内聚,低耦合
16.Springboot整合RabbitMQ
Fanout 模式
生产者
application.yml
# 服务端口
server:port: 8080
# 配置rabbitmq服务
spring:rabbitmq:username: adminpassword: adminvirtual-host: /host: 127.0.0.1port: 5672
OrderService.java
public class OrderService{@Autowiredprivate RabbitTemplate rabbitTemplate;//模拟用户下单public void makeOrder(String userid,String productid,int num){//1.根据商品id查询库存是否足够//2.保存订单String orderId = UUID.randomUUID().toString();sout("订单生产成功:"+orderId);//3.通过MQ来完成消息的分发//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容String exchangeName = "fanout_order_exchange";String routingKey = "";rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);}
}
RabbitMqConfiguration.java
@Configuration
public class RabbitMqConfiguration{//1.声明注册fanout模式的交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanout_order_exchange",true,false);}//2.声明队列@Beanpublic Queue smsQueue(){return new Queue("sms.fanout.queue",true);}@Beanpublic Queue duanxinQueue(){return new Queue("duanxin.fanout.queue",true);}@Beanpublic Queue emailQueue(){return new Queue("email.fanout.queue",true);}//3.完成绑定关系@Beanpublic Binding smsBingding(){return BindingBuilder.bin(smsQueue()).to(fanoutExchange());}@Beanpublic Binding duanxinBingding(){return BindingBuilder.bin(duanxinQueue()).to(fanoutExchange());}@Beanpublic Binding emailBingding(){return BindingBuilder.bin(emailQueue()).to(fanoutExchange());}
}
消费者
application.yml
# 服务端口
server:port: 8081
# 配置rabbitmq服务
spring:rabbitmq:username: adminpassword: adminvirtual-host: /host: 127.0.0.1port: 5672
FanoutSmsConsumer.java
@Component
@RabbitListener(queue = {"sms.direct.queue"})
public class FanoutSmsConsumer{@RabbitHandlerpublic void reviceMessage(String message){sout("sms接收到了的订单信息是:"+message);}
}
FanoutDuanxinConsumer.java
@Component
@RabbitListener(queue = {"duanxin.direct.queue"})
public class FanoutDuanxinConsumer{@RabbitHandlerpublic void reviceMessage(String message){sout("duanxin接收到了的订单信息是:"+message);}
}
FanoutEmailConsumer.java
@Component
@RabbitListener(queue = {"duanxin.direct.queue"})
public class FanoutEmailConsumer{@RabbitHandlerpublic void reviceMessage(String message){sout("email接收到了的订单信息是:"+message);}
}
Direct 模式
生产者
OrderService.java
public class OrderService{@Autowiredprivate RabbitTemplate rabbitTemplate;//模拟用户下单public void makeOrder(String userid,String productid,int num){//1.根据商品id查询库存是否足够//2.保存订单String orderId = UUID.randomUUID().toString();sout("订单生产成功:"+orderId);//3.通过MQ来完成消息的分发//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容String exchangeName = "direct_order_exchange";String routingKey = "";rabbitTemplate.convertAndSend(exchangeName,"email",orderId);rabbitTemplate.convertAndSend(exchangeName,"duanxin",orderId);}
}
消费者
RabbitMqConfiguration.java
@Configuration
public class RabbitMqConfiguration{//1.声明注册fanout模式的交换机@Beanpublic DirectExchange directExchange(){return new DirectExchange("direct_order_exchange",true,false);}//2.声明队列@Beanpublic Queue smsQueue(){return new Queue("sms.direct.queue",true);}@Beanpublic Queue duanxinQueue(){return new Queue("duanxin.direct.queue",true);}@Beanpublic Queue emailQueue(){return new Queue("email.direct.queue",true);}//3.完成绑定关系@Beanpublic Binding smsBingding(){return BindingBuilder.bin(smsQueue()).to(fanoutExchange()).with("sms");}@Beanpublic Binding duanxinBingding(){return BindingBuilder.bin(duanxinQueue()).to(fanoutExchange()).with("duanxin");}@Beanpublic Binding emailBingding(){return BindingBuilder.bin(emailQueue()).to(fanoutExchange()).with("email");}
}
Topic 模式
生产者
public class OrderService{@Autowiredprivate RabbitTemplate rabbitTemplate;//模拟用户下单public void makeOrder(String userid,String productid,int num){//1.根据商品id查询库存是否足够//2.保存订单String orderId = UUID.randomUUID().toString();sout("订单生产成功:"+orderId);//3.通过MQ来完成消息的分发//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容String exchangeName = "direct_order_exchange";String routingKey = "com.duanxin";rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);}
}
消费者(采用注解)
FanoutSmsConsumer.java
@Component
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "sms.topic.queue",durable = "true",antoDelete = "false"),exchange = @Exchange(value = "topic_order_exchange",type = "ExchangeTypes.TOPIC")key = "#.sms.#"
))
public class TopicSmsConsumer{@RabbitHandlerpublic void reviceMessage(String message){sout("sms接收到了的订单信息是:"+message);}
}
FanoutDuanxinConsumer.java
@Component
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "duanxin.topic.queue",durable = "true",antoDelete = "false"),exchange = @Exchange(value = "topic_order_exchange",type = "ExchangeTypes.TOPIC")key = "#.duanxin.#"
))
public classTopicDuanxinConsumer{@RabbitHandlerpublic void reviceMessage(String message){sout("duanxin接收到了的订单信息是:"+message);}
}
FanoutEmailConsumer.java
@Component
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "email.topic.queue",durable = "true",antoDelete = "false"),exchange = @Exchange(value = "topic_order_exchange",type = "ExchangeTypes.TOPIC")key = "#.email.#"
))
public class TopicEmailConsumer{@RabbitHandlerpublic void reviceMessage(String message){sout("email接收到了的订单信息是:"+message);}
}
注意:
配置类最好写在消费者一边,因为消费者最先启动,如果没有创建好配置,或发生报错,但是写在生产者一边也是可以的
17.RabbitMQ高级 - 过期时间TTL
概述
过期时间 TTl表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置 TTL,目前有两种方法可以设置
- 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间
- 第二种方法是对消息进行单独设置,每条消息 TTL可以不同
如果上述两种方法同时使用,则消息的过期时间以两者 TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL值,就称为 dead message被投递到死信队列,消费者将无法再收到该消息
方法1:设置队列TTL
RabbitMqConfiguration.java
@Configuration
public class TTLRabbitMQConfiguration{//1.声明注册direct模式的交换机@Beanpublic DirectExchange ttldirectExchange(){return new DirectExchange("ttl_direct_exchange",true,false);}//2.队列的过期时间@Beanpublic Queue directttlQueue(){//设置过期时间Map<String,Object> args = new HashMap<>();args.put("x-message-ttl",5000);//这里一定是int类型return new Queue("ttl.direct.queue",true,false,false,args);}@Beanpublic Binding ttlBingding(){return BindingBuilder.bin(directttlQueue()).to(ttldirectExchange()).with("ttl");}
}
设置成功后图形化界面会有一个TTL标志:
方法2:设置消息TTL
public class OrderService{@Autowiredprivate RabbitTemplate rabbitTemplate;//模拟用户下单public void makeOrder(String userid,String productid,int num){//1.根据商品id查询库存是否足够//2.保存订单String orderId = UUID.randomUUID().toString();sout("订单生产成功:"+orderId);//3.通过MQ来完成消息的分发//参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容String exchangeName = "ttl_order_exchange";String routingKey = "ttlmessage";//给消息设置过期时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){public Message postProcessMessage(Message message){//这里就是字符串message.getMessageProperties().setExpiration("5000");message.getMessageProperties().setContentEncoding("UTF-8");return message;}}rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId,messagePostProcessor);}
}
RabbitMqConfiguration.java
@Configuration
public class TTLRabbitMQConfiguration{//1.声明注册direct模式的交换机@Beanpublic DirectExchange ttldirectExchange(){return new DirectExchange("ttl_direct_exchange",true,false);}//2.队列的过期时间@Beanpublic Queue directttlQueue(){//设置过期时间Map<String,Object> args = new HashMap<>();args.put("x-message-ttl",5000);//这里一定是int类型return new Queue("ttl.direct.queue",true,false,false,args);}@Beanpublic Queue directttlMessageQueue(){return new Queue("ttlMessage.direct.queue",true,false,false,args);}@Beanpublic Binding ttlBingding(){return BindingBuilder.bin(directttlMessageQueue()).to(ttldirectExchange()).with("ttlmessage");}
}
RabbitMQ高级 - 死信队列(接盘侠)
概述
DLX,全称 Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为死信邮箱。当消息再一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX的队列就称之为死信队列。消息变成死信,可能是由于以下原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性,当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的 DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange指定交换机即可
代码
DeadRabbitMqConfiguration.java
@Configuration
public class DeadRabbitMqConfiguration{//1.声明注册direct模式的交换机@Beanpublic DirectExchange deadDirect(){return new DirectExchange("dead_direct_exchange",true,false);}//2.队列的过期时间@Beanpublic Queue deadQueue(){return new Queue("dead.direct.queue",true);}@Beanpublic Binding deadbinds(){return BindingBuilder.bind(deadDirect()).to(deadQueue()).with("dead");}
}
RabbitMqConfiguration.java
@Configuration
public class TTLRabbitMQConfiguration{//1.声明注册direct模式的交换机@Beanpublic DirectExchange ttldirectExchange(){return new DirectExchange("ttl_direct_exchange",true,false);}//2.队列的过期时间@Beanpublic Queue directttlQueue(){//设置过期时间Map<String,Object> args = new HashMap<>();//args.put("x-max-length",5);args.put("x-message-ttl",5000);//这里一定是int类型args.put("x-dead-letter-exchange","dead_direct_exchange");//两个交换机关联args.put("x-dead-letter-routing-key","dead");//fanout不需要配置return new Queue("ttl.direct.queue",true,false,false,args);}@Beanpublic Queue directttlMessageQueue(){return new Queue("ttlMessage.direct.queue",true,false,false,args);}@Beanpublic Binding ttlBingding(){return BindingBuilder.bin(directttlMessageQueue()).to(ttldirectExchange()).with("ttlmessage");}
}
18.内存磁盘的监控
RabbitMQ内存警告
RabbitMQ的内存控制
参考帮助文档:http://www.rabbbitmq.com/configure.html
命令的方式 :二者选其一
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当 RabbitMQ的内存超过40%时,就会产生警告并且会阻塞所有生产者的连接。通过此命令修改阈值在 Broker重启以后将会失效,通过修改配置文件设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启 Broker才会生效
配置文件方式 rabbitmq.conf
RabbitMQ的内存换页
RabbitMQ的磁盘预警
19.RabbitMQ高级 - 集群
RabbitMQ集群
集群搭建
配置的前提是你的 rabbitmq可以运行起来,比如ps aix|grep rebbitmq
你能看到相关进程,又比如运行rabbitmqct status
你可以看到类似如下信息而不报错:
单机多实例搭建
启动第二个节点
验证启动
ps aux|grep rabbitmq
rabbit-1操作作为主节点
rabbit-2操作作为从节点
验证集群状态
Web监控
rabbitmq-plugins enable rabbitmq_management
小结
20.RabbitMQ高级 -分布式事务
美团外卖架构:
基于MQ的分布式事务整体设计思路
消息中间件---RabbitMQ相关推荐
- 面试必过之消息中间件RabbitMQ面试总结大全!
本篇基于海量的学员大厂面试,面试中关于消息中间件RabbitMQ的问题专题整理,希望对即将入行的同学有所帮助,祝一切顺利! 1.使用RabbitMQ有什么好处? 抢购活动,削峰填谷,防止系统崩塌. 延 ...
- 分布式系统消息中间件——RabbitMQ的使用基础篇
分布式系统消息中间件--RabbitMQ的使用基础篇 转载于:https://www.cnblogs.com/zhehan54/p/9679101.html
- 基于消息中间件RabbitMQ实现简单的RPC服务
转载自 基于消息中间件RabbitMQ实现简单的RPC服务 RPC(Remote Procedure Call,远程过程调用),是一种计算机通信协议.对于两台机器而言,就是A服务器上的应用程序调用B ...
- 分布式日志sleuth+分布式追踪系统zipkin+消息中间件rabbitMQ+MySQL存储跟踪数据
一.了解分布式架构下系统的监控问题 接口监控问题 监测性能瓶颈 解决方案:Sleuth 日志监控问题 日志分散 解决方案:ELK+Kafka 二.使用Sleuth实现大觅网微服务跟踪 1.打开一个分布 ...
- 分布式系统消息中间件-RabbitMQ介绍及其应用
分布式系统消息中间件-RabbitMQ 一.消息中间件 1.1 中间件 1.1.1 什么是中间件? 中间件(Middleware)是处于操作系统和应用程序之间的软件.人们在使用中间件时,往往是一组中间 ...
- 消息中间件Rabbitmq核心概念讲解
概述 Rabbitmq是消息中间件的一种落地开源实现,使用Erlang语言编写,基于AMQP消息协议. 核心概念 Message:消息是不具名的,由消息头和消息体组成,消息体是不透明的,也就是可以设置 ...
- 【MQ】MQ消息中间件RabbitMQ
第一部分:RabbitMQ 一.MQ 概念 MQ,Message Queue,消息队列.本质是队列,遵循FIFO先进先出原则.只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于 ...
- springboot(十)SpringBoot消息中间件RabbitMQ
github地址:https://github.com/showkawa/springBoot_2017/tree/master/spb-demo/spb-brian-query-service 1. ...
- 又拍网架构 -- 前端PHP后台Python +消息中间件 RabbitMQ + 分库步骤
又拍网架构 (http://www.bopor.com/?p=652), 很有价值. 又拍网的服务器端开发语言主要是PHP和Python,其中PHP用于编写Web逻辑(通过HTTP和用户直接打交道) ...
最新文章
- R语言使用ggplot2包geom_jitter()函数绘制分组(strip plot,一维散点图)带状图(编写自定义函数添加均值、标准偏差)实战
- Mysql中大表添加索引的办法
- libevent 获取多线程结构体变量加锁方法
- python真的那么火吗-Python语言为什么这么火?
- 第18章 类加载机制与反射
- 《幽灵行者》:近期最酷炫的赛博朋克游戏之一
- ios 直播点赞_微信新版本更新:为视频号直播虚拟礼物,还提供连麦、美颜等功能...
- oracle 测试试题,oracle试题
- 漫话:如何给女朋友解释String对象是不可变的?
- html文件实践总结,html,css学习实践总结
- HDU2203 亲和串【字符串】
- 【源码阅读】Java集合之一 - ArrayList源码深度解读
- 利用 TFLearn 快速搭建经典深度学习模型
- How to create a DXL attribute using a DXL script
- Glide在github上的jar包下载方法
- java 有哪些框架_Java常用框架有哪些?这些框架有什么用?
- windows 2000 密钥
- 学Python中道崩殂的人,大抵逃不过这3个原因!其中有你吗?
- 简单概括 文明进化的各个阶段 (39)
- 生活当中调整时钟时间背后发生的事情
热门文章
- 【调剂】中国科学技术大学生物医学工程学院(中科院苏州医工所)2023年硕士研究生(专业学位硕士)调剂简章...
- 组播Lab RPF TTL
- 物联网操作系统有哪些特点
- USB PC to PC通讯、USB互联线API简介.doc
- 分析国家统计局行政区划代码(省市区数据)生成SQL
- ESI.ProCAST.2019详细安装指南
- 葛文德之医生三部曲《医生的修炼》、《医生的精进》和《最好的告别》
- QGIS如何将高程DEM统一增加数值
- c语言中数组的变量j是什么,C语言中x[i][j]的意思是?
- InternetCrackUrl