1.背景

了解过RabbitMQ的Fanout模式,应该知道它原本的Fanout模式就是用来做广播的。但是它的广播有一点区别,来回顾下它的含义:Fanout类型没有路由键的概念,只要队列绑定到了改exchange上面,就会接收到所有的消息。

使用过程一般就是先new 出一个Fanout类型的交换机,然后往这个交换机上绑定多个队列queue,不同的消费者各自监听不同的队列,这就实现了广播效果,因为同一个消息,会分发到所有队列中。

举个例子:

应用A监听了队列A,应用B监听了队列B,Fanout类型交换机同时绑定了队列A和B.假设生产者端发送了一条消息到Fanout类型交换机,交换机就会把消息分发到所有队列,这时应用A和应用B会收到同一条消息,这就是广播。

说了上面一大堆,只是为了强调,对于RabbitMQ的原本Fanout模式,它的设计就是多个消费者必须监听不同的队列,多个消费者之间才会形成广播关系。

那么问题来了,假如在Fanout工作模式下,多个消费者同时监听的是同一个队列,会怎样?实践过的同学应该都知道,这种情况下,这些消费者会形成竞争关系,现象是同一个消息只会被其中一个消费者接收,达不到广播的效果。。

2.需求

假如现在有一个需求,要做到对同一个应用的多个节点进行广播,怎么实现?

注意,这里所说的同一个应用多个节点,通俗点理解就是一个war包,布在多个服务器节点上。

在实际部署集群时,为了高可用,同一个应用可能会部署多个节点,那假如工程里已经通过配置定义某个队列,那多个节点它们定义的队列就会是相同的,那按照上面的背景,那这些节点间肯定就会存在竞争关系,即便是Fanout模式的交换机,一条消息也只能被其中一个节点接收,其他节点收不到,达不到广播的效果。那该如何做?

相信看到这里,有人会问,为何会有 对同一个应用的多个节点进行广播的需求场景?为什么要有这个需求。生产中的业务系统很多,自然而然场景就很多。

举两个经典的例子:

1.想要同时刷新所有节点的缓存

业务系统离不开缓存,有时会用内存缓存,假如我要刷新所有节点的内存缓存,多个节点前可能有负载均衡例如nginx之类的,我只需要访问其中一个节点,然后让这个节点做广播通知所有其他节点刷缓存。(广播刷缓存)

2.websocket会话寻找

websocket是比较受欢迎的实时消息推送方案。用过websocket应该知道,websocket只能与多个节点中的其中一个节点做长连接会话保持,也就是说用户的会话只会存在于一个节点上,假设服务端要主动向用户推一条消息,必须要知道用户的会话在哪个节点上,怎么得知?可以通过广播,通过消息广播,把消息发到多个节点上,然后节点收到消息只需要判断用户会话是否就在本节点上,假如在则主动推消息,不在,则丢弃这条消息。

类似上面这两种需求,就需要用到广播,并且是对同一个应用的多个节点进行广播。当然不用广播肯定也有其他通知方案,本文我们只讨论用MQ怎么做到。

3.思路

假如继续用RabbitMQ的Fanout模式,怎么做到对同一个应用的多个节点进行广播?

要起到广播效果,关键就是让多个应用节点间不要存在竞争关系或者存在竞争关系时它们的消息怎么共享?可以从这两个方向解决这个问题。

方法可能很多种,在这里,我只描述两种比较容易实现的方案。

方案1

过程大致如下

  1. 应用启动,多个节点监听同一个队列(此时多个节点是竞争关系,一条消息只会发到其中一个节点上)
  2. 消息生产者发送消息,同一条消息只被其中一个节点收到
  3. 收到消息的节点通过redis的发布订阅模式来通知其他兄弟节点

这种方案是最容易想到的,思路就是依赖其他组件来做消息共享,例如redis这种可以替换成其他方案,只要能做到消息共享就行,那么最终的效果就肯定是广播效果了。

方案2

过程大致如下

  1. 应用启动,利用监听器生成唯一ID
  2. 生成的唯一ID,通过文件写入的方式写到配置文件中
  3. spring启动,把这个唯一ID加载为全局属性(为何要用唯一ID,就是为了用这个ID作为该节点的监听队列名,当然前缀可以用相同的,后缀用唯一ID区分即可,举个例子就是:节点1监听队列 kunghsu-123 节点2监听队列 kunghsu-456.必须保证它们的唯一ID是唯一的,不然还是会存在竞争关系)
  4. 多个节点监听了多个队列(让每个队列名都不同,目的就是让他们不存在竞争关系,没有竞争关系就不用做消息共享,只管由MQ分发即可,这时同一条消息就会发到多个节点上)
  5. 到MQ控制台,将所有节点生成的队列手动绑定到指定的Fanout交换机上(这一步是手动的,当然也可以通过API做到,下面会说到)
  6. 生产者发送消息指定的Fanout交换机,交换机将同一条消息被分发到多个节点上
  7. 广播效果达成!

这种方案,也比较容易。这样做,就是为了让多个节点间是广播关系。总的来说不麻烦,其中第五步手动操作其实有点挫,这种手动操作步骤其实是应该转成自动化,让应用程序来完成,方便以后自动化建设。

这种方案的spring配置也比较简单,参考Fanout模式的配置即可。本文重点在这个思路的实现过程。

只列举部分代码如下:

消息生产者

 

消息消费者

另外,RabbitMQ的客户端API支持让我们 将队列绑定到指定的交换机上。具体可参考我的工具类代码。

代码如下:

package com.lunch.foo.rabbitmq;   import com.rabbitmq.client.*;   import java.io.IOException; import java.util.concurrent.TimeoutException;   /*** Created by xuyaokun On 2019/3/10 2:26* @desc:*/ public class RabbitMQUtil { private static final String HOST = "192.168.3.128"; private static final int PORT = AMQP.PROTOCOL.PORT; private static final String USERNAME = "kunghsu"; private static final String PASSWORD = "123456"; private static final String VIRTUALHOST = "/";   public static void main(String[] args) {   String QUEUE_NAME = "queueOneX"; String EXCHANGE_NAME = "exchangeFour";   try { queueBind(EXCHANGE_NAME, QUEUE_NAME); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }   }   /*** 获取会话链接** @return* @throws IOException* @throws TimeoutException*/ private static Connection getConnection() throws IOException, TimeoutException {   ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost(VIRTUALHOST); return factory.newConnection(); }   /*** 绑定队列到指定交换机** @param exchangeName* @param queueName* @throws IOException* @throws TimeoutException*/ public static void queueBind(String exchangeName, String queueName) throws IOException, TimeoutException {   Channel channel = null; try{ channel = getConnection().createChannel(); } catch(Exception e){ System.out.println("获取RabbitMQ会话连接失败!取消做队列绑定。"); return ; } //默认持久化 channel.queueDeclare(queueName, true, false, false, null); // 声明交换机:指定交换机的名称和类型(广播:fanout) channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true); // 在消费者端队列绑定 channel.queueBind(queueName, exchangeName, "");     channel.close(); }     }

总结

RabbitMQ的Fanout模式相关的文章,网上一抓一大把,但是几乎没有人讲到 如何实现 对同一个应用的多个节点进行广播。。希望通过这篇文章,能帮助到有需要的同学。另外,假如大家有更好的方案,欢迎交流。感谢阅读!

欢迎工作一到五年的Java工程师朋友们加入Java程序员开发: 721575865

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

程序发出的广播其他程序收不到_RabbitMQ 如何实现对同一个应用的多个节点进行广播...相关推荐

  1. rabbitmq中的消息有id吗_RabbitMQ 如何实现对同一个应用的多个节点进行广播

    作者:xyk_1021 (本文来自作者投稿) 1.背景 了解过RabbitMQ的Fanout模式,应该知道它原本的Fanout模式就是用来做广播的.但是它的广播有一点区别,来回顾下它的含义:Fanou ...

  2. 如何通过跟踪客户端程序发出的sql的方法来优化SQL

    简要说来,跟踪一个客户程序发出的SQL主要分成下面几步: 1) 识别要跟踪的客户端程序到数据库的连接(后面都用session代替),主要找出能唯一识别一个session的sid与serial#. 2) ...

  3. 13001.udp广播接收程序(python)

    文章目录 1 udp 广播接收程序 1 udp 广播接收程序 struct.unpack 函数默认解析的网络字节序数据 import io from socket import * import sy ...

  4. python搭建微信小程序卖货要收费用吗_个人的微信小程序做店铺收费吗?要收多少...

    微信小程序受到了广大用户的使用和喜爱,这种不用下载的应用,让人们能更快的开启和关闭应用,不用担心自己的内存不够.那么今天我们来了解下,个人的微信小程序做店铺收费吗?要收多少? 现在许多用了许多小程序了 ...

  5. 小程序“扫码购”的自助收银模式可以为商家带来什么?

    自助收银模式在零售方面的应用,主要有自助收银台自助结算.小程序"扫码购".APP自助购物等自助收银方式.不同的方式会带给商家和用户不同的体验,那么小程序"扫码购" ...

  6. 我们的系统检测到您的计算机网络中存在异常流量。此网页用于确认这些请求是由您而不是自动程序发出的。

    我们的系统检测到您的计算机网络中存在异常流量.此网页用于确认这些请求是由您而不是自动程序发出的. 用google搜索的时候遇到了这个问题. 具体的原因google也有说明 归纳成下面两点. 可能存在恶 ...

  7. 券商如何借助企业微信、小程序、视频号提高营收转化?

    近年来,小程序.视频号.企业微信联合打造了一个新的商业通信圈,越来越多的品牌及券商企业都入驻其中. 如今,具有直连12+亿微信客户优势的企业微信.商品交易总额近3万亿的小程序和日活跃用户数量超5亿的视 ...

  8. 【收藏】C#面试题整理笔试篇(最全1000+道带答案)300道填空 + 300道选择 + 300道判断 + 70道读程序写结果和看程序填空 + 100道简答题

    <程序员>曾陪伴了无数开发者成长.<新程序员>全新归来,推荐给大家! <新程序员> 一.填空: 1.操作符( && )被用来说明两个条件同为真的情况 ...

  9. python网络爬虫程序技术,Python网络爬虫程序技术

    spContent=该课程是2018年广东省精品在线开放课程.课程主要以爬取学生信息.爬取城市天气预报.爬取网站图像.爬起图书网站图书.爬取商城网站商品等5个项目为依托,讲解Web.正则表达式.Bea ...

最新文章

  1. 青少年python编程课_青少年之Python编程课程安排第一季
  2. linux命令join与paste
  3. 第四章 对象的类型和动态绑定
  4. ui-grid下拉过滤
  5. ucos 消息队列代码详解_用python实现 多进程队的列数据处理详解,零基础记得都收藏哦
  6. 标准I/O小程序-文件拷贝
  7. @Profile注解与@Conditional注解
  8. 连续汗蒸一星期有什么好处?
  9. 你有多温柔,就有多强大
  10. textView 属性总结
  11. 涡扇发动机的预测性维护
  12. moments音标_moments是什么意思_moments的翻译_音标_读音_用法_例句_爱词霸在线词典...
  13. 生命在于答疑——git推送本地到库鉴权失败
  14. JDK1.6安装_BouncyCastle JCE扩展加密算法解决JDK1.6 sftp连接openssh8.6Algorithm negotiation fail问题
  15. python公众号接口_用Python实现微信公众号API素材库图文消息抓取
  16. Windows7:修改系统注册表工具
  17. BFM:总线功能模型 zz
  18. xp升级到win7傻瓜教程_最简单xp一键升级win7重装
  19. visual studio code打开预览.md文件
  20. 对于试衣网的一点看法

热门文章

  1. 计算机网络学习笔记-目录(更新日期:2020.4.8)
  2. linux的驱动开发——交叉编译器
  3. 牛客16732 序列(排列组合)
  4. Leetcode:892. 三维形体的表面积(Java)
  5. 牛客网--2019校招--丰收
  6. response.end后抛了异常_(七)异常处理
  7. wps临时文件不自动删除_电脑:让 Windows 10 系统自动清理临时文件
  8. python快速入门课堂笔记_Python 快速入门笔记(9):模块和包
  9. python idle运行anaconda_在Python IDLE 下调用anaconda中的库教程
  10. C++ multimap查找某一个键的所有键值对