交换机

Exchanges 概念

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

Exchanges 的类型

  • 直接:direct 路由模式
  • 主题:topic
  • 标题:headers(不常用)
  • 扇出:fanout 广播模式,发布订阅模式
无名exchange

第一个参数是交换机的名称,空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
绑定bindings

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。

Fanout模式介绍

Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。

代码示例
package com.vmware.rabbit.demo5;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;import java.util.Scanner;public class Producer {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明扇出类型交换机channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//向交换机发送消息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());System.out.printf("消息:%s发送成功!",message);}}
}
package com.vmware.rabbit.demo5;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer1 {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_NAME,"");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());System.out.println(msg);};channel.basicConsume(queueName,true,deliverCallback,(arg1,arg2)->{});}
}
package com.vmware.rabbit.demo5;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer2 {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_NAME,"");DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody());System.out.println(msg);};channel.basicConsume(queueName,true,deliverCallback,(arg1,arg2)->{});}
}

Direct模式介绍

Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct(直接) 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去

  • 在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange,队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green
  • 在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列Q1。绑定键为 black/green 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃
多重绑定

如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多

代码实战
package com.vmware.rabbit.demo6;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;
import java.util.UUID;public class Producer {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.232");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明路由模式交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//向交换机发送消息Scanner scanner=new Scanner(System.in);while (scanner.hasNext()){String message = UUID.randomUUID().toString();String routingKey = scanner.next();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());System.out.printf("发送消息:%s成功!RoutingKey:%s\n",message,routingKey);}}
}
package com.vmware.rabbit.demo6;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer1 {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明队列String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_NAME,"error");channel.basicConsume(queueName,(tag,msg)-> System.out.println(new String(msg.getBody())),(tag,msg)->{});}
}
package com.vmware.rabbit.demo6;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer2 {private static final String EXCHANGE_NAME = "log";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明队列String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,EXCHANGE_NAME,"info");channel.queueBind(queueName,EXCHANGE_NAME,"warn");channel.basicConsume(queueName,(tag,msg)-> System.out.println(new String(msg.getBody())),(tag,msg)->{});}
}

Topics主题模式介绍

存在的问题:尽管使用direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型

Topic 的要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”,“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节

  • *星号可以代替一个单词

  • #井号可以替代零个或多个单词

  • 当一个队列绑定键是#那么这个队列将接收所有数据,就有点像fanout了

  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了

代码实现
package com.vmware.rabbit.demo7;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;import java.util.HashMap;
import java.util.Map;public class Producer {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception{Connection connection = RabbitUtil.getConnection();System.out.println("连接RabbitMQ服务器成功!");Channel channel = connection.createChannel();//声明主题模式交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);System.out.println("交换机创建成功!");Thread.sleep(15*1000);//发布消息HashMap<String,String> msgMap = new HashMap<>();msgMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");msgMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");msgMap.put("quick.orange.fox","被队列 Q1 接收到");msgMap.put("lazy.brown.fox","被队列 Q2 接收到");msgMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");msgMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");msgMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");msgMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");for (Map.Entry<String, String> entry : msgMap.entrySet()) {channel.basicPublish(EXCHANGE_NAME,entry.getKey(),null,entry.getValue().getBytes("UTF-8"));}}
}
package com.vmware.rabbit.demo7;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer1 {private static final String EXCHANGE_NAME = "topic_logs";private static final String QUEUE_NAME = "Q1";public static void main(String[] args) throws Exception {Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定交换机和队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");DeliverCallback deliverCallback = (tag,msg)->{String message = new String(msg.getBody());System.out.println(message+"\tRouting Key:"+msg.getEnvelope().getRoutingKey());};CancelCallback cancelCallback = (tag)->{};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
package com.vmware.rabbit.demo7;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;public class Consumer2 {private static final String EXCHANGE_NAME = "topic_logs";private static final String QUEUE_NAME = "Q2";public static void main(String[] args) throws Exception {Connection connection = RabbitUtil.getConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定交换机和队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");DeliverCallback deliverCallback = (tag,msg)->{String message = new String(msg.getBody());System.out.println(message+"Routing Key:"+msg.getEnvelope().getRoutingKey());};CancelCallback cancelCallback=(tag)->{};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

交换机-Exchanges相关推荐

  1. RabbitMQ 原文译03--发布和订阅

    发布/订阅 在之前的案例中我们创建了一个工作队列,这个工作队列的实现思想就是一个把每一个任务平均分配给每一个执行者,在这个篇文章我们会做一些不一样的东西,把一个消息发送给多个消费者,这种模式就被称作& ...

  2. RabbitMQ实战笔记

    RabbitMQ实战笔记 1 MQ引言 1.1 中间件技术及架构的概述 1.2 什么是MQ 1.3 为什么要用MQ 1.4 MQ的分类 1.5 MQ的选择 2 RabbitMQ 的引言 2.1 Rab ...

  3. RabbitMQ核心模式

    文章目录 工作原理图 环境搭建 Hello World(简单模式) 生产者 消费者 Work Queues(工作队列模式) 消息应答 自动应答 手动应答 RabbitMQ 持久化 队列持久化 消息持久 ...

  4. 【Rabbitmq】☛③

    目录 1. 交换机Exchanges 1.1 基本概念 1.1.1 Exchanges 概念 1.1.2 Exchanges 的类型 1.1.3 无名 exchange 1.2 临时队列 1.3 绑定 ...

  5. rabbitMQ-学习笔记

    RabbitMQ 一.MQ的相关概念 1.1 什么是MQ 1.2 为什么要用MQ 1.3 MQ的分类 1.3.1 ActiveMQ 1.3.2 Kafka 1.3.3 RocketMQ 1.3.4 R ...

  6. RabbitMQ官方文档知识点总结合集+代码注释(中文+Java版)

    全文代码.MD格式文档的github连接(求star~):https://github.com/Ruoyi-Chen/RabbitMQ-demos 文章目录 全文代码.MD格式文档的github连接( ...

  7. RabbitMQ-Java-04-发布订阅模式

    说明 RabbitMQ-Java-04-发布订阅模式 本案例是一个Maven项目 假设你已经实现了上一节工作队列 官方文档已包含绝大多数本案例内容.请移步:https://docs.spring.io ...

  8. Python 操作 Rabbit MQ 发布/订阅 (五)

    Python 操作 Rabbit MQ 发布/订阅 (五) 一.发布.订阅: 我们将一个消息分发给多个消费者,这种模式被称为发布/订阅. 为了更好的理解这个模式,我们将构建一个日志系统,它包括两个程序 ...

  9. RabbitMQ 延迟队列-对于入门来说可以快速上手

    RabbitMQ 延迟队列-非常非常实用 RabbitMQ 延迟队列-非常非常实用 一.使用场景 二.消息延迟推送的实现 三.项目具体实现 RabbitMQ 延迟队列-非常非常实用 一.使用场景 ​ ...

最新文章

  1. LeetCode简单题之判断路径是否相交
  2. 区块链隐私:交易还是计算?
  3. 在 Google Go Team 工作是一种怎样的体验?
  4. yii框架学习(五)get、post请求如何接收请求参数
  5. 不积跬步无以至千里[转]
  6. ppk on javascript 笔记(五)
  7. 海信计算机辅助统,海信计算机辅助手术系统将覆盖山东三级医院
  8. java jdbc连接db2数据库_Java连接db2数据库(常用数据库连接五)
  9. git 怎么提交忽略文件夹_git 设置忽略文件提交的几种方式
  10. cornerstone the working copy is locked due to a previous文件lock解决办法
  11. 为什么需要消息队列(MQ)
  12. jquery开关灯案例_JS/jQuery实现简单的开关灯效果【案例】_輕微_前端开发者
  13. matlab矩阵运算rank,Matlab矩阵运算
  14. Java之T分布计算数据的双侧置信区间
  15. python 会议室预约系统解决方案_会议预约系统_智能会议预约管理系统_轻松实现会议管理解决方案...
  16. 计算机网络和internet选项,详细教你电脑ie的internet选项在哪
  17. 青云科技成为开源 GitOps 产业联盟会员
  18. 完全卸载Android Studio的方法
  19. VS提示无可用源,此模块的调试信息…
  20. 大数据之Stream流

热门文章

  1. 苹果发布 iOS14 系统 Beta7,升级了这些内容
  2. s3c6410 framebuffer分析
  3. aba会导致问题_ABA问题的本质及其解决办法
  4. 大专生可以当程序员吗?
  5. 本地git代码推送到远程git步骤
  6. 推特 我们目前不能注册此邮箱地址_试玩手游版LOL,媲美端游的质量,这份注册攻略须收藏|moba|英雄联盟|lol|端游|手游...
  7. LR(0)项目集规范族的构造及LR(0)分析表的构造
  8. 智合同丨智能合同审查·赋能合同智能应用
  9. Kubernetes基础:可以用作示例演示的tornado镜像
  10. allegro 元件封装设计学习