Redis之消息队列的实现

消息队列一直是中间件三剑客(Redis、MQ、MySQL)中的重要一环,它能够实现异步、削峰、解耦等功能,特别在一些分布式系统架构中优势发挥的淋漓尽致,目前比较成熟的消息中间件种类很多如RabbitMQ、RocketMQ、ActiveMQ、Kafka等,而我们的缓存利器Redis也有对于消息队列的实现,简单概括为一种模式两种数据类型,一种模式指的是发布订阅模式(pub/sub),两种数据类型指的是List和Streams 。

消息队列的需求

一个消息队列在生产上的使用一般需要满足三个需求,保证消息的有序性、保证消息不会重复消费、保证消息的可靠性。

消息有序性

消费者虽然是异步执行,但如果消费顺序和生产顺序不一致可能造成业务错乱,如存在生产者按顺序生产了三个事件,事件A更新商品库存为10,事件B读取商品库存,事件C更新商品库存为5,按照顺序事件B读取的是事件A的更新结果,如果事件A更新后事件B还未读取商品库存事件C先更新了,那么事件B读取的商品库存就是事件C的更新结果,这样就会造成业务错乱。

消息的重复消费

消费者从消息队列中读取消息时,可能因为网络阻塞导致消息重复发送,这时消费者会收到多条重复的消息,如果没有做消息的重复性校验那么将执行多次重复逻辑,如果是插入数据库那么将存在多条重复记录,影响业务正常流转。

消息的可靠性

消息的可靠性主要指消费者在处理消息时,因为服务器宕机导致消息未处理完毕,而下次重启时因为未处理完毕的消息在宕机前是从消息队列中读取了,所以宕机后不会重新消费,这就导致了未处理完毕消息丢失。

发布订阅模式Pub/Sub

发布订阅模式是一种消息通信模式,生产者(pub)发布消息,消费者(sub)消费消息,结构图如下所示。

消息订阅分两种类型普通订阅以及模式订阅,普通订阅需要指定频道的名字(名字可以是多个),而模式订阅支持频道名字模糊匹配,命令演示如下。

普通订阅,消费者在订阅频道时必须指定频道的名称,不然生产者发布非指定频道的名称消费者无法接收到数据

模式订阅,不需要消费者指定到底是什么频道,可以采用模糊匹配的形式如PSUBSCRIBE pattern*

发布订阅模式Pub/Sub使用简单,但是有个很大的缺陷就是无法持久化消息,消费者下线或者Redis宕机会导致消息丢失,这也导致了很多公司在技术选型上不会考虑RedisPub/Sub发布订阅模式的主要原因。

List实现消息队列

既然无法采用发布订阅模式实现消息队列的需求,那么在redis5.0之前我们可以采用List来做补偿方案。

实现消息有序性

List自身就是有序的,如果List类型入队采用Lpush出队采用Rpop这就是一个完整的队列,可以实现先进先出的效果,流程如下所示。

List用自身的特性保证了有序性,但是有一个问题在出队时消费者需要轮询消息队列,无论消息队列是否有值都需要轮询显然这是对资源的消耗不太合理,所以Redis中可以使用阻塞式读取命令BRPOP当消息队列是空时会阻塞等待,当消息队列有后会重新出队。

消息的重复消费

生产者可以创建唯一性ID来标识这个消息,消费者需要存储已经消费的消息ID列表,每处理一个消息需要对比消息ID在已消费列表中是否存在,如果ID已存在那么该消息不会再处理,这也是幂等性的保证。

消息的可靠性

消息的可靠性保证可以在消费者出队一个消息后将这个消息保存到未完成的队列中,只有当消费者处理完毕才从未完成的队列中移除,其核心思想就是采用BRPOPLPUSH备份。

对于List可以简单满足消息队列的需求,但需要注意的是List处理消息队列肯定不能多个消费者一起消费,不然消息的有序性得不到保证,如果生产者的速度过快,而消费者的速度慢就会造成消息堆积,在Redis内存带来巨大的压力,所以在这种场景下需要考虑多个消费者共同分担压力的问题,这就需要依靠5.0 版本开始提供的Stream解决。

Streams实现消息队列

Stream是Redis5.0新增的数据结构,它提供了消息的持久化以及主备复制功能,可以让任何客户端访问任何时刻的数据,它有一个消息链表可以将所有加入的消息串起来,每个消息都存在一个唯一ID,并且这个ID是递增的。

last_delivered_id:游标,记录的是消息的id,游标到哪里就表示消费到哪里了。

pending_ids :消费者的状态变量,表示已经被消费者读取但是还没被消费者确认(ACK)的事件id。

简单消费

简单消费只针对单一的消费者而言,不需要创建消费组,命令演示如下

## 创建一个消息  *表自动创建id
127.0.0.1:6379> xadd mystream * f1 v1 f2 v2 f3 v3
"1650985928177-0"
## 创建消息时手动指定id id必须比现有的大,不然会报错
127.0.0.1:6379> xadd mystream 1 f4 v4
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item127.0.0.1:6379> xadd mystream 1650985928177-1 f4 v4
"1650985928177-1"## 查找id范围内的消息  -代表最小id  +代表最大id
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1650985928177-0"2) 1) "f1"2) "v1"3) "f2"4) "v2"5) "f3"6) "v3"
2) 1) "1650985928177-1"2) 1) "f4"2) "v4"## 消息个数
127.0.0.1:6379> XLEN mystream
(integer) 2## 根据消息id 删除消息
127.0.0.1:6379> xdel mystream 1650985928177-1
(integer) 1### 从ID是0-0开始消费消息
### XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
### COUNT 表示消费消息的个数,默认全部
### BLOCK 表示阻塞获取的时间,单位毫秒,设置0表示永不过期
### ID 表示消息id,为0表示从0-0的下一个消息开始读取
127.0.0.1:6379> XREAD streams mystream 0
1) 1) "mystream"2) 1) 1) "1650985928177-0"2) 1) "f1"2) "v1"3) "f2"4) "v2"5) "f3"6) "v3"2) 1) "1650985928177-2"2) 1) "f4"2) "v4"### 从阻塞队列的尾部读取
### 最后的ID为$表示使用mystream中最大的id为值读取(不会读取到本身的值,只有比自己大的)
### 在[XADD mystream * key value]没有执行时都会阻塞等待
127.0.0.1:6379> XREAD block 0  streams mystream $1) 1) "mystream"2) 1) 1) "1650986368128-0"2) 1) "f5"2) "v5"
(116.92s)

消费者组

### 创建消费组mygroup,消费组消费的队列是mystream  $表示从最新的开始消费(为0表示从头开始消费)
127.0.0.1:6379> XGROUP create mystream mygroup $
OK### 查看队列信息
127.0.0.1:6379> xinfo stream mystream1) "length"2) (integer) 3 ### 存在的元素个数3) "radix-tree-keys"4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "last-generated-id"8) "1650986368128-0"9) "groups"
10) (integer) 1   #### 一个消费组
11) "first-entry"  ### 第一个消息
12) 1) "1650985928177-0"2) 1) "f1"2) "v1"3) "f2"4) "v2"5) "f3"6) "v3"
13) "last-entry"  ### 最后一个消息
14) 1) "1650986368128-0"2) 1) "f5"2) "v5"### 消费者组mygroup,c1消费者从streams消费最新的消息
### > 号表示从当前消费组的 last_delivered_id 后面开始读
### 消费者组中的一个消费者读取一个消息后,该消息就不会被消费组的其它消费者读取,但是不同的消费组可以重复消费消息
127.0.0.1:6379> XREADGROUP group mygroup c1 count 1 streams mystream >
(nil)### 支持阻塞获取,在XADD后
127.0.0.1:6379> XREADGROUP group mygroup c1 count 1 block 0  streams mystream >
1) 1) "mystream"2) 1) 1) "1650988435423-0"2) 1) "f6"2) "v6"
(43.91s)## 查看消费组信息
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"2) "mygroup" ### 消费组名字3) "consumers"4) (integer) 1 ### 消费组成员15) "pending" 6) (integer) 1  ### 正在处理没有确认的个数ACK7) "last-delivered-id"  ### 游标位置也就是最大的消息ID8) "1650988435423-0"## 查看PENDING数组信息
127.0.0.1:6379> XPENDING mystream mygroup
1) (integer) 1
2) "1650988435423-0"  ### 读取的最小的ID
3) "1650988435423-0"  ### 读取的最大的ID
4) 1) 1) "c1"  ### 消费者2) "1"### 确认消息ACK
127.0.0.1:6379> XACK mystream mygroup 1650988435423-0
(integer) 1127.0.0.1:6379> xinfo groups mystream
1) 1) "name"2) "mygroup"3) "consumers"4) (integer) 15) "pending"  ### 确认后删除pending中的元素6) (integer) 07) "last-delivered-id"8) "1650988435423-0"

总结

Redis消息队列的实现真正能运用到生产环境的是List和Streams,两者区别如下所示

Redis实现消息队列在生产上的运用其实很多人都有争议,认为生产上应该尽量采用专业的消息中间件工具,其实在技术选型的时候应该从业务数据量出发,如果业务数据量不大而且不复杂那么运用Redis做消息中间件完全能解决,但如果业务数据量大这时Redis完全可以抛弃因为Redis本身就是将数据存储到内存,内存资源是非常有限的,这个应该交由专业的消息中间件完成,所以一个技术点存在必定有它的历史原因,只是我们在技术选型的时候从业务角度进行判断即可。

Redis之消息队列的实现相关推荐

  1. Redis做消息队列,香吗?

    来自:架构师修行之路 菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的 ...

  2. 【springboot】【redis】springboot+redis实现发布订阅功能,实现redis的消息队列的功能...

    springboot+redis实现发布订阅功能,实现redis的消息队列的功能 参考:https://www.cnblogs.com/cx987514451/p/9529611.html 思考一个问 ...

  3. PHP + Redis 实现消息队列

    Redis做消息队列的好处在于它的轻量级,高并发,延迟敏感,应用场景有 即时数据分析.秒杀计数器.缓存等 Redis做消息队列待解决的问题: 1.消息的可靠性: 没有相应的机制保证消息的消费,当消费者 ...

  4. ​redis实现消息队列

    redis是一个开源的key-value存储系统.与Memcached类似,Redis将大部分数据存储在内存中,支持的数据类型包括:字符串.哈希表.链表.集合.有序集合以及基于这些数据类型的相关操作. ...

  5. 用redis实现消息队列(实时消费+ack机制)【转】

    用redis实现消息队列(实时消费+ack机制) java queue 消息队列 redis 消息队列 首先做简单的引入. MQ主要是用来: 解耦应用. 异步化消息 流量削峰填谷 目前使用的较多的有A ...

  6. 使用Redis 实现消息队列

    一 .为什么要用Redis实现轻量级MQ? MQ的主要作用: 应用解耦 异步化消息 流量削峰填谷 目前使用比较多的是ActiveMQ . RabbitMQ . ZeroMQ . Kafka . Met ...

  7. 【BCVP】实现基于 Redis 的消息队列

    聆听自己的声音 如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯. 话说上回书我们说到了,Redis的使用修改<[BCVP更新]Sta ...

  8. 程序员过关斩将--redis做消息队列,香吗?

    菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的redis做的MQ,很 ...

  9. c#进阶(4)—— Redis 用于消息队列的存储

    1.参考的博文 a : http://www.cnblogs.com/lori/archive/2012/04/12/2443708.html -- 主要的实现思路 b:  http://www.cn ...

  10. Redis异步消息队列

    一.异步消息队列介绍 个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦.所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列.同时由于使用了消 ...

最新文章

  1. 【前沿技术】严重事故!实习生删除字节跳动所有轻量级机器学习模型
  2. C语言中malloc为字符型指针分配内存引起的缓冲区泄露
  3. 注册不到两年半Github标星39k+,吴恩达、李航老师的作品的笔记和代码实现
  4. activeMQ高并发发送消息异常解决方法
  5. lsof deleted java,lsof__强大的系统监控、诊断命令
  6. c语言中代码中的作用,C语言中#的神奇作用
  7. 建立项目接口文档_一个 SpringBoot 项目该包含哪些?
  8. idea git提交代码步骤
  9. Alsa是Linux高级音频接口(百度文库无下载券抄来的)
  10. java获取本地真实ip
  11. 网络游戏植入营销的成功案例
  12. uml活动图 各个功能的操作流程和分支_uml活动图
  13. oracle 波浪号不识别,键盘波浪号“~”打不出,一直打成±,但安全模式却正常打出...
  14. 【Python 基础教程】Python语言中的数据类型(二)
  15. pycharm使用问题:鼠标光标变成黑(白)色粗方块
  16. 微信文件下载内容如何调整存储位置?
  17. 专科学校查重严格还是不严格?
  18. 分享:金融短信接口应用场景详解
  19. C++ 硬件信息 获取CPU序列号
  20. LaTeX:定义字号命令

热门文章

  1. linux 6.5 xen,centos7或者centos6.5安装xen教程(折腾了几天的心血)
  2. 进程间通信(IPC):管道(Pipe)
  3. linux删除用户名命令,linux删除用户的命令是什么?
  4. 【SQL刷题】Day5----SQL分组查询专项练习
  5. 前端优化——预加载篇
  6. 钟胜辉谈PHP发展的现状和前景
  7. 我想打老板,作为Java后端程序员,他让我开发电商微信小程序
  8. vue-发布评论(星星打分,快捷输入)
  9. OGG-01028:归档漂移
  10. 微信小程序中跳转到其他小程序