用redis实现消息队列(实时消费+ack机制)

java queue 消息队列 redis

消息队列
首先做简单的引入。

MQ主要是用来:

解耦应用、

异步化消息

流量削峰填谷

目前使用的较多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

网上的资源对各种情况都有详细的解释,在此不做过多赘述。

本文仅介绍如何使用Redis实现轻量级MQ的过程。

为什么要用Redis实现轻量级MQ?
在业务的实现过程中,就算没有大量的流量,解耦和异步化几乎也是处处可用,此时MQ就显得尤为重要。但与此同时MQ也是一个蛮重的组件,例如我们如果用RabbitMQ就必须为它搭建一个服务器,同时如果要考虑可用性,就要为服务端建立一个集群,而且在生产如果有问题也需要查找功能。在中小型业务的开发过程中,可能业务的其他整个实现都没这个重。过重的组件服务会成倍增加工作量。
所幸的是,Redis提供的list数据结构非常适合做消息队列。
但是如何实现即时消费?如何实现ack机制?这些是实现的关键所在。

如何实现即时消费?
网上所流传的方法是使用Redis中list的操作BLPOP或BRPOP,即列表的阻塞式(blocking)弹出。
让我们来看看阻塞式弹出的使用方式:

BRPOP key [key …] timeout
此命令的说明是:

1、当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。

2、当给定多个key参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。
另外,BRPOP 除了弹出元素的位置和 BLPOP 不同之外,其他表现一致。

以此来看,列表的阻塞式弹出有两个特点:

1、如果list中没有任务的时候,该连接将会被阻塞

2、连接的阻塞有一个超时时间,当超时时间设置为0时,即可无限等待,直到弹出消息
由此看来,此方式是可行的,但此为传统的观察者模式,业务简单则可使用,如A的任务只由B去执行。但如果A和Z的任务,B和C都能执行,那使用这种方式就相形见肘。这个时候就应该使用订阅/发布模式,使业务系统更加清晰。
好在Redis也支持Pub/Sub(发布/订阅)。在消息A入队list的同时发布(PUBLISH)消息B到频道channel,此时已经订阅channel的worker就接收到了消息B,知道了list中有消息A进入,即可循环lpop或rpop来消费list中的消息。流程如下:

其中的worker可以是单独的线程,也可以是独立的服务,其充当了Consumer和业务处理者角色。下面做实例说明。

即时消费实例
示例场景为:worker要做同步文件功能,等到有文件生成时立马同步。

首先开启一个线程代表worker,来订阅频道channel:

@Service
public class SubscribeService {

@Resource
private RedisService redisService;
@Resource
private SynListener synListener;//订阅者@PostConstruct
public void subscribe() {new Thread(new Runnable() {@Overridepublic void run() {LogCvt.info("服务已订阅频道:{}", channel);redisService.subscribe(synListener, channel);}}).start();}

}
代码中的SynListener即为所声明的订阅者,channel为订阅的频道名称,具体的订阅逻辑如下:

@Service
public class SynListener extends JedisPubSub {

@Resource
private DispatchMessageHandler dispatchMessageHandler;@Override
public void onMessage(String channel, String message) {LogCvt.info("channel:{},receives message:{}",channel,message);try {//处理业务(同步文件)dispatchMessageHandler.synFile();} catch (Exception e) {LogCvt.error(e.getMessage(),e);}
}

}
处理业务的时候,就去list中去消费消息:

@Service
public class DispatchMessageHandler {

@Resource
private RedisService redisService;
@Resource
private MessageHandler messageHandler;public void synFile(){while(true){try {String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key());if (null == message){break;}Thread.currentThread().setName(Tools.uuid());// 队列数据处理messageHandler.synfile(message);} catch (Exception e) {LogCvt.error(e.getMessage(),e);}}
}

}
这样我们就达到了消息的实时消费的目的。

如何实现ack机制?
ack,即消息确认机制(Acknowledge)。

首先来看RabbitMQ的ack机制:

Publisher把消息通知给Consumer,如果Consumer已处理完任务,那么它将向Broker发送ACK消息,告知某条消息已被成功处理,可以从队列中移除。如果Consumer没有发送回ACK消息,那么Broker会认为消息处理失败,会将此消息及后续消息分发给其他Consumer进行处理(redeliver flag置为true)。
这种确认机制和TCP/IP协议确立连接类似。不同的是,TCP/IP确立连接需要经过三次握手,而RabbitMQ只需要一次ACK。
值的注意的是,RabbitMQ当且仅当检测到ACK消息未发出且Consumer的连接终止时才会将消息重新分发给其他Consumer,因此不需要担心消息处理时间过长而被重新分发的情况。
那么在我们用Redis实现消息队列的ack机制的时候该怎么做呢?
需要注意两点:

work处理失败后,要回滚消息到原始pending队列
假如worker挂掉,也要回滚消息到原始pending队列
上面第一点可以在业务中完成,即失败后执行回滚消息。

实现方案
(该方案主要解决worker挂掉的情况)

维护两个队列:pending队列和doing表(hash表)。
workers定义为ThreadPool。
由pending队列出队后,workers分配一个线程(单个worker)去处理消息——给目标消息append一个当前时间戳和当前线程名称,将其写入doing表,然后该worker去消费消息,完成后自行在doing表擦除信息。
启用一个定时任务,每隔一段时间去扫描doing队列,检查每隔元素的时间戳,如果超时,则由worker的ThreadPoolExecutor去检查线程是否存在,如果存在则取消当前任务执行,并把事务rollback。最后把该任务从doing队列中pop出,再重新push进pending队列。
在worker的某线程中,如果处理业务失败,则主动回滚,并把任务从doing队列中移除,重新push进pending队列。
总结
Redis作为消息队列是有很大局限性的。因为其主要特性及用途决定它只能实现轻量级的消息队列。写在最后:没有绝对好的技术,只有对业务最友好的技术,谨此献给所有developer。

转自https://segmentfault.com/a/1190000012244418#articleHeader0

用redis实现消息队列(实时消费+ack机制)【转】相关推荐

  1. 用redis解决消息队列重复消费问题

    重复消费问题: 为了解决消费端因为种种原因而造成的消息丢失问题,我们都知道根源在于因为RabbitMQ的自动ack机制,所以为了避免以上问题,我们会选中手动ack,以确保消息不会因为某些原因而丢失. ...

  2. redis list 实现消息队列 多线程消费

    redis list 实现消息队列 多线程消费 redis list 实现消息队列 多线程消费 redis list 实现消息队列 多线程消费 利用redis list 命令: Redis Brpop ...

  3. Python Redis Stream 消息队列 消费组

    项目有用到消息队列来消费不断新增的任务,本来看到Redis有Pub Sub就没准备用kafka了,后来看了下Redis 5.0新加的Stream,感觉刚好符合项目要求,看下文档就直接用上了,类似一个简 ...

  4. 使用redis做消息队列mq的总结

    总结 目前使用redis做消息队列的的方式有3中,list,      publish/subscribe,       stream list做mq的总结 使用方法 1. 生产者可以 lpush 写 ...

  5. redis实现消息队列的几种方式及其优劣

    概述 常用的消息队列有,rabbitMq.kafka.RocketMq.ActiveMq等.这些消息队列需要独立安装部署,作为一个中间件来提供服务,虽然有着高性能.高可靠的优点,但是额外部署这些中间件 ...

  6. Redis做消息队列,香吗?

    来自:架构师修行之路 菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的 ...

  7. 使用Redis 实现消息队列

    一 .为什么要用Redis实现轻量级MQ? MQ的主要作用: 应用解耦 异步化消息 流量削峰填谷 目前使用比较多的是ActiveMQ . RabbitMQ . ZeroMQ . Kafka . Met ...

  8. 【BCVP】实现基于 Redis 的消息队列

    聆听自己的声音 如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯. 话说上回书我们说到了,Redis的使用修改<[BCVP更新]Sta ...

  9. 程序员过关斩将--redis做消息队列,香吗?

    菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的redis做的MQ,很 ...

最新文章

  1. 《研磨设计模式》chap24 桥接模式bridge(1)基本概念
  2. Cocoa的MVC架构分析 cocoa的mvc实现
  3. tpshop防止sql注入补丁
  4. 75-商品服务-品牌分类关联与级联更新
  5. ASP.NET Core 2.0身份和角色管理入门
  6. Java基础-面向接口(interface)编程
  7. 网页嵌套:一个html嵌套到另一个html中
  8. ajax更换内容执行函数,在ajax成功调用另一个ajax函数
  9. LitePal的简单使用
  10. 编程福利:50本C语言电子书,你还怕没书看吗!
  11. 《广义动量定理与系统思考——战争、…
  12. 计算机信息检索 02139
  13. 搭建frida+木木模拟器运行环境
  14. CHIP-seq流程学习笔记(9)-使用IDR 软件对生物学重复样本间的差异peak进行提取
  15. android 全景拼接软件,DetuStitch(全景拼接软件)
  16. OPPO A96和oppo Reno 7 哪个好
  17. 第一章:costmap_2d代价地图生成原理
  18. Dubbo-05 20190317
  19. SQL提高查询效率知识拾忆
  20. 高人指路!如何借力国家标准让企业项目管理体制建设更容易?

热门文章

  1. python 重载id函数_Python函数重载实例
  2. 查看显卡显存_3d渲染需要多大显存比较合适?显存在渲染中的作用
  3. java arraylist线程安全_ArrayList升级为线程安全的List
  4. span标签style的优先级_css样式引入优先级?
  5. 华为vrrp默认优先级_华为的VRRP怎么配置
  6. 二十三、Python数据建模(上),禁止转载
  7. Python爬虫加密
  8. 三、IntellijIDEA开发工具,学习Java好利器
  9. 七、股票中的布朗运动和pandas.dataframe.pct_change()
  10. SVM支持向量机(下)