消息中间件之RabbitMQ

核心概念

​ JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

​ JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。

消息模型

Point-to-Point(P2P) 点对点(普通消息)

Publish/Subscribe(Pub/Sub) 发布订阅(交换器消息)

P2P:点对点发送,一个消息只能被消费一次

​ 涉及的角色

​ 消息队列(Queue)

​ 发送者(Sender)

​ 接收者(Receiver)

​ 每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

​ 特点:

​ 1、不同同时在线

​ 2、一个消息只能被消费1次(在点对点模型中)

Pub/Sub:发布订阅 一个消息可用被消费多次

​ 涉及的角色:

​ 主题(Topic)

​ 发布者(Publisher)

​ 订阅者(Subscriber)

​ 客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者息,直到他们被消费或超时。

​ 特点:

​ 一个消息可用被消费多次


MQ

消息中间件(MOM:Message Orient middleware),别名 消息队列

​ 作为系统间通信的必备技术,低耦合、可靠传输、流量控制、最终一致性实现异步消息通信

消息中间件有很多的用途和优点:

​ 1、将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块;
​ 2、负责建立网络通信的通道,进行数据的可靠传送。

​ 3、保证数据不重发,不丢失

​ 4、能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务

MQ的优缺点

​ 1、解耦:降低系统模块的耦合度

​ 2、提高系统响应时间

​ 3、异步消息

​ 4、过载保护,基于MQ实现削峰填谷(限流,超过服务器承载能力的请求在消息队列中等待处理)

主流MQ对比

1、ActiveMQ

​ Apache 下完全支持Java的JMS协议
​ 消息模式:1、点对点 2、发布订阅

2、RabbitMQ

​ Erlang语言 实现的开源的MQ中间件,支持多种协议

3、Kafka

​ Apache下开源项目
​ 高性能分布式消息队列,一般海量数据传输 大数据部门一般都用
​ 单机吞吐量:10W/S

4、RocketMQ

​ 阿里 贡献给了Apache
​ 参考了Kafka实现的基于Java 消息中间件

5、ZeroMQ

消息传输最快


RabbitMQ

概念

​ RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。

RabbitMQ是由RabbitMQ Technologies Ltd开发并且提供商业支持的。该公司在2010年4月被SpringSource(VMWare的一个部门)收购。在2013年5月被并入Pivotal。其实VMWare,Pivotal和EMC本质上是一家的。不同的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,现在并没有上市。

涉及核心

​ Exchange(交换机、交换器):

​ 根据绑定的匹配规则对消息进行匹配处理

​ Queue(队列):

​ RabbitMQ的作用是存储消息,队列的特性是先进先出。上图可以清晰地看到Client A和Client B是生产者,生产者生产消息最终被送到RabbitMQ的内部对象Queue中去,而消费者则是从Queue队列中取出数据


RabbitMQ初体验

注意:(可视化和客户端连接的端口号不一样)

​ 15672:网页版 可视化服务器数据
​ 5672:客户端连接点的端口号

涉及的角色

​ 1、MQ服务器(可以基于Doker安装RabbitMQ)

​ 端口号:15672(网页版 可视化服务器数据)

​ 端口号:5672(客户端连接点的端口号)

​ 2、MQ消息发送者

​ 3、MQ消息消费者

MQ消息发送者(点对点也称普通消息)

1、导入依赖的jar包
        <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId></dependency>
2、代码的编写
public class RabbitMQ_Send1_Main {public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置连接信息factory.setHost("主机ip地址");//设置主机factory.setPort(5672);//设置端口factory.setUsername("guest");//设置账户factory.setPassword("guest");//设置密码//获取连接对象  并抛出异常Connection connection = factory.newConnection();//获取通道对象Channel channel = connection.createChannel();/*** 通过通道对象定义一个队列* 定义队列参数说明* 第一个参数:队列名称* 第二个参数:是否持久化 队列消息是否存储到磁盘* 第三个参数:是否独占队列* 第四个参数:是否断开后自动删除消息* 第五个参数:额外设置的数据信息,是一个map集合,一般情况下不用。*/channel.queueDeclare("queue1902",false,false,false,null);//发送消息/* 参数说明:* 1、交换机名称* 2、队列名称* 3、属性参数* 4、发送的消息内容 要求字节*/channel.basicPublish(null,"queue1902",null,"21岁生日快乐".getBytes());//关闭channel.close();//关闭通道对象connection.close();//关闭连接对象}
}
运行消费之后,通过ip:15672进入RabbitMQ

点击对应的队列,可以看到消息

Ready:为已经就绪的消息

Total:为消息的总量

编写消息的消费者(点对点也称普通消息)
public class RabbitMQ_Consumer1_Main {public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置连接信息factory.setHost("注意的ip");//设置主机factory.setPort(5672);//设置连接端口factory.setUsername("guest");//设置账户factory.setPassword("guest");//设置密码//获取连接对象  并抛出异常Connection connection = factory.newConnection();//获取通道对象Channel channel = connection.createChannel();/*** 通过通道对象定义一个队列* 定义队列参数说明* 第一个参数:队列名称* 第二个参数:是否持久化 队列消息是否存储到磁盘* 第三个参数:是否独占队列* 第四个参数:是否断开后自动删除消息* 第五个参数:额外设置的数据信息,是一个map集合,一般情况下不用。*/channel.queueDeclare("queue1902",false,false,false,null);//发送消息/* 参数说明:* 1、交换机名称,注意交换机不能为null,可以写成 ""* 2、队列名称* 3、属性参数* 4、发送的消息内容 要求字节*/channel.basicPublish("","queue1902",null,"21岁生日快乐".getBytes());//定义消费者Consumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者:"+new String(body));}};/*** 绑定消费者* 参数说明* 1、队列名称* 2、是否自动应答* 3、消费者对象*/channel.basicConsume("queue1902",true , defaultConsumer);/*** 消费者不用关闭,如果没有可以消费的消息,就会暂时阻塞在这里*///channel.close();//关闭通道对象//connection.close();//关闭连接对象}}
在下面可以看到持续接受的消息。


交换器消息

注:点对点消息不需要使用交换器

RabbitMQ特色就在于Exchange,主要有以下类型:

​ 1、fanout:只要有消息就转发给绑定的队列,不会进行消息的路由判断

​ 2、diect:直接 会根据路由匹配规则,将消息发送到指定的队列中,注:路由规则不支持特殊字符

​ 3、topic:会根据路由匹配规则,将消息发送到指定的队列中,注:这个路由规则支持特殊字符,比如 *(匹配一个单词) #(没有或者多个)

交换器消息常用的有三种:Fanout消息、Direct消息、Topic消息

Fanout消息案例

Fanout类型的Exchange路由规则非常简单, 它会把所有发送到该Exchange的消息路由发到所有与它绑定的Queue(队列)中

Fanout消息提供者

此提供者创建了2个队列 laoxingjingjie

/*** RabbitMQ的fanout消息发布者*/
public class Msg_Provider {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();// 设置连接信息factory.setHost("主机ip");//设置主机factory.setPort(5672);//设置端口factory.setUsername("guest");//设置账户factory.setPassword("guest");//设置密码//获取连接对象  并抛出异常Connection connection = factory.newConnection();//获取通道对象Channel channel = connection.createChannel();//定义交换器channel.exchangeDeclare("exchange1902", BuiltinExchangeType.FANOUT);//创建2个队列channel.queueDeclare("laoxing",false,false,false,null);channel.queueDeclare("jingjie",false,false,false,null);/*** 绑定队列* 参数说明* 1、交换器名称* 2、路由规则* 3、队列要队列名称*///channel.exchangeBind("exchange1902","",qn);/*** 队列绑定* 参数说明* 1、队列名称* 2、交换器名称* 3、路由规则*/channel.queueBind("laoxing","exchange1902","");channel.queueBind("jingjie","exchange1902","");//发送消息channel.basicPublish("exchange1902","",null,"我是轩轩,我看着代码不想代码了".getBytes() );channel.close();//关闭通道connection.close();//关闭连接对象}

Fanout消息消费者

此消费者消费了2个队列 laoxingjingjie 里的消息

/*** RabbitMQ的fanout消息消费者*/
public class Msg_Cnsumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();// 设置连接信息factory.setHost("主机ip");//设置主机factory.setPort(5672);//设置端口factory.setUsername("guest");//设置账户factory.setPassword("guest");//设置密码//获取连接对象  并抛出异常Connection connection = factory.newConnection();//获取通道对象Channel channel = connection.createChannel();//定义交换器channel.exchangeDeclare("exchange1902", BuiltinExchangeType.FANOUT);//创建队列channel.queueDeclare("laoxing",false,false,false,null);channel.queueDeclare("jingjie",false,false,false,null);/*** 绑定队列* 参数说明* 1、交换器名称* 2、路由规则* 3、队列要队列名称*///channel.exchangeBind("exchange1902","",qn);/*** 队列绑定* 参数说明* 1、队列名称* 2、交换器名称* 3、路由规则*/channel.queueBind("laoxing","exchange1902","");channel.queueBind("jingjie","exchange1902","");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者:"+new String(body));}};/*** 绑定消费者* 参数说明* 1、队列名称* 2、自动应答* 3、消费者*/channel.basicConsume("laoxing",true,consumer);channel.basicConsume("jingjie",true,consumer);}
}

下面可以看到消费者接受到了消息


Direct消息案例

会根据路由匹配规则,将消息发送到指定的队列中,注:路由规则不支持特殊字符

Driect消息提供者

public class Msg_Send {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();// 设置连接信息factory.setHost("106.12.48.254");//设置主机factory.setPort(5672);//设置端口factory.setUsername("guest");//设置账户factory.setPassword("guest");//设置密码//获取连接对象  并抛出异常Connection connection = factory.newConnection();//获取通道对象Channel channel = connection.createChannel();//定义交换器名称和交换器类型channel.exchangeDeclare("exc_xuan_direct", BuiltinExchangeType.DIRECT);//创建2个队列channel.queueDeclare("order_xuanxuan",false,false,false,null);channel.queueDeclare("user_xuanxuan",false,false,false,null);/*** 绑定队列到交换器* 1、队列名* 2、交换器名* 3、匹配规则*/channel.queueBind("order_xuanxuan","exc_xuan_direct","order");channel.queueBind("user_xuanxuan","exc_xuan_direct","user");//发布消息channel.basicPublish("exc_xuan_direct", "order",null , "新来订单了".getBytes());channel.basicPublish("exc_xuan_direct", "user",null , "新来订单了user".getBytes());channel.close();;//关闭通道connection.close();//关闭连接}
}

运行只会,可以看到 exc_xuan_direct 路由中绑定了2个队列

Driect消息消费者

public class Msg_Cnsumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();// 设置连接信息factory.setHost("106.12.48.254");//设置主机factory.setPort(5672);//设置端口factory.setUsername("guest");//设置账户factory.setPassword("guest");//设置密码//获取连接对象  并抛出异常Connection connection = factory.newConnection();//获取通道对象Channel channel = connection.createChannel();/*** 绑定队列* 参数说明* 1、交换器名称* 2、路由规则* 3、队列要队列名称*///channel.exchangeBind("exchange1902","",qn);/*** 队列绑定* 参数说明* 1、队列名称* 2、交换器名称* 3、路由规则*/channel.queueBind("order_xuanxuan","exc_xuan_direct","");channel.queueBind("user_xuanxuan","exc_xuan_direct","");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者:"+new String(body));}};/*** 绑定消费者* 参数说明* 1、队列名称* 2、自动应答* 3、消费者*/channel.basicConsume("order_xuanxuan",true,consumer);channel.basicConsume("user_xuanxuan",true,consumer);}
}

运行消费者,可以看到接受到了内容

注:因为交换器和队列已经存在,所以不需要创建对了和交换器,只需要指定交换器和队列,进行接受即可


topic消息案例

​ 会根据路由匹配规则,这个消息类型和 direct 一样将消息发送到指定的队列中,不同的是:这个路由规则支持特殊字符,比如 *(匹配一个单词) #(没有或者多个)

Topic提供消息者

public class Msg_Send {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();// 设置连接信息factory.setHost("106.12.48.254");//设置主机factory.setPort(5672);//设置端口factory.setUsername("guest");//设置账户factory.setPassword("guest");//设置密码//获取连接对象  并抛出异常Connection connection = factory.newConnection();//获取通道对象Channel channel = connection.createChannel();//定义交换器名称和交换器类型channel.exchangeDeclare("exc_xuan_topic", BuiltinExchangeType.TOPIC);//创建队列channel.queueDeclare("pay_xuanxuan",false,false,false,null);channel.queueDeclare("msg_xuanxuan00",false,false,false,null);channel.queueDeclare("oss_xuanxuan",false,false,false,null);channel.queueDeclare("msg_xuanxuan01",false,false,false,null);/*** 绑定队列到交换器* 1、队列名* 2、交换器名* 3、匹配规则:匹配规则中的#为pay为前缀,一个或多个字符*///注意:匹配规则中,使用特殊符号需要使用.来分割channel.queueBind("pay_xuanxuan","exc_xuan_topic","pay.#");//channel.queueBind("msg_xuanxuan00","exc_xuan_topic","msg.#");channel.queueBind("oss_xuanxuan","exc_xuan_topic","oss.#");channel.queueBind("msg_xuanxuan01","exc_xuan_topic","msg.#");/***发布消息;使用msg进行模匹配,路径将消息发给符合规则的队列* 参数说明,* 第一个交换器* 第二个:匹配规则* 第三个:目前我还不知道* 第四个:消息内容*/channel.basicPublish("exc_xuan_topic", "msg.asd",null , "订单预支付信息".getBytes());channel.close();;//关闭通道connection.close();//关闭连接}
}

因为 msg. 规则后面不管写和不写,都可以匹配上 msg_xuanxuan00msg_xuanxuan01 两个队列,运行此代代码后,我们可以看到RabbitMQ的这两个队列中,分别多出一条消息

Topic消息消费者

public class Msg_Cnsumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();// 设置连接信息factory.setHost("106.12.48.254");//设置主机factory.setPort(5672);//设置端口factory.setUsername("guest");//设置账户factory.setPassword("guest");//设置密码//获取连接对象  并抛出异常Connection connection = factory.newConnection();//获取通道对象Channel channel = connection.createChannel();/*** 绑定队列* 参数说明* 1、交换器名称* 2、路由规则* 3、队列要队列名称*///channel.exchangeBind("exchange1902","",qn);/*** 队列绑定* 参数说明* 1、队列名称* 2、交换器名称* 3、路由规则*/channel.queueBind("msg_xuanxuan00","exc_xuan_direct","");channel.queueBind("msg_xuanxuan01","exc_xuan_direct","");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者-支付:"+new String(body));}};/*** 绑定消费者* 参数说明* 1、队列名称* 2、自动应答* 3、消费者*/channel.basicConsume("msg_xuanxuan00",true,consumer);channel.basicConsume("msg_xuanxuan01",true,consumer);}
}

运行消费信息类只会,控制台拿到两个队列的内容

之后刷新RibbtMQ队列,发现两个队列 msg_xuanxuan00msg_xuanxuan01 的消息已经被消费了

RabbitMQ总结

消息发送者:

1、直接发送给队列
2、将消息发给交换器

消息消费者:

1、监听队列的数据变化
2、自动接收消息

RabbitMQ知识点:

1、MQ服务器 Docker
2、发送消息 basicPublish
3、消费消息 basicConsumer
1、队列必须定义、交换器必须定义 交换器和队列绑定 queueBind

RabbitMQ入门用法及消息模型案例相关推荐

  1. day72 JavaWeb框架阶段——RabbitMQ消息队列【了解常见的MQ产品,了解RabbitMQ的5种消息模型,会使用Spring AMQP】

    文章目录 0.学习目标 1.RabbitMQ 1.1.搜索与商品服务的问题 1.2.消息队列(MQ) 1.2.1.什么是消息队列 1.2.2.AMQP和JMS 1.2.3.常见MQ产品 1.2.4.R ...

  2. SpringBoot整合RabbitMQ 实现五种消息模型

    目录 SpringBoot中使用RabbitMQ 搭建初始环境 引入依赖 配置配置文件 测试类 注入 rabbitTemplate 消息队列RabbitMQ之五种消息模型 第一种直连模型使用 开发生产 ...

  3. Python实现RabbitMQ中6种消息模型(转)

    RabbitMQ与Redis对比 ​ RabbitMQ是一种比较流行的消息中间件,之前我一直使用redis作为消息中间件,但是生产环境比较推荐RabbitMQ来替代Redis,所以我去查询了一些Rab ...

  4. 消息中间件rabbitMQ之第二种消息模型(work quene)

    Work queues,也被称为(Task queues),任务模型.当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度.长此以往,消息就会堆积越来越多,无法及时处理.此时就可以使用 ...

  5. RabbitMQ之五种消息模型

    首先什么是MQ MQ全称是Message Queue,即消息对列!消息队列是典型的:生产者.消费者模型.生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息.因为消息的生产和消费都是异步的,而 ...

  6. RabbitMQ·入门·壹

    文章目录 1 MQ思想 1.1 相关概念:同步.异步通讯 1.1.1 同步通讯 1.1.2 异步通讯 1.2 MQ思想概述 1.2.1 MQ思想优缺点 1.2.2 MQ技术对比 1.2.3 MQ技术选 ...

  7. RabbitMQ 入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)

    发布/订阅 在上篇第二部分教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全不一样--分发一个消息给多个消费者(consumers).这种模式 ...

  8. RabbitMQ快速入门--消息模型介绍

    RabbitMQ消息模型 RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型:

  9. 带你快速入门RabbitMQ(附思维导图,案例代码)

    RabbitMQ 1.初识MQ 1.1.同步和异步通讯 微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应. 异步通讯:就像发邮件,不需要马上回复. 两种方式各有优劣,打电话可以立 ...

最新文章

  1. 登录和oauth机制
  2. android面向数据库的的编程工具-OrmLite
  3. Math.round()
  4. 在控制台中输出 出现SIGBAT或者EXC_BAD_ACCESS的原因的方法
  5. linux 统计当前目录下文件或者文件夹的数量
  6. JavaScript-初识jQuery及公式
  7. 机器学习实战6-sklearn训练决策树实现分类和回归
  8. 6折入股蚂蚁金服?巨人网络如此回应
  9. 2017级C语言大作业 - 气球塔防
  10. python里的class_Python中的Class的讨论
  11. sudo chown r mysql_Linux 文件基本属性: chown修改所属组 和 chmod修改文件属性命令
  12. Linux安装Nginx1.7.4、php5.5.15和配置
  13. 90后美女学霸传奇人生:出身清华姚班,成斯坦福AI实验室负责人高徒
  14. POJ 2706 Connect
  15. 划分训练集,验证集,测试集
  16. 使用for循环编写反方向正直角三角形
  17. go (golang) DNS域名解析实现
  18. iOS开发 ☞ emoji表情大全
  19. BurpSuite--Proxy详解
  20. 论文领读|基于 VQVAE 的长文本生成

热门文章

  1. What is outlier?
  2. 统计学、深度学习、机器学习、数据挖掘
  3. 单片机I/O常用的驱动与隔离电路设计
  4. linux 下dump的使用
  5. python snmp
  6. 人工智能未来替代的职位,主要有哪些行业?
  7. 小程序设置边框border
  8. win10 修复打印机服务器,Windows Update修复了打印机错误(win10/win7)
  9. 哈佛体系结构 哈佛体系结构
  10. Quartus (Quartus Prime 18.1)的安装及仿真步骤