RabbitMQ学习(二)-Rabbit的使用
title: RabbitMQ学习(二)-Rabbit的使用
date: 2020-12-14
tags:
- 微服务
- RabbitMQ学习(二)-Rabbit的使用
- RabbitMQ
- spring
- springboot
categories: - 微服务
- RabbitMQ
- RabbitMQ学习(二)-Rabbit的使用
一、Rabbit常见的六种通信方式
二、Java连接RabbitMQ
2.1 创建一个Maven项目
2.2 导入RabbitMQ相关依赖
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency>
</dependencies>
2.3 创建工具类连接RabbitMQ
public class RabbitMQClient {public static Connection getConnection(){//1. 创建connection工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.199.138");//设置ipfactory.setPort(5672); //设置端口号factory.setUsername("test"); //设置用户名factory.setPassword("test"); //设置密码factory.setVirtualHost("/test"); //设置VirtualHost//2. 通过工厂创建connectionConnection connection = null;try {connection = factory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}return connection;}
}
2.4 测试
public class Demo1 {@Testpublic void testConnection() throws IOException {Connection connection = RabbitMQClient.getConnection();System.out.println(connection);connection.close();}
}
三、Hello-world 基本消息模型
最简单的消息模型:
一个生产者、一个默认交换机、一个队列和一个消费者。
3.1 创建生产者
步骤:
- 通过getConnection静态方法获取连接对象
- 通过连接对象获取channel管道
- 通过channel管道的basicPublish()将消息发布到管道中,此方法需要四个参数:
- 参数1 指定exchange,这里使用“”表示使用默认的交换机
- 参数2 指定要发布到哪个队列(在使用默认exchange时);
- 参数3 指定传递的消息所携带的properties,这里使用null
- 参数4 指定发布的具体消息,byte[]类型
- 释放资源
public class Publisher {@Testpublic void publish() throws Exception {//1.获取connectionConnection connection = RabbitMQClient.getConnection();//2. 创建channelChannel channel = connection.createChannel();//3. 发布消息到exchange,同时指定路由规则String msg = "Hello-Word"+(new Date());//basicPublish需要的四个参数://参数1 指定exchange,这里使用“”表示使用默认的交换机//参数2 指定要发布到哪个队列//参数3 指定传递的消息所携带的properties,这里使用null//参数4 指定发布的具体消息,byte[]类型channel.basicPublish("","HelloWord",null,msg.getBytes());//ps:exchange是不会帮你将你的消息持久化本地的,Queue才能帮你持久化消息System.out.println("发布消息成功!");//4.释放资源channel.close();connection.close();}
}
3.2 创建消费者
步骤:
- 通过getConnection静态方法获取连接对象
- 通过连接对象获取channel管道
- 通过channel的queueDeclare()方法创建一个队列,此方法需要五个参数:
- 参数1 String queue 指定要创建的队列的名称
- 参数2 boolean durable 指定当前队列是否需要持久化(指定为true后消息会自动存储到本地)
- 参数3 boolean exclusive 指定是否排外(当connection.close,后当前队列会被自动删除,并且当前队列只允许一个消费者进行消费)
- 参数4 boolean autoDelete 如果此队列没有消费者在消费,则自动删除
- 参数5 Map<String, Object> arguments 指定当前队列的其他信息
- 回调方法:通过重写DefaultConsumer对象中的handleDelivery方法来接收管道中的消息
- 通过channel的basicConsume方法来消费管道中的消息,此方法需要三个参数:
- 参数1 String queue 指定要消费哪个队列
- 参数2 DeliverCallback deliverCallback 指定是否自动ACK(当设置为true时,消费者接受到消息,会自动告诉RabbitMQ)
- 参数3 CancelCallback cancelCallback 指定回调方法
- 释放资源
public class Customer {@Testpublic void customer() throws Exception {//1.获取connectionConnection connection = RabbitMQClient.getConnection();//2. 创建channelChannel channel = connection.createChannel();//3.声明队列(HelloWord)//queueDeclare所需要的五个参数://参数1 String queue 指定要创建的队列的名称//参数2 boolean durable 指定当前队列是否需要持久化(指定为true后消息会自动存储到本地)//参数3 boolean exclusive 指定是否排外(当connection.close,后当前队列会被自动删除,并且当前队列只允许一个消费者进行消费)//参数4 boolean autoDelete 如果此队列没有消费者在消费,则自动删除//参数5 Map<String, Object> arguments 指定当前队列的其他信息channel.queueDeclare("HelloWord",false,false,false,null);//4.开启监听指定Queue//回调方法DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到的消息:"+new String(body,"UTF-8"));}};//basicConsume所需要的三个参数://参数1 String queue 指定要消费哪个队列//参数2 DeliverCallback deliverCallback 指定是否自动ACK(当设置为true时,消费者接受到消息,会自动告诉RabbitMQ)//参数3 CancelCallback cancelCallback 指定回调方法channel.basicConsume("HelloWord",true,consumer);System.out.println("正在监听队列!");//避免程序运行完自动停止,方便测试System.in.read();//5.释放资源channel.close();connection.close();}
}
注:
- 队列的声明可以在publish中、可以在customer中也可以都创建(创建队列时,有相同队列则不创建,没有则创建),但需要注意的是,发布消息时或者消费消息前需要存在一个声明好的队列
- exchange是不会帮你将你的消息持久化本地的,Queue才能帮你持久化消息
四、Work Queues工作队列模型
在基本消息模型中,一个生产者对应一个消费者,而实际生产过程中,往往消息生产会发送很多条消息,如果消费者只有一个的话效率就会很低,因此rabbitmq有另外一种消息模型,这种模型下,一个生产发送消息到队列,允许有多个消费者接收消息,但是一条消息只会被一个消费者获取。
一个生产者,一个默认交换机,一个队列,两个消费者。
4.1 创建生产者
public class Publisher {@Testpublic void publish() throws Exception {//1.获取connectionConnection connection = RabbitMQClient.getConnection();//2. 创建channelChannel channel = connection.createChannel();//3. 发布消息到exchange,同时指定路由规则for (int i = 0; i < 10; i++) {String msg = "Hello-Word"+i;channel.basicPublish("","work",null,msg.getBytes());}System.out.println("发布消息成功!");//4.释放资源channel.close();connection.close();}}
4.2 创建消费者
ACK (Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。
手动ACK:
- channel的basicConsume()方法的第二个参数设为false,表示不使用自动ACK。
- 通过channel的basicQos(int n) 方法指定一次消费多少条消息。
- 在DefaultConsumer重写的handleDelivery方法中进行手动ACK(因为在手动ACK前已经通过body属性获取到了消息,相当于消费了消息,所以可以在他的后面可以进行ACK):通过channel的basicAck()方法进行手动ACK,第二个参数设为false表示不进行批量操作。
消费者1
public class Customer1 {@Testpublic void customer() throws Exception {//1.获取connectionConnection connection = RabbitMQClient.getConnection();//2. 创建channelfinal Channel channel = connection.createChannel();//3.声明队列(HelloWord)channel.queueDeclare("work",false,false,false,null);//4.指定当前消费者一此消费多少条消息channel.basicQos(1);//5.开启监听指定Queue//回调方法DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1号接收到的消息:"+new String(body,"UTF-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume("work",false,consumer);System.out.println("消费者1号正在监听队列!");//避免程序运行完自动停止,方便测试System.in.read();//6.释放资源channel.close();connection.close();}}
消费者2
public class Customer1 {@Testpublic void customer() throws Exception {........}
}
测试
五、 Publish/Subscribe(FANOUT) 订阅模型
在之前的模型中,一条消息只能被一个消费者获取,而在订阅模式中,可以实现一条消息被多个消费者获取。在这种模型下,消息传递过程中比之前多了一个exchange交换机,生产者不是直接发送消息到队列,而是先发送给交换机,经由交换机分配到不同的队列,而每个消费者都有自己的队列。
一个生产者,一个自己创建的交换机,两个队列,两个消费者:
1个生产者,多个消费者
每一个消费者都有自己的一个队列
生产者没有将消息直接发送到队列,而是发送到了交换机
每个队列都要绑定到交换机
生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的
ps:生产者发布消息,所有消费者都可以获取所有消息。
5.1 创建生产者
创建自己的exchange交换机:
- 通过channel的exchangeDeclare()方法声明一个exchange,该方法需要两个参数:
- 参数1 exchange的名称
- 参数2 指定exchange的类型 FANOUT(Publish/Subscribe)、DIRECT(Routing)、TOPIC(Topics)
- 通过channel的queueBind()将声明好的exchange和存在的queue进行绑定(绑定的事情可以在生产者里进行也可以在消费者里进行),改方法需要三个参数:
- 参数1 队列的名称
- 参数2 exchange的名称
- 参数3 规则
- 在basicPublish()中将默认的exchange更改为自己定义的exchange,此时第二个参数routingKey变为路由的规则。
public class Publisher {@Testpublic void publish() throws Exception {//1.获取connectionConnection connection = RabbitMQClient.getConnection();//2. 创建channelChannel channel = connection.createChannel();//3. 创建exchange - 绑定一个队列//参数1 exchange的名称//参数2 指定exchange的类型 FANOUT(Publish/Subscribe) DIRECT(Routing) TOPIC(Topics)channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);//将交换机和队列进行绑定channel.queueBind("pubsub-queue1","pubsub-exchange","");channel.queueBind("pubsub-queue2","pubsub-exchange","");//4. 发布消息到exchange,同时指定路由规则for (int i = 0; i < 10; i++) {String msg = "Hello-Word"+i;channel.basicPublish("pubsub-exchange","",null,msg.getBytes());}System.out.println("发布消息成功!");//5.释放资源channel.close();connection.close();}}
5.2 创建消费者
消费者1
public class Customer1 {@Testpublic void customer() throws Exception {//1.获取connectionConnection connection = RabbitMQClient.getConnection();//2. 创建channelfinal Channel channel = connection.createChannel();//3.声明队列(HelloWord)channel.queueDeclare("pubsub-queue1",false,false,false,null);//4.指定当前消费者一此消费多少条消息channel.basicQos(1);//5.开启监听指定Queue//回调方法DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1号接收到的消息:"+new String(body,"UTF-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume("pubsub-queue1",false,consumer);System.out.println("消费者1号正在监听队列!");//避免程序运行完自动停止,方便测试System.in.read();//6.释放资源channel.close();connection.close();}}
消费者2
public class Customer2 {@Testpublic void customer() throws Exception {..........}}
测试
注意:exchange与队列一样都需要提前声明,如果未声明就使用交换机,则会报错。如果不清楚生产者和消费者谁先声明,为了保证不报错,生产者和消费者都声明交换机,同样的,交换机的创建也会保证幂等性。
六、Routing(Direct) 订阅模型
在fanout模型中,生产者发布消息,所有消费者都可以获取所有消息。在路由模式(Direct)中,可以实现不同的消息被不同的队列消费,在Direct模式下,交换机不再将消息发送给所有绑定的队列,而是根据Routing Key将消息发送到指定的队列,队列在与交换机绑定时会设定一个Routing Key,而生产者发送的消息时也需要携带一个Routing Key。
如图所示,消费者C1的队列与交换机绑定时设置的Routing Key是“error”, 而C2的队列与交换机绑定时设置的Routing Key包括三个:“info”,“error”,“warning”,假如生产者发送一条消息到交换机,并设置消息的Routing Key为“info”,那么交换机只会将消息发送给C2的队列。
6.1 创建生产者
使用步骤:
- 将exchangeDeclare()的第二个参数改为“BuiltinExchangeType.DIRECT”,这样路由规则就变为了Routing
- 将queueBind()的三个参数,指定为所需的routingKey
- basicPublish()的第二个参数中指定routingKey
public class Publisher {@Testpublic void publish() throws Exception {//1.获取connectionConnection connection = RabbitMQClient.getConnection();//2. 创建channelChannel channel = connection.createChannel();//3. 创建exchange - 绑定一个队列//参数1 exchange的名称//参数2 指定exchange的类型 FANOUT(Publish/Subscribe) DIRECT(Routing) TOPIC(Topics)channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);//将交换机和队列进行绑定channel.queueBind("routing-queue-error","routing-exchange","ERROR");channel.queueBind("routing-queue-info","routing-exchange","INFO");//4. 发布消息到exchange,同时指定路由规则channel.basicPublish("routing-exchange","ERROR",null,"ERROR1".getBytes());channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());System.out.println("发布消息成功!");//5.释放资源channel.close();connection.close();}}
6.2 创建消费者
消费者1
public class Customer1 {@Testpublic void customer() throws Exception {//1.获取connectionConnection connection = RabbitMQClient.getConnection();//2. 创建channelfinal Channel channel = connection.createChannel();//3.声明队列(HelloWord)channel.queueDeclare("routing-queue-error",false,false,false,null);//4.指定当前消费者一此消费多少条消息channel.basicQos(1);//5.开启监听指定Queue//回调方法DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者ERROR接收到的消息:"+new String(body,"UTF-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume("routing-queue-error",false,consumer);System.out.println("消费者ERROR正在监听队列!");//避免程序运行完自动停止,方便测试System.in.read();//6.释放资源channel.close();connection.close();}}
消费者2
public class Customer2 {@Testpublic void customer() throws Exception {........}}
测试
七、Topics(topic) 发布订阅
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符。
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:fast.red.monkey
routingKey通配符规则:
- #:匹配一个或多个词
- *:匹配不多不少恰好1个词
7.1 创建生产者
public class Publisher {@Testpublic void publish() throws Exception {//1.获取connectionConnection connection = RabbitMQClient.getConnection();//2. 创建channelChannel channel = connection.createChannel();//3. 创建exchange - 绑定一个队列//参数1 exchange的名称//参数2 指定exchange的类型 FANOUT(Publish/Subscribe) DIRECT(Routing) TOPIC(Topics)channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);//将交换机和队列进行绑定//动物信息<speed>,<color>,<what>//*.red.* -> *展位符//fast.# -> 通配符//*.*.rabbitchannel.queueBind("topic-queue-1","topic-exchange","*.red.*");channel.queueBind("topic-queue-2","topic-exchange","fast.#");channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");//4. 发布消息到exchange,同时指定路由规则channel.basicPublish("topic-exchange","fast.red.monkey",null,"快红侯".getBytes());channel.basicPublish("topic-exchange","slow.blue.rabbit",null,"慢蓝兔".getBytes());channel.basicPublish("topic-exchange","fast.orange.dog",null,"快橙狗".getBytes());System.out.println("发布消息成功!");//5.释放资源channel.close();connection.close();}}
7.2 创建消费者
消费者1
public class Customer1 {@Testpublic void customer() throws Exception {//1.获取connectionConnection connection = RabbitMQClient.getConnection();//2. 创建channelfinal Channel channel = connection.createChannel();//3.声明队列(HelloWord)channel.queueDeclare("topic-queue-1",false,false,false,null);//4.指定当前消费者一此消费多少条消息channel.basicQos(1);//5.开启监听指定Queue//回调方法DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1接收到的消息:"+new String(body,"UTF-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume("topic-queue-1",false,consumer);System.out.println("消费者1正在监听队列!");//避免程序运行完自动停止,方便测试System.in.read();//6.释放资源channel.close();connection.close();}}
消费者2
public class Customer1 {@Testpublic void customer() throws Exception {........}}
测试
RabbitMQ学习(二)-Rabbit的使用相关推荐
- RabbitMQ学习二
RabbitMQ 是一个消息broker.它的主要概念就是接受和转发消息.可以把它当作一个邮局:当向邮箱投递一封邮件时,你确信邮差最终会将这封邮件投递到收件人.使用这个比喻,RabbitMQ就是邮箱, ...
- RabbitMQ学习系列二:.net 环境下 C#代码使用 RabbitMQ 消息队列
上一篇已经讲了Rabbitmq如何在Windows平台安装,不懂请移步:RabbitMQ学习系列一:windows下安装RabbitMQ服务 一.理论: .net环境下,C#代码调用RabbitMQ消 ...
- 官网英文版学习——RabbitMQ学习笔记(二)RabbitMQ安装
一.安装RabbitMQ的依赖Erlang 要进行RabbitMQ学习,首先需要进行RabbitMQ服务的安装,安装我们可以根据官网指导进行http://www.rabbitmq.com/downlo ...
- RabbitMQ 学习笔记
RabbitMQ 学习笔记 RabbitMQ 学习笔记 1. 中间件 1.1 什么是中间件 1.2 为什么要使用消息中间件 1.3 中间件特点 1.4 在项目中什么时候使用中间件技术 2. 中间件技术 ...
- 乐行学院RabbitMQ学习教程 第一章 RabbitMQ介绍(可供技术选型时使用)
乐行学院RabbitMQ学习教程 第一章 RabbitMQ介绍 RabbitMQ介绍 1.RabbitMQ技术简介 2.RabbitMQ其他扩展插件 2.1监控工具rabbitmq-managemen ...
- 分布式消息中间件之RabbitMQ学习笔记[一]
写在前面 嗯,陆续的整理一些中间件的笔记 今天和小伙伴们分享RabbitMQ 相关笔记 博文偏理论,内容涉及: RabbitMQ的简单介绍 AMQP协议标准介绍 RabbitMQ Demo 食用方式: ...
- RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发
内容翻译自:RabbitMQ Tutorials Java版 RabbitMQ(一):Hello World程序 RabbitMQ(二):Work Queues.循环分发.消息确认.持久化.公平分发 ...
- RabbitMQ学习
RabbitMQ学习 1.概述 用于进程通信的中间件. 优势: 劣势: 1.应用解耦:提高了系统容错性和可维护性 1.系统依赖越多不能保证MQ的高可用 2.异步提速:提升用户体验和系统吞吐量 2.复杂 ...
- RabbitMQ学习之旅
Author:Eric Version:9.0.0 文章目录 一.引言 二.RabbitMQ介绍 三.RabbitMQ安装 四.RabbitMQ架构[`重点`] 4.1 官方的简单架构图 4.2 Ra ...
- Rabbitmq学习笔记(尚硅谷2021)
Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...
最新文章
- 机器学习_周志华_问题汇总_第1周
- pip19离线_更新pip为20后不显示下载链接无法离线下载回退pip版本
- 平板电脑连接投影仪_交互式触控幼教白板如何与平板进行连接-微幼科技
- 读《我们终将逝去的青春》
- c语言条件编译include,7.1编译与预处理-include-c学习 | 时刻需
- Gson案例:Java对象与JSON字符串相互转换
- 薄板样条插值(Thin plate splines)的实现与使用
- itextpdf添加表格元素_基操勿6第四期:PPT表格美化
- python多进程与多线程_第十五章 Python多进程与多线程
- 打开微信键盘自动弹出_微信一打开就弹出键盘 微信打字键盘怎么恢复
- 往事如烟 - 归去来
- 【历史上的今天】11 月 28 日:中国顶级域名 CN 被注册;上世纪最大的论坛诞生;首个 Fortran 程序开发者逝世
- 统一社会信用代码 php验证
- Asp.Net Core 中_ViewStart.cshtml 及_ViewImports.cshtml 的作用
- 关于Docker Toolbox安装的一点经验(算是吧)
- mysql:[ERR] 32> 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘createTime‘ at ro
- 简单衣物店购买计算系统问题
- AE cc 2018 详细安装教程
- android editext下拉框,android实现下拉框和输入框结合
- EDA软件_Cadence_Allegro 16.6进行尺寸标注
热门文章
- 中文的括号和英文的括号区别_工具推荐 含笔顺及英文的汉字书写练习纸
- 工信部发布八项互联网新通用顶级域名服务技术要求
- 怎样用VR看分频视频?
- Python学员信息管理系统
- 【b站雅思笔记】Simon‘s IELTS Course - 阅读部分
- java中protected_Java中protected方法访问权限的问题
- java protected 构造方法_Java中protected语义解释
- 尼古拉斯.海伦.波特
- Segmentation Fault原因总结
- java调用百度地图的不同显示方式