作者:RobanLee

原创文章,转载请注明: 萝卜李 http://www.robanlee.com

源码在这里: https://github.com/robanlee123/RobCron

时间有限,就不详细注释,有问题或者意见欢迎@我,也欢迎大家批评指正.

本文所必须的一些资料如下:

1. NODEJS ==> 可以去NODEJS.ORG下载最新的源码.
2. Redis ==> Redis.io
3. KUE ==> Nodejs的一个开源队列系统
4. NODE-SCHEDULE ==> NODEJS 一个开源调度系统

废话不多说,先来介绍任务的流程:

1. NODEJS或者PHP或者其他语言 写入REDIS 一个计划任务, 比如每分钟做某件事,这里就用SAYHELLO来代替好了
2. 使用NODEJS读取这个任务,并将它转换为NODE的调度任务(node-schedule 来完成)
3. 调度器[node-schedule]根据设定的规则来分发任务.
4. KUE接受任务,并且加入列队,执行.
5. DONE

STEP1: 创建一个任务

/*** Add task* @author Robanlee@gmail.com*///加载函数,集中加载一些LIB,这个源码请参照最后的附属文件
var loader = require('./loader');  function addTask(opts){new loader(this); //默认设置this.opts = {keyIDs:'schedule:job:ids',keyLists:'schedule:job:list',keyJob:'schedule:job:'}//合并配置,类似JQUERY: extendthis.mergeParams(opts);
};//Merge options
addTask.prototype.mergeParams = function ( param ){if(undefined === this.opts ) {return false;}for(var x in param) { if(param[x] != undefined && '' != param[x]) {this.opts[x] = param[x];}}
};//添加数据方法
addTask.prototype.pushData = function ( data ){ if(undefined == data ) {console.log('--ERROR:data is null');return false;}this.getIncr.call(this,function(response,obj){var id = response;obj.redisClient.rpush(obj.opts.keyLists,id,function(err,response){if(err) throw err;});data.id = id;var m = obj.redisClient.multi();for(var x in data) {m.hset( obj.opts.keyJob+id,x,data[x] );}m.exec(function(err,response){if(err) throw err; console.log('[info] Task: ['+data.name+'] has been set successful!');});   });
};//获取REDIS目前的自增ID
addTask.prototype.getIncr = function (callBack){var obj = this; this.redisClient.incr(this.opts.keyIDs,function(err,response){console.log("[info] Current id is : " + response);callBack(response, obj);});
};

加载这个lib 写入一个DEMO:

var data = { 'name':'taskDemo','created':Date.now(),'state':1,'type':'untitled','rule':'*/1 * * * *'  //这个任务规则可以为CRONTAB的规则,这个表示每分钟执行一次
};var job = new addTask();
job.pushData(data);

执行这个脚本,如果一切正常,你会看到如下信息:

NODEJS 输出:

REDIS:

接下来就是获取数据,并且转换为调度任务了,

源码:

var loader = require('./loader');
var taskLog = require("./TaskLog");function scheduleTask(){new loader(this); this.opts = {keyIDs:'schedule:job:ids',keyLists:'schedule:job:list',keyJob:'schedule:job:'}this.task = {taskDemo:undefined};//监听取消任务操作this.listenCancel();
};scheduleTask.prototype.setScheduleTask = function (data,obj){ this.task[data.name] =  this.libs['node-schedule'].scheduleJob(data.rule,function(){obj.setQueue(data);console.log('[info] Task :' + data.name + ' has been set in queue!');});};scheduleTask.prototype.setQueue = function (datas){var jobs = this.libs.kue.createQueue(); jobs.create(datas.name,{'name:':datas.name,'state':1}).save();console.log("[info] Task ["+datas.name+"] has been queued!");this.setLog(datas);
};scheduleTask.prototype.setLog = function (responseData){var logData = {jobid:responseData.id,name:responseData.name,result:1};new taskLog(logData);console.log("[info] Task has been loged");
};scheduleTask.prototype.getJob = function (){this.getJobIndex.call(this,function(response,obj){for(var x in response ) {obj.redisClient.hgetall(obj.opts.keyJob+response[x],function(err,data){console.log("[info] Task:["+data.name+"] has been loaded!");obj.setScheduleTask(data, obj);});}});
};scheduleTask.prototype.getJobIndex = function(callBack){//Read tasks from <list schedule:job:list>var o = this;this.redisClient.lrange(this.opts.keyLists,0,-1,function(err,response){if(err) throw err;callBack(response, o);});
};scheduleTask.prototype.listenCancel = function (){var job = this.libs.kue.createQueue();var that = this;job.process('cancelJob',function(data,done){  that.task[data.data.data].cancel();console.log('[info] Task: '+data.data.data + ' has been canceled') ;done();});
}

执行代码:

var x = new scheduleTask();
x.getJob();

等待一分钟后,NODEJS控制台会输出(这个任务在没有取消的情况下,每分钟都会执行):

第二分钟:

REDIS 现在的数据:

这个数据中增加了KUE的一些任务, q:job:[]:inactive 这个就标识任务还未被执行,执行后的任务状态有

complete active failed delay 四种

至此,就只剩下执行任务的步骤了

var loader = require('./loader');function execTask(){new loader(this); var job = this.libs.kue.createQueue();job.process('taskDemo',function(data,done){console.log('[info] Task:'+data.type+'#'+data.id+' has been executed successful!');//DONE之前可以做你想要做的事情done(); //千万别忘记调用此方法});}//添加一个取消定时任务的KUE任务
execTask.prototype.addCancelJob = function (){var job =this.libs.kue.createQueue();job.create('cancelJob', {data:'taskDemo'}).save();console.log('[info] Task: cancelJob has been send!');
}

执行这个脚本:

var et = new execTask();//取消定时任务
et.addCancelJob();

执行后会有2个结果

1. 程序会执行当前列队里的任务.

2. 定时任务会被取消,下一分钟后任务不会再由SCHEDULE分配

任务执行结果:

取消任务的回应:

注意最后一行…

转载于:https://www.cnblogs.com/jkll/p/4550108.html

使用NODEJS+REDIS开发一个消息队列以及定时任务处理相关推荐

  1. 什么鬼,面试官竟然让我用Redis实现一个消息队列!!?

    GitHub 9.4k Star 的Java工程师成神之路 ,不来了解一下吗? GitHub 9.4k Star 的Java工程师成神之路 ,真的不来了解一下吗? GitHub 9.4k Star 的 ...

  2. 面试官竟让我用Redis实现一个消息队列!

    来自公众号:Hollis >>>千人线上直播活动报名倒计时(今晚20:00): 从Oracle出发,走进GaussDB的世界 众所周知,redis是一个高性能的分布式key-valu ...

  3. redis 消息队列 过段时间不能下发_以Redis来谈消息队列

    首先 我先引入一个大家熟知的观点:Reids可以作为消息队列来使用 redis提供了两种方式来做消息队列,一种是生产者消费者模式,一种是发布订阅模式. 本篇文章将从 异步,解耦,分布式,可靠四部分来探 ...

  4. redis延迟消息队列不准时php,Redis实现延迟消息队列

    消息队列是应用中常用的一个技术点,通常我们可以借助消息队列中间件来实现,但是并不是所有的情况下,都需要使用到MQ. 如果只需要实现简单的消息队列,那么借助Redis即可. 如果对消息有着严格的可靠性等 ...

  5. 使用 Redis Stream 实现消息队列

    使用 Redis Stream 实现消息队列 Intro Redis 5.0 中增加了 Stream 的支持,利用 Stream 我们可以实现可靠的消息队列,并且支持一个消息被多个消费者所消费,可以很 ...

  6. redis简单队列java_使用Redis的简单消息队列

    redis简单队列java 在本文中,我们将使用列表命令将Redis用作简单的消息队列. 假设我们有一个允许用户上传照片的应用程序. 然后在应用程序中,我们以不同大小显示照片,例如Thumb,Medi ...

  7. 使用Redis的简单消息队列

    在本文中,我们将使用列表命令将Redis用作简单的消息队列. 假设我们有一个允许用户上传照片的应用程序. 然后在应用程序中,我们以不同大小显示照片,例如Thumb,Medium和Large. 在第一个 ...

  8. 【redis源码学习】redis 中的“消息队列” Stream

    文章目录 关于redis Stream Stream 结构 Stream 操作 添加消息 新增消费组 删除消息 裁剪信息流 释放消费组 查找元素 关于redis Stream redis stream ...

  9. redis stream 实现消息队列

    redis stream 实现消息队列 Redis5.0带来了Stream类型.从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现. 基于r ...

最新文章

  1. 代码优化实战:我又优化了一百个if else!
  2. 三星电子网络营销重拳出击芯片制造力求“差异化”取胜智能手机市场
  3. Hashtable元素的删除
  4. 性能测试—前端性能1
  5. Wondows环境下配置Tomat
  6. Vue.js 牛刀小试(持续更新~~~)
  7. 马斯克:我上大学时就想创立电动汽车公司
  8. 线性模型第1讲:最小二乘法
  9. 小麦(Wheat)-玉米(Maize)-水稻(Rice) 数粒软件
  10. html图片显示不出来
  11. Java命令行开关_java命令行操作
  12. office word安装mathtype报错,找不到mathpage.WLL文件
  13. Unreal Engine 4(虚幻UE4)GameplayAbilities 插件入门教程(三)技能标签(Ability Tags)
  14. 汇率查询接口,免费实时货币汇率查询换算
  15. js:网页中的高和宽(document)
  16. v4l2框架-开启视频流(stream on)
  17. ROUGE和pyrouge的安装
  18. APP项目软件开发流程
  19. *.lwp文件如何打开
  20. android 输入法如何启动流程_Android输入法显示流程

热门文章

  1. python复数的实部和虚部都是整数嘛_Python数字(Number)
  2. Team Foundation Power Tools 1.2发布
  3. touch——创建文件
  4. lua小技巧——lua全局变量的检测
  5. HTML简单的网页制作期末作业【NBA勒布朗詹姆斯篮球明星】HTML+CSS+JavaScript
  6. 电信运营商基于 MQTT 协议 构建千万级 IoT 设备管理平台
  7. 51单片机C语言波特率十六进制,8051单片机波特率计算公式(配套C语言例程)
  8. 一篇文章学会eggjs做后端服务及各种问题处理
  9. vue中安装和使用Dplayer视频播放器
  10. HDU - 4082 Hou Yi's secret