redis stream 实现消息队列
redis stream 实现消息队列
Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。
基于redis实现消息队列的方式有很多:
- PUB/SUB,订阅/发布模式
- 基于List的 LPUSH+BRPOP 的实现
redis 实现消息对列4中方法
发布订阅
发布订阅优点: 典型的一对的,所有消费者都能同时消费到消息。主动通知订阅者而不是订阅者轮询去读。
发布订阅缺点: 不支持多个消费者公平消费消息,消息没有持久化,不管订阅者是否收到消息,消息都会丢失。
使用场景:微服务间的消息同步,如 分布式webSocker,数据同步等。
list 队列
生产者通过lpush生成消息,消费者通过blpop阻塞读取消息。
**list队列优点:**支持多个消费者公平消费消息,对消息进行存储,可以通过lrange查询队列内的消息。
**list队列缺点:**blpop仍然会阻塞当前连接,导致连接不可用。一旦blpop成功消息就丢弃了,期间如果服务器宕机消息会丢失,不支持一对多消费者。
zset 队列
生产者通过zadd 创建消息时指定分数,可以确定消息的顺序,消费者通过zrange获取消息后进行消费,消费完后通zrem删除消息。
zset优点: 保证了消息的顺序,消费者消费失败后重新入队不会打乱消费顺序。
zset缺点: 不支持一对多消费,多个消费者消费时可能出现读取同一条消息的情况,得通过加锁或其他方式解决消费的幂等性。
zset使用场景:由于数据是有序的,常常被用于延迟队列,如 redisson的DelayQueue
Stream 队列
Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。
参考kafka的思想,通过多个消费者组和消费者支持一对多消费,公平消费,消费者内维护了pending列表防止消息丢失。
提供消息ack机制。
基本命令
xadd 生产消息
往 stream 内创建消息 语法为:
XADD key ID field string [field string …]
# * 表示自动生成id redis会根据时间戳+序列号自动生成id,不建议我们自己指定id
xadd stream1 * name zs age 23
读取消息
读取stream内的消息,这个并不是消费,只是提供了查看数据的功能,语法为:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
#表示从 stream1 内取出一条消息,从第0条消息读取(0表示最小的id)
xread count 1 streams stream1 0
#表示从 stream1 内 id=1649143363972-0 开始读取一条消息,读取的是指定id的下一条消息
xread count 1 streams msg 1649143363972-0#表示一直阻塞读取最新的消息($表示获取下一个生成的消息)
xread count 1 block 0 streams stream1 $ xrange stream - + 10
XRANGE key startID endID count
#表示从stream1内取10条消息 起始位置为 -(最小ID) 结束位置为+(最大ID)
xrange stream1 - + 10
xgroup 消费者组
redis stream 借鉴了kafka的设计,采用了消费者和消费者组的概念。允许多个消费者组消费stream的消息,每个消费者组都能收到完整的消息,例如:stream内有10条消息,消费者组A和消费者组B同时消费时,都能获取到这10条消息。
每个消费者组内可以有多个消费者消费,消息会平均分摊给各个消费者,例如:stream有10条消息,消费者A,B,C同时在同一个组内消费,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9
创建消费者组:
#消费消息首先得创建消费者组
# 表示为队列 stream1 创建一个消费者组 group1 从消息id=0(第一条消息)开始读取消息
xgroup create stream1 group1 0#查询stream1内的所有消费者组信息
xinfo groups stream1
xreadgroup 消费消息
通过xreadgroup可以在消费者组内创建消费者消费消息
XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
#创建消费者读取消息
#在group1消费者组内通过consumer1消费stream1内的消息,消费1条未分配的消息 (> 表示未分配过消费者的消息)
xreadgrup group group1 consumer1 count 1 streams stream1 >
Pending 等待列表
通过 xreadgroup 读取消息时消息会分配给对应的消费者,每个消费者内都维护了一个Pending列表用于保存接收到的消息,当消息ack后会从pending列表内移除,也就是说pending列表内维护的是所有未ack的消息id
每个Pending的消息有4个属性:
- 消息ID
- 所属消费者
- IDLE,已读取时长
- delivery counter,消息被读取次数
XPENDING key group [start end count] [consumer]
#查看pending列表
# 查看group1组内的consumer1的pending列表 - 表示最小的消息id + 表示最大的消息ID
xpending stream1 group1 - + 10 consumer1
# 查看group1组内的所有消费者pending类表
xpending stream1 group1 - + 10
消息确认
当消费者消费了消息,需要通过 xack
命令确认消息,xack后的消息会从pending列表移除
XACK key gruopName ID
xack stream1 group1 xxx
消息转移
当消费者接收到消息却不能正确消费时(报错或其他原因),可以使用 XCLAIM
将消息转移给其他消费者消费,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。
通过xclaim转移的消息只是将消息移入另一个消费者的pending列表,消费者并不能通过xreadgroup读取到消息,只能通过xpending读取到。
# 表示将ID为 1553585533795-1 的消息转移到消费者B消费,前提是消费
XCLAIM stream1 group1 consumer1 3600000 1553585533795-1
信息监控
redis提供了xinfo来查看stream的信息
#查看sream信息
xinfo stream steam1
#查询消费者组信息
xinfo groups group1 #查询消费者信息
xinfo consumers consumer1
SpringBoot 整合
1 引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2 编写消费者
@Slf4j
@Component
public class EmailConsumer implements StreamListener<String, MapRecord<String,String,String>> {public final String streamName = "emailStream";public final String groupName = "emailGroup";public final String consumerName = "emailConsumer";@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void onMessage(MapRecord<String, String, String> message) {//log.info("stream名称-->{}",message.getStream());//log.info("消息ID-->{}",message.getId());log.info("消息内容-->{}",message.getValue());Map<String, String> msgMap = message.getValue();if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){//消费异常导致未能ack时,消息会进入pending列表,我们可以启动定时任务来读取pending列表处理失败的任务log.info("消费异常-->"+message);return;}StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream();//消息应答streamOperations.acknowledge( streamName,groupName,message.getId() );}//我们可以启动定时任务不断监听pending列表,处理死信消息
}
3 配置redis
序列化配置
@EnableCaching
@Configuration
public class RedisConfig {/*** 设置redis序列化规则*/@Beanpublic Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);jackson2JsonRedisSerializer.setObjectMapper(om);return jackson2JsonRedisSerializer;}/*** RedisTemplate配置*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory,Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) {// 配置redisTemplateRedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);RedisSerializer<?> stringSerializer = new StringRedisSerializer();// key序列化redisTemplate.setKeySerializer(stringSerializer);// value序列化redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// Hash key序列化redisTemplate.setHashKeySerializer(stringSerializer);// Hash value序列化redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);redisTemplate.afterPropertiesSet();return redisTemplate;}}
消费者组和消费者配置
@Slf4j
@Configuration
public class RedisStreamConfig {@Autowiredprivate EmailConsumer emailConsumer;@Autowiredprivate RedisTemplate<String,Object> redisTemplate;@Beanpublic StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> emailListenerContainerOptions(){StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();return StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()//block读取超时时间.pollTimeout(Duration.ofSeconds(3))//count 数量(一次只获取一条消息).batchSize(1)//序列化规则.serializer( stringRedisSerializer ).build();}/*** 开启监听器接收消息*/@Beanpublic StreamMessageListenerContainer<String,MapRecord<String,String,String>> emailListenerContainer(RedisConnectionFactory factory,StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions){StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory,streamMessageListenerContainerOptions);//如果 流不存在 创建 stream 流if( !redisTemplate.hasKey(emailConsumer.streamName)){redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", ""));log.info("初始化stream {} success",emailConsumer.streamName);}//创建消费者组try {redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName);} catch (Exception e) {log.info("消费者组 {} 已存在",emailConsumer.groupName);}//注册消费者 消费者名称,从哪条消息开始消费,消费者类// > 表示没消费过的消息// $ 表示最新的消息listenerContainer.receive(Consumer.from(emailConsumer.groupName, emailConsumer.consumerName),StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()),emailConsumer);listenerContainer.start();return listenerContainer;}}
4.生产者生产消息
@GetMapping("/redis/ps")
public String redisPublish(String content,Integer count){StreamOperations streamOperations = redisTemplate.opsForStream();for (int i = 0; i < count; i++) {AtomicInteger num = new AtomicInteger(i);Map msgMap = new HashMap();msgMap.put("count", i);msgMap.put("sID", num);//新增消息streamOperations.add("emailStream",msgMap);}return "success";
}
参考文档:
redis Stream 消息队列
SpringBoot整合redis stream 实现消息队列
redis stream 实现消息队列相关推荐
- 使用 Redis Stream 实现消息队列
使用 Redis Stream 实现消息队列 Intro Redis 5.0 中增加了 Stream 的支持,利用 Stream 我们可以实现可靠的消息队列,并且支持一个消息被多个消费者所消费,可以很 ...
- 基于 Redis Stream 的消息队列
文章目录 基于 Redis Stream 的消息队列 消息队列相关命令 消费者组相关命令 如何使用Stream消息队列 生产者写入消息 - XADD 消费者读取消息 - XGROUP 创建消费者组 - ...
- redis stream java消息队列_Redis-消息队列的两种实现方式
索引: 基于list的实现方式 基于publish/subscribe 实战 消息队列简介 消息队列:是消息的顺序集合. 比如网站的PV统计和查看,传统方式就是每个页面发一个AJAX然后mysql给P ...
- redis stream java消息队列_Redis 异步消息队列与延时队列
消息中间件,大家都会想到 Rabbitmq 和 Kafka 作为消息队列中间件,来给应用程序之间增加异步消息传递功能.这两个中间件都是专业的消息队列中间件,特性之多超出了大多数人的理解能力.但是这种属 ...
- 控制 Redis stream 的消息数量
控制 Redis stream 的消息数量 Intro Redis Stream 是 Redis 5.0 引入的一个新的类型,之前我们介绍过使用 Redis Stream 来实现消息队列,可以参考之前 ...
- redis灵魂拷问:如何使用stream实现消息队列
redis在很早之前就支持消息队列了,使用的是PUB/SUB功能来实现的.PUB/SUB有一个缺点就是消息不能持久化,如果redis发生宕机,或者客户端发生网络断开,历史消息就丢失了. redis5. ...
- springboot使用redis实现消息队列功能,redis使用list和stream实现消息队列功能,redis实现消息队列的风险点分析
文章目录 写在前面 基于list的消息队列解决方案 使用list基本实现消息队列 阻塞式消费,避免性能损失 替换while(true) 实现消息幂等 保证消息可靠性 基于stream的消息队列解决方案 ...
- redis延迟消息队列不准时php,Redis实现延迟消息队列
消息队列是应用中常用的一个技术点,通常我们可以借助消息队列中间件来实现,但是并不是所有的情况下,都需要使用到MQ. 如果只需要实现简单的消息队列,那么借助Redis即可. 如果对消息有着严格的可靠性等 ...
- redis简单队列java_使用Redis的简单消息队列
redis简单队列java 在本文中,我们将使用列表命令将Redis用作简单的消息队列. 假设我们有一个允许用户上传照片的应用程序. 然后在应用程序中,我们以不同大小显示照片,例如Thumb,Medi ...
最新文章
- 世界名画 | 陌上花开,可缓缓归矣
- 20189208 2018-2019-2 《移动平台开发实践》分析小组项目代码
- WebGL Shader 环境搭建
- 结合使用slf4j和Logback教程
- 『震惊』秘密报告披露转基因食品危害
- 神州数码携手IBM与红帽共商“新基建”机遇与挑战
- CAD转换图片的小窍门
- 药师帮完成1.33亿美元D轮融资,投资方为老虎环球基金、H Capital和DCM...
- 使用kibana可视化报表实时监控你的应用程序,从日志中找出问题,解决问题
- PCB CS架构(工程系统)实现单点登入方法
- This iPhone is running iOS 12.2(16E227),which may be supported by this version of Xcode
- 如何解决时间在前端显示的问题,使用jsel解决,仅供初步接触servlet新手
- 安装PostgreSQL客户端
- 矩形波导中TE波和TM波的截止波数截止波长和截止频率
- 阿里云ddns过程记录
- PM42L-048 步进电机
- 2021智能零售领域最具商业合作价值企业盘点
- 去面试却被问的哑口无言,是不是踏入了机器学习误区
- Android开发之摇一摇
- app中的长连接与实现方式