List队列

如果想把Redis当作消息队列来使用,最先想到的就是List这个数据类型,List的底层是一个双向链表,在头部和尾部操作元素的时间复杂度是O(1),所以List符合消息队列的模型

生产者使用lpush发布消息

127.0.0.1:6379> lpush queue msg1
(integer) 1
127.0.0.1:6379> lpush queue msg2
(integer) 2

消费者使用rpop拉取消息

127.0.0.1:6379> rpop queue
"msg1"
127.0.0.1:6379> rpop queue
"msg2"

List作为消息队列的模型:

但是这样会有一个问题,如果队列中已经没有消息了,就会返回null

127.0.0.1:6379> rpop queue
(nil)

我们在编写消费者逻辑时一般是一个死循环,这个循环会不断地从队列里面拉取消息,伪代码如下:

while(true){msg = redis.rpop("queue");if(msg == null)continue;//处理消息handle(msg);
}

如果此时队列为空,消费者就会一直拉取消息这会造成CPU空转,浪费CPU资源,还会增加Redis的压力

那怎么解决呢?

当队列为空的时候,我们让消费者休眠一段时间,再去拉取消息,代码修改为:

while(true){msg = redis.rpop("queue");if(msg == null){//没有消息就睡眠3秒sleep(3);continue;}//处理消息handle(msg);
}

虽然CPU空转的问题解决了,但是又有了一个新的问题,如果在休眠的过程中有新的消息来了,消费者不能第一时间获取新的消息,那么就会存在延迟

Redis提供了blpop和brpop,这里的b指的是block

使用blpop和brpop,当队列为空的时候,消费者拉取消息时就阻塞等待,有了消息才返回

现在可以这样修改代码

while(true){//0表示不设置超时时间msg = redis.brpop("queue", 0);if(msg == null)continue;//处理消息handle(msg);
}

如果设置了超时时间,那么在指定的时间后会返回null

注意:如果设置的超时时间太长,这个连接太久没有活跃,可能会被Redis Server判定为无效连接,然后会把这个客户端踢下线,所以如果使用这种方案,客户端需要有重连机制

总结一下List消息队列模型的缺点:

  • 不支持重复消费:当其中一个消费者拉取消息后,这条消息就从List中删除了,其他的消费者就不能再次消费了
  • 消息丢失:消费者拉取消息后,如果发生了宕机,那么这条消息就丢失了

发布/订阅模型:Pub/Sub

在分析List消息队列的时候有两个问题,其中一个就是不支持重复消费,Pub/Sub正好可以解决这个问题

启动两个消费者,订阅同一个队列

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1

此时两个消费者都会被阻塞,等待新消息的到来

启动一个生产者,发布消息

127.0.0.1:6379> publish queue msg1
(integer) 2

此时,两个消费者就会解除阻塞,收到生产者发来的消息

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
1) "message"
2) "queue"
3) "msg1"

Pub/Sub的底层就是维护了一个字典,key就是频道,value是一个链表,当一个消费者订阅一个频道的时候,就会把该消费者添加到key对应的链表中,生产者发布消息就是将消息发送给链表中所有的消费者

Pub/Sub支持阻塞式拉取消息,还支持重复消费,但是他的缺点是:

  • 消息丢失

消费者宕机,Redis宕机,消息堆积都有可能造成消息丢失,原因如下:

Pub/Sub没有基于任何数据类型,也没有做任何的数据存储,它只是单纯的为生产者和消费者建立数据转发通道,把符合规则的数据从一端转发到另一端,在这个过程中没有任何的数据存储,一切都是实时转发的

如果消费者宕机,新的消息会因为找不到消费者被丢弃,当消费者重新上线,就只能接收新的消息,所以在使用Pub/Sub时,必须先订阅频道,生产者才能发布消息,否则消息会丢失

此外,因为没有持久化,所以Pub/Sub的操作不会写入到RDB或者AOF中,当redis宕机重启,消息也会全部丢失

最后就是消息积压的时候,每一个消费者订阅一个队列时,Redis都会这个给消费者分配一个缓冲区,当生产者发布消息时,Redis就会把消息写入到对应消费者的缓冲区里,然后消费者再从缓冲区中读取消息

如果消费者读取消息慢,会造成消息积压,缓冲区内存持续增长,如果超过了配置的上线,Redis会强制把这个消费者踢下线,这样消费者就消费失败,造成数据丢失

总结一下Pub/Sub的优缺点:

  • 优点:阻塞式拉取消息,支持订阅发布
  • 缺点:不支持数据持久化,消费者宕机,Redis宕机,消息堆积都会造成消息丢失

还有一点,如果消费者处理消息失败,也无法再次重新消费,因为消息已经从缓冲区里删除了

Pub/Sub有点鸡肋,目前只有哨兵集群和Redis实例通信时,采用了 Pub/Sub 的方案,因为哨兵正好符合即时通讯的业务场景

趋于成熟的队列:Stream

Stream是Redis的一个项目disque集成到Redis里的,通过xadd和xread完成最简单的生产、消费模型

生产者发布消息

127.0.0.1:6379> xadd queue * name zhangsan
"1664712059246-0"
127.0.0.1:6379> xadd queue * name lisi
"1664712066901-0"

[*]代表自动生成唯一消息id,格式是时间戳+自增序号

消费者拉取消息

127.0.0.1:6379> xread streams queue 0-0
1) 1) "queue"2) 1) 1) "1664712059246-0"2) 1) "name"2) "zhangsan"2) 1) "1664712066901-0"2) 1) "name"2) "lisi"

0-0表示拉取所有消息

如果想继续拉取消息,需要传入上一条消息的id

127.0.0.1:6379> xread streams queue 1664712066901-0
(nil)

没有消息会返回null

现在来看一下Stream是怎么解决消息队列的要求的

  • 阻塞式拉取消息

只需要增加block参数即可,这时消费者会阻塞,直到接收到新的消息

127.0.0.1:6379> xread block 0 streams queue 1664712066901-0
1) 1) "queue"2) 1) 1) "1664712542730-0"2) 1) "name"2) "erha"
  • 订阅/发布模式

xgroup:创建消费者组

xreadgroup:在指定消费组下,开启消费者拉取消息

生产者发布两个消息

127.0.0.1:6379> xadd queue * name zhangsan
"1664712939101-0"
127.0.0.1:6379> xadd queue * name lisi
"1664712943974-0"

创建两个消费组

# group1从头拉取消息
127.0.0.1:6379> xgroup create queue group1 0-0
OK
# group2从头拉取消息
127.0.0.1:6379> xgroup create queue group2 0-0
OK

第一个消费者消费第一个消费组

127.0.0.1:6379> xreadgroup group group1 consumer streams queue >
1) 1) "queue"2) 1) 1) "1664712939101-0"2) 1) "name"2) "zhangsan"2) 1) "1664712943974-0"2) 1) "name"2) "lisi"

第二个消费者消费第二个消费组

127.0.0.1:6379> xreadgroup group group2 consumer streams queue >
1) 1) "queue"2) 1) 1) "1664712939101-0"2) 1) "name"2) "zhangsan"2) 1) "1664712943974-0"2) 1) "name"2) "lisi"

  • 消息处理异常时,Stream保证消息不丢失

除了拉取消息时用到了消息ID,这里为了保证重新消费,也要用到这个消息ID,当一个消费者处理完消息后,需要执行xack命令告知Redis,这时Redis就会把这条消息标记为处理完成

如果消费者异常或者宕机,就不会发送xack,那么Redis就会依旧保留这条消息,等到消费者重新上线后,Redis就会把之前没有处理成功的数据,重新发给这个消费者,这样一来,即使消费者异常,也不会丢失数据了

  • Stream数据会写入到RDB和AOF做持久化

Stream是新增加的数据类型,它与其它数据类型一样,会持久化到RDB或者AOF中,这样就算 Redis宕机重启,Stream中的数据也可以从RDB或AOF中恢复回来

  • Stream处理消息堆积

其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:

生产者限流:避免消费者处理不及时,导致持续积压

丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息

而 Redis 在实现 Stream 时,采用了第 2 个方案

在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸

127.0.0.1:6379> xadd queue maxlen 1000 * name zhangsan
"1664715484247-0"

当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息,这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的

既然它的功能这么强大,这是不是意味着,Redis真的可以作为专业的消息队列中间件来使用呢?

不行,就算Redis能做到以上这些,也只是趋近于专业的消息队列

与专业的消息队列对比

一个专业的消息队列,必须要做到两大块:

  • 消息不丢
  • 消息可堆积

我们从一个消息队列的使用模型来分析一下

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者

先从消息不丢分析

消息是否会发生丢失,其重点也就在于以下 3 个环节:

生产者会不会丢消息?

消费者会不会丢消息?

队列中间件会不会丢消息?

  • 生产者会不会丢消息?

当生产者在发布消息时,可能发生以下异常情况:

1.消息没发出去:网络故障或其它问题导致发布失败,中间件直接返回失败

2.不确定是否发布成功:网络问题导致发布超时,可能数据已发送成功,但读取响应结果超时了

如果是情况 1,消息根本没发出去,那么重新发一次就好了

如果是情况 2,生产者没办法知道消息到底有没有发成功?所以,为了避免消息丢失,它也只能继续重试,直到发布成功为止

生产者一般会设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理

也就是说,生产者为了避免消息丢失,只能采用失败重试的方式来处理,这也意味着消息可能会重复发送,在使用消息队列时,要保证消息不丢,宁可重发,也不能丢弃

那消费者这边,就需要多做一些逻辑了,对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性

从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理

所以,无论是Redis还是专业的队列中间件,生产者在这一点上都是可以保证消息不丢的

  • 消费者会不会丢消息

这种情况就是我们前面提到的,消费者拿到消息后,还没处理完成,就异常宕机了或者处理消息异常,那消费者还能否重新消费失败的消息?

要解决这个问题,消费者在处理完消息后,必须告知队列中间件,队列中间件才会把标记已处理,否则仍旧把这些数据发给消费者

这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢

无论是Redis的Stream,还是专业的队列中间件,例如RabbitMQ、Kafka,其实都是这么做的

所以,从这个角度来看,Redis 也是合格的

  • 队列中间件会不会丢消息

Redis 在以下 2 个场景下,都会导致数据丢失

1.AOF持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能

2.主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库)

所以,如果把Redis当做消息队列,在这方面是有可能导致数据丢失的

再来看那些专业的消息队列中间件是如何解决这个问题的

像RabbitMQ或Kafka这类专业的队列中间件,在使用时,一般是部署一个集群,生产者在发布消息时,队列中间件通常会写多个节点,以此保证消息的完整性,这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失也正因为如此,RabbitMQ、Kafka在设计时也更复杂,毕竟,它们是专门针对队列场景设计的,但 Redis 的定位则不同,它的定位更多是当作缓存来用,它们两者在这个方面肯定是存在差异的

  • 消息积压怎么办

因为Redis的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致Redis的内存持续增长,如果超过机器内存上限,就会面临被OOM的风险

但Kafka、RabbitMQ这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加坦然

综上,我们可以看到,把Redis当作队列来使用时,始终面临的2个问题:

1.Redis本身可能会丢数据

2.面对消息积压,Redis内存资源紧张

如果业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把Redis 当作队列是完全可以的

而且,Redis 相比于 Kafka、RabbitMQ,部署和运维也更加轻量

如果业务场景对于数据丢失非常敏感,而且写入量非常大,消息积压时会占用很多的机器资源,那么建议使用专业的消息队列中间件

总结

T-3.2-把Redis当作消息队列合不合适相关推荐

  1. c#进阶(4)—— Redis 用于消息队列的存储

    1.参考的博文 a : http://www.cnblogs.com/lori/archive/2012/04/12/2443708.html -- 主要的实现思路 b:  http://www.cn ...

  2. Redis做消息队列,香吗?

    来自:架构师修行之路 菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的 ...

  3. 【springboot】【redis】springboot+redis实现发布订阅功能,实现redis的消息队列的功能...

    springboot+redis实现发布订阅功能,实现redis的消息队列的功能 参考:https://www.cnblogs.com/cx987514451/p/9529611.html 思考一个问 ...

  4. PHP + Redis 实现消息队列

    Redis做消息队列的好处在于它的轻量级,高并发,延迟敏感,应用场景有 即时数据分析.秒杀计数器.缓存等 Redis做消息队列待解决的问题: 1.消息的可靠性: 没有相应的机制保证消息的消费,当消费者 ...

  5. ​redis实现消息队列

    redis是一个开源的key-value存储系统.与Memcached类似,Redis将大部分数据存储在内存中,支持的数据类型包括:字符串.哈希表.链表.集合.有序集合以及基于这些数据类型的相关操作. ...

  6. 用redis实现消息队列(实时消费+ack机制)【转】

    用redis实现消息队列(实时消费+ack机制) java queue 消息队列 redis 消息队列 首先做简单的引入. MQ主要是用来: 解耦应用. 异步化消息 流量削峰填谷 目前使用的较多的有A ...

  7. 使用Redis 实现消息队列

    一 .为什么要用Redis实现轻量级MQ? MQ的主要作用: 应用解耦 异步化消息 流量削峰填谷 目前使用比较多的是ActiveMQ . RabbitMQ . ZeroMQ . Kafka . Met ...

  8. 【BCVP】实现基于 Redis 的消息队列

    聆听自己的声音 如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯. 话说上回书我们说到了,Redis的使用修改<[BCVP更新]Sta ...

  9. 程序员过关斩将--redis做消息队列,香吗?

    菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的redis做的MQ,很 ...

最新文章

  1. linux终端密码星星,如何在Ubuntu终端中显示密码星号
  2. 海量大数据处理最新面试题-1
  3. 2014年各种编程语言的薪资和市场需求
  4. 使用PowerMock模拟静态方法
  5. 《计算机网络》第二章:物理层(The Physical Layer)
  6. MongoDB,分组,聚合
  7. 从0开始学习 GitHub 系列之「团队合作利器 Branch」
  8. c++ string字符串翻转
  9. Python写一个服务
  10. 第13章 集成学习和随机森林 学习笔记上
  11. 又延伸到socket去了。
  12. 【笔记】mac os命令行编译objective-c
  13. 拉扎维模拟CMOS集成电路设计python建模工程——利用matplotlib绘制NMOS与PMOS转移特性曲线
  14. 计算一个三位数的个十百c语言,“任意输入一个三位数,输出这个三位数的百位、十位和个位,并且计算十位百位个位的和.”c语言程序...
  15. 解决电脑低俗弹窗广告
  16. 各个版本JDK官方下载地址
  17. 数理统计之 置信区间2
  18. 渗透测试之信息收集漏洞库篇
  19. Python之ascii转中文
  20. hololens拍照

热门文章

  1. Android ToastUtil
  2. python excel计算_怎么用python导入excel计算方差
  3. Python Excel操作模块XlsxWriter之写入数组公式worksheet.write_array_formula()
  4. 适合穷人挣钱最快的方法
  5. 时空大数据与智慧城市
  6. 【泛函分析】Thomae function
  7. 画图软件visio安装
  8. Aiml智能标记语言规范(20201216)
  9. py-faster-rcnn 中 shell脚本解读:./experiments/scripts/faster_rcnn_alt_opt.sh
  10. 白鹭安装node_Mac OS X 系统下安装和部署Egret引擎开发环境