Stream是Redis 5.0引入的一种新数据类型,它以一种抽象的方式来构建日志结构的数据。本文主要介绍Redis Streams的消费者组相关的信息。

1 什么是消费者组

在某些问题中,我们想要做的是从同一流中向许多客户端提供不同的消息子集。一个明显有用的例子是处理缓慢的消息:让N个不同的客户端接收流的不同部分来加快消息的处理。例如:如果有三个消费者A1、A2、A3和一个包含消息1、2、3、4、5、6、7的流,那么我们想要达到的是像下面这样分配消息。

为了实现这一点,Redis使用了一个叫做消费者组的概念。从实现的角度来看,Redis消费者组与Kafka (TM)消费者组没有任何关系,只是在功能上是相似的:允许一组客户端合作消费同一消息流的不同部分。

我们可以将一个消费者组简单理解为一个从流中获取数据的特殊的消费者。它从流中获取数据,然后再服务于多个消费者,同时提供了如下的保证:

1)每条消息都提供给不同的消费者,不会将相同的消息传递给多个消费者。

2)在消费者组中,消费者通过客户端的名称(区分大小写的字符串)进行区分,当断开连接重新连通后,消费者客户端还是提供相同的名字,会被当做同一个消费者。这意味着在消费者组中由客户端提供唯一标识符。

3)每个消费者组都有未被消费的第一个ID的概念,这样当消费者请求新消息时,它可以只提供以前未传递的消息。

4)消费消息需要使用特定命令进行显式确认。Redis将该确认解释为:此消息已正确处理,可以从消费者组中移除。

5)消费者组跟踪所有当前挂起的消息,即已传递给消费者组的某个消费者但尚未确认为已处理的消息。由于此功能,当访问流的消息历史记录时,每个使用者将只看到传递给它的消息。

下面一起来看下消费者组相关的命令。

2 消费者组命令 XGROUP

使用XGROUP可以:

- 创建与流关联的新消费者组。

- 设置要传递的下一条消息。

- 销毁一个消费者组。

- 往消费者组中添加指定的消费者。

- 从消费者组中移除指定的消费者。

2.1 创建消费者组

在创建命令中我们必须指定一个ID,在示例中是$。这是必需的,因为消费者组必须知道在第一个消费者连接时接下来要提供哪条消息。$表示从现在开始到达流中的新消息才会提供给组中的消费者。我们也可以指定一个有效ID,会提供给消费者大于指定ID的消息。

XGROUP CREATE还支持自动创建流,如果流不存在,使用可选的MKSTREAM子命令作为最后一个参数可以自动创建对应的流:

2.2 设置要传递的下一条消息

使用这种命令,可以修改消费者组要获取的下一个ID,而无需再次删除和创建使用者组。

2.3 销毁一个消费者组

可以使用以下形式完全销毁一个消费者组:

即使存在活动的消费者和待处理消息,消费者组也将被销毁,因此请确保仅在真正需要时才调用此命令。

2.4 消费者的添加与移除

3 从流中读取数据 XREADGROUP

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]

在从流中读取之前,让我们将一些消息放入其中:

使用如下命令通过消费者组读取流中的数据

XREADGROUP命令是XREAD命令的特殊版本,支持消费者组。从语法的角度来看,这两个命令几乎是相同的,但是XREADGROUP需要一个特殊和强制的选项:GROUP<group-name><consumer-name>。

3.1 GROUP <group-name> <consumer-name>

group-name是关联到流的消费者组的名称。consumer-name是客户端用于在消费者组内标识自己的字符串。对应消费者不存在时会自动创建,不同的消费者应该选择不同的消费者名称。

使用XREADGROUP时在STREAMS选项中指定的ID可以是以下两种之一:

1)特殊ID>,意味着消费者希望只接收从未发送给任何其他消费者的消息。这意思是说,请给我新的消息。

2)任意其他的ID,即0或任意其他有效ID或不完整的ID(只有毫秒时间部分),将返回发送命令的消费者的待处理条目信息。所以,基本上如果ID不是>,该命令将返回消费者的待处理条目信息(已发送给它,但尚未确认的条目)。

对于第2点的测试结果如下:

我们可以创建多个消费者来消费这个流中的消息。

就像XREAD一样,XREADGROUP命令也可以以阻塞的方式使用。在这方面两者没有区别。具体请参考Redis Stream介绍(一)。

4 消费确认命令 XACK

XACK key group ID [ID …]

当消费者将消息正确处理后,需要调用确认命令来确认消费。只有当调用确认命令后,才会将该消息从待处理的返回中移除。

5 总结

以上就是关于Redis Streams的消费者组相关的介绍和使用命令,对于是否使用消费者组:

1)如果你有一个流和多个客户端,并且你希望所有的客户端都获取到完整的信息,那么你不需要使用消费者组。

2)如果你有一个流和多个客户端,并且你希望在你的客户端上对流进行分区或共享,能获得一个流消息的子集,那么你需要使用消费者组。

Redis流中的消费者组在某些方面可能类似于基于Kafka(TM)分区的消费者组,但是请注意,实际上它们是不同的。Redis流的分区只是概念上的,消息都放在一个Redis键中,不同客户端获取的消息内容取决于谁准备好处理新消息,而不是客户端在处理哪个分区。例如,上述示例中如果消费者 li 在某个时刻永久失败,Redis将继续为zhang和wang提供所有到达的新消息,跟之前相比现在相当于只有两个分区,而Kafka的分区不会自动更改。类似地,如果给定的消费者处理消息的速度比其他消费者快得多,则该消费者将接收更多的消息,而Kafka中消费者只能获取对应分区的消息。

在Redis中,如果你真的想将同一个流中的消息分割成多个Redis实例,你必须使用多个key和一些分片系统,比如Redis Cluster或其他一些特定于应用程序的分片系统。单个Redis流不会自动分区到多个实例。

Redis Stream的消费者组介绍相关推荐

  1. Redis Stream

    Redis Stream Redis Stream流是 Redis5引入的一种数据结构,它的功能类似于只追加日志.开发者可以用Redis Stream 流实时记录.跟踪事件,Redis流用例示例包括: ...

  2. redis 流 stream的使用总结 - 消费者组

    本博客讲述如何使用redis中流stream的组 简言 1. 消费者组(consumer group)允许用户将一个流从逻辑上分成多个不同的流,并让消费者组组下的消费者去处理组中的消息 2. 多个消费 ...

  3. redis stream持久化_Beetlex.Redis之Stream功能详解

    原标题:Beetlex.Redis之Stream功能详解 有一段时间没有写文章,techempower的测试规则评分竟然发生了变化,只能忘着补充一下占比权重最多的数据更新示例了和深入设计一下组件模块化 ...

  4. 使用Redis Stream来做消息队列和在Asp.Net Core中的实现

    Redis - Wikipedia 写在前面 我一直以来使用redis的时候,很多低烈度需求(并发要求不是很高)需要用到消息队列的时候,在项目本身已经使用了Redis的情况下都想直接用Redis来做消 ...

  5. 基于 Redis Stream 的消息队列

    文章目录 基于 Redis Stream 的消息队列 消息队列相关命令 消费者组相关命令 如何使用Stream消息队列 生产者写入消息 - XADD 消费者读取消息 - XGROUP 创建消费者组 - ...

  6. redis stream学习总结

    文章目录 stream Stream基本概念 消息id 消息内容 增删查改 消息生产 添加消息 xadd 查看消息长度 xlen 限制stream最大长度 1.xadd 中添加**maxlen**: ...

  7. redis stream 实现消息队列

    redis stream 实现消息队列 Redis5.0带来了Stream类型.从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现. 基于r ...

  8. Redis Stream 简明使用教程

    Redis Stream 特性是Redis 5.0之后才有的.Redis Stream的主要应用就是时间序列的消息流分发.PUB/SUB也可以做消息流分发,但是PUB/SUB不记录历史消息,而Redi ...

  9. Redis深度历险-Redis Stream

    本文大部分内容引自<Redis深度历险:核心原理和应用实践>,感谢作者!!! Redis Stream Redis5.0多出了新的数据结构Stream,它是一个新的强大的支持多播的可持久化 ...

最新文章

  1. java开发webservice的几种方式
  2. CENTOS MINI版安装tomcat9.0
  3. redmine1.3.x 插件安装
  4. Nginx 之防盗链配置
  5. java 反编译class文件_用Java实现JVM第三章《解析class文件》
  6. python的zipfile压缩文件夹_python zipfile压缩使用说明
  7. HTML data-* 属性
  8. JS调试设置断点却无法中断的解决
  9. 基于数组的一个简单增删改查
  10. 高程拟合MATLAB二次曲面,GPS高程二次曲面拟合及其程序
  11. 阿里图标库iconfont下载和在旧有的iconfont中添加新的图标
  12. 常用EDA软件的license文件结构分析
  13. 百度与谷歌地图坐标转换
  14. python爬虫岗位招聘_Python爬虫系列2-抓取拉钩网2020年最新互联网岗位招聘信息
  15. AQS——CLH队列维护方法详解
  16. Android字符设备驱动开发基于高通msm8916【原创 】
  17. ABclonal再添一员“蛋白~DNA互作研究”大将—CUTTag
  18. Cesium|xt3d视频融合
  19. 103000大写加零吗_103000怎样大写不写零
  20. 计算机usb口设置方法,如何控制电脑USB接口 常见的屏蔽电脑USB接口方法

热门文章

  1. 原生 JS 实现飘雪效果
  2. Git上传文件时报错:The authenticity of host xxx can‘t be established.
  3. DeepChem教程4:分子指纹
  4. 生产力、生产工具和生产关系
  5. 共享打印机提示“0x00000709”错误的解决方法
  6. Android Q版本应用兼容性适配指导
  7. 2015年12月英语总结
  8. [POI2014]DOO-Around the world
  9. Arduino 项目笔记 | 基于 Arduino 单片机的 A4988 和 L298N电机驱动模块实验记录
  10. 常用的国际物流运输方式有哪些