使用NODEJS+REDIS开发一个消息队列以及定时任务处理
作者:RobanLee
原创文章,转载请注明: 萝卜李 http://www.robanlee.com
源码在这里: https://github.com/robanlee123/RobCron
时间有限,就不详细注释,有问题或者意见欢迎@我,也欢迎大家批评指正.
本文所必须的一些资料如下:
1. NODEJS ==> 可以去NODEJS.ORG下载最新的源码.
2. Redis ==> Redis.io
3. KUE ==> Nodejs的一个开源队列系统
4. NODE-SCHEDULE ==> NODEJS 一个开源调度系统
废话不多说,先来介绍任务的流程:
1. NODEJS或者PHP或者其他语言 写入REDIS 一个计划任务, 比如每分钟做某件事,这里就用SAYHELLO来代替好了
2. 使用NODEJS读取这个任务,并将它转换为NODE的调度任务(node-schedule 来完成)
3. 调度器[node-schedule]根据设定的规则来分发任务.
4. KUE接受任务,并且加入列队,执行.
5. DONE
STEP1: 创建一个任务
/*** Add task* @author Robanlee@gmail.com*///加载函数,集中加载一些LIB,这个源码请参照最后的附属文件 var loader = require('./loader'); function addTask(opts){new loader(this); //默认设置this.opts = {keyIDs:'schedule:job:ids',keyLists:'schedule:job:list',keyJob:'schedule:job:'}//合并配置,类似JQUERY: extendthis.mergeParams(opts); };//Merge options addTask.prototype.mergeParams = function ( param ){if(undefined === this.opts ) {return false;}for(var x in param) { if(param[x] != undefined && '' != param[x]) {this.opts[x] = param[x];}} };//添加数据方法 addTask.prototype.pushData = function ( data ){ if(undefined == data ) {console.log('--ERROR:data is null');return false;}this.getIncr.call(this,function(response,obj){var id = response;obj.redisClient.rpush(obj.opts.keyLists,id,function(err,response){if(err) throw err;});data.id = id;var m = obj.redisClient.multi();for(var x in data) {m.hset( obj.opts.keyJob+id,x,data[x] );}m.exec(function(err,response){if(err) throw err; console.log('[info] Task: ['+data.name+'] has been set successful!');}); }); };//获取REDIS目前的自增ID addTask.prototype.getIncr = function (callBack){var obj = this; this.redisClient.incr(this.opts.keyIDs,function(err,response){console.log("[info] Current id is : " + response);callBack(response, obj);}); };
加载这个lib 写入一个DEMO:
var data = { 'name':'taskDemo','created':Date.now(),'state':1,'type':'untitled','rule':'*/1 * * * *' //这个任务规则可以为CRONTAB的规则,这个表示每分钟执行一次 };var job = new addTask(); job.pushData(data);
执行这个脚本,如果一切正常,你会看到如下信息:
NODEJS 输出:
REDIS:
接下来就是获取数据,并且转换为调度任务了,
源码:
var loader = require('./loader'); var taskLog = require("./TaskLog");function scheduleTask(){new loader(this); this.opts = {keyIDs:'schedule:job:ids',keyLists:'schedule:job:list',keyJob:'schedule:job:'}this.task = {taskDemo:undefined};//监听取消任务操作this.listenCancel(); };scheduleTask.prototype.setScheduleTask = function (data,obj){ this.task[data.name] = this.libs['node-schedule'].scheduleJob(data.rule,function(){obj.setQueue(data);console.log('[info] Task :' + data.name + ' has been set in queue!');});};scheduleTask.prototype.setQueue = function (datas){var jobs = this.libs.kue.createQueue(); jobs.create(datas.name,{'name:':datas.name,'state':1}).save();console.log("[info] Task ["+datas.name+"] has been queued!");this.setLog(datas); };scheduleTask.prototype.setLog = function (responseData){var logData = {jobid:responseData.id,name:responseData.name,result:1};new taskLog(logData);console.log("[info] Task has been loged"); };scheduleTask.prototype.getJob = function (){this.getJobIndex.call(this,function(response,obj){for(var x in response ) {obj.redisClient.hgetall(obj.opts.keyJob+response[x],function(err,data){console.log("[info] Task:["+data.name+"] has been loaded!");obj.setScheduleTask(data, obj);});}}); };scheduleTask.prototype.getJobIndex = function(callBack){//Read tasks from <list schedule:job:list>var o = this;this.redisClient.lrange(this.opts.keyLists,0,-1,function(err,response){if(err) throw err;callBack(response, o);}); };scheduleTask.prototype.listenCancel = function (){var job = this.libs.kue.createQueue();var that = this;job.process('cancelJob',function(data,done){ that.task[data.data.data].cancel();console.log('[info] Task: '+data.data.data + ' has been canceled') ;done();}); }
执行代码:
var x = new scheduleTask(); x.getJob();
等待一分钟后,NODEJS控制台会输出(这个任务在没有取消的情况下,每分钟都会执行):
第二分钟:
REDIS 现在的数据:
这个数据中增加了KUE的一些任务, q:job:[]:inactive 这个就标识任务还未被执行,执行后的任务状态有
complete active failed delay 四种
至此,就只剩下执行任务的步骤了
var loader = require('./loader');function execTask(){new loader(this); var job = this.libs.kue.createQueue();job.process('taskDemo',function(data,done){console.log('[info] Task:'+data.type+'#'+data.id+' has been executed successful!');//DONE之前可以做你想要做的事情done(); //千万别忘记调用此方法});}//添加一个取消定时任务的KUE任务 execTask.prototype.addCancelJob = function (){var job =this.libs.kue.createQueue();job.create('cancelJob', {data:'taskDemo'}).save();console.log('[info] Task: cancelJob has been send!'); }
执行这个脚本:
var et = new execTask();//取消定时任务 et.addCancelJob();
执行后会有2个结果
1. 程序会执行当前列队里的任务.
2. 定时任务会被取消,下一分钟后任务不会再由SCHEDULE分配
任务执行结果:
取消任务的回应:
注意最后一行…
转载于:https://www.cnblogs.com/jkll/p/4550108.html
使用NODEJS+REDIS开发一个消息队列以及定时任务处理相关推荐
- 什么鬼,面试官竟然让我用Redis实现一个消息队列!!?
GitHub 9.4k Star 的Java工程师成神之路 ,不来了解一下吗? GitHub 9.4k Star 的Java工程师成神之路 ,真的不来了解一下吗? GitHub 9.4k Star 的 ...
- 面试官竟让我用Redis实现一个消息队列!
来自公众号:Hollis >>>千人线上直播活动报名倒计时(今晚20:00): 从Oracle出发,走进GaussDB的世界 众所周知,redis是一个高性能的分布式key-valu ...
- redis 消息队列 过段时间不能下发_以Redis来谈消息队列
首先 我先引入一个大家熟知的观点:Reids可以作为消息队列来使用 redis提供了两种方式来做消息队列,一种是生产者消费者模式,一种是发布订阅模式. 本篇文章将从 异步,解耦,分布式,可靠四部分来探 ...
- redis延迟消息队列不准时php,Redis实现延迟消息队列
消息队列是应用中常用的一个技术点,通常我们可以借助消息队列中间件来实现,但是并不是所有的情况下,都需要使用到MQ. 如果只需要实现简单的消息队列,那么借助Redis即可. 如果对消息有着严格的可靠性等 ...
- 使用 Redis Stream 实现消息队列
使用 Redis Stream 实现消息队列 Intro Redis 5.0 中增加了 Stream 的支持,利用 Stream 我们可以实现可靠的消息队列,并且支持一个消息被多个消费者所消费,可以很 ...
- redis简单队列java_使用Redis的简单消息队列
redis简单队列java 在本文中,我们将使用列表命令将Redis用作简单的消息队列. 假设我们有一个允许用户上传照片的应用程序. 然后在应用程序中,我们以不同大小显示照片,例如Thumb,Medi ...
- 使用Redis的简单消息队列
在本文中,我们将使用列表命令将Redis用作简单的消息队列. 假设我们有一个允许用户上传照片的应用程序. 然后在应用程序中,我们以不同大小显示照片,例如Thumb,Medium和Large. 在第一个 ...
- 【redis源码学习】redis 中的“消息队列” Stream
文章目录 关于redis Stream Stream 结构 Stream 操作 添加消息 新增消费组 删除消息 裁剪信息流 释放消费组 查找元素 关于redis Stream redis stream ...
- redis stream 实现消息队列
redis stream 实现消息队列 Redis5.0带来了Stream类型.从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现. 基于r ...
最新文章
- 代码优化实战:我又优化了一百个if else!
- 三星电子网络营销重拳出击芯片制造力求“差异化”取胜智能手机市场
- Hashtable元素的删除
- 性能测试—前端性能1
- Wondows环境下配置Tomat
- Vue.js 牛刀小试(持续更新~~~)
- 马斯克:我上大学时就想创立电动汽车公司
- 线性模型第1讲:最小二乘法
- 小麦(Wheat)-玉米(Maize)-水稻(Rice) 数粒软件
- html图片显示不出来
- Java命令行开关_java命令行操作
- office word安装mathtype报错,找不到mathpage.WLL文件
- Unreal Engine 4(虚幻UE4)GameplayAbilities 插件入门教程(三)技能标签(Ability Tags)
- 汇率查询接口,免费实时货币汇率查询换算
- js:网页中的高和宽(document)
- v4l2框架-开启视频流(stream on)
- ROUGE和pyrouge的安装
- APP项目软件开发流程
- *.lwp文件如何打开
- android 输入法如何启动流程_Android输入法显示流程
热门文章
- python复数的实部和虚部都是整数嘛_Python数字(Number)
- Team Foundation Power Tools 1.2发布
- touch——创建文件
- lua小技巧——lua全局变量的检测
- HTML简单的网页制作期末作业【NBA勒布朗詹姆斯篮球明星】HTML+CSS+JavaScript
- 电信运营商基于 MQTT 协议 构建千万级 IoT 设备管理平台
- 51单片机C语言波特率十六进制,8051单片机波特率计算公式(配套C语言例程)
- 一篇文章学会eggjs做后端服务及各种问题处理
- vue中安装和使用Dplayer视频播放器
- HDU - 4082 Hou Yi's secret