贼好用,冰河再次升级了这款开源的精准定时任务和延时队列框架!!
写在前面
在实际工作中,很多小伙伴在开发定时任务时,会采取定时扫描数据表的方式实现。然而,这种方式存在着重大的缺陷:如果数据量大的话,频繁的扫描数据表会对数据库造成巨大的压力;难以支撑大规模的分布式定时任务;难以支持精准的定时任务;大量浪费CPU的资源;扫描的数据大部分是不需要执行的任务。那么,既然定时扫描数据表存在这么多的弊端,那么,有没有一种方式来解决这些问题呢?今天,冰河就带着他的开源项目mykit-delay来了!!开源地址:https://github.com/sunshinelyz/mykit-delay 和 https://gitee.com/binghe001/mykit-delay
在使用框架过程中如有任何问题,都可以添加冰河微信【sun_shine_lyz】进行交流。
文章已收录到https://github.com/sunshinelyz/technology-binghe 和 https://gitee.com/binghe001/technology-binghe
项目简述
Mykit体系中提供的简单、稳定、可扩展的延迟消息队列框架,提供精准的定时任务和延迟队列处理功能。
项目模块说明
- mykit-delay-common: mykit-delay 延迟消息队列框架通用工具模块,提供全局通用的工具类
- mykit-delay-config: mykit-delay 延迟消息队列框架通用配置模块,提供全局配置
- mykit-delay-queue: mykit-delay 延迟消息队列框架核心实现模块,目前所有主要的功能都在此模块实现
- mykit-delay-controller: mykit-delay 延迟消息队列框架Restful接口实现模块,对外提供Restful接口访问,兼容各种语言调用
- mykit-delay-core: mykit-delay 延迟消息队列框架的入口,整个框架的启动程序在此模块实现
- mykit-delay-rpc:mykit-delay延时消息队列的RPC模块,支持Dubbo、brpc、grpc、Motan、Sofa、SpringCloud、SpringCloud Alibaba等主流RPC的实现
- mykit-delay-test: mykit-delay 延迟消息队列框架通用测试模块,主要提供Junit单元测试用例
需求背景
- 用户下订单后未支付,30分钟后支付超时
- 在某个时间点通知用户参加系统活动
- 业务执行失败之后隔10分钟重试一次
类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理。
队列设计
整体架构设计如下图所示。
开发前需要考虑的问题
- 及时性 消费端能按时收到
- 同一时间消息的消费权重
- 可靠性 消息不能出现没有被消费掉的情况
- 可恢复 假如有其他情况 导致消息系统不可用了 至少能保证数据可以恢复
- 可撤回 因为是延迟消息 没有到执行时间的消息支持可以取消消费
- 高可用 多实例 这里指HA/主备模式并不是多实例同时一起工作
- 消费端如何消费
当然初步选用redis作为数据缓存的主要原因是因为redis自身支持zset的数据结构(score 延迟时间毫秒) 这样就少了排序的烦恼而且性能还很高,正好我们的需求就是按时间维度去判定执行的顺序 同时也支持map list数据结构。
简单定义一个消息数据结构
private String topic;/***topic**/
private String id;/***自动生成 全局惟一 snowflake**/
private String bizKey;
private long delay;/***延时毫秒数**/
private int priority;//优先级
private long ttl;/**消费端消费的ttl**/
private String body;/***消息体**/
private long createTime=System.currentTimeMillis();
private int status= Status.WaitPut.ordinal();
运行原理
- 用Map来存储元数据。id作为key,整个消息结构序列化(json/…)之后作为value,放入元消息池中。
- 将id放入其中(有N个)一个zset有序列表中,以createTime+delay+priority作为score。修改状态为正在延迟中
- 使用timer实时监控zset有序列表中top 10的数据 。 如果数据score<=当前时间毫秒就取出来,根据topic重新放入一个新的可消费列表(list)中,在zset中删除已经取出来的数据,并修改状态为待消费
- 客户端获取数据只需要从可消费队列中获取就可以了。并且状态必须为待消费 运行时间需要<=当前时间的 如果不满足 重新放入zset列表中,修改状态为正在延迟。如果满足修改状态为已消费。或者直接删除元数据。
客户端
因为涉及到不同程序语言的问题,所以当前默认支持http访问方式。
- 添加延时消息添加成功之后返回消费唯一ID POST /push {……消息体}
- 删除延时消息 需要传递消息ID GET /delete?id=
- 恢复延时消息 GET /reStore?expire=true|false expire是否恢复已过期未执行的消息。
- 恢复单个延时消息 需要传递消息ID GET /reStore/id
- 获取消息 需要长连接 GET /get/topic
用Nginx暴露服务,配置为轮询 在添加延迟消息的时候就可以流量平均分配。
目前系统中客户端并没有采用HTTP长连接的方式来消费消息,而是采用MQ的方式来消费数据这样客户端就可以不用关心延迟消息队列。只需要在发送MQ的时候拦截一下 如果是延迟消息就用延迟消息系统处理。
消息可恢复
实现恢复的原理 正常情况下一般都是记录日志,比如mysql的binlog等。
这里我们直接采用mysql数据库作为记录日志。
目前创建以下2张表:
- 消息表 字段包括整个消息体
- 消息流转表 字段包括消息ID、变更状态、变更时间、zset扫描线程Name、host/ip
定义zset扫描线程Name是为了更清楚的看到消息被分发到具体哪个zset中。前提是zset的key和监控zset的线程名称要有点关系 这里也可以是zset key。
支持消息恢复
假如redis服务器宕机了,重启之后发现数据也没有了。所以这个恢复是很有必要的,只需要从表1也就是消息表中把消息状态不等于已消费的数据全部重新分发到延迟队列中去,然后同步一下状态就可以了。
当然恢复单个任务也可以这么干。
数据表设计
这里,我就直接给出创建数据表的SQL语句。
DROP TABLE IF EXISTS `mykit_delay_queue_job`;
CREATE TABLE `mykit_delay_queue_job` (`id` varchar(128) NOT NULL,`bizkey` varchar(128) DEFAULT NULL,`topic` varchar(128) DEFAULT NULL,`subtopic` varchar(250) DEFAULT NULL,`delay` bigint(20) DEFAULT NULL,`create_time` bigint(20) DEFAULT NULL,`body` text,`status` int(11) DEFAULT NULL,`ttl` int(11) DEFAULT NULL,`update_time` datetime(3) DEFAULT NULL,PRIMARY KEY (`id`),KEY `mykit_delay_queue_job_ID_STATUS` (`id`,`status`),KEY `mykit_delay_queue_job_STATUS` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ----------------------------
-- Table structure for mykit_delay_queue_job_log
-- ----------------------------
DROP TABLE IF EXISTS `mykit_delay_queue_job_log`;
CREATE TABLE `mykit_delay_queue_job_log` (`id` varchar(128) NOT NULL,`status` int(11) DEFAULT NULL,`thread` varchar(60) DEFAULT NULL,`update_time` datetime(3) DEFAULT NULL,`host` varchar(128) DEFAULT NULL,KEY `mykit_delay_queue_job_LOG_ID_STATUS` (`id`,`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
关于高可用
分布式协调还是选用zookeeper。
如果有多个实例最多同时只能有1个实例工作 这样就避免了分布式竞争锁带来的坏处,当然如果业务需要多个实例同时工作也是支持的,也就是一个消息最多只能有1个实例处理,可以选用zookeeper或者redis就能实现分布式锁了。
最终做了一下测试多实例同时运行,可能因为会涉及到锁的问题性能有所下降,反而单机效果很好。所以比较推荐基于docker的主备部署模式。
运行模式
- 支持 master,slave (HA)需要配置
mykit.delay.registry.serverList
zk集群地址列表 - 支持 cluster 会涉及到分布式锁竞争 效果不是很明显 分布式锁采用
redis
的setNx
实现 - StandAlone
目前,经过测试,推荐使用master slave的模式,并且,在升级版本中,进一步增强了Master Slave模式。后期会优化Cluster模式。
如何接入
为了提供一个统一的精准定时任务和延时队列框架,mykit-delay提供了HTTP Rest接口和RPC方式供其他业务系统调用,接口使用简单方便,只需要简单的调用接口,传递相应的参数即可。
RPC方式调用,后续支持的方式有:
- Dubbo(已实现)
- brpc(预留支持)
- grpc(预留支持)
- Motan(预留支持)
- Sofa(预留支持)
- SpringCloud(预留支持)
- SpringCloud Alibaba(预留支持)
HTTP方式接入
消息体
以JSON数据格式参数 目前提供了http
协议。
- body 业务消息体
- delay 延时毫秒 距
createTime
的间隔毫秒数 - id 任务ID 系统自动生成 任务创建成功返回
- status 状态 默认不填写
- topic 标题
- subtopic 保留字段
- ttl 保留字段
- createTime 创建任务时间 非必填 系统默认
启动HTTP Rest服务
首先,从GitHub Clone项目到本地
git clone https://github.com/sunshinelyz/mykit-delay.git
然后进入mykit-delay框架目录。
cd mykit-delay
执行Maven命令
mvn clean package -Dmaven.test.skip=true
接下来,进入 mykit-delay-core
的 target
目录下,运行如下命令。
java -jar mykit-delay-core-xxx.jar
其中,xxx是版本号,以实际下载的版本号为准。
接下来,就可以调用HTTP Restful接口来使用mykit-delay框架了。
添加任务
/push POST application/json
{"body":"{hello world}","delay":10000,"id":"20","status":0,"topic":"ces","subtopic":"",ttl":12}
删除任务
删除任务 需要记录一个JobId
/delete?jobId=xxxGET
恢复单个任务
用于任务错乱 脑裂情况 根据日志恢复任务
/reStoreJob?JobId=xxxGET
恢复所有未完成的任务
根据日志恢复任务
/reStore?expire=trueGET
参数expire
表示是否需要恢复已过期还未执行的数据
清空队列数据
根据日志中未完成的数据清空队列中全部数据。清空之后 会删除缓存中的所有任务
/clearAllGET
Dubbo方式接入
消息体
以JSON数据格式参数 目前提供了http
协议。
- body 业务消息体
- delay 延时毫秒 距
createTime
的间隔毫秒数 - id 任务ID 系统自动生成 任务创建成功返回
- status 状态 默认不填写
- topic 标题
- subtopic 保留字段
- ttl 保留字段
- createTime 创建任务时间 非必填 系统默认
启动Dubbo服务
首先,从GitHub Clone项目到本地
git clone https://github.com/sunshinelyz/mykit-delay.git
然后进入mykit-delay框架目录。
cd mykit-delay
执行Maven命令
mvn clean package -Dmaven.test.skip=true
接下来,进入 mykit-rpc-dubbo
模块下的 mykit-rpc-dubbo-server
服务 的 target
目录下,运行如下命令。
mykit-rpc-dubbo-server-xxx.jar
其中,xxx是版本号,以实际下载的版本号为准。
引入mykit-delay依赖
以Dubbo方式接入mykit-delay,需要引入mykit-delay的依赖,如下所示。
<dependency><groupId>io.mykit.delay</groupId><artifactId>mykit-rpc-dubbo-common</artifactId><version>1.0-SNAPSHOT</version>
</dependency>
然后,在需要调用Dubbo服务的类中以如下方式注入MykitDelayDubboInterface。
@DubboReference(version = "1.0.0")
private MykitDelayDubboInterface mykitDelayDubboInterface;
其中,MykitDelayDubboInterface接口的定义如下所示。
/*** @author binghe* @version 1.0.0* @description 发布的Dubbo接口*/
public interface MykitDelayDubboInterface {/*** 推送消息*/ResponseMessage push(JobWrapp jobMsg);/*** 删除任务*/ResponseMessage delete(String jobId);/*** 完成任务*/ResponseMessage finish(String jobId);/*** 恢复单个任务*/ResponseMessage reStoreJob(String jobId);/*** 提供一个方法 假设缓存中间件出现异常 以及数据错乱的情况 提供恢复功能* @param expire 过期的数据是否需要重发 true需要, false不需要 默认为true*/ResponseMessage reStore(Boolean expire);/*** 清除所有的任务*/ResponseMessage clearAll();
}
接下来,就可以以Dubbo方式接入mykit-delay框架了。
注意:无论是以HTTP方式,还是以RPC方式启动mykit-delay服务,都需要通过如下方式加载基本配置信息。
StartGetReady.ready(ConsumeQueueProvider.class.getName());
客户端获取队列方式
目前默认实现了RocketMQ
与ActiveMQ
的推送方式。依赖MQ的方式来实现延时框架与具体业务系统的解耦。同时,框架已SPI的形式加载相应的MQ,也就是说,集成MQ的方式是可扩展的。
消息体中消息与RocketMQ
和 ActiveMQ
消息字段对应关系
mykit-delay | RocketMQ | ActiveMQ | 备注 |
---|---|---|---|
topic | topic | topic | 点对点发送队列名称或者主题名称 |
subtopic | subtopic | subtopic | 点对点发送队列子名称或者主题子名称 |
body | 消息内容 | 消息内容 | 消息内容 |
关于系统配置
延迟框架与具体执行业务系统的交互方式通过延迟框架配置实现,具体配置文件位置为mykit-delay-config项目下的resources/properties/starter.properties
文件中。
测试
需要配置好数据库地址和Redis的地址 如果不是单机模式 也需要配置好Zookeeper
运行mykit-delay-test模块下的测试类io.mykit.delay.test.PushTest
添加任务到队列中
启动mykit-delay-test模块下的io.mykit.delay.TestDelayQueue
消费前面添加数据 为了方便查询效果 默认的消费方式是consoleCQ
控制台输出
扩展
支持zset队列个数可配置,避免大数据带来高延迟的问题。进一步增强框架的高可用。
近期规划
- brpc、grpc、Motan、Sofa、SpringCloud、SpringCloud Alibaba等RPC扩展
- 支持RabbitMQ、Kafka等消息中间件
- 分区(buck)支持动态设置
- redis与数据库数据一致性的问题 (
重要
) - 实现自己的推拉机制
- 支持可切换实现方式,目前只是依赖Redis实现,后续待优化,支持更多的可配置选项
- 支持Web控制台管理队列
- 实现消息消费
TTL
机制 - 增加对框架和定时任务的监控
如果这款开源框架对你有帮助,请小伙伴们打开github链接:https://github.com/sunshinelyz/mykit-delay 和 https://gitee.com/binghe001/mykit-delay,给个Star,让更多的小伙伴看到,减轻工作中繁琐的扫描数据表的定时任务开发。也希望能够有越来越多的小伙伴参与这个开源项目,我们一起养肥它!!
好了,不早了,今天就到这儿吧,我是冰河,我们下期见!!
贼好用,冰河再次升级了这款开源的精准定时任务和延时队列框架!!相关推荐
- 企业微信再次升级;滴滴网约车欲加装防护隔离膜;Firefox 73发布 | 极客头条
整理 | 郭芮 快来收听极客头条音频版吧,智能播报由标贝科技提供技术支持. 「极客头条」-- 技术人员的新闻圈! CSDN 的读者朋友们早上好哇,「极客头条」来啦,快来看今天都有哪些值得我们技术人关注 ...
- 华为scp快充协议详解_华为SCP快充技术再次升级!成本大幅降低,有望在未来普及!...
手机的功能多样性的发展,导致手机电量成了查看手机性价比的条件之一.一款电池容量大,续航久的手机远比容量小,续航时间短的手机受欢迎的多.为了续航,电池容量各大厂商都对此有所研发,OPPO之前的充电5分钟 ...
- vue怎么取消按回车下拉框自动下拉_八月更新第二版,小视频自动竖屏全屏播放,失效校验再次升级!...
新方圆小棉袄,传说中的贴心小棉袄,宇宙无敌超级厉害. 记住我们的付费群(大佬众多):978260150,免费群:1101359539 1.方圆和海阔的规则导入和支付宝口令类似,将口令全部复制打开最新版 ...
- asp按钮跳转页面代码_重磅更新!全新Web编辑页面、编辑规则快速跳转、状态栏变色、富文本再次升级!...
新方圆小棉袄,传说中的贴心小棉袄,宇宙无敌超级厉害. 记住我们的付费群(大佬众多):978260150,免费群:1101359539 1.方圆和海阔的规则导入和支付宝口令类似,将口令全部复制打开最新版 ...
- 手环升级鸿蒙设备名单,鸿蒙2.0升级名单已确认-可首批升级的42款机型推荐
原标题:鸿蒙2.0升级名单已确认-可首批升级的42款机型推荐 鸿蒙系统2.0已确认有42款机型可以首批升级,比安卓快60%,那么鸿蒙系统2.0已确认的升级机型都有哪些?不少小伙伴们还不是很了解,小编为 ...
- 高德地图安卓 拖拽选点_行车记录仪当“眼睛” 高德地图手机AR导航再次升级...
来源标题:行车记录仪当"眼睛" 高德地图手机AR导航再次升级 高德地图近日发布新版本,AR驾车导航服务再次升级,支持连接车内行车记录仪.由行车记录仪的摄像头充当"眼睛&q ...
- 中国移动国际mCloud体验再次升级,助力企业远程协同数字化转型
申耀的科技观察 读懂科技,赢取未来! 中国移动国际有限公司(CMI)宣布一站式云网融通平台mCloud 体验再次升级,为企业数字化转型和全球扩张提供更为便捷的平台.升级后的mCloud平台将支持更丰富 ...
- 意迷观看欧冠决赛慌乱踩踏 公共安防再次升级
据报道,6月3日晚,意大利都灵市圣卡尔洛广场发生踩踏事故,造成至少600人受伤.当晚数千名尤文图斯球迷聚集在圣卡洛广场,通过大屏幕观看欧冠决赛直播.目击者称,观赛中人群突然出现叫喊和推搡,场面失控,发 ...
- 面对百度绿萝算法再次升级SEO要做什么呢?
面对百度绿萝算法再次升级SEO要做什么呢? 唉,百度绿萝算法再次升级,很明显加大了对论坛.博客等外链的打击力度,百度这种削弱外链的作法,令我们不得不再次审视我们的淘金创业网安欣生活项目网站,思考网站今 ...
最新文章
- Excel中将一个表格的数据关联到另一个表格
- ASP.NET MVC应用程序展示RDLC报表
- ajax方法完整的事件流
- VTK:可视化之ColorAnActor
- Lucene全文检索过程
- 使用 jQuery Mobile 与 HTML5 开发 Web App (十五) —— jQuery Mobile 与响应式
- 《统计学习方法》代码全解析——第十三部分无监督学习概论
- 怎样设计访谈提纲_访谈提纲设计
- Mysql JSON对象和JSON数组查询
- 科技文献检索与计算机应用,科技文献检索与计算机应用.doc
- 《Redis视频教程》(p9)
- OFDM转向FBMC
- 【WINDOWS / DOS 批处理】添加注释
- NVIDIA Riva中文手册 (五) —— Riva TTS语音合成API的使用
- 怎么把PS界面语言变成英文方法教程
- 计算机主板华擎,华擎主板怎么样靠谱吗
- 时域采样与频域采样实验报告_使用网络分析仪进行时域分析
- 备战APEC 智利公布2019峰会主要议题
- 学习笔记——Dialog回调
- H5打包成app源码
热门文章
- java resources报错解决方案(步骤三亲测)
- module github.com/jinzhu/gorm/dialects/mysql: git ls-remote -q origin in E:\go_gin\pkg\mod\cache\vcs
- 分享几个免费书籍的网站
- anc降噪是什么意思
- 打造自己的MVC框架
- 四象限运行模式_双向可控硅四象限触发方式介 - 双向可控硅四象限触发方式介绍_双向可控硅触发电路的设计...
- python DEA: 基于非径向距离NDDF的Malmquist-Luenberger 指数及其分解
- 中北大学算法分析与设计实验报告一(BF算法)
- 一篇文章通透理解序列号实现原理
- reverse()的使用