Redis Stream
Redis Stream
Redis Stream流是 Redis5引入的一种数据结构,它的功能类似于只追加日志。开发者可以用Redis Stream 流实时记录、跟踪事件,Redis流用例示例包括:
- 事件跟踪 - 追踪用户浏览行为,点击事件
- 传感器监控 - 记录IOT设备监控数据
- 事件通知 - 使用redis stream流记录用户通知事件
Redis为每一条流数据生成唯一的ID。开发者可以使用这些ID进行检索。Redis流支持多种策略(以防止流无限增长)和多种消费策略 - 如集群消费、广播消费。
Stream 基本操作
添加 stream
127.0.0.1:6379> XADD temperatures:us-ny:10007 * temp_f 87.2 pressure 29.69 humidity 46
"1670293701825-0"
需要注意:
命令行中的 * 号表示 自动生成stream ID
console 控制台输出 ID 数据
如下 ID格式由两部分组成 毫秒级别的时间戳 - 序列号。相同时间戳下序列号自增
<millisecondsTime>-<sequenceNumber>
可以添加指定的ID
127.0.0.1:6379> XADD temperatures:us-ny:10006 1-1 temp_f 87.2 pressure 29.69 humidity 46
"1-1"
添加重复ID的stream信息、或者小于最后一个ID值,会报错
127.0.0.1:6379> XADD temperatures:us-ny:10006 1-1 temp_f 87.2 pressure 29.69 humidity 46
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
读取 stream
redis stream 支持按范围查询数据,只需要指定两个ID,即start和end。返回数据包括ID为start或end的元素,数据闭区间返回。两个特殊ID-和+分别表示可能的最小和最大ID。语法格式包含:
# 读取 mystream 里面所有元素
XRANGE mystream - +# 读取 mystream 指定区间元素
XRANGE mystream 1518951480106 1518951480107# 读取 mystream 里面所有元素中的前2个XRANGE mystream - + COUNT 2
读取信息需要指定redis stream 流的ID,即从哪一个ID开始读取
# 从指定偏移量读取 1 条信息
127.0.0.1:6379> XRANGE temperatures:us-ny:10007 1658354934941-0 + COUNT 1
1) 1) "1669946854066-0"2) 1) "temp_f"2) "87.2"3) "pressure"4) "29.69"5) "humidity"6) "46"
与XRANGE顺序读取的命令相反, XREVRANGE命令逆序读取stream中的数据,语法如下
# 注意 end start的 顺序性
XREVRANGE key end start [COUNT count]# 如下命令逆序读取stream中的一个元素
XREVRANGE mystream + - COUNT 1
监听 stream
Redis Stream除了支持按照顺序读取元素外,还支持监听新增加的元素。从使用上有点类似Pub/Sub发布订阅的方式,但是存在差异:
- Stream 允许多个客户端监听,默认情况下新添加的元素,会被每一个客户端消费
- 与Pub/Sub数据存储方式不同,Stream 中的数据被追加流中,除非明确删除,否则数据还可以被重新读取
- Stream 支持消费者组的概念 类似于kafka中的集群消费
语法如下
# [COUNT count] 监听数据条数
# [BLOCK milliseconds] 阻塞读取时间
# [key ...] stream 流的名称
# [id ...] stream 流ID 区间
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
条数读取
# 从ID为0开始读取 mystream 流中的数据,
XREAD COUNT 2 STREAMS mystream 0
(nil)# 同时读取mystream mystream1两个流中的数据
XREAD COUNT 2 STREAMS mystream mystream1 0 0
(nil)
阻塞读取
# 阻塞100秒 从最后的ID开始 读取最新的数据 注意:只要有新的数据 立即终止读取过程
XREAD block 100000 STREAMS mystream $
(nil)# 阻塞读100秒 从ID为0 开始读取数据
XREAD block 100000 STREAMS mystream 0# 同时读取多个stream数据 只要有一个流有数据,立即返回
XREAD block 100000 STREAMS mystream mystream1 $ $
# block 0 表示未读取数据之前 永不超时
XREAD BLOCK 0 STREAMS mystream $
消费者组监听
设想有三个消费者C1、C2、C3,以及一个包含消息1、2、3、4、5、6、7的流,那么我们想要的是根据下图提供消息:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
为了实现这个目标,Redis 引入消费组的概念,同一个消费者组的消费者共同消费stream中的数据。
创建消费者组
> XGROUP CREATE mystream mygroup $
OK
当stream不存在时,Redis 允许在创建消费者组的同时 创建stream
> XGROUP CREATE newstream mygroup $ MKSTREAM
OK
读取数据
XREADGROUP 命令,使用消费者组从流中返回新条目,或访问给定消费者的待处理条目的历史记录
# consumer 消费者名称 不允许重复
# [COUNT count] 读取数量
# [BLOCK milliseconds] 阻塞时间
# [NOACK] ACK确认机制,noack选项用于允许偶尔消息丢失的场景
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds][NOACK] STREAMS key [key ...] id [id ...]
# > 符号表示 该消费者最后记录的ID值,需要跟之前的 $ 相区别
XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS newstream >
消息确认
如果使用XACK机制对消息进行处理确认,那么它将不在作为历史消息的一部分,进行重新消费
> XACK mystream mygroup 1526569495631-0
(integer) 1
# 1526569495631-0的数据 不会重新消费
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"2) (empty list or set)
故障恢复
当消费者出现故障、宕机时,之前分配给他的消息,需要被重新挂起重新消费。具体步骤如下:
故障数据
使用XPENDING命令 观察失败者列表
> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"2) "2"
以上知道挂起消息的起止ID,可以重新读取数据
XRANGE mystream 1526569498055-0 1526569506935-0
重新分配
更改(或获取)消费者组中消息的所有权,将故障消息传递给指定的消费者一样
# 将指定key 中的消息重新分配给 新的消费者
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N># 如 1526569498055-0 消息重新你分配给 alice消费者
XCLAIM mystream mygroup Alice 3600000 1526569498055-0
自动分配
Redis 6.2中添加的XAUTOLAIM命令来自动实现故障数据的分配过程,XAUTOLAIM标识空闲挂起消息,并将其所有权转移给消费者。语法如下:
XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]
# 自动分配一条挂起的数据给消费者
> XAUTOCLAIM mystream mygroup Alice 3600000 0-0 COUNT 1
1) 1526569498055-0
2) 1) 1526569498055-02) 1) "message"2) "orange"
其他功能
Stream 监控
Redis Stream 提供相关的命令来监控 Stream流的运行情况。XINFO命令是一个可观察性接口,可以与子命令一起使用,以获取有关流或消费者组的信息。
Stream 信息
127.0.0.1:6379> xinfo stream newstream1) "length"2) (integer) 03) "radix-tree-keys"4) (integer) 05) "radix-tree-nodes"6) (integer) 17) "last-generated-id"8) "0-0"9) "max-deleted-entry-id" 10) "0-0" 11) "entries-added" 12) (integer) 0 13) "recorded-first-entry-id" 14) "0-0" 15) "groups" 16) (integer) 1 17) "first-entry" 18) (nil) 19) "last-entry" 20) (nil)
消费者信息
127.0.0.1:6379> xinfo groups newstream 1) 1) "name"2) "mygroup"3) "consumers"4) (integer) 15) "pending"6) (integer) 07) "last-delivered-id"8) "0-0"9) "entries-read"10) (nil)11) "lag"12) (integer) 0
删除元素
Redis Stream流提供删除命令从流中删除元素,如根据ID删除。
# 删除流中的多个ID 元素
XDEL key id [id ...]
清空Stream
Redis Stream 流与其他Redis数据结构的区别在于,当其他数据结构不再具有任何元素时,作为调用删除元素的命令,键本身将被删除。
但是由于Stream可能存在关联的消费者组,因此不能直接删除stream的键值,更多的是期望清空stream里面的元素值。
# key -> stream 名称
# maxlen -> 最大阈值,即超过阈值删除
# MINID -> 小于该ID值的将被删除
XTRIM key <MAXLEN | MINID> [= | ~] threshold [LIMIT count]
# stream 元素数量 > 0 将删除,即清空stream
127.0.0.1:6379> xtrim newstream maxlen 0
(integer) 1
127.0.0.1:6379> xadd newstream * hello world
"1671028603049-0"
# 小于 1671028603049-1 的ID元素将被删除
127.0.0.1:6379> xtrim newstream minid 1671028603049-1
(integer) 1
127.0.0.1:6379> xlen newstream
(integer) 0
Redis Stream相关推荐
- 使用Redis Stream来做消息队列和在Asp.Net Core中的实现
Redis - Wikipedia 写在前面 我一直以来使用redis的时候,很多低烈度需求(并发要求不是很高)需要用到消息队列的时候,在项目本身已经使用了Redis的情况下都想直接用Redis来做消 ...
- 控制 Redis stream 的消息数量
控制 Redis stream 的消息数量 Intro Redis Stream 是 Redis 5.0 引入的一个新的类型,之前我们介绍过使用 Redis Stream 来实现消息队列,可以参考之前 ...
- 使用 Redis Stream 实现消息队列
使用 Redis Stream 实现消息队列 Intro Redis 5.0 中增加了 Stream 的支持,利用 Stream 我们可以实现可靠的消息队列,并且支持一个消息被多个消费者所消费,可以很 ...
- redis stream学习总结
文章目录 stream Stream基本概念 消息id 消息内容 增删查改 消息生产 添加消息 xadd 查看消息长度 xlen 限制stream最大长度 1.xadd 中添加**maxlen**: ...
- Redis Stream的消费者组介绍
Stream是Redis 5.0引入的一种新数据类型,它以一种抽象的方式来构建日志结构的数据.本文主要介绍Redis Streams的消费者组相关的信息. 1 什么是消费者组 在某些问题中,我们想要做 ...
- redis stream 实现消息队列
redis stream 实现消息队列 Redis5.0带来了Stream类型.从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现. 基于r ...
- Redis Stream 简明使用教程
Redis Stream 特性是Redis 5.0之后才有的.Redis Stream的主要应用就是时间序列的消息流分发.PUB/SUB也可以做消息流分发,但是PUB/SUB不记录历史消息,而Redi ...
- Redis深度历险-Redis Stream
本文大部分内容引自<Redis深度历险:核心原理和应用实践>,感谢作者!!! Redis Stream Redis5.0多出了新的数据结构Stream,它是一个新的强大的支持多播的可持久化 ...
- 基于 Redis Stream 的消息队列
文章目录 基于 Redis Stream 的消息队列 消息队列相关命令 消费者组相关命令 如何使用Stream消息队列 生产者写入消息 - XADD 消费者读取消息 - XGROUP 创建消费者组 - ...
最新文章
- mysql被格式化恢复数据_三种常见数据库文件恢复方法介绍
- java和C++的区别
- 计算机视觉之一:特征检测
- MVCC在MySQL的InnoDB中的实现
- Nginx服务的信号控制
- php html标签自定义属性,详解H5的自定义属性data-*
- 春晚之后的采访和豆瓣投名状
- linux小红伞安装黑屏,在linux下安装Avria(小红伞)
- Pure Pursuit轨迹跟踪matlab程序
- Qt网络编程-TcpClient入门Demo(1)
- 代号夏娃在电脑上怎么玩 代号夏娃PC版玩法教程
- (扩展)BSGS与高次同余方程
- 有了AI,程序猿再也不用担心有Bug了
- OAuth2 logout
- there is no statement named xxx in this SqlMap
- 高通平台开发系列讲解(系统篇)coredump
- 系统定制开发,微商来----专业做分销商城
- recyclerView横条指示器——仿淘宝菜单模块
- 易语言 判断网络是否连接
- win2008文件储存服务器,win server 2008 文件服务器
热门文章
- 8分频verilog线_七、八分频电路Verilog源代码
- 我发现不少培训班的就业辅导老师,简直是面试官的卧底——再论培训班学员的就业方式(java方向)
- 电信专家王煜全:手机监管面临三大困境
- 如何玩好微信十亿流量?微趋道教你小程序推广最全攻略!
- App预览制作,看我就够了
- mysql 主从1146_mysql 主从复制1146错误处理办法
- stm32跑web服务器和协议栈的区别,STM32与LAN9252构建EtherCAT从站(二):使用SSC生成协议栈和XML文件——丁丁的个人网站...
- buu-[Zer0pts2020]Can you guess it?
- mac 安装typescript
- Oracle数据库有哪些应用结构?