摘要: 使用RabbitMQ的消息队列,可以有效提高系统的峰值处理能力。

RabbitMQ简介

RabbitMQ是消息代理(Message Broker),它支持多种异步消息处理方式,最常见的有:

  • Work Queue:将消息缓存到一个队列,默认情况下,多个worker按照Round Robin的方式处理队列中的消息。每个消息只会分配给单个worker。
  • Publish/Subscribe:每个订阅消息的消费者都会收到消息,因此每个消息通常会分配给多个worker,每个worker对消息进行不同的处理。

RabbitMQ还支持Routing、Topics、以及Remote procedure calls (RPC)等方式。

对于不同的消息处理方式,有一点是相同的,RabbitMQ是介于消息的生产者和消费者的中间节点,负责缓存和分发消息。RabbitMQ接收来自生产者的消息,缓存到内存中,按照不同的方式分发给消费者。RabbitMQ还可以将消息写入磁盘,保证持久化,这样即使RabbitMQ意外崩溃了,消息数据不至于完全丢失。

为什么使用RabbitMQ?

最简单的一点在于,它支持Work Queue等不同的消息处理方式,可以用于不同的业务场景。对于我们Fundebug来说,目前只用过RabbitMQ的Work Queue,即消息队列。

使用消息队列,可以将不算紧急、但是非常消耗资源的计算任务,以消息的方式插入到RabbitMQ的队列中,然后使用多个处理模块处理这些消息。

这样做最大的好处在于:提高了系统峰值处理能力。因为,来不及处理的消息缓存在RabbitMQ中,避免了同时进行大量计算导致系统因超负荷运行而崩溃。而那些来不及处理的消息,会在峰值过去之后慢慢处理掉。

另一个好处在于解耦。消息的生产者只需要将消息发送给RabbitMQ,这些消息什么时候处理完,不会影响生产者的响应性能。

广告:欢迎免费试用Fundebug,为您监控线上代码的BUG,提高用户体验~

安装并运行RabbitMQ

使用Docker运行RabbitMQ非常简单,只需要执行一条简单的命令:

sudo docker run -d --name rabbitmq -h rabbitmq -p 5672:5672 -v /var/lib/rabbitmq:/var/lib/rabbitmq registry.docker-cn.com/library/rabbitmq:3.7

对于不熟悉Docker的朋友,我解释一下docker的命令选项:

  • -d : 后台运行容器
  • --name rabbitmq : 将容器的名字设为rabbitmq
  • -h rabbitmq : 将容器的主机名设为rabbitmq,希望RabbitMQ消息数据持久化保存到本地磁盘是需要设置主机名,因为RabbitMQ保存数据的目录为主机名
  • -p 5672:5672 : 将容器的5672端口映射为本地主机的5672端口,这样可以通过本地的5672端口访问rabbitmq
  • -v /var/lib/rabbitmq:/var/lib/rabbitmq:将容器的/var/lib/rabbitmq目录映射为本地主机的/var/lib/rabbitmq目录,这样可以将RabbitMQ消息数据持久化保存到本地磁盘,即使RabbitMQ容器被删除,数据依然还在。

Docker为官方镜像提供了加速服务,因此命令中Rabbit的Docker镜像名为registry.docker-cn.com/library/rabbitmq:3.7

如果你不会Docker,建议你学习一下。如果你不想学,Ubuntu 14.04下安装RabbitMQ的命令是这样的:

sudo echo "deb http://www.rabbitmq.com/debian testing main" | sudo tee -a /etc/apt/sources.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server

启动RabbitMQ:

sudo service rabbitmq-server start

消息队列代码示例

下面,我们使用Node.js实现一个简单消息队列。

消息的生产者:sender.js

const amqp = require("amqplib");const queue = "demo";async function sendMessage(message)
{const connection = await amqp.connect("amqp://localhost");const channel = await connection.createChannel();await channel.assertQueue(queue);await channel.sendToQueue(queue, new Buffer(message),{// RabbitMQ关闭时,消息会被保存到磁盘persistent: true});
}setInterval(function()
{sendMessage("Hello, Fundebug!");
}, 1000)
  • 在sender中,不断地往消息队列中发送"Hello, Fundebug!"。

消息的消费者:receiver.js

const amqp = require("amqplib");const queue = "demo";async function receiveMessage()
{const connection = await amqp.connect("amqp://localhost");const channel = await connection.createChannel();await channel.assertQueue(queue);await channel.consume(queue, function(message){console.log(message.content.toString());channel.ack(message);});
}receiveMessage();
  • 在receiver中,从消息队列中读出message并打印。

我们用到了amqplib模块,用于与RabbitMQ进行通信,对于具体接口的细节,可以查看文档。

在调用sendToQueue时,将persistent属性设为true,这样RabbitMQ关闭时,消息会被保存到磁盘。测试这一点很简单:

  • 关闭receiver
  • 启动sender,发送消息给RabbitMQ
  • 重启RabbitMQ(sudo docker restart rabbitmq)
  • 启动receiver,会发现它可以接收sender在RabbitMQ重启之前发送的消息

由于RabbitMQ容器将保存数据的目录(/var/lib/rabbitmq)以数据卷的形式保存在本地主机,因此即使将RabbitMQ容器删除(sudo docker rm -f rabbitmq)后重新运行,效果也是一样的。

另外,这段代码采用了Node.js最新的异步代码编写方式:Async/Await,因此非常简洁,感兴趣的同学可以了解一下。

这个Demo的运行方式非常简单:

  • 运行RabbitMQ容器
sudo ./start_rabbitmq.sh
  • 发送消息
node ./sender.js
  • 接收消息
node ./receiver.js

在receiver端,可以看到不停地打印"Hello, Fundebug!"。

代码仓库地址为:Fundebug/rabbitmq-demo

自动重连代码示例

在生产环境中,RabbitMQ难免会出现重启的情况,比如更换磁盘或者服务器、负载过高导致崩溃。因为RabbitMQ可以将消息写入磁盘,所以数据是"安全"的。但是,代码中必须实现自动重连机制,否则RabbitMQ停止时会导致Node.js应用崩溃。这里提供一个自动重连的代码示例,给大家参考:

消息生产者:sender_reconnect.js

const amqp = require("amqplib");const queue = "demo";var connection;// 连接RabbitMQ
async function connectRabbitMQ()
{try{connection = await amqp.connect("amqp://localhost");console.info("connect to RabbitMQ success");const channel = await connection.createChannel();await channel.assertQueue(queue);await channel.sendToQueue(queue, new Buffer("Hello, Fundebug!"),{// RabbitMQ重启时,消息会被保存到磁盘persistent: true});connection.on("error", function(err){console.log(err);setTimeout(connectRabbitMQ, 10000);});connection.on("close", function(){console.error("connection to RabbitQM closed!");setTimeout(connectRabbitMQ, 10000);});}catch (err){console.error(err);setTimeout(connectRabbitMQ, 10000);}
}connectRabbitMQ();

消息消费者:receiver_reconnect.js

const amqp = require("amqplib");const queue = "demo";var connection;// 连接RabbitMQ
async function connectRabbitMQ()
{try{connection = await amqp.connect("amqp://localhost");console.info("connect to RabbitMQ success");const channel = await connection.createChannel();await channel.assertQueue(queue);await channel.consume(queue, async function(message){console.log(message.content.toString());channel.ack(message);});connection.on("error", function(err){console.log(err);setTimeout(connectRabbitMQ, 10000);});connection.on("close", function(){console.error("connection to RabbitQM closed!");setTimeout(connectRabbitMQ, 10000);});}catch (err){console.error(err);setTimeout(connectRabbitMQ, 10000);}
}connectRabbitMQ();

这样的话,即使RabbitMQ重启,sender和receiver也可以自动重新连接RabbitMQ。如果你希望监控RabbitMQ是否出错,不妨使用我们Fundebug的Node.js错误监控服务,在连接触发"error"或者"close"事件时,第一时间发送报警,这样开发者可以及时定位和处理BUG。

参考

  • AMQP library (RabbitMQ) - async/await
  • RbbitMQ文档:Work Queue(JavaScript)
  • Won't persist data
  • How to build reconnect logic for amqplib

RabbitMQ入门教程相关推荐

  1. RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)

    RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等) 1. RabbitMQ简介及AMQP协议 开源的消息代理和队列 ...

  2. RabbitMQ 入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)

    发布/订阅 在上篇第二部分教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全不一样--分发一个消息给多个消费者(consumers).这种模式 ...

  3. 干货!消息队列RabbitMQ入门教程

    ​写在前面:全文12000多字,从为什么需要用消息队列,到rabbitMQ安装使用,如何使用JavaAPI生产消费消息,以及使用消息队列带来的一些常见问题.绝对很适合新手入门学习. 为什么需要消息队列 ...

  4. RabbitMQ入门教程——.NET客户端使用

    众所周知RabbitMQ使用的是AMQP协议.我们知道AMQP是一种网络协议,能够支持符合要求的客户端应用和消息中间件代理之间进行通信. 其中消息代理扮演的角色就是从生产者那儿接受消息,并根据既定的路 ...

  5. RabbitMQ入门教程——发布/订阅

    什么是发布订阅 发布订阅是一种设计模式定义了一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象.这个主题对象在自身状态变化时,会通知所有的订阅者对象,使他们能够自动更新自己的状态. 为了描述这种 ...

  6. php写入rabbit速度,RabbitMQ 入门教程(PHP) 实现延迟功能

    php 使用rabbitmq-delayed-message-exchange插件实现延迟功能 1.安装 3.6.x下载地址 3.7.x下载地址 下载后解压,并将其拷贝至(使用Linux Debian ...

  7. RabbitMQ入门教程(十一):消息属性Properties

    分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 发送消息可以为消息指定一些参数 Delivery mode: 是否持久化,1 - Non-persistent,2 ...

  8. RabbitMQ入门教程(四):工作队列(Work Queues)

    分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 工作队列 使用工作队列实现任务分发的功能,一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们 ...

  9. RabbitMQ入门教程 1

    一.MQ的基本概念 1.MQ的概述 2.MQ的优势 应用解耦 异步提速 削峰填谷 3.MQ的劣势 系统可用性降低 系统复杂度提高 一致性问题 4.MQ的使用条件 5.常见的MQ的产品 二.什么是Rab ...

最新文章

  1. java如何确保单线程_java – 任何单线程程序如何成为有效的多线程程序?
  2. POJ3345 Bribing FIPA 【背包类树形dp】
  3. 中央纪委网站:​深度关注 | 元宇宙如何改写人类社会生活
  4. 给Anaconda安装国内镜像,加快下载速度
  5. js中的location的href和pathname,search
  6. Maven学习笔记(2) --mvn archetype:create 说明
  7. javaone_替代JavaOne 2013
  8. Python常用的字符串操作
  9. 设计模式C++(Strategy策略模式)
  10. displaytag用法一
  11. 三次hermite插值matlab,三次hermite插值
  12. Tcl 语言 ——变量篇
  13. 搭建一个vue项目完整步骤及详细讲解
  14. msm 8953 camera 流程
  15. NOIP2018(普及组 ) 赛后感想 题解
  16. C++:实现量化Forward option远期合约期权测试实例
  17. Mac 编译OpenSSL 静态库、动态链接库
  18. tea系列加密算法学习笔记
  19. 微信小程序使用mqtt mpvue mosquito
  20. 电脑蓝屏、经常用一会后蓝屏问题检查修复

热门文章

  1. c语言课程案例设计报告,C语言课程设计报告—范例解读.doc
  2. 教资支付显示找不到服务器,教师资格证报名支付的问题,点了支付总是找不到服..._教师资格考试_帮考网...
  3. 20191010:希尔排序代码详解
  4. 20190825:(leetcode习题)最长公共前缀
  5. 2019 live tex 发行版_TeX Live 2019安装指南
  6. 三维点云体素滤波python_三维重建9:点云图像的滤波方法小结
  7. 如何在VB6.0里动态使用具有事件的对象
  8. Windows网络命令行程序
  9. 从零学ELK系列(一):为什么要跟我学从零学ELK系列
  10. 微软小冰学会画画了,堪称复活近代画家,还能命题作画