使用Redis 实现消息队列
一 、为什么要用Redis实现轻量级MQ?
- MQ的主要作用:
- 应用解耦
- 异步化消息
- 流量削峰填谷
- 目前使用比较多的是ActiveMQ 、 RabbitMQ 、 ZeroMQ 、 Kafka 、 MetaMQ 、 RocketMQ等
- 在业务实现过程中 , 就算没有大量的流量 , 解耦和异步化也是处处可用 , 此时MQ就显得尤为重要 。 但与此同时MQ也是一个蛮重的组件,例如我们如果用RabbitMQ就必须为它搭建一个服务器,同时如果要考虑可用性,就要为服务端建立一个集群,而且在生产如果有问题也需要查找功能。在中小型业务的开发过程中,可能业务的其他整个实现都没这个重。过重的组件服务会成倍增加工作量。所幸的是,Redis提供的list数据结构非常适合做消息队列。但是如何实现即时消费?如何实现ack机制?这些是实现的关键所在。
二、 如何实现即使消费
网上所流传的方法是使用Redis中list的操作BLPOP或BRPOP,即列表的阻塞式(blocking)弹出。让我们来看看阻塞式弹出的使用方式:
BRPOP key [key ...] timeout 此命令的说明是:1、当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。 2、当给定多个key参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。 另外,BRPOP 除了弹出元素的位置和 BLPOP 不同之外,其他表现一致。
- 以此看来 , 列表的阻塞式弹出有两个特点:
- 如果list中没有任务时, 该连接将会被阻塞
- 连接的阻塞有一个超时时间 , 当超时时间设置为0时 , 即可无线等待, 直到弹出消息
- 由此看来此方式是可行的的 , 但是此方式为传统的观察者模式 , 业务简单可用 , 如果A的任务由B去执行 没有问题 , 但是如果A 、 B 发布的任务要C 、 D 去都能执行 , 这个方法就相形见绌了 , 这时就要用到发布\订阅模式 , 使业务系统更加清晰 。
- 好在Redis也支持Pub/Sub(发布 / 订阅) 。 在消息A入队的同时发布通知到频道Channel , 此时已经订阅channel的worker就收到了通知 , 知道了list中有消息A ,就可以获取并消费了 。
三、 及时消费实例
示例场景为:worker要做同步文件功能,等到有文件生成时立马同步。
首先开启一个线程代表worker,来订阅频道channel:@Service public class SubscribeService {@Resourceprivate RedisService redisService;@Resourceprivate SynListener synListener;//订阅者@PostConstructpublic void subscribe() {new Thread(new Runnable() {@Overridepublic void run() {LogCvt.info("服务已订阅频道:{}", channel);redisService.subscribe(synListener, channel);}}).start();} } 代码中的SynListener即为所声明的订阅者,channel为订阅的频道名称,具体的订阅逻辑如下:@Service public class SynListener extends JedisPubSub {@Resourceprivate DispatchMessageHandler dispatchMessageHandler;@Overridepublic void onMessage(String channel, String message) {LogCvt.info("channel:{},receives message:{}",channel,message);try {//处理业务(同步文件)dispatchMessageHandler.synFile();} catch (Exception e) {LogCvt.error(e.getMessage(),e);}} } 处理业务的时候,就去list中去消费消息:@Service public class DispatchMessageHandler {@Resourceprivate RedisService redisService;@Resourceprivate MessageHandler messageHandler;public void synFile(){while(true){try {String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key());if (null == message){break;}Thread.currentThread().setName(Tools.uuid());// 队列数据处理messageHandler.synfile(message);} catch (Exception e) {LogCvt.error(e.getMessage(),e);}}}
四、 如何实现ack机制?
- ack , 即小气确认机制
- 首先看看rabbitMQ 的ack机制:
- publistener把消息通知给Consumer , 如果在consumer已处理完任务 , 那么他将向Broker发送ack消息 , 告知某条消息已经被成功处理 , 可以从队列中移除 。 如果consunmer么有发送回ack消息 , 那么Broker会认为消息处理失败 , 会将此消息及后续消息分发给其他consumer进行处理(redeliver flag 设置为true )
- 这种机制和TCP/IP协议确认连接类似 , 不同的是TCP/IP确立连接需要经过三次握手,而RabbitMQ只需要一次ACK。
- 值得注意的是RabbitMQ当且仅当检测到ACK消息未发出且Consumer的连接终止时才会将消息重新分发给其他Consumer,因此不需要担心消息处理时间过长而被重新分发的情况。
- 那么在我们用Redis实现消息队列的ack机制的时候该怎么做呢?
- work处理失败之后 , 要回滚消息到原始pending队列
- 加入worker挂掉 , 也要回滚消息到原始pending队列
五 、 实现方案(主要解决worker挂掉的情况)
- 维护两个队列: pending队列和doing表(hash表)。
- workers定义为ThreadPool。
- 由pending队列出队后,workers分配一个线程(单个worker)去处理消息——给目标消息append一个当前时间戳和当前线程名称,将其写入doing表,然后该worker去消费消息,完成后自行在doing表擦除信息。
- 启用一个定时任务,每隔一段时间去扫描doing队列,检查每隔元素的时间戳,如果超时,则由worker的ThreadPoolExecutor去检查线程是否存在,如果存在则取消当前任务执行,并把事务rollback。最后把该任务从doing队列中pop出,再重新push进pending队列。
- 在worker的某线程中,如果处理业务失败,则主动回滚,并把任务从doing队列中移除,重新push进pending队列。
六 、 总结
- Redis作为消息队列是有很大局限性的。因为其主要特性及用途决定它只能实现轻量级的消息队列。写在最后:没有绝对好的技术,只有对业务最友好的技术,谨此献给所有developer。
使用Redis 实现消息队列相关推荐
- Redis做消息队列,香吗?
来自:架构师修行之路 菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的 ...
- 【springboot】【redis】springboot+redis实现发布订阅功能,实现redis的消息队列的功能...
springboot+redis实现发布订阅功能,实现redis的消息队列的功能 参考:https://www.cnblogs.com/cx987514451/p/9529611.html 思考一个问 ...
- PHP + Redis 实现消息队列
Redis做消息队列的好处在于它的轻量级,高并发,延迟敏感,应用场景有 即时数据分析.秒杀计数器.缓存等 Redis做消息队列待解决的问题: 1.消息的可靠性: 没有相应的机制保证消息的消费,当消费者 ...
- redis实现消息队列
redis是一个开源的key-value存储系统.与Memcached类似,Redis将大部分数据存储在内存中,支持的数据类型包括:字符串.哈希表.链表.集合.有序集合以及基于这些数据类型的相关操作. ...
- 用redis实现消息队列(实时消费+ack机制)【转】
用redis实现消息队列(实时消费+ack机制) java queue 消息队列 redis 消息队列 首先做简单的引入. MQ主要是用来: 解耦应用. 异步化消息 流量削峰填谷 目前使用的较多的有A ...
- 【BCVP】实现基于 Redis 的消息队列
聆听自己的声音 如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯. 话说上回书我们说到了,Redis的使用修改<[BCVP更新]Sta ...
- 程序员过关斩将--redis做消息队列,香吗?
菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的redis做的MQ,很 ...
- c#进阶(4)—— Redis 用于消息队列的存储
1.参考的博文 a : http://www.cnblogs.com/lori/archive/2012/04/12/2443708.html -- 主要的实现思路 b: http://www.cn ...
- Redis异步消息队列
一.异步消息队列介绍 个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦.所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列.同时由于使用了消 ...
最新文章
- ubuntu9.10上安装jdk5
- JSON数据从OSS迁移到MaxCompute最佳实践
- HMM——前向后向算法
- 创办私营企业必做的16件事
- Python Debug调试技巧
- 机器学习——LightGBM
- EditPlus注册码 亲测最新版可用
- Lua,Lua API,配置文件
- 云服务器Tomcat版本升级(Tomcat6升级至Tomcat7和Tomcat8)问题总结
- 新版电力系统决策支持系统开发告一段落
- 静态路由原理及配置(8)
- 【生活感想】期末考试
- 中职生计算机系自我推荐作文,中职生自我鉴定
- 【DBeaver】安装与使用教程
- 3t硬盘哪个服务器系统识别,3T硬盘为何不能被系统完全识别
- MEMS传感器的6大种类简介-传感器专题
- 区块链凭什么改变这个世界?从它的工作原理谈起
- GJM : 各大开发游戏引擎
- java发送邮件报错554_java发送邮件报错554
- GB28181对接摄像机/NVR视频流
热门文章
- Spring和Struts2整合
- 最大熵学习笔记(六)优缺点分析
- 【转】托管函数的挂钩(完美版)
- 给定一个年份,判断这一年是不是闰年。
- css不常用,不常用的 CSS
- ptp精准时间协议_PTP协议时间同步精度测试
- Java黑皮书课后题第10章:*10.1(Time类)设计一个名为Time的类。编写一个测试程序,创建两个Time对象(使用new Time()和new Time(555550000))
- Java黑皮书课后题第9章:9.7(Account类)设计一个名为Account的类,它包含……。编写一个测试程序,创建一个账户ID为1122、余额为20000美元、年利率为4.5%的Account…
- 网页html好学嘛,javascript好学么?
- java数据分析平台源码_DataGear数据可视化分析平台 v2.0.0