redis streams_初步了解Redis Streams以及如何在Java中使用它们
redis streams
自今年年初以来,Redis Streams已进入Redis的unstable
分支,并且第一个客户开始采用Redis Streams API。 因此,这是一个绝佳的时间,可以从客户端的角度看一下Redis Streams提供的功能以及如何使用它们。
免责声明:Redis Streams作为初稿提供,尚未成为稳定版本的一部分。 API可能会更改。
什么是Redis Stream?
Redis流是一种类似于日志/日志的数据结构,它按顺序表示事件日志。 消息(事件)可以附加到流中。 然后可以以独立方式或通过在消费者组中阅读来使用这些消息。 使用者组是一个概念,其中可以将多个使用者(例如应用程序实例)分组为一个使用者组,其流偏移量(读取进度)保留在服务器端。 由于不需要在用户端保留流偏移,因此此概念简化了构建客户端的过程。
流消息由Redis在提交时生成的消息ID和表示为哈希(映射)的正文组成-基本上是一组键和值。
流本身由一个密钥标识,并保存零到许多流消息以及一些元数据,例如消费者组。
Redis Stream API
到目前为止,所有流命令都以X
为前缀。 流允许使用添加,读取,自省和维护命令。 在下一部分中,您将看到最常见的命令:
XADD key * field1 value1 [field2 value2] [fieldN valueN]
:将消息附加(提交)到Redis流。XREAD [BLOCK timeout] [COUNT n] STREAMS key1 [keyN] offset1 [offsetN]
:从Redis流中读取消息。XRANGE key from to [COUNT n]
:扫描(XRANGE key from to [COUNT n]
)Redis流中的消息
此外,在使用使用者组时,还有其他命令在起作用:
XREADGROUP GROUP name consumer [BLOCK timeout] [COUNT n] [NOACK] STREAMS key1 [keyN] offset1 [offsetN]
:在使用者及其组的上下文中从Redis流中读取消息。XACK key group messageId1 [messageId2] [messageIdN]
:在使用者的上下文中读取后确认消息。XPENDING key group [from to COUNT n]
:枚举未决(未确认的消息)。XGROUP
和子命令:用于创建和删除使用者组的API。
注意:为简洁起见,以上命令被截断了。 有关所有可能选项和组合的说明,请参见Redis Streams文档 。
使用Redis流
让我们来看一下如何通过redis-cli
应用我们之前看到的命令来使用Redis Stream。 让我们向新流添加(并最初创建流)消息。
127.0.0.1:6379> XADD my-stream * key value
1527062149743-0
我们正在使用XADD
通过键值元组向流my-stream
添加新消息。 注意*
(星号)? 这是用于控制ID生成的字段。 如果要由服务器生成消息ID(在99.5%的用例中都是如此,除非您是要复制的Redis服务器),请始终在此放置*
。 Redis回复消息ID 1527062149743-0
。
现在,我们的信息流包含一条消息。 让我们用XREAD
阅读它。
127.0.0.1:6379> XREAD COUNT 1 STREAMS my-stream 0
1) 1) "my-stream"2) 1) 1) 1527062149743-02) 1) "key"2) "value"
我们现在已经阅读了该消息,并沿着读取的内容检索了正文。 读取消息会将消息保留在流中。 我们可以使用XRANGE
验证这XRANGE
:
127.0.0.1:6379> XRANGE my-stream - +
1) 1) 1527068644230-02) 1) "key"2) "value"
发出具有相同流偏移量的后续读取将返回相同的消息。 您有不同的选择来避免此行为:
- 在客户端跟踪消息ID
- 阻止读取
- 从信息流中删除消息
- 限制流大小
- 使用消费者群体
让我们仔细看看这些选项。
MessageId追踪
每个读取操作都将返回消息ID和流消息。 如果您只有一个客户端(没有并发读取),则可以在应用程序中保留最新消息ID的引用,并在后续的读取调用中重用该消息ID。 让我们针对我们之前看到的1527068644230-0
的消息ID进行此1527068644230-0
:
127.0.0.1:6379> XADD my-stream * key value
1527069672240-0
127.0.0.1:6379> XREAD COUNT 1 STREAMS my-stream 1527068644230-0
1) 1) "my-stream"2) 1) 1) 1527069672240-02) 1) "key"2) "value"
我们使用1527068644230-0
作为流偏移并接收下一条添加的消息。 这种方法允许恢复读取较旧的(可能已经消耗的消息),但是需要在客户端进行一些协调,以免读取重复的消息。
如果您不想跟踪消息ID,而仅对最新消息感兴趣,则可以使用阻塞读取。
阻止读取
通过XREAD
读取允许以阻塞方式从流读取。 XREAD
与BLPOP
和BRPOP
操作的行为类似,在BLPOP
和BRPOP
操作中,您指定超时,并且如果消息可用或读取超时,则调用将返回。 但是,Stream API允许更多选项。 对于此示例,我们需要两个单独的参与方:生产者和消费者。 如果您从头开始阅读,您将看到使用单个客户端执行的示例。 我们首先从消费者开始,否则产生的消息将到达流中而没有机会通知正在等待的消费者。
消费者
我们正在将XREAD
与BLOCK 10000
一起使用以等待10000毫秒(10秒)。 请注意,我们使用的符号流偏移量$
指向流的开头。
127.0.0.1:6379> XREAD COUNT 1 BLOCK 10000 STREAMS my-stream $
使用者现在被阻止,等待消息到达。
制片人
127.0.0.1:6379> XADD my-stream * key value
1527070630698-0
Redis将消息写入流中。 现在,让我们切换回消费者。
消费者
消息写入我们的流之后,消费者收到一条消息并再次被解除阻止。 您可以开始处理该消息,并可能发出其他读取。
1) 1) "my-stream"2) 1) 1) 1527070630698-02) 1) "key"2) "value"
(1.88s)
使用流偏移量$
发出另一个读取将再次等待到达该流的下一条消息。 但是,使用$
会给我们提供一段时间,在此期间我们不会消耗其他消息。 为了避免这些漏洞,您应该跟踪上一次阅读的消息ID,并将其重新用于下一个XREAD
调用。
还要注意ist并发性。 我们已经看到一个单个消费者的例子。 如果增加消费者数量怎么办?
在这种情况下,例如,如果您有两个使用方发出阻塞读取,那么两个使用方都会收到同一条消息,这又使我们不得不承担协调读取的任务,因此流消息不会被多次处理。
从流中删除消息
可以从流中删除消息,但是不建议这样做。 我们还没有看到XDEL
,但是从名称上可以明显看出我们可以从流中删除消息:
127.0.0.1:6379> XDEL my-stream 1527070789716-0
(integer) 1
该消息现在消失了。 不建议删除,因为操作成本很高:流使用带有宏节点的基数树。 删除是一种安全的操作,但是在与多个使用者一起使用一条消息时,您需要同步访问权限,因为删除不会阻止多次读取消息。
限制流大小
将消息追加到流时,可以指定最大流大小。 发出XADD
命令时,使用MAXLEN
选项会发生这种情况。
127.0.0.1:6379> XADD my-stream MAXLEN 4 * key value
1527071269045-0
消息将添加到流中,并且将尽最大努力将流修剪到大小限制。 这也意味着较旧的消息将被修剪并且不再可读。
消费群体
解决重复消息处理的最后一种方法是利用使用者组。 消费者群体的想法是跟踪确认。 确认允许将消息标记为消费者确认。 XACK
命令返回是否已确认该消息或先前的使用者是否已确认该消息。
要使用消费者组,我们需要首先创建一个消费者组。 请注意,自撰写本文时起,必须先存在一个流,然后才能创建消费者组。 这个问题可能将通过https://github.com/antirez/redis/issues/4824解决。
到目前为止,如果您遵循前面的示例,我们可以重用我们的流my-stream
。
我们正在创建一个名为my-group
的使用者组,它仅对流my-stream
有效。 请注意,最后一个参数是用于跟踪读取进度的流偏移量。 我们使用$
指向流头。
127.0.0.1:6379> XGROUP CREATE my-stream my-group $
OK
现在,向流中添加一条消息:
127.0.0.1:6379> XADD my-stream * key value
1527072009813-0
并通过XREADGROUP
发出非阻塞读取:
127.0.0.1:6379> XREADGROUP GROUP my-group c1 COUNT 1 STREAMS my-stream >
1) 1) "my-stream"2) 1) 1) 1527072009813-02) 1) "key"2) "value"
XREADGROUP
接受组名和使用者名来跟踪阅读进度。 另请注意,流偏移量>
。 此符号流偏移量指向使用者组my-group
读取的最新消息ID。
您可能已经注意到该组中有一个消费者名称。 消费者群体旨在跟踪消息传递并区分消费者。 如果您还记得上面的阻塞阅读示例,您已经看到两个使用者同时收到一条消息。 要更改(或保留)此行为,可以指定使用者名称:
- 具有相同使用者名称的读取可以多次接收相同的消息。
- 具有不同使用者名称的读取将阻止多次接收同一条消息。
根据您使用消息的方式,您可能想重新启动处理或使用多个客户端使用消息,而无需建立自己的同步机制。 Redis流允许您通过确认消息来做到这一点。 默认情况下, XREADGROUP
确认消息,表明该消息已被处理并且可以被逐出。 您可以指定NOACK
在阅读消息时不确认消息。 处理完消息后,确认消息发出XACK
。 根据返回的命令,您可以查看您是确认消息的对象还是其他客户端已经确认的消息。
现在让我们在这里暂停,不要再讨论恢复和更高级的主题。 Redis网站在https://redis.io/topics/streams-intro提供了有关Redis Streams的完整文档。
使用Java消耗Redis流
注意:在编写本文时,唯一支持Redis Streams的Java客户端是Lettuce预览版本5.1.0.M1。
Redis Streams带有新的服务器端API,该API也需要在客户端采用。 让我们使用Java客户端重播以上示例。
首先,我们需要一个客户端实例来准备连接。 我们将使用同步API。 但是,异步和React式API也支持Redis Stream API。
RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();
RedisStreamCommands<String, String> streamCommands = connection.sync();
Lettuce引入了一个新的命令接口RedisStreamCommands
,该接口声明Redis Stream API方法及其各种类型(例如StreamOffset
, Consumer
和命令参数对象)。
我们要做的第一件事是向流中添加新消息:
Map<String, String> body = Collections.singletonMap("key", "value");
String messageId = streamCommands.xadd("my-stream", body);
本示例使用UTF-8编码的字符串表示键和值。 主体本身作为Map
传输,并发出命令XADD my-stream * key value
。
现在,让我们使用与XREAD COUNT 1 STREAMS my-stream 0
相对应的命令从流中读取一条消息:
List<StreamMessage<String, String>> messages = streamCommands.xread(XReadArgs.Builder.count(1), StreamOffset.from("my-stream", "0"));if(messages.size() == 1) { // a message was read} else { // no message was read}
所述xread(…)
方法接受XReadArgs
和StreamOffset
并返回的列表StreamMessage<K, V>
包含消息ID与主体一起对象。 现在可以处理消息了,随后的读取将包括最后一个messageId以读取新消息:
StreamMessage<String, String> message = …;
List<StreamMessage<String, String>> messages = streamCommands.xread(XReadArgs.Builder.count(1), StreamOffset.from("my-stream", message.getId()));if(messages.size() == 1) { // a message was read} else { // no message was read}
阻塞读取需要将额外的持续时间传递到参数对象中。 添加BLOCK
选项会将非阻塞调用(从Redis的角度来看)变成阻塞调用:
List<StreamMessage<String, String>> messages = streamCommands.xread(XReadArgs.Builder.count(1).block(Duration.ofSeconds(10)), StreamOffset.from("my-stream", "0"));
在最后一个示例中,让我们看一下消费者群体。 RedisStreamCommands
提供了用于创建使用者的方法-截至撰写本文时,Redis中尚未实现删除使用者和使用者组的方法。
streamCommands.xadd("my-stream", Collections.singletonMap("key", "value")); // add a message to create the stream data structurestreamCommands.xgroupCreate("my-stream", "my-group", "$"); // add a group pointing to the stream headList<StreamMessage<String, String>> messages = streamCommands.xreadgroup(Consumer.from("my-group", "c1"),StreamOffset.lastConsumed("my-stream"));
使用使用者组my-group
和使用者c1
从my-stream
中读取消息。 使用者组和使用者名称是字节安全编码的,因此在使用ASCII或UTF-8字符串时区分大小写。
结论
这篇博客文章概述了Redis 5附带的Redis Streams的初步外观,以及如何在Lettuce Redis客户端上使用Stream API。 该API尚未完全实现,因此我们应该期待更改。
翻译自: https://www.javacodegeeks.com/2018/05/a-first-look-at-redis-streams-and-how-to-use-them-with-java.html
redis streams
redis streams_初步了解Redis Streams以及如何在Java中使用它们相关推荐
- 初步了解Redis Streams以及如何在Java中使用它们
自今年年初以来,Redis Streams已进入Redis的unstable分支,并且第一个客户端始于采用Redis Streams API. 因此,这是一个绝佳的时机,可以从客户端角度看一下Redi ...
- java redis 缓存_如何在 Java 中实现一个 redis 缓存服务
缓存服务的意义 为什么要使用缓存?说到底是为了提高系统的运行速度.将用户频繁访问的内容存放在离用户最近,访问速度最快的地方,提高用户的响应速度.一个 web 应用的简单结构如下图. web 应用典型架 ...
- 【redis】二、redis数据类型
文章目录 数据存储类型介绍 业务数据的特殊性 作为缓存使用 附加功能 Redis 数据类型(5种常用) Redis 数据存储格式 string string类型数据的基本操作 单数据操作与多数据操作的 ...
- Redis学习记录之Java中的初步使用
[html] view plain copy redis下载地址:<span style="font-family: Arial, Helvetica, sans-serif;&qu ...
- [redis]知识回顾之redis主从+哨兵搭建简要记录
1.先准备环境 家里的万年老爷机(120G的SSD,300G硬盘 ),手机热点下载,所以只能少利用空间资源,用virtualBox装centos7纯净版 其中要点: 1.virtualbox创建新机要 ...
- Redis万字笔记 深入浅出redis
笔记里有大量图片帮助你快速理解Redis 因为上传图片很麻烦这是无图片版的 想要有图片版的压缩包的可以在这里下载:后端+Redis+Redis笔记+加深记忆-Java文档类资源-CSDN文库 也可以关 ...
- c#获取对象的唯一标识_在 Java 中利用 redis 实现分布式全局唯一标识服务
作者: 杨高超 juejin.im/post/5a4984265188252b145b643e 获取全局唯一标识的方法介绍 在一个IT系统中,获取一个对象的唯一标识符是一个普遍的需求.在以前的单体应用 ...
- java实现redis缓存_java实现redis缓存功能
一.安装redis 1.mac安装,如果有安装brew 可以直接快捷安装:brew install redis 2.linux下载安装wget http://download.redis.io/rel ...
- 一文深入了解 Redis 内存模型,Redis 的快是有原因的!
点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 来源:编程迷思 cnblogs.com/kismetv/p/865 ...
最新文章
- unix环境高级编程-进程间通信
- 计算机应用基础课程B,计算机应用基础B卷_百度文库.DOC
- 【译】Easily Build Android APKs on Device in Termux
- 案例代码:springboot+springsecurity+redis设置新登录后踢出前一个登录用户
- Spring Cloud Gateway介绍(二)
- 51.com庞升东:其实我认为自己比较像唐僧
- history模式监听_面试题:VueRouter中的 hash 模式和 history 模式有什么区别
- 孔夫子告诉你:编程到底能教会你什么!
- 编程猫海龟编辑器python_编程猫海龟编辑器
- Mobile game forensics
- YUV格式详解【全】
- 企业财务报表分析【3】
- 机器学习算法(8)之多元线性回归分析理论详解
- 利用多进程/多线程实现多个客户端同时访问同一服务器
- hdl_graph_slam的论文阅读
- Fiddler 抓包工具使用详解
- 漏洞扫描的原理与设计
- 3D模型欣赏:黑色的巴斯泰托女神【3D游戏建模教程】
- 李航统计学习感知机算法实现
- mmdetection报错 TypeError: vars() argument must have __dict__ attribute
热门文章
- 牛客网【每日一题】4月16日题目精讲 逆序对
- [AtCoder Beginner Contest 215] A-G题解
- 【CF1199 D,E, F】Welfare State // Matching vs Independent Set // Rectangle Painting 1
- 欢乐纪中某A组赛【2019.7.10】
- P3365,jzoj3894-改造二叉树【LIS,BST】
- 2021牛客暑期多校训练营5 D-Double Strings(dp+组合数)
- 2021牛客暑期多校训练营3 B-Black and white(思维+最小生成树)
- 转圈游戏(luogu 1965)
- Codeforces1019C
- Hadoop生态hive(六)Hive QL表