1. 概述

gmq是基于redis提供的特性,使用go语言开发的一个简单易用的队列;关于redis使用特性可以参考之前本人写过一篇很简陋的文章Redis 实现队列;

gmq的灵感和设计是基于有赞延迟队列设计,文章内容清晰而且很好理解,但是没有提供源码,在文章的最后也提到了一些未来架构方向; gmq不是简单按照有赞延迟队列的设计实现功能,在它的基础上,做了一些修改和优化,主要如下:

功能上

多种任务模式,不单单只是延迟队列;例如:延迟队列,普通队列,优先级队列

架构上:

添加job由dispatcher调度分配各个bucket,而不是由timer

每个bucket维护一个timer,而不是所有bucket一个timer

timer每次扫描bucket到期job时,会一次性返回多个到期job,而不是每次只返回一个job

timer的扫描时钟由bucket中下个job到期时间决定,而不是每秒扫描一次

2. 应用场景

延迟任务

延迟任务,例如用户下订单一直处于未支付状态,半个小时候自动关闭订单

异步任务

异步任务,一般用于耗时操作,例如群发邮件等批量操作

超时任务

规定时间内(TTR)没有执行完毕或程序被意外中断,则消息重新回到队列再次被消费,一般用于数据比较敏感,不容丢失的

优先级任务

当多个任务同时产生时,按照任务设定等级优先被消费,例如a,b两种类型的job,优先消费a,然后再消费b

3. gmq原理

3.1 核心概念

dispatcher任务调度器,负责将job分配到bucket或直接推送到ready queue

bucket任务桶,用于存放延迟任务;每个bucket会维护一个timer定时器,然后将到期的job推送到ready queue

ready queue存放已准备好的job,等待被consumer消费

3.2 延迟时间delay

当job.delay>0时,job会被分配到bucket中,bucket会有周期性扫描到期job,如果到期,job会被bucket移到ready queue,等待被消费

当job.delay=0时,job会直接加到ready queue,等待被消费

3.3 执行超时时间TTR

参考第一个图的流程,当job被消费者读取后,如果job.TTR>0,即job设置了执行超时时间,那么job会在读取后会被添加到TTRBucket(专门存放设置了超时时间的job),并且设置job.delay = job.TTR,如果在TTR时间内没有得到消费者ack确认然后删除job,job将在TTR时间之后添加到ready queue,然后再次被消费(如果消费者在TTR时间之后才请求ack,会得到失败的响应)

3.3 确认机制

主要和TTR的设置有关系,确认机制可以分为两种:

当job.TTR=0时,消费者pop出job时,即会自动删除job pool中的job元数据

当job.TTR>0时,即job执行超时时间,这个时间是指用户pop出job时开始到用户ack确认删除结束这段时间,如果在这段时间没有ACK,job会被再次加入到ready queue,然后再次被消费,只有用户调用了ACK,才会去删除job pool中job元数据

4. 安装

4.1 源码运行

配置文件位于gmq/conf.ini,可以根据自己项目需求修改配置

go get -v -u github.com/wuzhc/gmq

cd $GOPATH/src/github.com/wuzhc/gmq

go run main.go

5. 客户端

运行

# php

# 生产者

php producer.php

# 消费者

php consumer.php

# python

# 生产者

python producer.py

# 消费者

python consumer.py

一条消息结构

{

"id": "xxxx", # 任务id,这个必须是一个唯一值,将作为redis的缓存键

"topic": "xxx", # topic是一组job的分类名,消费者将订阅topic来消费该分类下的job

"body": "xxx", # 消息内容

"delay": "111", # 延迟时间,单位秒

"TTR": "11111", # 执行超时时间,单位秒

"status": 1, # job执行状态,该字段由gmq生成

"consumeNum":1, # 被消费的次数,主要记录TTR>0时,被重复消费的次数,该字段由gmq生成

}

延迟任务

$data = [

'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),

'topic' => ["topic_xxx"],

'body' => 'this is a rpc test',

'delay' => '1800', // 单位秒,半个小时后执行

'TTR' => '0'

];

超时任务

$data = [

'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),

'topic' => ["topic_xxx"],

'body' => 'this is a rpc test',

'delay' => '0',

'TTR' => '100' // 100秒后还未得到消费者ack确认,则再次添加到队列,将再次被被消费

];

异步任务

$data = [

'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),

'topic' => ["topic_xxx"],

'body' => 'this is a rpc test',

'delay' => '0',

'TTR' => '0'

];

优先级任务

$data = [

'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),

'topic' => ["topic_A","topic_B","topic_C"], //优先消费topic_A,消费完后消费topic_B,最后再消费topic_C

'body' => 'this is a rpc test',

'delay' => '0',

'TTR' => '0'

];

6. web监控

gmq提供了一个简单web监控平台(后期会提供根据job.Id追踪消息的功能),方便查看当前堆积任务数,默认监听端口为8000,例如:http://127.0.0.1:8000, 界面如下:

7. 遇到问题

以下是开发遇到的问题,以及一些粗糙的解决方案

7.1 安全退出

如果强行中止gmq的运行,可能会导致一些数据丢失,例如下面一个例子:

如果发生上面的情况,就会出现job不在bucket中,也不在ready queue,这就出现了job丢失的情况,而且将没有任何机会去删除job pool中已丢失的job,长久之后job pool可能会堆积很多的已丢失job的元数据;所以安全退出需要在接收到退出信号时,应该等待所有goroutine处理完手中的事情,然后再退出

7.1.1 gmq退出流程

首先gmq通过context传递关闭信号给dispatcher,dispatcher接收到信号会关闭dispatcher.closed,每个bucket会收到close信号,然后先退出timer检索,再退出bucket,dispatcher等待所有bucket退出后,然后退出

dispatcher退出顺序流程: timer -> bucket -> dispatcher

7.1.2 注意

不要使用kill -9 pid来强制杀死进程,因为系统无法捕获SIGKILL信号,导致gmq可能执行到一半就被强制中止,应该使用kill -15 pid,kill -1 pid或kill -2 pid,各个数字对应信号如下:

9 对应SIGKILL

15 对应SIGTERM

1 对应SIGHUP

2 对应SIGINT

7.2 智能定时器

每一个bucket都会维护一个timer,不同于有赞设计,timer不是每秒轮询一次,而是根据bucket下一个job到期时间来设置timer的定时时间 ,这样的目的在于如果bucket没有job或job到期时间要很久才会发生,就可以减少不必要的轮询;

timer只有处理完一次业务后才会重置定时器;,这样的目的在于可能出现上一个时间周期还没执行完毕,下一个定时事件又发生了

如果到期的时间很相近,timer就会频繁重置定时器时间,就目前使用来说,还没出现什么性能上的问题

7.3 原子性问题

我们知道redis的命令是排队执行,在一个复杂的业务中可能会多次执行redis命令,如果在大并发的场景下,这个业务有可能中间插入了其他业务的命令,导致出现各种各样的问题;

redis保证整个事务原子性和一致性问题一般用multi/exec或lua脚本,gmq在操作涉及复杂业务时使用的是lua脚本,因为lua脚本除了有multi/exec的功能外,还有Pipepining功能(主要打包命令,减少和redis server通信次数),下面是一个gmq定时器扫描bucket集合到期job的lua脚本:

-- 获取到期的50个job

local jobIds = redis.call('zrangebyscore',KEYS[1], 0, ARGV[4], 'withscores', 'limit', 0, 50)

local res = {}

for k,jobId in ipairs(jobIds) do

if k%2~=0 then

local jobKey = string.format('%s:%s', ARGV[3], jobId)

local status = redis.call('hget', jobKey, 'status')

-- 检验job状态

if tonumber(status) == tonumber(ARGV[1]) or tonumber(status) == tonumber(ARGV[2]) then

-- 先移除集合中到期的job,然后到期的job返回给timer

local isDel = redis.call('zrem', KEYS[1], jobId)

if isDel == 1 then

table.insert(res, jobId)

end

end

end

end

local nextTime

-- 计算下一个job执行时间,用于设置timer下一个时钟周期

local nextJob = redis.call('zrange', KEYS[1], 0, 0, 'withscores')

if next(nextJob) == nil then

nextTime = -1

else

nextTime = tonumber(nextJob[2]) - tonumber(ARGV[4])

if nextTime < 0 then

nextTime = 1

end

end

table.insert(res,1,nextTime)

return res

7.4 redis连接池

可能一般phper写业务很少会接触到连接池,其实这是由php本身所决定他应用不大,当然在php的扩展swoole还是很有用处的

gmq的redis连接池是使用gomodule/redigo/redis自带连接池,它带来的好处是限制redis连接数,通过复用redis连接来减少开销,另外可以防止tcp被消耗完,这在生产者大量生成数据时会很有用

// gmq/mq/redis.go

Redis = &RedisDB{

Pool: &redis.Pool{

MaxIdle: 30, // 最大空闲链接

MaxActive: 10000, // 最大链接

IdleTimeout: 240 * time.Second, // 空闲链接超时

Wait: true, // 当连接池耗尽时,是否阻塞等待

Dial: func() (redis.Conn, error) {

c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword(""))

if err != nil {

return nil, err

}

return c, nil

},

TestOnBorrow: func(c redis.Conn, t time.Time) error {

if time.Since(t) < time.Minute {

return nil

}

_, err := c.Do("PING")

return err

},

},

}

8. 注意问题

job.id在job pool是唯一的,它将作为redis的缓存键;gmq不自动为job生成唯一id值是为了用户可以根据自己生成的job.id来追踪job情况,如果job.id是重复的,push时会报重复id的错误

bucket数量不是越多越好,一般来说,添加到ready queue的速度取决与redis性能,而不是bucket数量

9. 使用中可能出现的问题

9.1 客户端出现大量的TIME_WAIT状态,并且新的连接被拒绝

netstat -anp | grep9503 | wc -l

tcp 0 0 10.8.8.188:41482 10.8.8.185:9503 TIME_WAIT -

这个是正常现象,由tcp四次挥手可以知道,当接收到LAST_ACK发出的FIN后会处于TIME_WAIT状态,主动关闭方(客户端)为了确保被动关闭方(服务端)收到ACK,会等待2MSL时间,这个时间是为了再次发送ACK,例如被动关闭方可能因为接收不到ACK而重传FIN;另外也是为了旧数据过期,不影响到下一个链接,; 如果要避免大量TIME_WAIT的连接导致tcp被耗尽;一般方法如下:

使用长连接

配置文件,网上很多教程,就是让系统尽快的回收TIME_WAIT状态的连接

使用连接池,当连接池耗尽时,阻塞等待,直到回收再利用

10. 相关链接

11. 未来计划

支持安全传输层协议(TLS)

除了json外,可支持protobuf序列化

web监控工具提供消息追踪功能

增加分布式部署方案

增加数据统计收集器

可持久化到磁盘

支持http协议

增加调试和分析 pprof

开发分支

python 消息队列 go_gmq: gmq是基于redis提供的特性,使用go语言开发的一个简单易用的消息队列;支持延迟任务,异步任务,超时任务,优先级任务...相关推荐

  1. 基于ForkJoin构建一个简单易用的并发组件

    2019独角兽企业重金招聘Python工程师标准>>> 基于ForkJoin构建一个简单易用的并发组件 在实际的业务开发中,需要用到并发编程的知识,实际使用线程池来异步执行任务的场景 ...

  2. 基于搜狐云景的java语言开发技巧

    基于搜狐云景的java语言开发技巧 坊间盛传国内最牛X的paas公测了,上网搜了下,是搜狐云景,我试用了下,以下是我对搜狐云景的初体验,和辛苦耕耘的码农分享.在云景上开发java项目,和你现在的工作没 ...

  3. python小项目实例流程-Python小项目:快速开发出一个简单的学生管理系统

    原标题:Python小项目:快速开发出一个简单的学生管理系统 本文根据实际项目中的一部分api 设计抽象出来,实例化成一个简单小例子,暂且叫作「学生管理系统」. 这个系统主要完成下面增删改查的功能: ...

  4. python小项目案例-Python小项目:快速开发出一个简单的学生管理系统

    本文根据实际项目中的一部分api 设计抽象出来,实例化成一个简单小例子,暂且叫作「学生管理系统」. 这个系统主要完成下面增删改查的功能: 包括: 学校信息的管理 教师信息的管理 学生信息的管理 根据A ...

  5. python项目开发实例-Python小项目:快速开发出一个简单的学生管理系统

    本文根据实际项目中的一部分api 设计抽象出来,实例化成一个简单小例子,暂且叫作「学生管理系统」. 这个系统主要完成下面增删改查的功能: 包括: 学校信息的管理 教师信息的管理 学生信息的管理 根据A ...

  6. (超多图)基于Android studio开发的一个简单入门小应用(超级详细!!)(建议收藏)

    基于Android studio开发的一个简单入门小应用 一.前言 二.前期准备 三.开发一个小应用 五.运行应用 一.前言 在暑假期间,我学习JAVA基础,为了能早日实现自己用代码写出一个app的& ...

  7. 一个轻便易用的消息队列

    PS:工作的时候遇到一个需要更新几千万关键词的热度等属性的功能,最开始使用普通的for循环,但是这个方法明显的不理想,后来想到使用消息队列,因为之前从来没有使用过消息队列,所以临时网上去找一些常用的消 ...

  8. go语言打印日期_基于 Go 语言开发在线论坛(八):消息、视图及日期时间本地化...

    我们接着上篇在线论坛的进度,由于之前所有页面和消息文本都是英文的,而我们开发的应用基本都是面向中文用户的,所以需要对项目进行本地化,今天正好借着这个入门项目给大家介绍下如何在 Go Web 应用中进行 ...

  9. java 分布式任务_一个简单的基于 Redis 的分布式任务调度器 —— Java 语言实现...

    折腾了一周的 Java Quartz 集群任务调度,很遗憾没能搞定,网上的相关文章也少得可怜,在多节点(多进程)环境下 Quartz 似乎无法动态增减任务,恼火.无奈之下自己撸了一个简单的任务调度器, ...

最新文章

  1. 浙大Java延毕_复旦博导:我有个“拼命三郎”似的博士生,却也面临延毕,做科研得有章法!......
  2. llinux基本操作
  3. springboot基于全局异常处理的简单日志打印
  4. 启明星辰集团DT总部落地杭州 数据绿洲版图驱动未来发展
  5. mysql调试问题_mysql 数据库调试分析
  6. Android通讯录查询篇--ContactsContract.Data 二
  7. DevExpress XtraReports 入门五 创建交叉表报表
  8. MySQL中文存到数据库是,springMVC保存数据到mysql数据库中文乱码问题解决方法
  9. 搭建 Python 开发环境
  10. 潮流计算程序————支路功率计算与输出程序
  11. laravel路由的配置,别名,路由群组
  12. 校园建设的一个实例:校园网规划与设计
  13. 比win7运行快的linux发行版,旧电脑扔了浪费!装SliTaz系统,瞬间运行流畅如新机,比win7更快...
  14. 全球人气果汁机的马达电机辐射超标整改—原汁原味
  15. 【VS2019编辑器第一行默认添加:#define _CRT_SECURE_NO_WARNINGS 1\使用scanf函数报错 解决方案\创建源文件第一行不出现#define _CRT_SECURE】
  16. DSS (LCD+HDMI) OMAP4460 第十章
  17. Discuz大气仿英雄联盟游戏风格论坛模板源码
  18. 自制深度学习推理框架-第七课-构建自己的计算图
  19. 6套好看的404页面模板源码
  20. keepalived版本引发故障解决分析

热门文章

  1. 以史为鉴,编程语言,启示录之系统觉醒
  2. C#.NetWPF实现垃圾桶案例
  3. PTA 哥尼斯堡的“七桥问题”(并查集 + 性质判断)
  4. [转载]Css设置table网格线(无重复)
  5. QQ空间打不开的解决方案
  6. 电脑监控怎样查看回放?
  7. 快手直播弹幕采集-python-protobuf解析
  8. [极简]路由器发送IP至服务器
  9. 当我们遇到问题的时候改如何解决
  10. 鸿蒙大陆鸿蒙战凯,鸿蒙大陆7.1正式版