上一篇:深夜看了张一鸣的微博,让我越想越后怕

七种模式介绍与应用场景

简单模式(Hello World)

做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B

应用场景: 将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人

工作队列模式(Work queues)

‍‍‍‍

在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理

应用场景: 一个订单的处理需要10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况

订阅模式(Publish/Subscribe)

一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。

应用场景: 更新商品库存后需要通知多个缓存和多个数据库,这里的结构应该是:

  • 一个fanout类型交换机扇出两个个消息队列,分别为缓存消息队列、数据库消息队列

  • 一个缓存消息队列对应着多个缓存消费者

  • 一个数据库消息队列对应着多个数据库消费者

路由模式(Routing)

有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息

应用场景: 如在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息

主题模式(Topics)

根据主题(Topics)来接收消息,将路由key和某模式进行匹配,此时队列需要绑定在一个模式上,#匹配一个词或多个词,*只匹配一个词。

应用场景: 同上,iphone促销活动可以接收主题为iphone的消息,如iphone12、iphone13等

远程过程调用(RPC)

如果我们需要在远程计算机上运行功能并等待结果就可以使用RPC,具体流程可以看图。应用场景:需要等待接口返回数据,如订单支付

发布者确认(Publisher Confirms)

与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。

应用场景: 对于消息可靠性要求较高,比如钱包扣款

代码演示

代码中没有对后面两种模式演示,有兴趣可以自己研究。

简单模式

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Sender {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列// queue:队列名// durable:是否持久化// exclusive:是否排外 即只允许该channel访问该队列 一般等于true的话用于一个队列只能有一个消费者来消费的场景// autoDelete:是否自动删除 消费完删除// arguments:其他属性channel.queueDeclare(QUEUE_NAME, false, false, false, null);//消息内容String message = "simplest mode message";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[x]Sent '" + message + "'");//最后关闭通关和连接channel.close();connection.close();}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receiver {private final static String QUEUE_NAME = "simplest_queue";public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {// 获取连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

工作队列模式

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receiver1 {private final static String QUEUE_NAME = "queue_work";public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 同一时刻服务器只会发送一条消息给消费者channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receiver2 {private final static String QUEUE_NAME = "queue_work";public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 同一时刻服务器只会发送一条消息给消费者channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Sender {private final static String QUEUE_NAME = "queue_work";public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 0; i < 100; i++) {String message = "work mode message" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[x] Sent '" + message + "'");Thread.sleep(i * 10);}channel.close();connection.close();}
}

发布订阅模式

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class Receive1 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 订阅消息的回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 消费者,有消息时出发订阅回调函数channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class Receive2 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 订阅消息的回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received2 '" + message + "'");};// 消费者,有消息时出发订阅回调函数channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Sender {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "publish subscribe message";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}

路由模式

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receiver1 {private final static String QUEUE_NAME = "queue_routing";private final static String EXCHANGE_NAME = "exchange_direct";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 指定路由的key,接收key和key2channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receiver2 {private final static String QUEUE_NAME = "queue_routing2";private final static String EXCHANGE_NAME = "exchange_direct";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 仅接收key2channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Sender {private final static String EXCHANGE_NAME = "exchange_direct";private final static String EXCHANGE_TYPE = "direct";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 交换机声明channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);// 只有routingKey相同的才会消费String message = "routing mode message";channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes());System.out.println("[x] Sent '" + message + "'");
// channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes());
// System.out.println("[x] Sent '" + message + "'");channel.close();connection.close();}
}

主题模式

public class Receiver1 {private final static String QUEUE_NAME = "queue_topic";private final static String EXCHANGE_NAME = "exchange_topic";public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 可以接收key.1channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
public class Receiver1 {private final static String QUEUE_NAME = "queue_topic";private final static String EXCHANGE_NAME = "exchange_topic";public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 可以接收key.1channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Sender {private final static String EXCHANGE_NAME = "exchange_topic";private final static String EXCHANGE_TYPE = "topic";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);String message = "topics model message with key.1";channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());System.out.println("[x] Sent '" + message + "'");String message2 = "topics model message with key.1.2";channel.basicPublish(EXCHANGE_NAME, "key.1.2", null, message2.getBytes());System.out.println("[x] Sent '" + message2 + "'");channel.close();connection.close();}
}

四种交换机介绍

1、直连交换机(Direct exchange):

具有路由功能的交换机,绑定到此交换机的时候需要指定一个routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列

2、扇形交换机(Fanout exchange):

广播消息到所有队列,没有任何处理,速度最快,

3、主题交换机(Topic exchange):

在直连交换机基础上增加模式匹配,也就是对routing_key进行模式匹配,*代表一个单词,#代表多个单词

4、首部交换机(Headers exchange):

忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则

总结

这么多种队列模式中都有其应用场景,大家可以根据应用场景示例中进行选择。另外,关注公众号互联网架构师,在后台回复:2T,可以获取我整理的 Java、MQ 系列面试题和答案,非常齐全。

原文链接:https://blog.csdn.net/qq_32828253/article/details/110450249

感谢您的阅读,也欢迎您发表关于这篇文章的任何建议,关注我,技术不迷茫!小编到你上高速。

· END ·

最后,关注公众号互联网架构师,在后台回复:2T,可以获取我整理的 Java 系列面试题和答案,非常齐全。

正文结束

推荐阅读 ↓↓↓

1.不认命,从10年流水线工人,到谷歌上班的程序媛,一位湖南妹子的励志故事

2.如何才能成为优秀的架构师?

3.从零开始搭建创业公司后台技术栈

4.程序员一般可以从什么平台接私活?

5.37岁程序员被裁,120天没找到工作,无奈去小公司,结果懵了...

6.IntelliJ IDEA 2019.3 首个最新访问版本发布,新特性抢先看

7.漫画:程序员相亲图鉴,笑屎我了~

8.15张图看懂瞎忙和高效的区别!

一个人学习、工作很迷茫?

点击「阅读原文」加入我们的小圈子!

RabbitMQ 中的 7 种队列模式,写得太好了!相关推荐

  1. IEEE754标准中的4种舍入模式

    link 一.前言 最近在写一个基于IEEE754标准的浮点加法器,其中有一项要求就是要满足IEEE754标准的四种舍入模式. 我们在进行对阶或者右规格化的时候,阶数较小的操作数在进行右移的时候,会造 ...

  2. JM8.5中的7种宏块模式问题

    JM8.5中的7种宏块模式问题 收藏 Outline: 1.  CFG文件中有关可变尺寸宏块模式的相关选项 2.  7种宏块模式对应的数值常量 3.  7种宏块模式被分成宏块和亚宏块 4.  如何对宏 ...

  3. JM8.5中的7种宏块模式问题 - zhoujunming的专栏 - CSDN博客

    JM8.5中的7种宏块模式问题 收藏 Outline: 1.  CFG文件中有关可变尺寸宏块模式的相关选项 2.  7种宏块模式对应的数值常量 3.  7种宏块模式被分成宏块和亚宏块 4.  如何对宏 ...

  4. js中的4种函数调用模式:函数调用、方法调用、构造器调用、间接调用

    全栈工程师开发手册 (作者:栾鹏) js系列教程4-函数.函数参数教程全解 js中的4种函数调用模式 javascript一共有4种调用模式:函数调用模式.方法调用模式.构造器调用模式和间接调用模式. ...

  5. RabbitMQ 七种队列模式应用场景案例分析(通俗易懂)

    点击关注公众号,Java干货及时送达 作者:我思知我在 blog.csdn.net/qq_32828253/article/details/110450249 七种模式介绍与应用场景 简单模式(Hel ...

  6. RabbitMQ七种队列模式介绍与应用场景(通俗易懂)

    七种模式介绍与应用场景 简单模式(Hello World) 做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B 应用场景:将发送的电子邮件放到消息队列 ...

  7. RabbitMQ介绍以及五种工作模式

    早期出现认证系统类似的提供认证服务; 出现了系统间的通信;并发的高需求 每个前端系统与认证系统的通信强耦合 传递消息,获取返回结果的过程,如果出现网络波动,整个传递数据,计算返回结果的流程重走一遍;需 ...

  8. 还不知道 RabbitMQ 常用的几种交换机模式?这篇小白都能看懂的 RabbitMQ 交换机模式

    要了解 RabbitMQ 的交换机发布订阅模型,先来了解下 RabbitMQ 消息传递模型的核心思想:生产者从不直接向队列发送任何消息.实际上,通常情况下,生产者甚至根本不知道消息是否会被传递到任何队 ...

  9. JavaScript中函数四种调用模式

    目录 JS中函数的四种调用模式 函数调用模式 方法调用模式 构造器调用模式 上下文调用模式 JS中函数的四种调用模式 在函数的调用模式中感觉最大的区别就是: this指向 函数调用模式 即通过函数名直 ...

  10. 计算机毕业设计中JAVA 23种开发模式详解(代码举例)

    设计模式(Design Patterns) --可复用面向对象软件的基础 设计模式(Design pattern)是一套被反复使用.多数人知晓的.经过分类编目的.代码设计经验的总结.使用设计模式是为了 ...

最新文章

  1. 用pytorch加载训练模型
  2. java学习路线_java学习路线_我的入坑路
  3. 创建 tls 客户端 凭据时发生严重错误。内部错误状态为 10013_kubectl 创建 Pod 背后到底发生了什么?...
  4. php 时间错误,PHP xdebug调试trace记录时间错误
  5. ASP.NET MVC铵钮Click后下载文件
  6. (46)FPGA对数运算符(V代码实现)
  7. ffmpeg 安装_CentOS7.6安装SRS和ffmpeg实现自建直播服务器
  8. RFID 是什么意思
  9. 李飞飞新动向:创建斯坦福“以人为本AI研究院”,担任共同院长
  10. Java面试题300道
  11. android播放器录制视频,Android播放器的录制实践
  12. Java、JSP小区车辆停车管理系统
  13. 十大硬盘数据恢复软件介绍
  14. jQuery手机版日历插件带农历
  15. 阿里云大数据开发一面面经,已过,面试题已配答案
  16. CSS DIV 滚动(CSS,HTML)
  17. android 时间颜色,android修改状态栏时间和日期颜色.docx
  18. github.io网页无法打开(连接不是私密连接)
  19. 数学之美读书笔记第一章
  20. Python 位运算

热门文章

  1. Guitar Pro教程之如何设置MIDI键盘
  2. CF1067E Random Forest Rank
  3. for循环,while循环,break跳出循环,continue结束本次循环,exit退出整个脚本
  4. Linux服务器上安装node.js
  5. [Unity3D]深度相机 Depth Camera
  6. [SHELL进阶] (转)最牛B的 Linux Shell 命令 (三)
  7. android 获取系统所有安装的应用程序
  8. 【BZOJ】1303: [CQOI2009]中位数图(特殊的技巧)
  9. 求职必看!大厂面试中遇到了发散性问题..... ,怎么办?
  10. 汉化:Blocs for Mac(可视化网页设计工具)4.5.0