控制 Redis stream 的消息数量

Intro

Redis Stream 是 Redis 5.0 引入的一个新的类型,之前我们介绍过使用 Redis Stream 来实现消息队列,可以参考之前的文章 使用 Redis Stream 实现消息队列,而 Stream 的消息会持久化地内存中,如果我们不控制消息数量的话,可能会出现大量的消息存在内存里导致过大的内存占用,Redis Stream 5.0 开始支持根据 Max Length 来控制 Stream 的长度(消息数量),从 6.2 开始支持根据消息 Id 来控制 Stream 的长度,默认地消息 Id 是一个时间戳,所以使用默认地 Id 也可以理解为按时间来控制 Stream 长度,下面我们来看使用示例吧

Redis 语法

控制 Stream 消息长度有两个 Redis 命令,一个是 XTRIM 只做 Trim 操作,把不满足要求的消息去除,另外一个是 XADD 在添加 Stream 消息的同时做 Trim 操作,简化还要多一步 Trim 的操作,将添加消息和控制消息长度可以合并为一个操作

XTRIM 语法:

XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]

使用示例:

XTRIM mystream MAXLEN 1000
XTRIM mystream MINID 649085820XTRIM mystream MAXLEN ~ 1000(Nearly trim,不准确,可能有些消息该删掉的会保留下来,但是执行效率会比 `=`(Exactly Trim) 高一些)
XTRIM mystream MINID = 649085820

Trimming the stream can be done using one of these strategies:

  • MAXLEN: Evicts entries as long as the stream's length exceeds the specified threshold, where threshold is a positive integer.

  • MINID: Evicts entries with IDs lower than threshold, where threshold is a stream ID.

XADD 语法:

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

使用示例:

redis> XADD mystream * name Sara surname OConnor
"1631546114612-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1631546114612-1"redis> XADD mystream MAXLEN ~ 1000 field1 value1
redis> XADD mystream MINID ~ 1631546460687 field1 value1

XADD 允许用户在向 Stream 里添加消息的时候控制消息的长度返回值是消息ID,默认是一个时间戳,语法如上,可以 Trim 也可以 不Trim,可以根据需要选择

Prepare

先来准备一些帮助类和公共方法,下面的示例是基于 StackExchange.Redis 来实现的

RedisHelper,获取 Redis 连接

internal static class RedisHelper
{private static readonly IConnectionMultiplexer ConnectionMultiplexer = StackExchange.Redis.ConnectionMultiplexer.Connect("127.0.0.1:6379");public static IDatabase GetRedisDb(int dbIndex = 0){return ConnectionMultiplexer.GetDatabase(dbIndex);}
}

AddStreamMessage,向指定 stream 中添加若干条消息

private static async Task AddStreamMessage(string key, int msgCount, Action action=null)
{var redis = RedisHelper.GetRedisDb();for (var i = 0; i < msgCount; i++){await redis.StreamAddAsync(key, "messages", $"val-{i}");action?.Invoke();}
}

Max-Length

根据 MaxLength 来控制 Stream 长度示例

var streamKey = $"stream-{nameof(MaxLengthTrim)}";
await AddStreamMessage(streamKey, 10);
var redis = RedisHelper.GetRedisDb();
Console.WriteLine(await redis.StreamLengthAsync(streamKey));// trim directly
await redis.StreamTrimAsync(streamKey, 5);
Console.WriteLine(await redis.StreamLengthAsync(streamKey));// add with trim
await redis.StreamAddAsync(streamKey, StreamMessageField, "Test", maxLength: 3);
Console.WriteLine(await redis.StreamLengthAsync(streamKey));await redis.KeyDeleteAsync(streamKey);

输出结果如下:

Min-ID

根据 Min-ID 来控制 Stream 消息长度,是 Redis 6.2 新引入的功能,目前 StackExchange.Redis 还没有专门的 API 来支持这个功能,不过我们可以通过 Execute 来执行 Redis 命令,通常这些 Redis 客户端库都会支持直接调用 Redis 命令,根据 MinID 控制 Stream 长度示例如下:

private const string StreamAddCommandName = "XADD";
private const string StreamTrimCommandName = "XTRIM";private const string StreamAddAutoMsgId = "*";private const string StreamTrimByMinIdName = "MINID";private const string StreamTrimOperator = "=";private const string StreamMessageField = "message";private static async Task MinMsgIdTrim()
{var streamKey = $"stream-{nameof(MaxLengthTrim)}";await AddStreamMessage(streamKey, 10, () => Thread.Sleep(1000));var redis = RedisHelper.GetRedisDb();var minId = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(5)).ToUnixTimeMilliseconds();Console.WriteLine(await redis.StreamLengthAsync(streamKey));// https://redis.io/commands/xtrim// trim directlyawait redis.ExecuteAsync(StreamTrimCommandName, streamKey,StreamTrimByMinIdName,StreamTrimOperator, // optionalminId);Console.WriteLine(await redis.StreamLengthAsync(streamKey));minId = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(2)).ToUnixTimeMilliseconds();// https://redis.io/commands/xadd// add with trimvar result = redis.Execute(StreamAddCommandName, streamKey, StreamTrimByMinIdName, StreamTrimOperator, // optionalminId,StreamAddAutoMsgId, StreamMessageField, "Test");Console.WriteLine(await redis.StreamLengthAsync(streamKey));await redis.KeyDeleteAsync(streamKey);
}

上述代码输出结果如下:

More

本文主要介绍了控制 Redis Stream 的消息长度,除了介绍 Redis 本身的命令之外,也是介绍一下如何使用 StackExchange.Redis 实现调用没有 API 支持的 Redis 命令,Redis 6.2 之后支持了很多新的特性,但是很多库都还太支持,了解如何原生调用 Redis 命令有些时候会很有帮助

References

  • https://redis.io/commands/xadd

  • https://redis.io/commands/xtrim

  • https://github.com/WeihanLi/SamplesInPractice/blob/master/RedisSample/StreamTrimSample.cs

控制 Redis stream 的消息数量相关推荐

  1. redis stream 实现消息队列

    redis stream 实现消息队列 Redis5.0带来了Stream类型.从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现. 基于r ...

  2. 基于 Redis Stream 的消息队列

    文章目录 基于 Redis Stream 的消息队列 消息队列相关命令 消费者组相关命令 如何使用Stream消息队列 生产者写入消息 - XADD 消费者读取消息 - XGROUP 创建消费者组 - ...

  3. 使用 Redis Stream 实现消息队列

    使用 Redis Stream 实现消息队列 Intro Redis 5.0 中增加了 Stream 的支持,利用 Stream 我们可以实现可靠的消息队列,并且支持一个消息被多个消费者所消费,可以很 ...

  4. redis stream java消息队列_Redis-消息队列的两种实现方式

    索引: 基于list的实现方式 基于publish/subscribe 实战 消息队列简介 消息队列:是消息的顺序集合. 比如网站的PV统计和查看,传统方式就是每个页面发一个AJAX然后mysql给P ...

  5. redis stream java消息队列_Redis 异步消息队列与延时队列

    消息中间件,大家都会想到 Rabbitmq 和 Kafka 作为消息队列中间件,来给应用程序之间增加异步消息传递功能.这两个中间件都是专业的消息队列中间件,特性之多超出了大多数人的理解能力.但是这种属 ...

  6. Redis消息队列——Redis Stream

    文章目录 消息队列 为什么不使用Redis 发布订阅 (pub/sub) 来实现消息队列 Stream 消息队列相关命令: 消费者组相关命令: Stream最简单的生产.消费模型 Stream 优点/ ...

  7. Python一些可能用的到的函数系列81 基于Redis Stream的简单消息队列对象

    说明 一个实现消息队列简单管理的对象 内容 代码 import redis # 基于Redis Stream的消息队列 class LittleRQ:def __init__(self, host, ...

  8. redis灵魂拷问:如何使用stream实现消息队列

    redis在很早之前就支持消息队列了,使用的是PUB/SUB功能来实现的.PUB/SUB有一个缺点就是消息不能持久化,如果redis发生宕机,或者客户端发生网络断开,历史消息就丢失了. redis5. ...

  9. 使用Redis Stream来做消息队列和在Asp.Net Core中的实现

    Redis - Wikipedia 写在前面 我一直以来使用redis的时候,很多低烈度需求(并发要求不是很高)需要用到消息队列的时候,在项目本身已经使用了Redis的情况下都想直接用Redis来做消 ...

最新文章

  1. linux RTX2080显卡驱动
  2. 概率统计笔记:高斯威沙特分布
  3. linux进程map,linux下unordered_map和map在小数据下性能差异
  4. 基于ncat的简易web服务器
  5. 使用CSS sprites减少HTTP请求
  6. hdu 1698(线段树区间更新)
  7. Check failed: weights_.Size() == num_row_ (38997 vs. 383852) : Size of weights must equal to number.
  8. C#基础_值类型引用类型(十一)
  9. 浅谈WebKit之Port
  10. 微信小程序最新开发资源汇总,对学习微信小程序的新手有一定帮助
  11. Ubuntu下如何正确安装FFmpeg
  12. oracle 如何查看当前用户的表空间名称
  13. android 拨打电话 发送短信 权限,Android开发实现拨打电话与发送信息的方法分析...
  14. 02:MongoDB操作
  15. 如何在SQL Server Reporting Services中使用表达式来创建有效的报告
  16. 电子版证件照怎么制作并改大小
  17. 南邮——计算机图像学——光照、冯氏光照模型
  18. Snipaste——一款强大又实用的截图工具
  19. ESP32 模拟键盘的简单操作 (ESP32 for Arduino)
  20. 安卓开发实战讲解!斗鱼直播Android开发二面被刷,社招面试心得

热门文章

  1. 简单音乐播放实例的实现,Android Service AIDL 远程调用服务
  2. 看了《OCP/OCA认证考试指南全册:Oracle Database 11g(1Z0-051,...
  3. 关于java连接sqlserver2000 和sqlserver2005的初识
  4. MySQL使用裸设备
  5. 为团委出书写:《打造社团品牌:请给我一个理由,让我记住你!》
  6. mysql 存储 事务_MYSQL 可以在存储过程里实现事务控制吗
  7. ElasticSearch教程——自定义分词器(转学习使用)
  8. AMD and CMD are dead之KMD.js依赖可视化工具发布
  9. iOS6.0以上版本,关于NSDateFormatter的问题
  10. UVA 10518 How Many Calls?