一 、为什么要用Redis实现轻量级MQ?

  1. MQ的主要作用:

    1. 应用解耦
    2. 异步化消息
    3. 流量削峰填谷
  2. 目前使用比较多的是ActiveMQ 、 RabbitMQ 、 ZeroMQ 、 Kafka 、 MetaMQ 、 RocketMQ等
  3. 在业务实现过程中 , 就算没有大量的流量 , 解耦和异步化也是处处可用 , 此时MQ就显得尤为重要 。 但与此同时MQ也是一个蛮重的组件,例如我们如果用RabbitMQ就必须为它搭建一个服务器,同时如果要考虑可用性,就要为服务端建立一个集群,而且在生产如果有问题也需要查找功能。在中小型业务的开发过程中,可能业务的其他整个实现都没这个重。过重的组件服务会成倍增加工作量。所幸的是,Redis提供的list数据结构非常适合做消息队列。但是如何实现即时消费?如何实现ack机制?这些是实现的关键所在。

二、 如何实现即使消费

  1. 网上所流传的方法是使用Redis中list的操作BLPOP或BRPOP,即列表的阻塞式(blocking)弹出。让我们来看看阻塞式弹出的使用方式:

    BRPOP key [key ...] timeout
    此命令的说明是:1、当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。
    2、当给定多个key参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。
    另外,BRPOP 除了弹出元素的位置和 BLPOP 不同之外,其他表现一致。
    
  2. 以此看来 , 列表的阻塞式弹出有两个特点:
    1. 如果list中没有任务时, 该连接将会被阻塞
    2. 连接的阻塞有一个超时时间 , 当超时时间设置为0时 , 即可无线等待, 直到弹出消息
  3. 由此看来此方式是可行的的 , 但是此方式为传统的观察者模式 , 业务简单可用 , 如果A的任务由B去执行 没有问题 , 但是如果A 、 B 发布的任务要C 、 D 去都能执行 , 这个方法就相形见绌了 , 这时就要用到发布\订阅模式 , 使业务系统更加清晰 。
  4. 好在Redis也支持Pub/Sub(发布 / 订阅) 。 在消息A入队的同时发布通知到频道Channel , 此时已经订阅channel的worker就收到了通知 , 知道了list中有消息A ,就可以获取并消费了 。

三、 及时消费实例

  • 示例场景为:worker要做同步文件功能,等到有文件生成时立马同步。

    首先开启一个线程代表worker,来订阅频道channel:@Service
    public class SubscribeService {@Resourceprivate RedisService redisService;@Resourceprivate SynListener synListener;//订阅者@PostConstructpublic void subscribe() {new Thread(new Runnable() {@Overridepublic void run() {LogCvt.info("服务已订阅频道:{}", channel);redisService.subscribe(synListener, channel);}}).start();}
    }
    代码中的SynListener即为所声明的订阅者,channel为订阅的频道名称,具体的订阅逻辑如下:@Service
    public class SynListener extends JedisPubSub {@Resourceprivate DispatchMessageHandler dispatchMessageHandler;@Overridepublic void onMessage(String channel, String message) {LogCvt.info("channel:{},receives message:{}",channel,message);try {//处理业务(同步文件)dispatchMessageHandler.synFile();} catch (Exception e) {LogCvt.error(e.getMessage(),e);}}
    }
    处理业务的时候,就去list中去消费消息:@Service
    public class DispatchMessageHandler {@Resourceprivate RedisService redisService;@Resourceprivate MessageHandler messageHandler;public void synFile(){while(true){try {String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key());if (null == message){break;}Thread.currentThread().setName(Tools.uuid());// 队列数据处理messageHandler.synfile(message);} catch (Exception e) {LogCvt.error(e.getMessage(),e);}}}
    

四、 如何实现ack机制?

  1. ack , 即小气确认机制
  2. 首先看看rabbitMQ 的ack机制:
    1. publistener把消息通知给Consumer , 如果在consumer已处理完任务 , 那么他将向Broker发送ack消息 , 告知某条消息已经被成功处理 , 可以从队列中移除 。 如果consunmer么有发送回ack消息 , 那么Broker会认为消息处理失败 , 会将此消息及后续消息分发给其他consumer进行处理(redeliver flag 设置为true )
    2. 这种机制和TCP/IP协议确认连接类似 , 不同的是TCP/IP确立连接需要经过三次握手,而RabbitMQ只需要一次ACK。
    3. 值得注意的是RabbitMQ当且仅当检测到ACK消息未发出且Consumer的连接终止时才会将消息重新分发给其他Consumer,因此不需要担心消息处理时间过长而被重新分发的情况。
  3. 那么在我们用Redis实现消息队列的ack机制的时候该怎么做呢?
    1. work处理失败之后 , 要回滚消息到原始pending队列
    2. 加入worker挂掉 , 也要回滚消息到原始pending队列

五 、 实现方案(主要解决worker挂掉的情况)

  1. 维护两个队列: pending队列和doing表(hash表)。
  2. workers定义为ThreadPool。
  3. 由pending队列出队后,workers分配一个线程(单个worker)去处理消息——给目标消息append一个当前时间戳和当前线程名称,将其写入doing表,然后该worker去消费消息,完成后自行在doing表擦除信息。
  4. 启用一个定时任务,每隔一段时间去扫描doing队列,检查每隔元素的时间戳,如果超时,则由worker的ThreadPoolExecutor去检查线程是否存在,如果存在则取消当前任务执行,并把事务rollback。最后把该任务从doing队列中pop出,再重新push进pending队列。
  5. 在worker的某线程中,如果处理业务失败,则主动回滚,并把任务从doing队列中移除,重新push进pending队列。

六 、 总结

  1. Redis作为消息队列是有很大局限性的。因为其主要特性及用途决定它只能实现轻量级的消息队列。写在最后:没有绝对好的技术,只有对业务最友好的技术,谨此献给所有developer。

使用Redis 实现消息队列相关推荐

  1. Redis做消息队列,香吗?

    来自:架构师修行之路 菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的 ...

  2. 【springboot】【redis】springboot+redis实现发布订阅功能,实现redis的消息队列的功能...

    springboot+redis实现发布订阅功能,实现redis的消息队列的功能 参考:https://www.cnblogs.com/cx987514451/p/9529611.html 思考一个问 ...

  3. PHP + Redis 实现消息队列

    Redis做消息队列的好处在于它的轻量级,高并发,延迟敏感,应用场景有 即时数据分析.秒杀计数器.缓存等 Redis做消息队列待解决的问题: 1.消息的可靠性: 没有相应的机制保证消息的消费,当消费者 ...

  4. ​redis实现消息队列

    redis是一个开源的key-value存储系统.与Memcached类似,Redis将大部分数据存储在内存中,支持的数据类型包括:字符串.哈希表.链表.集合.有序集合以及基于这些数据类型的相关操作. ...

  5. 用redis实现消息队列(实时消费+ack机制)【转】

    用redis实现消息队列(实时消费+ack机制) java queue 消息队列 redis 消息队列 首先做简单的引入. MQ主要是用来: 解耦应用. 异步化消息 流量削峰填谷 目前使用的较多的有A ...

  6. 【BCVP】实现基于 Redis 的消息队列

    聆听自己的声音 如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯. 话说上回书我们说到了,Redis的使用修改<[BCVP更新]Sta ...

  7. 程序员过关斩将--redis做消息队列,香吗?

    菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的redis做的MQ,很 ...

  8. c#进阶(4)—— Redis 用于消息队列的存储

    1.参考的博文 a : http://www.cnblogs.com/lori/archive/2012/04/12/2443708.html -- 主要的实现思路 b:  http://www.cn ...

  9. Redis异步消息队列

    一.异步消息队列介绍 个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦.所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列.同时由于使用了消 ...

最新文章

  1. ubuntu9.10上安装jdk5
  2. JSON数据从OSS迁移到MaxCompute最佳实践
  3. HMM——前向后向算法
  4. 创办私营企业必做的16件事
  5. Python Debug调试技巧
  6. 机器学习——LightGBM
  7. EditPlus注册码 亲测最新版可用
  8. Lua,Lua API,配置文件
  9. 云服务器Tomcat版本升级(Tomcat6升级至Tomcat7和Tomcat8)问题总结
  10. 新版电力系统决策支持系统开发告一段落
  11. 静态路由原理及配置(8)
  12. 【生活感想】期末考试
  13. 中职生计算机系自我推荐作文,中职生自我鉴定
  14. 【DBeaver】安装与使用教程
  15. 3t硬盘哪个服务器系统识别,3T硬盘为何不能被系统完全识别
  16. MEMS传感器的6大种类简介-传感器专题
  17. 区块链凭什么改变这个世界?从它的工作原理谈起
  18. GJM : 各大开发游戏引擎
  19. java发送邮件报错554_java发送邮件报错554
  20. GB28181对接摄像机/NVR视频流

热门文章

  1. Spring和Struts2整合
  2. 最大熵学习笔记(六)优缺点分析
  3. 【转】托管函数的挂钩(完美版)
  4. 给定一个年份,判断这一年是不是闰年。
  5. css不常用,不常用的 CSS
  6. ptp精准时间协议_PTP协议时间同步精度测试
  7. Java黑皮书课后题第10章:*10.1(Time类)设计一个名为Time的类。编写一个测试程序,创建两个Time对象(使用new Time()和new Time(555550000))
  8. Java黑皮书课后题第9章:9.7(Account类)设计一个名为Account的类,它包含……。编写一个测试程序,创建一个账户ID为1122、余额为20000美元、年利率为4.5%的Account…
  9. 网页html好学嘛,javascript好学么?
  10. java数据分析平台源码_DataGear数据可视化分析平台 v2.0.0