Redis Stream

Redis Stream流是 Redis5引入的一种数据结构,它的功能类似于只追加日志。开发者可以用Redis Stream 流实时记录、跟踪事件,Redis流用例示例包括:

  • 事件跟踪 - 追踪用户浏览行为,点击事件
  • 传感器监控 - 记录IOT设备监控数据
  • 事件通知 - 使用redis stream流记录用户通知事件

Redis为每一条流数据生成唯一的ID。开发者可以使用这些ID进行检索。Redis流支持多种策略(以防止流无限增长)和多种消费策略 - 如集群消费、广播消费。

Stream 基本操作

添加 stream

127.0.0.1:6379> XADD temperatures:us-ny:10007 * temp_f 87.2 pressure 29.69 humidity 46
"1670293701825-0"

需要注意:

  • 命令行中的 * 号表示 自动生成stream ID

  • console 控制台输出 ID 数据

  • 如下 ID格式由两部分组成 毫秒级别的时间戳 - 序列号。相同时间戳下序列号自增

    <millisecondsTime>-<sequenceNumber>
    

可以添加指定的ID

127.0.0.1:6379> XADD temperatures:us-ny:10006 1-1 temp_f 87.2 pressure 29.69 humidity 46
"1-1"

添加重复ID的stream信息、或者小于最后一个ID值,会报错

127.0.0.1:6379> XADD temperatures:us-ny:10006 1-1 temp_f 87.2 pressure 29.69 humidity 46
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

读取 stream

redis stream 支持按范围查询数据,只需要指定两个ID,即start和end。返回数据包括ID为start或end的元素,数据闭区间返回。两个特殊ID-和+分别表示可能的最小和最大ID。语法格式包含:

# 读取 mystream 里面所有元素
XRANGE mystream - +# 读取 mystream 指定区间元素
XRANGE mystream 1518951480106 1518951480107# 读取 mystream 里面所有元素中的前2个XRANGE mystream - + COUNT 2

读取信息需要指定redis stream 流的ID,即从哪一个ID开始读取

# 从指定偏移量读取 1 条信息
127.0.0.1:6379> XRANGE temperatures:us-ny:10007 1658354934941-0 + COUNT 1
1) 1) "1669946854066-0"2) 1) "temp_f"2) "87.2"3) "pressure"4) "29.69"5) "humidity"6) "46"

XRANGE顺序读取的命令相反, XREVRANGE命令逆序读取stream中的数据,语法如下

# 注意 end start的 顺序性
XREVRANGE key end start [COUNT count]# 如下命令逆序读取stream中的一个元素
XREVRANGE mystream + - COUNT 1

监听 stream

Redis Stream除了支持按照顺序读取元素外,还支持监听新增加的元素。从使用上有点类似Pub/Sub发布订阅的方式,但是存在差异:

  • Stream 允许多个客户端监听,默认情况下新添加的元素,会被每一个客户端消费
  • 与Pub/Sub数据存储方式不同,Stream 中的数据被追加流中,除非明确删除,否则数据还可以被重新读取
  • Stream 支持消费者组的概念 类似于kafka中的集群消费

语法如下

# [COUNT count] 监听数据条数
# [BLOCK milliseconds] 阻塞读取时间
# [key ...] stream 流的名称
# [id ...]  stream 流ID 区间
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

条数读取

# 从ID为0开始读取 mystream 流中的数据,
XREAD COUNT 2 STREAMS mystream 0
(nil)# 同时读取mystream mystream1两个流中的数据
XREAD COUNT 2 STREAMS mystream mystream1 0 0
(nil)

阻塞读取

# 阻塞100秒 从最后的ID开始 读取最新的数据 注意:只要有新的数据 立即终止读取过程
XREAD block 100000 STREAMS mystream $
(nil)# 阻塞读100秒 从ID为0 开始读取数据
XREAD block 100000 STREAMS mystream 0# 同时读取多个stream数据 只要有一个流有数据,立即返回
XREAD block 100000 STREAMS mystream mystream1 $ $
# block 0 表示未读取数据之前 永不超时
XREAD BLOCK 0 STREAMS mystream $

消费者组监听

设想有三个消费者C1、C2、C3,以及一个包含消息1、2、3、4、5、6、7的流,那么我们想要的是根据下图提供消息:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

为了实现这个目标,Redis 引入消费组的概念,同一个消费者组的消费者共同消费stream中的数据。

创建消费者组

> XGROUP CREATE mystream mygroup $
OK

当stream不存在时,Redis 允许在创建消费者组的同时 创建stream

> XGROUP CREATE newstream mygroup $ MKSTREAM
OK

读取数据

XREADGROUP 命令,使用消费者组从流中返回新条目,或访问给定消费者的待处理条目的历史记录

# consumer 消费者名称 不允许重复
# [COUNT count] 读取数量
# [BLOCK milliseconds] 阻塞时间
# [NOACK] ACK确认机制,noack选项用于允许偶尔消息丢失的场景
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds][NOACK] STREAMS key [key ...] id [id ...]
# > 符号表示 该消费者最后记录的ID值,需要跟之前的 $ 相区别
XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS newstream >

消息确认

如果使用XACK机制对消息进行处理确认,那么它将不在作为历史消息的一部分,进行重新消费

> XACK mystream mygroup 1526569495631-0
(integer) 1
# 1526569495631-0的数据 不会重新消费
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"2) (empty list or set)

故障恢复

当消费者出现故障、宕机时,之前分配给他的消息,需要被重新挂起重新消费。具体步骤如下:

故障数据

使用XPENDING命令 观察失败者列表

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"2) "2"
  • 以上知道挂起消息的起止ID,可以重新读取数据

    XRANGE mystream 1526569498055-0 1526569506935-0
    

重新分配

更改(或获取)消费者组中消息的所有权,将故障消息传递给指定的消费者一样

# 将指定key 中的消息重新分配给 新的消费者
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N># 如 1526569498055-0 消息重新你分配给 alice消费者
XCLAIM mystream mygroup Alice 3600000 1526569498055-0

自动分配

Redis 6.2中添加的XAUTOLAIM命令来自动实现故障数据的分配过程,XAUTOLAIM标识空闲挂起消息,并将其所有权转移给消费者。语法如下:

XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]
# 自动分配一条挂起的数据给消费者
> XAUTOCLAIM mystream mygroup Alice 3600000 0-0 COUNT 1
1) 1526569498055-0
2) 1) 1526569498055-02) 1) "message"2) "orange"

其他功能

Stream 监控

Redis Stream 提供相关的命令来监控 Stream流的运行情况。XINFO命令是一个可观察性接口,可以与子命令一起使用,以获取有关流或消费者组的信息。

  • Stream 信息

    127.0.0.1:6379> xinfo stream newstream1) "length"2) (integer) 03) "radix-tree-keys"4) (integer) 05) "radix-tree-nodes"6) (integer) 17) "last-generated-id"8) "0-0"9) "max-deleted-entry-id"
    10) "0-0"
    11) "entries-added"
    12) (integer) 0
    13) "recorded-first-entry-id"
    14) "0-0"
    15) "groups"
    16) (integer) 1
    17) "first-entry"
    18) (nil)
    19) "last-entry"
    20) (nil)
    
  • 消费者信息

    127.0.0.1:6379> xinfo groups newstream
    1)  1) "name"2) "mygroup"3) "consumers"4) (integer) 15) "pending"6) (integer) 07) "last-delivered-id"8) "0-0"9) "entries-read"10) (nil)11) "lag"12) (integer) 0
    

删除元素

Redis Stream流提供删除命令从流中删除元素,如根据ID删除。

# 删除流中的多个ID 元素
XDEL key id [id ...]

清空Stream

Redis Stream 流与其他Redis数据结构的区别在于,当其他数据结构不再具有任何元素时,作为调用删除元素的命令,键本身将被删除。

但是由于Stream可能存在关联的消费者组,因此不能直接删除stream的键值,更多的是期望清空stream里面的元素值。

# key -> stream 名称
# maxlen -> 最大阈值,即超过阈值删除
# MINID  -> 小于该ID值的将被删除
XTRIM key <MAXLEN | MINID> [= | ~] threshold [LIMIT count]
# stream 元素数量 > 0 将删除,即清空stream
127.0.0.1:6379> xtrim newstream maxlen 0
(integer) 1
127.0.0.1:6379> xadd newstream * hello world
"1671028603049-0"
# 小于 1671028603049-1 的ID元素将被删除
127.0.0.1:6379> xtrim newstream minid 1671028603049-1
(integer) 1
127.0.0.1:6379> xlen newstream
(integer) 0

Redis Stream相关推荐

  1. 使用Redis Stream来做消息队列和在Asp.Net Core中的实现

    Redis - Wikipedia 写在前面 我一直以来使用redis的时候,很多低烈度需求(并发要求不是很高)需要用到消息队列的时候,在项目本身已经使用了Redis的情况下都想直接用Redis来做消 ...

  2. 控制 Redis stream 的消息数量

    控制 Redis stream 的消息数量 Intro Redis Stream 是 Redis 5.0 引入的一个新的类型,之前我们介绍过使用 Redis Stream 来实现消息队列,可以参考之前 ...

  3. 使用 Redis Stream 实现消息队列

    使用 Redis Stream 实现消息队列 Intro Redis 5.0 中增加了 Stream 的支持,利用 Stream 我们可以实现可靠的消息队列,并且支持一个消息被多个消费者所消费,可以很 ...

  4. redis stream学习总结

    文章目录 stream Stream基本概念 消息id 消息内容 增删查改 消息生产 添加消息 xadd 查看消息长度 xlen 限制stream最大长度 1.xadd 中添加**maxlen**: ...

  5. Redis Stream的消费者组介绍

    Stream是Redis 5.0引入的一种新数据类型,它以一种抽象的方式来构建日志结构的数据.本文主要介绍Redis Streams的消费者组相关的信息. 1 什么是消费者组 在某些问题中,我们想要做 ...

  6. redis stream 实现消息队列

    redis stream 实现消息队列 Redis5.0带来了Stream类型.从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现. 基于r ...

  7. Redis Stream 简明使用教程

    Redis Stream 特性是Redis 5.0之后才有的.Redis Stream的主要应用就是时间序列的消息流分发.PUB/SUB也可以做消息流分发,但是PUB/SUB不记录历史消息,而Redi ...

  8. Redis深度历险-Redis Stream

    本文大部分内容引自<Redis深度历险:核心原理和应用实践>,感谢作者!!! Redis Stream Redis5.0多出了新的数据结构Stream,它是一个新的强大的支持多播的可持久化 ...

  9. 基于 Redis Stream 的消息队列

    文章目录 基于 Redis Stream 的消息队列 消息队列相关命令 消费者组相关命令 如何使用Stream消息队列 生产者写入消息 - XADD 消费者读取消息 - XGROUP 创建消费者组 - ...

最新文章

  1. mysql被格式化恢复数据_三种常见数据库文件恢复方法介绍
  2. java和C++的区别
  3. 计算机视觉之一:特征检测
  4. MVCC在MySQL的InnoDB中的实现
  5. Nginx服务的信号控制
  6. php html标签自定义属性,详解H5的自定义属性data-*
  7. 春晚之后的采访和豆瓣投名状
  8. linux小红伞安装黑屏,在linux下安装Avria(小红伞)
  9. Pure Pursuit轨迹跟踪matlab程序
  10. Qt网络编程-TcpClient入门Demo(1)
  11. 代号夏娃在电脑上怎么玩 代号夏娃PC版玩法教程
  12. (扩展)BSGS与高次同余方程
  13. 有了AI,程序猿再也不用担心有Bug了
  14. OAuth2 logout
  15. there is no statement named xxx in this SqlMap
  16. 高通平台开发系列讲解(系统篇)coredump
  17. 系统定制开发,微商来----专业做分销商城
  18. recyclerView横条指示器——仿淘宝菜单模块
  19. 易语言 判断网络是否连接
  20. win2008文件储存服务器,win server 2008 文件服务器

热门文章

  1. 8分频verilog线_七、八分频电路Verilog源代码
  2. 我发现不少培训班的就业辅导老师,简直是面试官的卧底——再论培训班学员的就业方式(java方向)
  3. 电信专家王煜全:手机监管面临三大困境
  4. 如何玩好微信十亿流量?微趋道教你小程序推广最全攻略!
  5. App预览制作,看我就够了
  6. mysql 主从1146_mysql 主从复制1146错误处理办法
  7. stm32跑web服务器和协议栈的区别,STM32与LAN9252构建EtherCAT从站(二):使用SSC生成协议栈和XML文件——丁丁的个人网站...
  8. buu-[Zer0pts2020]Can you guess it?
  9. mac 安装typescript
  10. Oracle数据库有哪些应用结构?