IT实战联盟博客:http://blog.100boot.cn/

消息中间件

消息队列中间件(Message Queue Middleware, 简称为 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流, 并基于数据通信来进行分布式系统的集成。 通过提供消息传递和消息排队模型 ,它可以在分布式环境下扩展进程间的通信。一般有两种传递模式:点对 点 ( P2P, Point-to-Point )模式和发布/订阅( Pub/Sub )模式

MQ的作用
1)解耦:在项目启动之初是很难预测未来会遇到什么困难的,消息中间件在处理过程中插入了一个隐含的,基于数据的接口层,两边都实现这个接口,这样就允许独立的修改或者扩展两边的处理过程,只要两边遵守相同的接口约束即可。
2)冗余(存储):在某些情况下处理数据的过程中会失败,消息中间件允许把数据持久化知道他们完全被处理
扩展性:消息中间件解耦了应用的过程,所以提供消息入队和处理的效率是很容易的,只需要增加处理流程就可以了。
3)削峰:在访问量剧增的情况下,但是应用仍然需要发挥作用,但是这样的突发流量并不常见。而使用消息中间件采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃
4)可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合性,当一个处理消息的进程挂掉后,加入消息中间件的消息仍然可以在系统恢复后重新处理
5)顺序保证:在大多数场景下,处理数据的顺序也很重要,大部分消息中间件支持一定的顺序性
6)缓冲:消息中间件通过一个缓冲层来帮助任务最高效率的执行
7)异步通信:通过把把消息发送给消息中间件,消息中间件并不立即处。

RabbitMQ

概念

RabbitMQ 整体上是一个生产者与消费者模型, 主要负责接收、存储和转发消息。 可以把消 息传递的过程想象成: 当你将一个包裹送到邮局, 邮局会暂存并最终将邮件通过邮递员送到收 件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。 从计算机术语层面来说, RabbitMQ 模型更像是一种交换机模型。

1
2
3
4
5
6
7
8
9
Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息的载体,每个消息都会被投到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
Producer:消息生产者,就是投递消息的程序.
Consumer:消息消费者,就是接受消息的程序.
Channel:消息通道,在客户端的每个连接里,可建立多个channel.

Broker:消息中间件的服务节点

对于 RabbitMQ 来说, 一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点, 或者 RabbitMQ 服务实例。 大多数情况下也可以将一个 RabbitMQ Broker 看作一 台 RabbitMQ 服务器 。

Exchange:交换机

生产者将消息发送到 Exchange ,由交换器将消息路由到一个或者多个队列中。

RoutingKey 与 Binding Key

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。

RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

  1. fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。
  2. direct:把消息投递到那些binding key与routing key完全匹配的队列中。
  3. topic:将消息路由到binding key与routing key模式匹配的队列中。

示例代码

由官方教程翻译而来,用ts实现,因为觉得ts好使
https://www.rabbitmq.com/tutorials

能搜索rabbitmq的一般都会了解一些概念,就直接上代码吧。

$ node -v
v8.9.3
# 为了执行ts代码
$ ts-node -v
v9.0.0
$ brew install rabbitmq
$ brew services start rabbitmq

访问 http://127.0.0.1:15672, 进入rabbitmq后台管理界面。

访问 http://tryrabbitmq.com/ 进入模拟 生产-交换机-队列-消费者

以上为准备阶段与可视化阶段

direct

以下代码主要实现生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// emit_log_direct.ts 生产者
import * as amqp from 'amqplib'const url = `amqp://localhost`;(async function publish(){const exchange = 'direct_logs';const msg = 'hello world';const routingKeys = ['info', 'error', 'warning'];// 1. 创建链接const connect = await amqp.connect(url);// 2. 创建channelconst channel = await connect.createChannel();// 3. 创建or连上 交换机// 3.1 直连方式;await channel.assertExchange(exchange, 'direct', { durable: false });let i = 0;while(i<1){const index = random(3);// 4. 消息发给交换机channel.publish(exchange, routingKeys[index], Buffer.from(msg));console.log(`[x] Sent ${msg}-- ${routingKeys[index]}`);i++;}await sleep(1);await connect.close();process.exit(0);
})();function sleep(time: number) {return new Promise((resolve) => setTimeout(resolve, time*1000));
}function random(max: number){return Math.floor(Math.random() * Math.floor(max));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// receive_logs_direct.ts 消费者
import * as amqp from 'amqplib'
const url = `amqp://localhost`;(async function receive(){const connect = await amqp.connect(url);const channel = await connect.createChannel();const exchange = 'direct_logs';await channel.assertExchange(exchange, 'direct', { durable: false });const routingKeys = ['info', 'error', 'warning'];const queueA = await channel.assertQueue('queueA');await channel.bindQueue(queueA.queue, exchange, routingKeys[0]);await channel.bindQueue(queueA.queue, exchange, routingKeys[1]);await channel.bindQueue(queueA.queue, exchange, routingKeys[2]);const queueB = await channel.assertQueue('queueB');await channel.bindQueue(queueB.queue, exchange, routingKeys[1]);channel.consume(queueA.queue, msg => {console.log("队列AAAAA:", msg.content.toString())}, { noAck: true });channel.consume(queueB.queue, msg => {console.log("队列BBBBBBBB:", msg.content.toString())}, { noAck: true });// await connect.close();
})();
1
2
3
4
5
6
7
8
9
10
# 执行
$ ts-node emit_log_direct.ts
[x] Sent hello world-- error
[x] Sent hello world-- info$ ts-node receive_logs_direct.ts
队列AAAAA: hello world
队列BBBBBBBB: hello world队列AAAAA: hello world

topic

发送到主题交换机的消息 必须是单词列表,以.分割, routing_key无效。通常它们指定与消息相关的某些功能。 一些有效的路由关键示例:“stock.usd.nyse”,“ nyse.vmw”,“ quick.orange.rabbit”。 路由关键字中可以包含任意多个单词,最多255个字节。

1
2
* (star) : 匹配一个单词
# (hash) : 匹配 0 或 更多单词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// emit_log_topic.ts
import * as amqp from 'amqplib'const url = `amqp://localhost`;(async function publish(){const exchange = 'topic_logs';const msg = 'hello world';// bothqueue、firstqueue、secondqueueconst keys = ['quick.orange.rabbit', 'quick.orange.fox', 'lazy.brown.fox'];// 1. 创建链接const connect = await amqp.connect(url);// 2. 创建channelconst channel = await connect.createChannel();// 3. 创建or连上 交换机// 3.1 直连方式;await channel.assertExchange(exchange, 'topic', { durable: false });let i = 0;while(i<1){const index = random(3);// 4. 消息发给交换机channel.publish(exchange, keys[index], Buffer.from(msg));console.log(`[x] Sent ${msg}-- ${keys[index]}`);i++;}await sleep(1);await connect.close();process.exit(0);
})();function sleep(time: number) {return new Promise((resolve) => setTimeout(resolve, time*1000));
}function random(max: number){return Math.floor(Math.random() * Math.floor(max));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// receive_logs_topic.ts
import * as amqp from 'amqplib'
const url = `amqp://localhost`;(async function receive(){const connect = await amqp.connect(url);const channel = await connect.createChannel();const exchange = 'topic_logs';await channel.assertExchange(exchange, 'topic', { durable: false });const queueA = await channel.assertQueue('queue_topic_A');// # 匹配多个单词await channel.bindQueue(queueA.queue, exchange, '*.orange.*');const queueB = await channel.assertQueue('queue_topic_B');// * 可以替代一个单词await channel.bindQueue(queueB.queue, exchange, '#');channel.consume(queueA.queue, msg => {console.log("队列AAAAA:", msg.content.toString())}, { noAck: true });channel.consume(queueB.queue, msg => {console.log("队列BBBBBBBB:", msg.content.toString())}, { noAck: true });// await connect.close();
})();
1
2
3
4
5
6
7
8
9
10
# 执行
$ ts-node emit_log_topic.ts
[x] Sent hello world-- error
[x] Sent hello world-- info$ ts-node receive_logs_topic.ts
队列AAAAA: hello world
队列BBBBBBBB: hello world队列AAAAA: hello world

GitHub地址--[https://github.com/simuty/Node_Demo](https://github.com/simuty/Node_Demo)

参考

rabbitmq官网教程

Node rabbitmq 入门就够了相关推荐

  1. RabbitMQ入门到精通

    RabbitMQ 1. 消息中间件概述 1.1. 为什么学习消息队列 电子商务应用中,经常需要对庞大的海量数据进行监控,随着网络技术和软件开发技术的不断提高,在实战开发中MQ的使用与日俱增,特别是Ra ...

  2. RabbitMQ入门到掌握

    RabbitMQ入门到掌握 一.消息队列 1.MQ 的相关概念 1.2 什么是MQ 1.2 为什么要用MQ ①流量消峰 ②应用解耦 ③异步处理 1.3 MQ 的分类 ①ActiveMQ ②Kafka ...

  3. 超详细的RabbitMQ入门

    转载:超详细的RabbitMQ入门,看这篇就够了!-阿里云开发者社区 思维导图 一.什么是消息队列 消息指的是两个应用间传递的数据.数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象. ...

  4. RabbitMQ 入门系列(10)— RabbitMQ 消息持久化、不丢失消息

    消息要保持"持久化",即不丢失,必须要使得消息.交换器.队列,必须全部 "持久化". 1. 生产者怎么确认 RabbitMQ 已经收到了消息? # 打开通道的确 ...

  5. rabbitMQ入门程序

    1.生产者 /*** rabbitMQ入门程序消费者** @author xiaoss* @date 2020年10月27日 22:02*/ public class Producer01 {//队列 ...

  6. Node初学者入门,一本全面的NodeJS教程,微小的web框架,能实现文件上传功能以及数据解析功能...

    Node初学者入门,一本全面的NodeJS教程 转载于:https://www.cnblogs.com/hfultrastrong/p/8036672.html

  7. node.js入门 - 9.api:http

    node一个重要任务是用来创建web服务,接下来我们就学习与此相关的一个重要的api -- http.我们使用http.createServer()创建一个http服务的实例,用来处理来自客户的请求. ...

  8. 《Node.js入门》Windows 7下Node.js Web开发环境搭建笔记

    最近想尝试一下在IBM Bluemix上使用Node.js创建Web应用程序,所以需要在本地搭建Node.js Web的开发测试环境. 这里讲的是Windows下的搭建方法,使用CentOS 的小伙伴 ...

  9. rabbitmq 入门demo

    rabbitmq 入门demo http://www.cnblogs.com/jimmy-muyuan/p/5428715.html http://www.cnblogs.com/shanyou/p/ ...

  10. RabbitMQ 入门:2. Exchange 和 Queue

    上文RabbitMQ 入门:1. Message Broker(消息代理)提到过 RabbitMQ 实现了 AMQP 这个协议(RabbitMQ 所支持的 AMQP 的版本是 0.9.1),这个协议的 ...

最新文章

  1. Blazeface 人脸检测器
  2. Python中循环(列表循环)的学习笔记~
  3. 如何在Android文本视图周围添加边框?
  4. HDU 漫步校园 (记忆化搜索)
  5. iOS开发中的错误整理,再一次整理通过通知中心来处理键盘,一定记得最后关闭通知中心...
  6. Python数据分析、挖掘常用工具
  7. kubernetes 容器持久化存储PV、PVC、StorageClass
  8. HTML方式显示邮件无法打开,HTML格式的电子邮件不能正确显示
  9. wget 命令下载网络文件到指定目录
  10. 第三届“传智杯”全国大学生IT技能大赛(初赛B组)
  11. 《CSS世界》--张鑫旭 : 前端样式高手进阶CSS
  12. Mac下解压.bin文件
  13. 添加或删除程序 0x00310030指令引用的0x00310030内存。该内存不能为written。
  14. 畅游或将私有化退市股价涨近50%,搜狐“吃饱”后能重回巅峰吗?
  15. 安装华为运动健康beta版本安装失败
  16. 饥饿游戏2:星火燎原[The Hunger Games:Catching Fire]
  17. XCode使用googletest(包括googlemock)
  18. eap协议 c语言,CCNP无线技术知识点-EAP和EAPOL协议报文详解
  19. 80款FLASH小游戏合辑,附Flash播放器 | Yongd's Blognbsp;nbsp;分享网络好资源
  20. android 电池检测软件,电池寿命检测软件下载-电池寿命检测 安卓版v2.7.0-PC6安卓网...

热门文章

  1. Linux死锁检测-Lockdep
  2. bagging boosting 随机森林 GBDT对比
  3. Apache Parquet 与Apache ORC简介
  4. redis 实战系列二:用python操作redis集群
  5. PipeMapRed.waitOutputThreads(): subprocess failed with code N
  6. 思科网院Packet Tracer实验(三)调查运行中的 TCP/IP和OSI模型
  7. 51nod1118--简单DP
  8. python以下导入包的格式错误的是_Python结合Tableau,万字长文搞定传统线下连锁店数据分析...
  9. 计算机专业毕设外文翻译springboot_计算机毕业设计之SpringBoot物流管理系统
  10. centos7上mycat安装_Mysql+Mycat实现数据库主从同步与读写分离