为什么需要消息队列

系统中引入消息队列机制是对系统一个非常大的改善。例如一个web系统中,用户做了某项操作后需要发送邮件通知到用户邮箱中。你可以使用同步方式让用户等待邮件发送完成后反馈给用户,但是这样可能会因为网络的不确定性造成用户长时间的等待从而影响用户体验。

有些场景下是不可能使用同步方式等待完成的,那些需要后台花费大量时间的操作。例如极端例子,一个在线编译系统任务,后台编译完成需要30分钟。这种场景的设计不可能同步等待后在回馈,必须是先反馈用户随后异步处理完成,再等待处理完成后根据情况再此反馈用户与否。

另外适用消息队列的情况是那些系统处理能力有限的情况下,先使用队列机制把任务暂时存放起来,系统再一个个轮流处理掉排队的任务。这样在系统吞吐量不足的情况下也能稳定的处理掉高并发的任务。

消息队列可以用来做排队机制,只要系统需要用到排队机制的地方就可以使用消息队列来作。

rabbitmq的优先级做法

目前成熟的消息队列产品有很多,著名的例如rabbitmq。它使用起来相对还是比较简单的,功能也相对比较丰富,一般场合下是完全够用的。但是有个很烦人的就是它不支持优先级。

例如一个发邮件的任务,某些特权用户希望它的邮件能够更加及时的发送出去,至少比普通用户要优先对待。默认情况下rabbitmq是无法处理掉 的,扔给rabbitmq的任务都是FIFO先进先出。但是我们可以使用一些变通的技巧来支持这些优先级。创建多个队列,并为rabbitmq的消费者设 置相应的路由规则。

例如默认情况下有这样一个队列,我们拿list来模拟 [task1, task2, task3],消费者轮流按照FIFO的原则一个个拿出task来处理掉。如果有高优先级的任务进来,它也只能跟在最后被处理[task1, task2, task3, higitask1]. 但是如果使用两个队列,一个高优先级队列,一个普通优先级队列。 普通优先级[task1, task2, task3], 高优先级[hightask1 ] 然后我们设置消费者的路由让消费者随机从任意队列中取数据即可。

并且我们可以定义一个专门处理高优先级队列的消费者,它空闲的时候也不处理低优先级队列的数据。这类似银行的VIP柜台,普通客户在银行取号排队,一个VIP来了他虽然没有从取号机里拿出一个排在普通会员前面的票,但是他还是可以更快地直接走VIP通道。

使用rabbitmq来做支持优先级的消息队列的话,就像是上面所述同银行VIP会员一样,走不同的通道。但是这种方式只是相对的优先级,做不 到绝对的优先级控制,例如我希望某一个优先级高的任务在绝对意义上要比其他普通任务优先处理掉,这样上面的方案是行不通的。因为rabbitmq的消费者 只知道再自己空闲的情况下从自己关心的队列中“随机”取某一个队列里面的第一个数据来处理,它没法控制优先取找哪一个队列。或者更加细粒度的优先级控制。 或者你系统里面设置的优先级有10多种。这样使用rabbitmq也是很难实现的。

但是如果使用redis来做队列的话上面的需求都可以实现。

使用redis怎么做消息队列

首先redis它的设计是用来做缓存的,但是由于它自身的某种特性使得他可以用来做消息队列。它有几个阻塞式的API可以使用,正是这些阻塞式的API让他有做消息队列的能力。

试想一下在”数据库解决所有问题“的思路下,不使用消息队列也是可以完成你的需求的。我们把任务全部存放在数据库然后通过不断的轮询方式来取任 务处理。这种做法虽然可以完成你的任务但是做法很粗劣。但是如果你的数据库接口提供一个阻塞的方法那么就可以避免轮询操作了,你的数据库也可以用来做消息 队列,只不过目前的数据库还没有这样的接口。

另外做消息队列的其他特性例如FIFO也很容易实现,只需要一个List对象从头取数据,从尾部塞数据即可实现。

redis能做消息队列得益于他list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口。他们都是阻塞版的,所以可以用来做消息队列。

redis消息队列优先级的实现

一些基础redis基础知识的说明

redis> blpop tasklist 0

"im task 01"

这个例子使用blpop命令会阻塞方式地从tasklist列表中取头一个数据,最后一个参数就是等待超时的时间。如果设置为0则表示无限等 待。另外redis存放的数据都只能是string类型,所以在任务传递的时候只能是传递字符串。我们只需要简单的将负责数据序列化成json格式的字符 串,然后消费者那边再转换一下即可。

这里我们的示例语言使用python,链接redis的库使用redis-py. 如果你有些编程基础把它切换成自己喜欢的语言应该是没问题的。

1.简单的FIFO队列

复制代码

import redis, time

def handle(task):

print task

time.sleep(4)

def main():

pool = redis.ConnectionPool(host='localhost', port=6379, db=0)

r = redis.Redis(connection_pool=pool)

while 1:

result = r.brpop('tasklist', 0)

handle(result[1])

if __name__ == "__main__":

main()

复制代码

上例子即使一个最简单的消费者,我们通过一个无限循环不断地从redis的队列中取数据。如果队列中没有数据则没有超时的阻塞在那里,有数据则取出往下执行。

一般情况取出来是个复杂的字符串,我们可能需要将其格式化后作为再传给处理函数,但是为了简单我们的例子就是一个普通字符串。另外例子中的处理函数不做任何处理,仅仅sleep 用来模拟耗时的操作。

我们另开一个redis的客户端来模拟生产者,自带的客户端就可以。多往tasklist 队列里面塞上一些数据。

redis> lpush tasklist 'im task 01'

redis> lpush tasklist 'im task 02'

redis> lpush tasklist 'im task 03'

redis> lpush tasklist 'im task 04'

redis> lpush tasklist 'im task 05'

随后在消费者端便会看到这些模拟出来的任务被挨个消费掉。

2.简单优先级的队列

假设一种简单的需求,只需要高优先级的比低优先级的任务率先处理掉。其他任务之间的顺序一概不管,这种我们只需要在在遇到高优先级任务的时候将它塞到队列的前头,而不是push到最后面即可。

因为我们的队列是使用的redis的 list,所以很容易实现。遇到高优先级的使用rpush 遇到低优先级的使用lpush

redis> lpush tasklist 'im task 01'

redis> lpush tasklist 'im task 02'

redis> rpush tasklist 'im high task 01'

redis> rpush tasklist 'im high task 01'

redis> lpush tasklist 'im task 03'

redis> rpush tasklist 'im high task 03'

随后会看到,高优先级的总是比低优先级的率先执行。但是这个方案的缺点是高优先级的任务之间的执行顺序是先进后出的。

3.较为完善的队列

例子2中只是简单的将高优先级的任务塞到队列最前面,低优先级的塞到最后面。这样保证不了高优先级任务之间的顺序。

假设当所有的任务都是高优先级的话,那么他们的执行顺序将是相反的。这样明显违背了队列的FIFO原则。

不过只要稍加改进就可以完善我们的队列。

跟使用rabbitmq一样,我们设置两个队列,一个高优先级一个低优先级的队列。高优先级任务放到高队列中,低的放在低优先队列中。redis和rabbitmq不同的是它可以要求队列消费者从哪个队列里面先读。

def main():

pool = redis.ConnectionPool(host='localhost', port=6379, db=0)

r = redis.Redis(connection_pool=pool)

while 1:

result = r.brpop(['high_task_queue', 'low_task_queue'], 0)

handle(result[1])

上面的代码,会阻塞地从'high_task_queue', 'low_task_queue'这两个队列里面取数据,如果第一个没有再从第二个里面取。

所以只需要将队列消费者做这样的改进便可以达到目的。

复制代码

redis> lpush low_task_queue low001

redis> lpush low_task_queue low002

redis> lpush low_task_queue low003

redis> lpush low_task_queue low004

redis> lpush high_task_queue low001

redis> lpush high_task_queue low002

redis> lpush high_task_queue low003

redis> lpush high_task_queue low004

复制代码

通过上面的测试看到,高优先级的会被率先执行,并且高优先级之间也是保证了FIFO的原则。

这种方案我们可以支持不同阶段的优先级队列,例如高中低三个级别或者更多的级别都可以。

4.优先级级别很多的情况

假设有个这样的需求,优先级不是简单的高中低或者0-10这些固定的级别。而是类似0-99999这么多级别。那么我们第三种方案将不太合适了。

虽然redis有sorted set这样的可以排序的数据类型,看是很可惜它没有阻塞版的接口。于是我们还是只能使用list类型通过其他方式来完成目的。

有个简单的做法我们可以只设置一个队列,并保证它是按照优先级排序号的。然后通过二分查找法查找一个任务合适的位置,并通过 lset 命令插入到相应的位置。

例如队列里面包含着写优先级的任务[1, 3, 6, 8, 9, 14],当有个优先级为7的任务过来,我们通过自己的二分算法一个个从队列里面取数据出来反和目标数据比对,计算出相应的位置然后插入到指定地点即可。

因为二分查找是比较快的,并且redis本身也都在内存中,理论上速度是可以保证的。但是如果说数据量确实很大的话我们也可以通过一些方式来调优。

回想我们第三种方案,把第三种方案结合起来就会很大程度上减少开销。例如数据量十万的队列,它们的优先级也是随机0-十万的区间。我们可以设置 10个或者100个不同的队列,0-一万的优先级任务投放到1号队列,一万-二万的任务投放到2号队列。这样将一个队列按不同等级拆分后它单个队列的数据 就减少许多,这样二分查找匹配的效率也会高一点。但是数据所占的资源基本是不变的,十万数据该占多少内存还是多少。只是系统里面多了一些队列而已。

redis实现轮询算法_用redis实现支持优先级的消息队列相关推荐

  1. redis实现轮询算法_基于zookeeper或redis实现分布式锁

    前言 在分布式系统中,分布式锁是为了解决多实例之间的同步问题.例如master选举,能够获取分布式锁的就是master,获取失败的就是slave.又或者能够获取锁的实例能够完成特定的操作. 目前比较常 ...

  2. redis实现轮询算法_【07期】Redis中是如何实现分布式锁的?

    点击上方"Java面试题精选",关注公众号 面试刷图,查缺补漏 分布式锁常见的三种实现方式: 数据库乐观锁: 基于Redis的分布式锁: 基于ZooKeeper的分布式锁. 本地面 ...

  3. redis实现轮询算法_白话分布式系统中的一致性哈希算法

    本文首发于:白话分布式系统中的一致性哈希算法 微信公众号:后端技术指南针 持续输出干货 欢迎关注! 通过本文将了解到以下内容:分布式系统的概念和作用 分布式系统常用负责均衡策略 普通哈希取模策略优缺点 ...

  4. redis实现轮询算法_Dcron:基于redis与一致性哈希算法的分布式定时任务库

    背景 最近项目中的定时任务越来越多,为了防止任务重复执行曾经使用过的方案: 只启用了一个节点. 固定循环间隔,使用分布式事务锁. 部署一套分布式任务调度系统. 方案一 没有容错机制,当单个节点宕机,所 ...

  5. java轮训算法_负载均衡轮询算法实现疑问

    import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Set; /* ...

  6. 微信红包业务,为什么采用轮询算法?

    目录 前言 基本的负载算法 平滑加权轮询算法 一致性哈希算法 最小活跃数算法 最优响应算法 总结 前言 负载均衡这个概念,几乎在所有支持高可用的技术栈中都存在,例如微服务.分库分表.各大中间件(MQ. ...

  7. 微信红包业务,为什么采用轮询算法?(荣耀典藏版)

    目录 前言 1.基本的负载算法 1.1.轮询算法 1.2.随机算法 1.3.权重算法 2.平滑加权轮询算法 3.一致性哈希算法 3.1.通过其他分发算法实现缓存 3.2.致性哈希核心-哈希环 3.3. ...

  8. 负载均衡轮询算法和服务器性能,SpringCloud-Ribbon负载均衡机制、手写轮询算法

    Ribbon 内置的负载均衡规则 在 com.netflix.loadbalancer 包下有一个接口 IRule,它可以根据特定的算法从服务列表中选取一个要访问的服务,默认使用的是「轮询机制」 Ro ...

  9. 负载均衡轮询算法和服务器性能,负载均衡算法

    对于要实现高性能集群,选择好负载均衡器很重要,同时针对不同的业务场景选择合适的负载均衡算法也是非常重要的. 一.负载均衡算法分类 任务平分类 负载均衡系统将收到的任务平均分配给服务器进行处理,这里的& ...

最新文章

  1. Mysql字符串字段判断是否包含某个字符串的3种方法
  2. 剑指 Offer 52. 两个链表的第一个公共节点(C语言)
  3. SMACH专题(一)----安装与初探
  4. mysql根据经纬度搜周边_mysql根据经纬度获取附近的商家
  5. mysql判断表存在的sql语句_SQL 语句判断已知表是否存在_MySQL
  6. GPU Shader 程序调试方法
  7. hotspot线程模型_Linux上的HotSpot GC线程CPU占用空间
  8. Ubuntu14.04 Apollo 3.5安装
  9. 每人都有两大炸弹的扎金花2012
  10. ElasticSearch 最全详细使用教程
  11. 消费分期群体-在校大学生和职场白领
  12. 基于MUI框架的影视播放APP的设计与实现毕业设计论文参考【原查重5.1%】
  13. 微信小程序 #项目笔记# | 从0到1实现婚礼邀请函小程序
  14. adf4351 锁相环相关硬件设计
  15. eventFilter能进入dragEnter但没有event::drog
  16. 2022年中国大学排行榜出炉~
  17. 读 Robert C. Solomon 之《哲学导论》
  18. WPF中应用toolkit Chart控件安装
  19. 【前端面试指南】简历上的前端常用单词,你拼写对了吗?
  20. DS SIMULIA Antenna Magus Professional 2021.5

热门文章

  1. python HTTP后台响应服务
  2. OGG维护优化脚本(一)-需求分析篇
  3. 高性能HTTP加速器varnish实践
  4. leetcode day4
  5. UI高级----Images.xcassets
  6. 分享一个针对触摸设备优化的图片幻灯jQuery插件 - touchtouch
  7. Oracle导表语句
  8. select子句顺序
  9. 两道统计题(两次检测呈阳性,连续抛硬币)
  10. 实验4 [BX]和loop指令