publisher代码

const amqp = require('amqp');let option = {host: 'server-ip',port: 5672,login: 'guest',password: 'guest',connectionTimeout: 10000,authMechanism: 'AMQPLAIN',vhost: '/',noDelay: true,ssl: {enabled: false}
}
const connection = amqp.createConnection(option);connection.on('error',function(e){console.log("Error from amqp: ", e);
})
let default_exchange = {};
connection.on('ready', function(){default_exchange = connection.exchange('fans',{type:'fanout'}); //创建 fanout 类型的交换机let q = connection.queue('my-queue');q.bind(default_exchange,'my-queue');let qq = connection.queue('qqq');qq.bind(default_exchange, 'qqq');setInterval(publish_message, 2000);
})let count= 0;
publish_message = function() {let message = {hello: 'world',time: Date.now(),count: count++};default_exchange.publish('', message);return console.log("my-queue message published: " + (JSON.stringify(message)) + " to queue: my-queue");};

  receiver1 代码

const amqp = require('amqp');let option = {host: 'server-ip',port: 5672,login: 'guest',password: 'guest',connectionTimeout: 10000,authMechanism: 'AMQPLAIN',vhost: '/',noDelay: true,ssl: {enabled: false}
}
const connection = amqp.createConnection(option);connection.on('error',function(e){console.log("Error from amqp: ", e);
})connection.on('ready', function(){connection.queue('my-queue', function(q){console.log('my-queue is already subscribing');q.bind('logs','my-queue', function(){q.subscribe(function(message){console.log('----receiveMessage: ',message);})});})
})

receiver2 代码

const amqp = require('amqp');let option = {host: 'server-ip',port: 5672,login: 'guest',password: 'guest',connectionTimeout: 10000,authMechanism: 'AMQPLAIN',vhost: '/',noDelay: true,ssl: {enabled: false}
}
const connection = amqp.createConnection(option);connection.on('error',function(e){console.log("Error from amqp: ", e);
})
connection.on('ready', function(){connection.queue('qqq', function(q){console.log('my-queue is already subscribing');q.bind('fans','qqq',function(){q.subscribe(function(message){console.log('----receiveMessage: ',message);})});})
})

初学,简单测试,理解不深,可能有潜在问题

转载于:https://www.cnblogs.com/lc-ant/p/9259240.html

node-amqp 使用fanout发布订阅rabbitmq消息相关推荐

  1. kafka redis vs 发布订阅_发布订阅的消息系统 Kafka的深度解析

    背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能 高吞吐 ...

  2. 发布订阅的消息系统 Kafka的深度解析

    发布&订阅的消息系统 Kafka的深度解析 2015-01-27 10:25 Jason Guo Jason Guo的博客 字号: T | T 一个典型的kafka集群中包含若干produce ...

  3. python使用pika订阅rabbitmq消息链接被重置问题

    最近在做一个运维监控系统的时候,使用python的pika插件订阅rabbitmq消息,程序在运行一段时间后,总是会报链接被充值的错误,具体报错如下: Traceback (most recent c ...

  4. RabbitMQ实例教程:发布/订阅者消息队列

    消息交换机(Exchange) RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列. 相反的,生产者只能发送消息给交换机(Exc ...

  5. Redis的发布订阅(消息队列,比如ActiveMQ,一方得到数据后,多方得到信息)

    什么是发布订阅? 发布和订阅是进程间的一种消息通信模式:发送者(publisher)将消息发送给一个第三方,订阅者(subscriber)从第三方那里接收消息. 这个第三方我们通常称之为 消息中间件, ...

  6. redisson究极爽文-手把手带你实现redisson的发布订阅,消息队列,延迟队列(死信队列),(模仿)分布式线程池

    参考资料 :分布式中间件实战:java版 (书籍), 多线程视频教程(视频)- 项目启动环境 导入依赖 <parent><groupId>org.springframework ...

  7. 14. Redis 发布订阅-实现消息队列

    Redis 除了做缓存, 也可以做消息队列, 实现简单的消息的发布和订阅.Redis 消息订阅支持精确订阅和模糊订阅两种模式! 1. Redis 消息队列 1.1 Redis 消息队列特点 当有新消息 ...

  8. ROS发布订阅的消息的种类及使用

    #1.消息(std_msgs)的种类 在/opt/ros/melodic/include/std_msgs文件夹中查询 或参考: https://www.itdaan.com/tw/b30f2309f ...

  9. 【夏目鬼鬼分享】RabbitMQ发布/订阅广播模式

    消息发送流程说明 可以有多个消费者 每个消费者都有自己的队列(queue) 每个队列都要绑定到交换机(Exchange)(都是一些临时队列) 生产者发送的消息只能发送到交换机,交换机来决定要发给那个队 ...

  10. RabbitMQ:订阅模型-消息订阅模式

    订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息. RabbitMQ 单 ...

最新文章

  1. 未定义与 double 类型的输入参数相对应的函数 eval_点评一下鸿蒙os的时钟计算函数...
  2. 5行脚本代码完美破解99%的过期软件
  3. SQL Server 2005 镜像构建说明(转载)
  4. img打 webpack_webpack打包html里面img后src为“[object Module]”问题
  5. Linux系统优化脚本
  6. Error和Exception有什么区别?(还在总结)
  7. 因为你的电脑安装了即点即用_即你所爱
  8. 7-7 汉密尔顿回路 (25 分)(C语言实现)
  9. SS不能在Win7中打开,出现停止运行
  10. error C3859: 超过了PCH的虚拟内存范围;请使用“-Zm137”或更大的命令行选项重新编译
  11. ios键盘横屏_平板电脑就只能追剧玩游戏?看这款外设键盘如何让iPad爱上办公...
  12. 时间序列深度学习:seq2seq 模型预测太阳黑子
  13. oracle审计功能启动关闭
  14. python判断火车票座位_利用Python实现命令行版的火车票查看器
  15. MATLAB学习笔记(注释超详细)
  16. AlertManager 告警信息
  17. GB28380台式计算机,微型计算机能效限定值及能效等级 GB28380-2012
  18. 解决Request processing failed; nested exception is com.sun.jersey.api.client.UniformInterfaceException
  19. 这些坑别踩!游戏随机地图生成开发经验分享
  20. 为什么android手机内存不够,安卓手机内存不足怎么办?安卓手机内存越来越小解决方法汇总...

热门文章

  1. 注解的定义与反射调用
  2. HTML相对路径相对目录--上级目录及下级目录的写法
  3. 递归求解斐波那契fib(10)一共调用了多少次fib()函数
  4. c++(/clr)非托管类型和托管类型互转
  5. Java 开源报表制作
  6. Customizing AxWebBrowser, make it powerful
  7. 数据库新手常犯的5个错误
  8. oracle跨库连接查询
  9. 117 Populating Next Right Pointers in Each Node II
  10. 20190909 SpringBoot集成Swagger