Redis Stream 特性是Redis 5.0之后才有的。Redis Stream的主要应用就是时间序列的消息流分发。PUB/SUB也可以做消息流分发,但是PUB/SUB不记录历史消息,而Redis Stream可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis stream也是有长度大小限制的,超过设置的最大长度,最旧的消息会被丢失

用法

向队列添加消息,同时也创建一个队列

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0
  • mystream是队列的名字,如果没有会创建
  • * 是消息id,使用*表示由redis生成(推荐),可以自定义,但是要自己保证递增性。
  • sensor-id 1234 temperature 19.8 是一条记录
  • 1518951480106-0,返回的消息ID,是毫秒+一个从0开始的数字

使用XADD创建一个stream的时候,可以设置一个最大长度MAXLEN,由于Redis Sream的内部实现,精确的设置一个长度上限消耗比较大,所以XADD提供了一个模糊设置方式:

> XADD mystream MAXLEN ~ 1000 * sensor-id 1234 temperature 19.8

意思是长度可以超过1000一些,由Redis自己觉得什么时候截断,但是截断以后一定不要小于1000。

下边是对于Stream的一些基本操作:
获取消息队列长度:

> XLEN mystream
(integer) 2

通过ID范围获取消息:

> XRANGE mystream - +
1) 1) 1518951480106-02) 1) "sensor-id"2) "1234"3) "temperature"4) "19.8"
  • - + 表示获取所有, -表示最小ID, +表示最大ID
  • 如果使用的是redis默认的ID,就可以通过时间范围来查询消息了,这就是为什么推荐使用默* 认ID的原因
  • 这个命令之后还可以跟一个count参数,如果ID范围间隔过大,可以使用count做分页
  • XREVRANGE命令是对应的逆序查找

通过XREAD获取消息:

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"2) 1) 1) 1519073278252-02) 1) "foo"2) "value_1"2) 1) 1519073279157-02) 1) "foo"2) "value_2"
  • count 参数表示需要获取多少个,还是分页用,如果用0,表示任意多
  • STREAMS后边可以跟多个队列,也就是说一个客户端可以同时监听多个消息队列
  • 最后的0表示从那个ID开始读,0表示从头开始读,如果分页,就指定上次读取的最后一个消息ID
  • 这种读取方式比较开放,所有客户端都可以读任意时刻的消息。

阻塞的读消息:

> XREAD BLOCK 0 STREAMS mystream $
  • BLOCK后边的数字表示timeout,0的意思是不超时
  • $表示等待新消息,如果指定当前时间之前的ID,则返回已有消息而不会阻塞。所以在使用*BLOCK的时候,这个值总是$
  • 如果有多个客户端等待同一个队列,当队列添加一个新消息的时候,所有客户端都会收到这个消息,策略为FIFO。

上述的基本操作都比较自由,可以根据实际需要自己灵活组合使用,但是Redis Stream最重要的也是最常用的一种使用模式是Consumer Group。

使用Consumer groups消费时间序列

在使用consumer group之前应该先了解一些概念:

  • stream是一个序列,使用XADD向其中添加的消息不会被自动删除(在stream长度限制之内),所有消息始终在那里。
  • consumer group是服务端的一个结构,它维护了属于这个group的consumer的状态,即这个consumer处理了那些消息,正在处理哪些消息。并保证同一个消息不会分发给多个consumer。
  • consumer通过唯一标识认定,即使换了客户端,只要ID相同,group仍然认为是同一个consumer。

创建一个Group:

> XGROUP CREATE mystream mygroup $
OK
  • stream必须已经存在
  • $意思仍旧是从新消息开始分发,注意因为group是在服务端的结构,所以这里用的词是分发,而不是接收。

向队列添加新消息:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

客户端读取消息:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"2) 1) 1) 1526569495631-02) 1) "message"2) "apple"
  • 这个客户端将自己标识为Alice
  • > 的意思是,只把没有分发给别人的消息发给我,这也是最常用的方式。
  • 如果使用其他符号,意思是获取自己的pending状态的消息,也就是没有ACK的消息,如果没有pending状态的消息,返回空列表。这个功能很重要,一个consumer启动后,应该先读自己的pending消息(因为之前可能Crash过),如果没有pending消息了才开始处理新消息。
  • 可以同时读多个stream,但是这些stream都需要有相同名字的group

XREADGROUP是一个写操作,只能在master节点上执行

读自己的pending消息:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0

确认消息:

> XACK mystream mygroup 1526569495631-0
(integer) 1
  • XACK表示这个消息处理完了
  • XACK这个命令并不需要指定消费者的名字,也就是说可以强行结束他人正处理的任务,但是不要这样做。
  • 这个命令可以重复执行,没有副作用。第二次以后执行返回0

错误处理

假如一个consumer有pending的消息,然后crash了,然后再也不启动了,怎么办呢?Redis Stream提供了查询消息处理状态的命令XPENDING和转移消息所有者的命令XCLAIM

使用一个或者多个客户端,获取消息处于pending状态,指定group返回一个Summary:

> XPENDING mystream mygroup
1) (integer) 5
2) "1564401080073-0"
3) "1564401244891-0"
4) 1) 1) "Alice"2) "1"2) 1) "Bob"2) "2"3) 1) "Lily"2) "2"
  • 5表示个数
  • 然后是最小ID和最大ID(中间的消息可以不都是pending状态的)
  • 接下来显示每个consumer都有几个pending的消息

然后使用XPENDING命令获取每个Pending消息的信息

> XPENDING mystream mygroup 1564401080073-0 1564401244891-0 10
1) 1) "1564401080073-0"2) "Alice"3) (integer) 18185024) (integer) 4
2) 1) "1564401116987-0"2) "Bob"3) (integer) 5903284) (integer) 1
3) 1) "1564401230270-0"2) "Bob"3) (integer) 5903284) (integer) 1
4) 1) "1564401237686-0"2) "Lily"3) (integer) 4162164) (integer) 1
5) 1) "1564401244891-0"2) "Lily"3) (integer) 4162164) (integer) 1

使用范围和个数查询可以看到pending状态的消息在谁那里,停留了多少毫秒。对于停留时间过长的消息,可以使用XCLAIM将消息交给其他consumer处理。

> XCLAIM mystream mygroup Alice 50000 1564401116987-0
  • 这句的意思是将1564401116987-0这个任务交由Alice处理
  • 50000表示的是最小闲置时间,为什么要设置这个呢,这个任务闲置了多长时间不是在XPEDING命令中得到了么?因为可能有多个客户端发现了这个闲置的任务,都要这个任务的处理权,而在XCLAIM命令中,并没有说从谁那里拿来任务,所以可能造成一个客户端刚刚拿到任务,就被另外一个客户端拿走了。指定了一个最小时间,紧接着的第二个CLAIM命令就无法拿到任务了。

如果一个任务反复失败怎么办?

使用XPENDING命令可以查看一个任务被调度的次数,使用XREADGROUP和CLAIM都会增加一个任务被调度的次数。当这个次数达到一个设定的上限的时候,最好的处理方式是将这个任务放到另外一个Stream中,并通知任务失败。

这种任务也叫死信

Redis Stream中的消息和Group状态都会从主节点复制到从节点,并保持到AOF和RDB文件中。因此重启Redis会保存Stream中的状态。但是主节点崩溃仍可能有一些数据没有同步过去,如果应用程序对数据要求较高,可以使用WAIT命令等待数据主从同步完成。即使这样,在主库Crash的时候,还是有可能会丢失数据。

使用XINFO观察Stream

XINFO命令有很多子命令,比如查看Stream本身的状态使用XINFO STREAM

> XINFO STREAM mystream1) length2) (integer) 133) radix-tree-keys4) (integer) 15) radix-tree-nodes6) (integer) 27) groups8) (integer) 29) first-entry
10) 1) 1524494395530-02) 1) "a"2) "1"3) "b"4) "2"
11) last-entry
12) 1) 1526569544280-02) 1) "message"2) "banana"

使用XINFO GROUPS查看组信息

> XINFO GROUPS mystream
1) 1) name2) "mygroup"3) consumers4) (integer) 25) pending6) (integer) 2
2) 1) name2) "some-other-group"3) consumers4) (integer) 15) pending6) (integer) 0

查看某一个Group中的Consumer:

> XINFO CONSUMERS mystream mygroup
1) 1) name2) "Alice"3) pending4) (integer) 15) idle6) (integer) 9104628
2) 1) name2) "Bob"3) pending4) (integer) 15) idle6) (integer) 83841983

参考

Introduction to Redis Streams

Redis Stream 简明使用教程相关推荐

  1. Android简明开发教程二十一:访问Internet 绘制在线地图

    在例子Android简明开发教程十七:Dialog 显示图像 中我们留了一个例子DrawMap()没有实现,这个例子显示在线地图,目前大部分地图服务器都是将地图以图片存储以提高响应速度. 一般大小为2 ...

  2. OsharpNS轻量级.net core快速开发框架简明入门教程-代码生成器的使用

    OsharpNS轻量级.net core快速开发框架简明入门教程 教程目录 从零开始启动Osharp 1.1. 使用OsharpNS项目模板创建项目 1.2. 配置数据库连接串并启动项目 1.3. O ...

  3. OsharpNS轻量级.net core快速开发框架简明入门教程-基于Osharp实现自己的业务功能...

    OsharpNS轻量级.net core快速开发框架简明入门教程 教程目录 从零开始启动Osharp 1.1. 使用OsharpNS项目模板创建项目 1.2. 配置数据库连接串并启动项目 1.3. O ...

  4. Visual SourceSafe简明培训教程

      名称 Visual SourceSafe简明培训教程 (Visual SourceSafe Training Short Course) 作者 晨光(Morning) 简介 对于采用Visual ...

  5. 简明docker教程

    简明docker教程 一.什么是docker 二.docker与虚拟机比较 三.安装docker 四.基本概念 1.镜像 2.容器 3.数据卷 4.挂载 五.参考资料 有收获的话请加颗小星星,没有收获 ...

  6. Redis Lua脚本中学教程(下)

    在中学教程的上半部分我们介绍了Redis Lua相关的命令,没有看过或者忘记的同学可以步行前往直接使用机票Redis Lua脚本中学教程(上).今天我们来简单学习一下Lua的语法. 在介绍Lua语法之 ...

  7. python简单入门_Python简明入门教程

    本文实例讲述了Python简明入门教程.分享给大家供大家参考.具体如下: 一.基本概念 1.数 在Python中有4种类型的数--整数.长整数.浮点数和复数. (1)2是一个整数的例子. (2)长整数 ...

  8. 简明python教程在线-简明python教程

    广告关闭 2017年12月,云+社区对外发布,从最开始的技术博客到现在拥有多个社区产品.未来,我们一起乘风破浪,创造无限可能. usrbinpython#hello_world.pyprinthell ...

  9. 简明python教程购买-自学Python买什么书?

    简单地总结一下: <父与子的编程之旅> /> 如果是零基础,不懂编程,甚至计算机基础都比较薄弱的.推荐一本叫做<父与子的编程之旅>,老版本叫<与孩子一起学编程> ...

最新文章

  1. Java项目:资源下载工具(java+swing)
  2. 进程通信学习笔记(Posix消息队列)
  3. python 连续比较_For循环比较python中以前的值
  4. Dapper源码学习和源码修改
  5. javafx窗体程序_JavaFX实际应用程序:AISO HRC-Matic
  6. 给定数字的b+树创建_在C ++中找到给定数字中的两个的下一个和上一个幂
  7. Flink的ConGroup算子介绍
  8. 利用JAVA计算TFIDF和Cosine相似度-学习版本
  9. 自己整理实现的python小工具
  10. php是日元吗,PHP to JPY
  11. Julia: eval的一些用法
  12. suse linux11安装 dhcp,Suse Linux DHCP的设定过程
  13. 【转】如何向App Sotre提交应用
  14. 深漂一年,一位程序员的2016年终告白
  15. Zynga 在韩国发布游戏《Harry Potter: Puzzles Spells》
  16. 中科院毕业去向(硕士+博士)
  17. 计算机如何安装cpu风扇,新手装电脑入门二:手把手学习如何安装CPU及散热风扇...
  18. 软考学习:吐血整理——自学软考的终极干货
  19. Linux之磁盘配额
  20. 微软官方的 Power Apps 介绍和视频 来自于youtub 网站

热门文章

  1. Python爬取煎蛋网的妹子图
  2. 不使用foreach遍历一个Dictionary
  3. 【机器学习中的矩阵求导】(七)矩阵向量化复习
  4. 龙珠激斗服务器一直维护,龙珠激斗初始角色哪个好 六大新手角色评析
  5. java毕业生设计法律知识分享平台计算机源码+系统+mysql+调试部署+lw
  6. 佳能ir1133硒鼓替换方案
  7. 程序开发中,\t \r \n \f 什么意思
  8. js 中split分割字符串组成数组
  9. Layui回调函数改变不同数据的颜色
  10. 【锁】Redis锁 处理并发 原子性