Ruby使用RabbitMQ(基础)

RabbitMQ documentation

rabbitmq-tutorials

rabbitmq-configure

bunny

前提

最近刚刚接触到mq, 就在极客时间上买了一门关于mq的课程.
学习了一些基础加上在 RabbitMQ 官网上的例子.

总结了一下.

为什么要使用mq

消息队列(mq)可以帮助我们去处理系统之间的消息传递.
帮助我们去解决消息在传递过程中可能出现的数据丢失问题.

同时,消息队列还可以起到缓存的作用.
消息队列可以暂存一些消息. 平衡消息上下游之间速度不平衡.

mq是必须的吗

mq 不是必须的, 很多系统没有mq 也可以正常运行.
但是, 有mq 可以帮助我们很好地处理系统之间的交互.

mq的应用场景

  • 异步处理
  • 流量控制
  • 日志
  • 服务解耦

不适合mq的场景

  • 数据一致性要求很高
  • 第三方支付
  • 银行转账等

术语 jargon

RabbitMQ 就像一个邮局一样, 有发送信件的人, 也有接受人.
人们不去关心到底通过什么样子的方式, 把信交到收信人那里的.
人们只要把信交给邮局就好.

RabbitMQ 在系统间就起到邮局的作用.
帮助系统之间进行通信. 并且要确保信息不丢失.

Producer(生产者)

Producer 就是消息的生产者.

也就是消息的上游, 一个系统A 产生了一些信息.
可能需要另外一个系统B 去处理.
那么, 系统A 就是一个 Producer

Consumer(消费者)

一个系统B, 需要处理另一个系统A 的消息.
那么 系统B就是一个 Consumer

注意: 一个系统可以是Producer,也可以是Consumer.

Broker

Broker 就是mq, 消息队列中间件.
它去协调在两个系统之间的消息. 尽可能地让消息不丢失,不重复.

安装

  • deepin操作系统
  • 使用docker安装
# 安装docker
sudo apt-get install docker-ce
# 使用docker运行
sudo docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# 关闭
docker stop rabbitmq
# 启动
docker start -a rabbitmq

docker 运行完成后,本地会启动一个server,
也有一个web端的控制台

http://localhost:15672/#/

使用bunny

刚刚安装了RabbitMQ的server.

那么,现在就应该安装RabbitMQ的client了;
RabbitMQ 支持的语言有很多中.
这里因为工作原因, 选择使用了Ruby的bunny

安装 bunny 用于和rabbitmq 进行交互

# 通过gem来安装
gem install bunny
# 直接require 就可以使用了
require 'bunny'

简单使用

Producer

代码中 hostname 是我自己在hosts设置的;
如果 Producer 和 Consumer 在同一个主机上.
就不用设置 hostname

# 引入 bunny
require 'bunny'
# hostname 是我自己在hosts设置的;
connection = Bunny.new(hostname: 'dev.local')
connection.startchannel = connection.create_channel# 获取queue是幂等的
queue = channel.queue('simple')100.times do |n|message = "第#{n}条消息"queue.publish(message)
end
# 关闭连接
connection.close

Consumer

require 'bunny'connection = Bunny.new(hostname: 'dev.local')
connection.startchannel = connection.create_channel# 获取queue是幂等的
queue = channel.queue('simple')# block: true 表明会一直监听mq, 等待消息的传入
# 真实的环境不要使用
queue.subscribe(block: true) do |_delivery_info, _properties, body|# 模拟延时任务, 延时1ssleep 1puts body
end

消息队列可以帮助我们处理消息;
无论是先启动 Consumer 还是先启动 Producer
消息是不会丢失的;

我们分别先执行Consumer,Producer;
我们收到的信息都是一样的;

Consumer 可扩展性

当一个Consumer的处理速度已经满足不了 Producer的生产速度时,
我们可以同时运行多个Consumer.

我们可以执行多个上面 Consumer 的代码;
我们可以看到消息被mq一个个地均分给了一个个的Consumer.

但是, 注意: 这里需要先启动多个 Consumer, 再启动 Producer.
不然, 只能有一个Consumer收到消息.

消息确认

在上面的 Consumer 代码中, 如果一个Consumer 挂了;
或者在 Consumer处理问题发生了问题;
导致消息没有正常处理.

那么这条消息就是丢失了;

RabbitMQ 中如何解决这个问题;
在client上加入 manual_ack: true 就可以了;

queue.subscribe(block: true, manual_ack: true) do |_delivery_info, _properties, body|# 模拟延时任务, 延时1ssleep 1puts body# 发送ack, 通知mq 消息已处理完成;# 必须发送ack, 不然mq 会一直requeuechannel.ack(_delivery_info.delivery_tag)
end

修改完了, 我们再看一下结果;
就可以发现, 如果一个Consumer挂掉了;

消息会转移到其它存活的Consumer上.

注意: 如果使用了manual_ack, 一定要发送 ack. 不然RabbitMQ会不停占用内存, 最后导致系统崩了;

消息持久化(Message durability)

通过上面的代码, 我们解决了当Consumer挂掉的时候, mq 会帮助我们把消息发送给别的Consumer;
确保了, 在Consumer端处理得当;

但是, 我们也要考虑到当 RabbitMQ server 挂了的时候.
我们如何处理呢?

  1. 确保 queue 是可持久化的
  2. 确保 messages 是可持久化的

我们在代码中可以添加

同时在Consumer和Producer的代码中修改
注意: 我们使用了不同的queue

# 加入 durable: true
queue = channel.queue('simple-durable', durable: true)

在Producer 中加入

queue.publish(message, persistent: true)

执行docker kill 可以在试试看效果

不过要注意:

这样并不会完全保证消息不丢失; 尽管RabbitMQ知道了要把数据保存到硬盘中.
但是仍然有一个短暂的窗口时间: 当mq 接受到消息, 但是还未将消息保存到硬盘.

同样, MQ也不会收到一条消息就保存到内存中去;
这个持久化并不strong. 当然,一般情况下够用.

如果需要一个更strong的保证. 可以使用 publisher confirms

公平分配(Fair dispatch)

我们运行上面的Consumer 可以发现.
消息其实是公平分配给每个Consumer的.

那么,这可能会出现问题;
假如某个节点处理消息的速度快, 但是因为公平分配.
这个节点的利用率就比较低了;

为了解决这个问题:
我们可以使用 channel.prefetch(n)

n = 1
channel.prefetch(n)

这将告诉RabbitMQ 不要一次发布超过 n 条消息给Consumer;
直到Consumer处理完消息, 并返回ack.

注意: 如果所有的Consumer都处于繁忙状态, 消息可能会堆积.

总结消息丢失

确认消息丢失

首先, 我们需要确定消息丢失.

我们可以利用消息队列的有序性来验证是否有消息丢失

一般MQ会有 拦截器机制, 让我们在Producer和Consumer代码之前,
进行消息丢失的验证;

图中的三个系统 Produce, Broker, Consumer

三个阶段 生产阶段, 存储阶段, 消费阶段

四个流程 send, pull, ack_to_broker, ack_to_producer

都有可能因为系统,网络等原因导致消息的丢失;

Producer

如何在生产者这一阶段确保消息不丢失呢?

生产者发送给MQ消息后,
MQ会给生产者一个ack. 如果没有接受到ack.
说明MQ 挂掉了,或者网络没能达到等等.

这时候, 需要生产者这一端进行数据重发.

只要 Producer 收到了 Broker 的确认响应,
那说明在生产者这一阶段是确保了消息不丢失;

但是, 这也有可能导致消息重复;

Broker

消息传递到了MQ这里. 一般情况下,消息可以正常发送给Consumer.
但是, 如果MQ挂了; 怎么处理?

我们在之前的代码中提到了消息的持久化;
消息的持久化, 可以确保消息在MQ挂了的情况下, 还能够保存下来;

等到下次重启就可以继续把消息发送给Consumer.

Consumer

在Consumer阶段. 很有可能在处理消息时, 系统重启或者挂掉;
那么, 处理消息的业务逻辑还没有完成.

这条消息就不能算是被成功处理了;
所以需要MQ 再次重发消息;

不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认.

消息重复

消息在网络传输过程中发送错误,由于发送方收不到确认,会通过重发来保证消息不丢失。
但是,如果确认响应在网络传输时丢失,也会导致重发消息。

也就是说,无论是 Broker 还是 Consumer 都是有可能收到重复消息的

质量标准

传递消息时能够提供的服务质量标准

  • at more once 消息最多发送一次, 不管不顾. 发完就完事了;

  • at least once 至少一次. 消息会有确认机制. 保证消息被消费者消费了; 但是肯定会有重复;

  • exactly once 仅仅一次; 这个MQ没有实现; 也很难实现; 因为系统层面上的问题, 不能全让MQ 处理;

exactly once 的实现需要 消息队列 + 系统处理

从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。
但是, 很多代码不是幂等的;

需要我们处理成幂等;

消费幂等

消费者一段的代码, 如果是幂等的;
那么, 遇到多余的消息. 直接执行逻辑也无妨;

例如:

给Tony 性别设置为 男;

这个逻辑执行1次, 和执行 n次效果一样;

所以,如果业务处理逻辑本身就是幂等的,
那么,就不用考虑消息重复的问题;

幂等: 多次执行和执行一次结果相同;

消息查重

更新的数据设置前置条件

消息先去重,根据业务ID(或者其它能够标识消息唯一性的就行),
去查询是否消费过此消息了,
消费了,则抛弃,否则就消费.

如果大家看完了上面的内容, 可以看看链接中, RabbitMQ更为常用的介绍.

Ruby使用RabbitMQ(进阶)

Ruby使用RabbitMQ(基础)相关推荐

  1. rabbitmq基础1——消息中间件概念、Rabbitmq的发展起源和基本组件的作用流程

    文章目录 一.消息中间件 1.1 概念 1.2 作用 1.2.1 消息队列持久化 1.2.2 消息队列分发策略 1.2.3 消息队列的高可用和高可靠 1.2.3.1 一主多从共享集群 1.2.3.2 ...

  2. (2)RabbitMQ基础概念及工作流程详解

    上一节中我们对MQ做了一个概要介绍,这一节开始我们选取RabbitMQ开始进行学习,本节将会RabbitMQ做个简单介绍,并且会对其常见的基础概念做个讲解,最后会简单介绍一下RabbitMQ的工作流程 ...

  3. 转 RabbitMQ 基础概念及 Spring 的配置和使用 推荐好文 举例讲解

    从不知道到了解-RabbitMQ 基础概念及 Spring 的配置和使用 转: sumile.cn  »  从不知道到了解-RabbitMQ 基础概念及 Spring 的配置和使用 序言 你在系统中是 ...

  4. java B2B2C Springcloud电子商务平台源码-RabbitMQ基础概念...

    RabbitMQ是一个由erlang开发的AMQP的开源实现. 需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码 一零三八七七四六二六 AMQP,即Adva ...

  5. RabbitMQ基础知识详解

    RabbitMQ基础知识详解 2017年08月28日 20:42:57 dreamchasering 阅读数:41890 标签: RabbitMQ 什么是MQ? MQ全称为Message Queue, ...

  6. Rabbitmq 基础

    Rabbitmq基础 1 MQ 基本概念 1.1 MQ概述 MQ全称Message Queue,是在消息的传输过程中保存消息的容器,多用于分布式系统之间进行通信. 分布式系统通信两种方式:直接远程调用 ...

  7. rabbitmq基础2——rabbitmq二进制安装和docker安装、基础命令

    文章目录 一.RabbitMQ安装 1.1 二进制安装 1.2 rabbitmqctl工具 1.3 docker安装 二.rabbitmq基础命令 2.1 多租户与权限类 2.1.1 创建虚拟主机 2 ...

  8. RabbitMQ基础知识介绍、RabbitMQ的安装

    RabbitMQ基础知识介绍 官方解释:MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过                   读写出入队列的消息 ...

  9. RabbitMQ基础概念详细介绍

    转至:http://www.ostest.cn/archives/497 引言 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用.通讯的问题而苦恼. ...

最新文章

  1. json qbytearray 串 转_如何通过QByteArray在JSON中存储QPixmap?
  2. [转]七大.NET开源框架
  3. openssl 生成 cert.key cert.pem
  4. js indexof用法indexOf()定义和用法
  5. linux命令的导入,[导入]Linux基本命令
  6. 集群系统与事务处理需要注意的一点
  7. MVC POST请求后执行javascript代码
  8. Jquery 获取 radio选中值
  9. 大道至简伪代码(第一个博客)
  10. 51单片机——LCD1602
  11. 分治应用--万里挑一 找假硬币
  12. mysql语法错误文件_使用logstash同步MySQL的数据时,在jdbc查询sql文件时报sql语法错误,sql文件是navicat生成的...
  13. LeetCode OJ - Longest Substring Without Repeating Characters
  14. 为什么会找不到D层文件?
  15. U盘等无法弹出的解决办法
  16. 秒杀项目总结及面试常见问题
  17. 两种方法:在 PowerPoint 中插入视频
  18. 哥去微软面试,第一句话就被…
  19. COLA开发流程总结
  20. 洛谷T156530 儒略历详解

热门文章

  1. 牛客网入门题--最大公约数与最小公倍数
  2. 免费苹果CMS影视站采集器影视站SEO优化教程
  3. Python 有趣的囚犯问题
  4. 太厉害了,终于有人能把云计算、大数据和人工智能一次性讲明白了
  5. 看上去很美--次世代游戏平台XBOX360测评
  6. 在使用集合中的contains(),要根据实际情况改写集合中对象的equals(Object obj)方法------改写List集合中equals(Object obj)的方法
  7. 制作表白墙,给TA一个惊喜吧
  8. QQ浏览器性能提升之路-windows性能分析工具篇
  9. 东南亚电商Shopee爆款打造小技巧,一定要收藏!
  10. 升级Mountain Lion系统后因为 “来自身份不明开发者” 不能打开某些软件的解决方法