1. pending数据的产生

在消费者组模式下,当一个消息被消费者取出,为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读(XREADGROUP)取但并未处理完毕(未ACK)的消息。

2. 对pending数据的几种处理方式

下面的讨论基于几点:

  1. 面向的场景为多个无差别消费者(每个消费者名子相同,功能相同)在同一group下消费任务。
  2. 我们要保证的是,每个任务至多只做一次。
  3. 代码实现是在使用redis stream实现队列服务一文的封装基础上实现的。

2.1 无需处理

如果你的处理逻辑是:

getTask()
delTask()
yourProcessFuc();

即不太关注任务的丢失,此时无需做什么特别处理。但一定记得delTask(),不然pending队列会越积越多,占用大量存储空间。

2.2 从pending中按条件读取,放回原队列

    /** 将pending队列中超时的数据重新放回队列* * $idleTime: 超时时间, 毫秒* $perPage:每次从pending队列中取的任务数, 之所以分页是为防止队列太长,一下取出内存不够** 注意:只能有一个进程执行pendingRestore** 优点: consumer不需要做任何改动* 缺点: * 先del再add, 成本上不划算,* 如果del和add中间断掉任务就丢了* 无法保留任务被重复投递的次数,不过如果你的任务只想重做一次,或者不关注此数据那就无所谓了。* * return: restore的数量* */public function pendingRestore($idleTime = 5000, $perPage = 20){/*** 比较简单粗暴的取pending数据方式* 依赖* 1.每次从pending取走/删除超时数据* 2.id是按时间排序,小id未超时,大id一定未超时**/$restoreNum = 0;while(1){$thisNum = 0;$data = $this->getPending($perPage);foreach($data as $one){$id = $one[0];$duration = $one[2];if ($duration > $idleTime){$data = $this->getRange($id, $id);$task = $data[$id];$this->delTask($id);$this->addTask($task);$thisNum++;}}$restoreNum += $thisNum;if ($thisNum < $perPage){break;}}return $restoreNum;}/* 从pending队列中取任务*/protected function getPending($count = 1, $start='-', $end='+', $consumer = null){if (!$consumer){return $this->_mRedis->xPending($this->_mStream, $this->_mGroup, $start, $end, $count);}return $this->_mRedis->xPending($this->_mStream, $this->_mGroup, $start, $end, $count, $consumer);}/** 取[$start, $end]范围内的数据, 注意是闭区间** $count:条数,null时表示取全部* */protected function getRange($start = '-', $end = '+', $count = null){if(is_null($count)){return $this->_mRedis->xRange($this->_mStream, $start, $end);}else{return $this->_mRedis->xRange($this->_mStream, $start, $end, $count);}}

2.3 使用claim

将超时任务放入另一个名子的消费者pending队列中,然后从新的消费者历史数据中取出数据并处理。

    /** 另一种恢复超时任务的方法* 思路:将超时任务放入newConsumer的pending中,后续可以从newConsume的历史中取出数据并处理** 优点:* 恢复数据没有重复读,删,插,效率高* 任务投递次数会保留在新的pending中 ** 缺点:* consumer需要做改动,至少要改变consumer的名子* 只能用单进程从历史数据中读数据,然后处理。*** $idleTime: 超时时间, 毫秒* $newConsumer: 之后处理pending任务的消费者名称* $perPage: 每次取pending任务的条数** return: 满足条件且成功claim的条数* */public function pendingClaim($idleTime = 5000, $newConsumer=null, $perPage = 20){if (!$newConsumer){return false;}$info = $this->getPendingInfo();$startID = $info[1];$endID = $info[2];$claimNum = 0;/** 使用startid, endid遍历pending列表* 因为getpending取的是[startid, endid]* 所以边界处的id可能被重复取出,但不影响结果的正确性* perPage越大/符合xclaim条件的id越多,重复的可能性越小* */while($startID != $endID){//var_dump([$startID, $endID]);$data = $this->getPending($perPage, $startID, $endID, $this->_mConsumer);foreach($data as $one){$ids[] = $one[0];$startID = $one[0];}//xClaim会根据条件自动过滤任务$res = $this->_mRedis->xClaim($this->_mStream, $this->_mGroup, $newConsumer, $idleTime, $ids, ['JUSTID']);$thisNum = count($res);   $claimNum += $thisNum;//id是按时间排列,小id未超时,则后面不会超时//在所有id都有相同的投递次数的基础上if ($thisNum < $perPage){break;}}return $claimNum;}

使用pendingClaim后,可以使用一个单独进程,通过下面方式获取超时任务并处理。

$config = ['server' => '10.10.10.1:6379','stream' => 'balltube',    'consumer' => 'pendingProcessor',//pendingClaim中的newConsumer
];$q = new RedisQueue($this->_config);
$block = 1000;
$num = 1;while(1){$d = $q->getTask($block, $num, 0);if (empty($d)){break;}$id = key($d);$data = $d[$id];$q->delTask($id);//处理任务逻辑yourTaskProcessFunc($data);
}

3. git代码库

https://github.com/qmhball/redisQueue

  • RedisQueue.php 队列实现
  • RedisQueueTest.php 对应测试

我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=26tnhtub18qsc

redis stream中pending数据的处理相关推荐

  1. redis stream学习总结

    文章目录 stream Stream基本概念 消息id 消息内容 增删查改 消息生产 添加消息 xadd 查看消息长度 xlen 限制stream最大长度 1.xadd 中添加**maxlen**: ...

  2. Redis Stream

    Redis Stream Redis Stream流是 Redis5引入的一种数据结构,它的功能类似于只追加日志.开发者可以用Redis Stream 流实时记录.跟踪事件,Redis流用例示例包括: ...

  3. Redis Stream 简明使用教程

    Redis Stream 特性是Redis 5.0之后才有的.Redis Stream的主要应用就是时间序列的消息流分发.PUB/SUB也可以做消息流分发,但是PUB/SUB不记录历史消息,而Redi ...

  4. 老司机带你玩转面试(1):缓存中间件 Redis 基础知识以及数据持久化

    引言 今天周末,我在家坐着掐指一算,马上又要到一年一度的金九银十招聘季了,国内今年上半年受到 YQ 冲击,金三银四泡汤了,这就直接导致很多今年毕业的同学会和明年毕业的同学一起参加今年下半年的秋招,这个 ...

  5. Redis消息队列——Redis Stream

    文章目录 消息队列 为什么不使用Redis 发布订阅 (pub/sub) 来实现消息队列 Stream 消息队列相关命令: 消费者组相关命令: Stream最简单的生产.消费模型 Stream 优点/ ...

  6. redis一般缓存什么样数据_门户数据展示_Redis缓存数据

    学习主题:门户数据展示_Redis缓存数据 一.Redis_3主3从集群环境搭建 谈单你对读写分离和主从同步的理解 读写分离:Master负责写数据的操作,salve负责读数据的操作 主从同步:sal ...

  7. Redis持久化和备份数据

    一.持久化 实现持久化的方式有两种RDB.AOF 基于RDB方式做持久化 RDB是基于快照模式实现的,所保存的数据文件默认dump.rdb,具体产生这个数据文件的方式有两种: 方式1:客户端执行sav ...

  8. 使用 Redis Stream 实现消息队列

    使用 Redis Stream 实现消息队列 Intro Redis 5.0 中增加了 Stream 的支持,利用 Stream 我们可以实现可靠的消息队列,并且支持一个消息被多个消费者所消费,可以很 ...

  9. redis 通过aof日志恢复_Redis从入门到精通(四、Redis的持久化和数据备份与恢复)

    本文将对Redis的两种持久化方式做详细的介绍,从配置,机制,优缺点几方面讲起 Redis持久化简介 Redis提供了两种持久化的选项,一种是快照文件(snapshotting,RDB),它会基于某个 ...

最新文章

  1. 【Qt】error: undefined reference to `vtable for MainWindow‘
  2. Android 重新编译资源文件
  3. 利用MultipartFile实现文件上传
  4. Linux中ifcfg-eth0配置参数解释
  5. Csharp迭代循环
  6. 源文件与模块文件生成时的文件不同,仍要调试器使用它吗
  7. git如何查看和切换账号
  8. 第二课 每天努力一点点【Linux培训实录】
  9. 关于Memcached反射型DRDoS攻击分析
  10. C#/.NET 上位机快速入门1——界面渐变设计、实现无边框拖动、关闭的淡出效果、基于socket实现与PLC服务器通信
  11. 3D元素周期表源码(已加注释)及分析
  12. android5.0刷机,一加手机怎么升级安卓5.0 一加手机刷Android 5.0教程
  13. 欧莱雅眉笔banner个人设计
  14. 【前端框架学习】第一次实验 跑马灯效果的制作
  15. Java synchronized偏向锁、轻量级锁、重量级锁
  16. python正则 两边固定 中间任意字符
  17. 记录Android开发过程中遇到的疑难问题
  18. 宝塔mysql管理员初始密码_宝塔面板忘记管理员用户名密码简单有效解决方法
  19. 《人生百忌》之出门有忌
  20. ERPbuilder:企业电商精细化运营管理专家

热门文章

  1. .Net6 WebAPI使用log4net记录日志
  2. Linux设备驱动中的阻塞和非阻塞IO
  3. Apache服务器配置
  4. fopen matlab,整理:matlab的fopen
  5. fopen和fclose的用法
  6. 缓存+db 该怎么设计?
  7. 中文摩斯密码 - JavaScript库
  8. MyBatis-Plus - 字段类型处理器
  9. Spring Cloud Streams Messaging消息驱动微服务实践
  10. ACCP 8.0 jQuery 第八章 上机练习