作者:xyk_1021 (本文来自作者投稿)

1.背景

了解过RabbitMQ的Fanout模式,应该知道它原本的Fanout模式就是用来做广播的。但是它的广播有一点区别,来回顾下它的含义:Fanout类型没有路由键的概念,只要队列绑定到了改exchange上面,就会接收到所有的消息。

使用过程一般就是先new 出一个Fanout类型的交换机,然后往这个交换机上绑定多个队列queue,不同的消费者各自监听不同的队列,这就实现了广播效果,因为同一个消息,会分发到所有队列中。

举个例子:

应用A监听了队列A,应用B监听了队列B,Fanout类型交换机同时绑定了队列A和B.假设生产者端发送了一条消息到Fanout类型交换机,交换机就会把消息分发到所有队列,这时应用A和应用B会收到同一条消息,这就是广播。

说了上面一大堆,只是为了强调,对于RabbitMQ的原本Fanout模式,它的设计就是多个消费者必须监听不同的队列,多个消费者之间才会形成广播关系。

那么问题来了,假如在Fanout工作模式下,多个消费者同时监听的是同一个队列,会怎样?实践过的同学应该都知道,这种情况下,这些消费者会形成竞争关系,现象是同一个消息只会被其中一个消费者接收,达不到广播的效果。。

2.需求

假如现在有一个需求,要做到对同一个应用的多个节点进行广播,怎么实现?

注意,这里所说的同一个应用多个节点,通俗点理解就是一个war包,布在多个服务器节点上。

在实际部署集群时,为了高可用,同一个应用可能会部署多个节点,那假如工程里已经通过配置定义某个队列,那多个节点它们定义的队列就会是相同的,那按照上面的背景,那这些节点间肯定就会存在竞争关系,即便是Fanout模式的交换机,一条消息也只能被其中一个节点接收,其他节点收不到,达不到广播的效果。那该如何做?

相信看到这里,有人会问,为何会有 对同一个应用的多个节点进行广播的需求场景?为什么要有这个需求。生产中的业务系统很多,自然而然场景就很多。

举两个经典的例子:

1.想要同时刷新所有节点的缓存

业务系统离不开缓存,有时会用内存缓存,假如我要刷新所有节点的内存缓存,多个节点前可能有负载均衡例如nginx之类的,我只需要访问其中一个节点,然后让这个节点做广播通知所有其他节点刷缓存。(广播刷缓存)

2.websocket会话寻找

websocket是比较受欢迎的实时消息推送方案。用过websocket应该知道,websocket只能与多个节点中的其中一个节点做长连接会话保持,也就是说用户的会话只会存在于一个节点上,假设服务端要主动向用户推一条消息,必须要知道用户的会话在哪个节点上,怎么得知?可以通过广播,通过消息广播,把消息发到多个节点上,然后节点收到消息只需要判断用户会话是否就在本节点上,假如在则主动推消息,不在,则丢弃这条消息。

类似上面这两种需求,就需要用到广播,并且是对同一个应用的多个节点进行广播。当然不用广播肯定也有其他通知方案,本文我们只讨论用MQ怎么做到。

3.思路

假如继续用RabbitMQ的Fanout模式,怎么做到对同一个应用的多个节点进行广播?

要起到广播效果,关键就是让多个应用节点间不要存在竞争关系或者存在竞争关系时它们的消息怎么共享?可以从这两个方向解决这个问题。

方法可能很多种,在这里,我只描述两种比较容易实现的方案。

方案1

过程大致如下

  1. 应用启动,多个节点监听同一个队列(此时多个节点是竞争关系,一条消息只会发到其中一个节点上)

  2. 消息生产者发送消息,同一条消息只被其中一个节点收到

  3. 收到消息的节点通过redis的发布订阅模式来通知其他兄弟节点

这种方案是最容易想到的,思路就是依赖其他组件来做消息共享,例如redis这种可以替换成其他方案,只要能做到消息共享就行,那么最终的效果就肯定是广播效果了。

方案2

过程大致如下

  1. 应用启动,利用监听器生成唯一ID

  2. 生成的唯一ID,通过文件写入的方式写到配置文件中

  3. spring启动,把这个唯一ID加载为全局属性(为何要用唯一ID,就是为了用这个ID作为该节点的监听队列名,当然前缀可以用相同的,后缀用唯一ID区分即可,举个例子就是:节点1监听队列 kunghsu-123 节点2监听队列 kunghsu-456.必须保证它们的唯一ID是唯一的,不然还是会存在竞争关系)

  4. 多个节点监听了多个队列(让每个队列名都不同,目的就是让他们不存在竞争关系,没有竞争关系就不用做消息共享,只管由MQ分发即可,这时同一条消息就会发到多个节点上)

  5. 到MQ控制台,将所有节点生成的队列手动绑定到指定的Fanout交换机上(这一步是手动的,当然也可以通过API做到,下面会说到)

  6. 生产者发送消息指定的Fanout交换机,交换机将同一条消息被分发到多个节点上

  7. 广播效果达成!

这种方案,也比较容易。这样做,就是为了让多个节点间是广播关系。总的来说不麻烦,其中第五步手动操作其实有点挫,这种手动操作步骤其实是应该转成自动化,让应用程序来完成,方便以后自动化建设。

这种方案的spring配置也比较简单,参考Fanout模式的配置即可。本文重点在这个思路的实现过程。

只列举部分代码如下:

消息生产者



"exchangeFour" durable="true" auto-delete="false" >

template id="amqpTemplate4" connection-factory="connectionFactory2"exchange="exchangeFour" />

消息消费者

queue name="${queue-name-fanout}" durable="true"auto-delete="false" exclusive="false" declared-by="connectAdmin2" />"fanoutTwoConsumer" class="com.lunch.foo.rabbitmq.FanoutTwoConsumer">connection-factory="connectionFactory2">"${queue-name-fanout}" ref="fanoutOneConsumer" />

另外,RabbitMQ的客户端API支持让我们 将队列绑定到指定的交换机上。具体可参考我的工具类代码。

代码如下:

package com.lunch.foo.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/*** Created by xuyaokun On 2019/3/10 2:26* @desc:*/

public class RabbitMQUtil {

private static final String HOST = "192.168.3.128";

private static final int PORT = AMQP.PROTOCOL.PORT;

private static final String USERNAME = "kunghsu";

private static final String PASSWORD = "123456";

private static final String VIRTUALHOST = "/";

public static void main(String[] args) {

String QUEUE_NAME = "queueOneX";

String EXCHANGE_NAME = "exchangeFour";

try {

queueBind(EXCHANGE_NAME, QUEUE_NAME);

} catch (IOException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

}

}

/*** 获取会话链接** @return* @throws IOException* @throws TimeoutException*/

private static Connection getConnection() throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(HOST);

factory.setPort(PORT);

factory.setUsername(USERNAME);

factory.setPassword(PASSWORD);

factory.setVirtualHost(VIRTUALHOST);

return factory.newConnection();

}

/*** 绑定队列到指定交换机** @param exchangeName* @param queueName* @throws IOException* @throws TimeoutException*/

public static void queueBind(String exchangeName, String queueName) throws IOException, TimeoutException {

Channel channel = null;

try{

channel = getConnection().createChannel();

} catch(Exception e){

System.out.println("获取RabbitMQ会话连接失败!取消做队列绑定。");

return ;

}

//默认持久化

channel.queueDeclare(queueName, true, false, false, null);

// 声明交换机:指定交换机的名称和类型(广播:fanout)

channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true);

// 在消费者端队列绑定

channel.queueBind(queueName, exchangeName, "");

channel.close();

}

}

总结

RabbitMQ的Fanout模式相关的文章,网上一抓一大把,但是几乎没有人讲到 如何实现 对同一个应用的多个节点进行广播。。希望通过这篇文章,能帮助到有需要的同学。另外,假如大家有更好的方案,欢迎交流。感谢阅读!

【本文作者】

作者网名:xyk_1021

RabbitMQ • 长按关注

在看

rabbitmq中的消息有id吗_RabbitMQ 如何实现对同一个应用的多个节点进行广播相关推荐

  1. 程序发出的广播其他程序收不到_RabbitMQ 如何实现对同一个应用的多个节点进行广播...

    1.背景 了解过RabbitMQ的Fanout模式,应该知道它原本的Fanout模式就是用来做广播的.但是它的广播有一点区别,来回顾下它的含义:Fanout类型没有路由键的概念,只要队列绑定到了改ex ...

  2. RabbitMQ中的消息确认ACK机制

    我们将消息持久化后,假如消费端出现异常,rabbitmq服务器会将消息缓存到内存,当生产者发送一直发送消息而消费者都没有正常消费时消息就会将这些消息全部保存在内存,当我们的消息过多时,就可能导致rab ...

  3. RabbitMQ中的消息不可达returnlistener和mandatory的使用

    return listener 用于处理一些不可路由的消息.     我们的消息生产者,通过指定一个exchange和routingkey,把消息送达到某一个队列中,然后我们的消费者监听队列,进行消费 ...

  4. rabbitmq中默认unack超时时间_RabbitMQ 与 Kafka 的技术差异以及使用注意点

    导言 作为一个有丰富经验的微服务系统架构师,经常有人问我,"应该选择RabbitMQ还是Kafka?".基于某些原因, 许多开发者会把这两种技术当做等价的来看待.的确,在一些案例场 ...

  5. 基于 RabbitMQ 的实时消息推送

    博主新开公众号"不太灵光的程序员" , 关注公众号,每日八点有干货推送 1 实现服务器端推送的几种方式 Web 应用都是基于 HTTP 协议的请求/响应模式,无法像 TCP 协议那 ...

  6. kafka消息消费有延迟_RabbitMQ与Kafka的技术差异以及使用注意点

    导言 作为一个有丰富经验的微服务系统架构师,经常有人问我,"应该选择RabbitMQ还是Kafka?".基于某些原因, 许多开发者会把这两种技术当做等价的来看待.的确,在一些案例场 ...

  7. RabbitMq中的warren模式和shovel模式

    Warren 集群模式: Warren 集群模式,其中文含义为主备集群模式.那么什么是主备呢? 我们先来看一下主备的基础概念,主备这一名词的诞生并不是在我们所熟知的计算机领域,而是首先出现在我们的日常 ...

  8. Rabbitmq中常用的五种连接方式

    目录 前提准备 方式一:  Hello World 服务端(provider)代码 客户端(customer)代码 方式二: work(以下方式都是通过工具类来创建connection对象) 有两种方 ...

  9. java 队列和rabbitmq_java – 如何在不使消息出列的情况下浏览rabbitmq中的队列

    我正在尝试获取具有特定相关ID的消息,如rabbitmq docs中所述.但是我看到无关的消息被出列了.我不希望它发生.在得到消息后,我怎么能告诉rabbitmq没有出队,并且知道这不是我想要的那个. ...

最新文章

  1. cordova sqlite
  2. 错误处理:安装torch-sparse、torch-spline、torch-scatter、torch-cluster
  3. 第10章 评价分类结果
  4. pdf转换为word问题
  5. 2020/Province_C_C++_A/F/成绩分析
  6. nyoj_218_Dinner_201312021434
  7. 伺服驱动器生产文件_直流伺服系统的组成和控制原理详解
  8. Silverlight 3 OOB 原理
  9. 入门机器学习(十三)--支持向量机(SVM)
  10. 一阶电路暂态响应的结果分析。_【2020考研】南京邮电大学813《电路分析》考试大纲...
  11. display 隐藏css,CSS-元素的显示与隐藏
  12. 管理站点复制 【Windows Server 2019】活动目录(Acitve Directory)——在同一区域安装多台域控制器
  13. python参数检查类型_Python类型检查
  14. python快乐数,快乐数 - SegmentFault 思否
  15. 在页面上动态显示实时时间
  16. office 2019 kms
  17. 《游戏系统设计六》一步一步实现王者荣耀等级系统
  18. 笔记本此计算机到网络出现一个叉,笔记本电脑无线网络不可用并显示红叉的解决方...
  19. kubebuilder之一:kubernetes operator工作原理
  20. 案例分析:中介提供的二手房合同不规范催生大量房屋买卖纠纷(转)

热门文章

  1. 社工大师_社工,与弱势者同行 | TED演讲
  2. java动态加载类 框架_java运用RMI框架类的动态加载不成功
  3. shell不允许输入空字符_反弹shell | ncamp;bash
  4. WindowsAPI中W和A的区别
  5. Python爬虫项目--爱拍视频批量下载
  6. Linux协议栈:基于ping流程窥探Linux网络子系统,及常用优化方法
  7. shell高级脚本:“秒”转换为“时-分-秒”;ping
  8. leetcode题库:1.两数之和
  9. 越狱后必装软件_iOS 13全系统越狱详细教程疑难解答
  10. Java的jvm原理和常识