redis stream 实现消息队列

Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。

基于redis实现消息队列的方式有很多:

  • PUB/SUB,订阅/发布模式
  • 基于List的 LPUSH+BRPOP 的实现

redis 实现消息对列4中方法

发布订阅

发布订阅优点: 典型的一对的,所有消费者都能同时消费到消息。主动通知订阅者而不是订阅者轮询去读。

发布订阅缺点: 不支持多个消费者公平消费消息,消息没有持久化,不管订阅者是否收到消息,消息都会丢失。

使用场景:微服务间的消息同步,如 分布式webSocker,数据同步等。

list 队列

生产者通过lpush生成消息,消费者通过blpop阻塞读取消息。

**list队列优点:**支持多个消费者公平消费消息,对消息进行存储,可以通过lrange查询队列内的消息。

**list队列缺点:**blpop仍然会阻塞当前连接,导致连接不可用。一旦blpop成功消息就丢弃了,期间如果服务器宕机消息会丢失,不支持一对多消费者。

zset 队列

生产者通过zadd 创建消息时指定分数,可以确定消息的顺序,消费者通过zrange获取消息后进行消费,消费完后通zrem删除消息。

zset优点: 保证了消息的顺序,消费者消费失败后重新入队不会打乱消费顺序。

zset缺点: 不支持一对多消费,多个消费者消费时可能出现读取同一条消息的情况,得通过加锁或其他方式解决消费的幂等性。

zset使用场景:由于数据是有序的,常常被用于延迟队列,如 redisson的DelayQueue

Stream 队列

Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。

参考kafka的思想,通过多个消费者组和消费者支持一对多消费,公平消费,消费者内维护了pending列表防止消息丢失。

提供消息ack机制。

基本命令

xadd 生产消息

往 stream 内创建消息 语法为:

XADD key ID field string [field string …]

# * 表示自动生成id redis会根据时间戳+序列号自动生成id,不建议我们自己指定id
xadd stream1 * name zs age 23

读取消息

读取stream内的消息,这个并不是消费,只是提供了查看数据的功能,语法为:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

#表示从 stream1 内取出一条消息,从第0条消息读取(0表示最小的id)
xread count 1 streams stream1 0
#表示从 stream1 内 id=1649143363972-0 开始读取一条消息,读取的是指定id的下一条消息
xread count 1 streams msg 1649143363972-0#表示一直阻塞读取最新的消息($表示获取下一个生成的消息)
xread count 1 block 0 streams stream1 $ xrange stream - + 10

XRANGE key startID endID count

#表示从stream1内取10条消息 起始位置为 -(最小ID) 结束位置为+(最大ID)
xrange stream1 - + 10

xgroup 消费者组

redis stream 借鉴了kafka的设计,采用了消费者和消费者组的概念。允许多个消费者组消费stream的消息,每个消费者组都能收到完整的消息,例如:stream内有10条消息,消费者组A和消费者组B同时消费时,都能获取到这10条消息。

每个消费者组内可以有多个消费者消费,消息会平均分摊给各个消费者,例如:stream有10条消息,消费者A,B,C同时在同一个组内消费,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9

创建消费者组:

#消费消息首先得创建消费者组
# 表示为队列 stream1 创建一个消费者组 group1 从消息id=0(第一条消息)开始读取消息
xgroup create stream1 group1 0#查询stream1内的所有消费者组信息
xinfo groups stream1

xreadgroup 消费消息

通过xreadgroup可以在消费者组内创建消费者消费消息

XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

#创建消费者读取消息
#在group1消费者组内通过consumer1消费stream1内的消息,消费1条未分配的消息 (> 表示未分配过消费者的消息)
xreadgrup group group1 consumer1 count 1 streams stream1 >

Pending 等待列表

通过 xreadgroup 读取消息时消息会分配给对应的消费者,每个消费者内都维护了一个Pending列表用于保存接收到的消息,当消息ack后会从pending列表内移除,也就是说pending列表内维护的是所有未ack的消息id

每个Pending的消息有4个属性:

  1. 消息ID
  2. 所属消费者
  3. IDLE,已读取时长
  4. delivery counter,消息被读取次数

XPENDING key group [start end count] [consumer]

#查看pending列表
# 查看group1组内的consumer1的pending列表 - 表示最小的消息id + 表示最大的消息ID
xpending stream1 group1 - + 10 consumer1
# 查看group1组内的所有消费者pending类表
xpending stream1 group1 - + 10

消息确认

当消费者消费了消息,需要通过 xack 命令确认消息,xack后的消息会从pending列表移除

XACK key gruopName ID

xack stream1 group1 xxx

消息转移

当消费者接收到消息却不能正确消费时(报错或其他原因),可以使用 XCLAIM 将消息转移给其他消费者消费,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。

通过xclaim转移的消息只是将消息移入另一个消费者的pending列表,消费者并不能通过xreadgroup读取到消息,只能通过xpending读取到。

# 表示将ID为 1553585533795-1 的消息转移到消费者B消费,前提是消费
XCLAIM stream1 group1 consumer1 3600000 1553585533795-1

信息监控

redis提供了xinfo来查看stream的信息

#查看sream信息
xinfo stream steam1
#查询消费者组信息
xinfo groups group1 #查询消费者信息
xinfo consumers consumer1

SpringBoot 整合

1 引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2 编写消费者

@Slf4j
@Component
public class EmailConsumer implements StreamListener<String, MapRecord<String,String,String>> {public final String streamName      = "emailStream";public final String groupName       = "emailGroup";public final String consumerName    = "emailConsumer";@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void onMessage(MapRecord<String, String, String> message) {//log.info("stream名称-->{}",message.getStream());//log.info("消息ID-->{}",message.getId());log.info("消息内容-->{}",message.getValue());Map<String, String> msgMap = message.getValue();if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){//消费异常导致未能ack时,消息会进入pending列表,我们可以启动定时任务来读取pending列表处理失败的任务log.info("消费异常-->"+message);return;}StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream();//消息应答streamOperations.acknowledge( streamName,groupName,message.getId() );}//我们可以启动定时任务不断监听pending列表,处理死信消息
}

3 配置redis

序列化配置

@EnableCaching
@Configuration
public class RedisConfig {/*** 设置redis序列化规则*/@Beanpublic Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);jackson2JsonRedisSerializer.setObjectMapper(om);return jackson2JsonRedisSerializer;}/*** RedisTemplate配置*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory,Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) {// 配置redisTemplateRedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);RedisSerializer<?> stringSerializer = new StringRedisSerializer();// key序列化redisTemplate.setKeySerializer(stringSerializer);// value序列化redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// Hash key序列化redisTemplate.setHashKeySerializer(stringSerializer);// Hash value序列化redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);redisTemplate.afterPropertiesSet();return redisTemplate;}}

消费者组和消费者配置

@Slf4j
@Configuration
public class RedisStreamConfig {@Autowiredprivate EmailConsumer emailConsumer;@Autowiredprivate RedisTemplate<String,Object> redisTemplate;@Beanpublic StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> emailListenerContainerOptions(){StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();return StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()//block读取超时时间.pollTimeout(Duration.ofSeconds(3))//count 数量(一次只获取一条消息).batchSize(1)//序列化规则.serializer( stringRedisSerializer ).build();}/*** 开启监听器接收消息*/@Beanpublic StreamMessageListenerContainer<String,MapRecord<String,String,String>> emailListenerContainer(RedisConnectionFactory factory,StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions){StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory,streamMessageListenerContainerOptions);//如果 流不存在 创建 stream 流if( !redisTemplate.hasKey(emailConsumer.streamName)){redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", ""));log.info("初始化stream {} success",emailConsumer.streamName);}//创建消费者组try {redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName);} catch (Exception e) {log.info("消费者组 {} 已存在",emailConsumer.groupName);}//注册消费者 消费者名称,从哪条消息开始消费,消费者类// > 表示没消费过的消息// $ 表示最新的消息listenerContainer.receive(Consumer.from(emailConsumer.groupName, emailConsumer.consumerName),StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()),emailConsumer);listenerContainer.start();return listenerContainer;}}

4.生产者生产消息

@GetMapping("/redis/ps")
public String redisPublish(String content,Integer count){StreamOperations streamOperations = redisTemplate.opsForStream();for (int i = 0; i < count; i++) {AtomicInteger num = new AtomicInteger(i);Map msgMap = new HashMap();msgMap.put("count", i);msgMap.put("sID", num);//新增消息streamOperations.add("emailStream",msgMap);}return "success";
}

参考文档:

redis Stream 消息队列

SpringBoot整合redis stream 实现消息队列

redis stream 实现消息队列相关推荐

  1. 使用 Redis Stream 实现消息队列

    使用 Redis Stream 实现消息队列 Intro Redis 5.0 中增加了 Stream 的支持,利用 Stream 我们可以实现可靠的消息队列,并且支持一个消息被多个消费者所消费,可以很 ...

  2. 基于 Redis Stream 的消息队列

    文章目录 基于 Redis Stream 的消息队列 消息队列相关命令 消费者组相关命令 如何使用Stream消息队列 生产者写入消息 - XADD 消费者读取消息 - XGROUP 创建消费者组 - ...

  3. redis stream java消息队列_Redis-消息队列的两种实现方式

    索引: 基于list的实现方式 基于publish/subscribe 实战 消息队列简介 消息队列:是消息的顺序集合. 比如网站的PV统计和查看,传统方式就是每个页面发一个AJAX然后mysql给P ...

  4. redis stream java消息队列_Redis 异步消息队列与延时队列

    消息中间件,大家都会想到 Rabbitmq 和 Kafka 作为消息队列中间件,来给应用程序之间增加异步消息传递功能.这两个中间件都是专业的消息队列中间件,特性之多超出了大多数人的理解能力.但是这种属 ...

  5. 控制 Redis stream 的消息数量

    控制 Redis stream 的消息数量 Intro Redis Stream 是 Redis 5.0 引入的一个新的类型,之前我们介绍过使用 Redis Stream 来实现消息队列,可以参考之前 ...

  6. redis灵魂拷问:如何使用stream实现消息队列

    redis在很早之前就支持消息队列了,使用的是PUB/SUB功能来实现的.PUB/SUB有一个缺点就是消息不能持久化,如果redis发生宕机,或者客户端发生网络断开,历史消息就丢失了. redis5. ...

  7. springboot使用redis实现消息队列功能,redis使用list和stream实现消息队列功能,redis实现消息队列的风险点分析

    文章目录 写在前面 基于list的消息队列解决方案 使用list基本实现消息队列 阻塞式消费,避免性能损失 替换while(true) 实现消息幂等 保证消息可靠性 基于stream的消息队列解决方案 ...

  8. redis延迟消息队列不准时php,Redis实现延迟消息队列

    消息队列是应用中常用的一个技术点,通常我们可以借助消息队列中间件来实现,但是并不是所有的情况下,都需要使用到MQ. 如果只需要实现简单的消息队列,那么借助Redis即可. 如果对消息有着严格的可靠性等 ...

  9. redis简单队列java_使用Redis的简单消息队列

    redis简单队列java 在本文中,我们将使用列表命令将Redis用作简单的消息队列. 假设我们有一个允许用户上传照片的应用程序. 然后在应用程序中,我们以不同大小显示照片,例如Thumb,Medi ...

最新文章

  1. 世界名画 | 陌上花开,可缓缓归矣
  2. 20189208 2018-2019-2 《移动平台开发实践》分析小组项目代码
  3. WebGL Shader 环境搭建
  4. 结合使用slf4j和Logback教程
  5. 『震惊』秘密报告披露转基因食品危害
  6. 神州数码携手IBM与红帽共商“新基建”机遇与挑战
  7. CAD转换图片的小窍门
  8. 药师帮完成1.33亿美元D轮融资,投资方为老虎环球基金、H Capital和DCM...
  9. 使用kibana可视化报表实时监控你的应用程序,从日志中找出问题,解决问题
  10. PCB CS架构(工程系统)实现单点登入方法
  11. This iPhone is running iOS 12.2(16E227),which may be supported by this version of Xcode
  12. 如何解决时间在前端显示的问题,使用jsel解决,仅供初步接触servlet新手
  13. 安装PostgreSQL客户端
  14. 矩形波导中TE波和TM波的截止波数截止波长和截止频率
  15. 阿里云ddns过程记录
  16. PM42L-048 步进电机
  17. 2021智能零售领域最具商业合作价值企业盘点
  18. 去面试却被问的哑口无言,是不是踏入了机器学习误区
  19. Android开发之摇一摇
  20. app中的长连接与实现方式

热门文章

  1. 《STL源码剖析》笔记——allocator
  2. InteliJ IDEA社区版 两款插件变身旗舰版
  3. c语言教材课后题答案6,C语言谭浩强版6章课后练习题答案.doc
  4. 【测试】 抓包工具 Charles 使用教程
  5. SOLIDWORKS如何正确使用焊件及材料切割清单
  6. 平面设计培训需要学习多久
  7. reflow(重排、回流)和repaint(重绘)
  8. java校园一卡通管理系统
  9. Matlab基础学习笔记(五)—— Simulink仿真
  10. C语言实现的超详细的冒泡排序(附有详细代码)