zoukankan      html  css  js  c++  java
  • 使用NODEJS+REDIS开发一个消息队列以及定时任务处理

    作者:RobanLee

    原创文章,转载请注明: 萝卜李 http://www.robanlee.com

    源码在这里: https://github.com/robanlee123/RobCron




    时间有限,就不详细注释,有问题或者意见欢迎@我,也欢迎大家批评指正.

    本文所必须的一些资料如下:

    1. NODEJS ==> 可以去NODEJS.ORG下载最新的源码.
    2. Redis ==> Redis.io
    3. KUE ==> Nodejs的一个开源队列系统
    4. NODE-SCHEDULE ==> NODEJS 一个开源调度系统

    废话不多说,先来介绍任务的流程:

    1. NODEJS或者PHP或者其他语言 写入REDIS 一个计划任务, 比如每分钟做某件事,这里就用SAYHELLO来代替好了
    2. 使用NODEJS读取这个任务,并将它转换为NODE的调度任务(node-schedule 来完成)
    3. 调度器[node-schedule]根据设定的规则来分发任务.
    4. KUE接受任务,并且加入列队,执行.
    5. DONE

    STEP1: 创建一个任务

    /**
     * Add task
     * @author Robanlee@gmail.com
     */
    
    //加载函数,集中加载一些LIB,这个源码请参照最后的附属文件
    var loader = require('./loader');  
    
    function addTask(opts){
            new loader(this); 
            
            //默认设置
            this.opts = {
                    keyIDs:'schedule:job:ids',
                    keyLists:'schedule:job:list',
                    keyJob:'schedule:job:'
            }
            
            //合并配置,类似JQUERY: extend
            this.mergeParams(opts);  
    };
    
    //Merge options
    addTask.prototype.mergeParams = function ( param ){
            if(undefined === this.opts ) {
                    return false;
            }
            
            for(var x in param) { 
                    if(param[x] != undefined && '' != param[x]) {
                            this.opts[x] = param[x];
                    }
            }
    };
    
    //添加数据方法
    addTask.prototype.pushData = function ( data ){ 
            if(undefined == data ) {
                    console.log('--ERROR:data is null');
                    return false;
            }
            this.getIncr.call(this,function(response,obj){
                    var id = response;
                    obj.redisClient.rpush(obj.opts.keyLists,id,function(err,response){
                            if(err) throw err;
                    });
             
                    data.id = id;
                    var m = obj.redisClient.multi();
                    for(var x in data) {
                            m.hset( obj.opts.keyJob+id,x,data[x] );
                    }
                    
                    m.exec(function(err,response){
                            if(err) throw err; 
                            console.log('[info] Task: ['+data.name+'] has been set successful!');
                    });   
                   
            }); 
    };
    
    //获取REDIS目前的自增ID
    addTask.prototype.getIncr = function (callBack){
            var obj = this; 
            this.redisClient.incr(this.opts.keyIDs,function(err,response){
                    console.log("[info] Current id is : " + response);
                    callBack(response, obj);
            });
    };

    加载这个lib 写入一个DEMO:

    var data = { 
            'name':'taskDemo',
            'created':Date.now(),
            'state':1,
            'type':'untitled',
            'rule':'*/1 * * * *'  //这个任务规则可以为CRONTAB的规则,这个表示每分钟执行一次
    };
    
    var job = new addTask();
    job.pushData(data);

    执行这个脚本,如果一切正常,你会看到如下信息:

    NODEJS 输出:

    REDIS:

    接下来就是获取数据,并且转换为调度任务了,

    源码:

    var loader = require('./loader');
    var taskLog = require("./TaskLog");
    
    function scheduleTask(){
            new loader(this); 
            this.opts = {
                    keyIDs:'schedule:job:ids',
                    keyLists:'schedule:job:list',
                    keyJob:'schedule:job:'
            }
            
            this.task = {
                    taskDemo:undefined
            };
            
            //监听取消任务操作
            this.listenCancel();
    };
    
    scheduleTask.prototype.setScheduleTask = function (data,obj){ 
            this.task[data.name] =  this.libs['node-schedule'].scheduleJob(data.rule,function(){
                    obj.setQueue(data);
                    console.log('[info] Task :' + data.name + ' has been set in queue!');
            });
             
    };
    
    scheduleTask.prototype.setQueue = function (datas){
            
            var jobs = this.libs.kue.createQueue(); 
            jobs.create(datas.name,{
                    'name:':datas.name,
                    'state':1
            }).save();
             
            console.log("[info] Task ["+datas.name+"] has been queued!");
            
            this.setLog(datas);
    };
    
    scheduleTask.prototype.setLog = function (responseData){
            var logData = {
                    jobid:responseData.id,
                    name:responseData.name,
                    result:1
            };
                                            
            new taskLog(logData);
            console.log("[info] Task has been loged");
    };
    
    
    scheduleTask.prototype.getJob = function (){
            this.getJobIndex.call(this,function(response,obj){
                    for(var x in response ) {
                            obj.redisClient.hgetall(obj.opts.keyJob+response[x],function(err,data){
                                    console.log("[info] Task:["+data.name+"] has been loaded!");
                                    obj.setScheduleTask(data, obj);
                            });
                    }
            });
    };
    
    
    scheduleTask.prototype.getJobIndex = function(callBack){
            //Read tasks from <list schedule:job:list>
            var o = this;
            this.redisClient.lrange(this.opts.keyLists,0,-1,function(err,response){
                    if(err) throw err;
                    callBack(response, o);
            });
    };
    
    scheduleTask.prototype.listenCancel = function (){
            var job = this.libs.kue.createQueue();
            var that = this;
    
            job.process('cancelJob',function(data,done){  
                    that.task[data.data.data].cancel();
                    console.log('[info] Task: '+data.data.data + ' has been canceled') ;
                    done();
            });
    }

    执行代码:

    var x = new scheduleTask();
    x.getJob();

    等待一分钟后,NODEJS控制台会输出(这个任务在没有取消的情况下,每分钟都会执行):

    第二分钟:

    REDIS 现在的数据:

    这个数据中增加了KUE的一些任务, q:job:[]:inactive 这个就标识任务还未被执行,执行后的任务状态有

    complete active failed delay 四种

    至此,就只剩下执行任务的步骤了

    var loader = require('./loader');
    
    function execTask(){
            new loader(this); 
            
            var job = this.libs.kue.createQueue();
            job.process('taskDemo',function(data,done){
                    console.log('[info] Task:'+data.type+'#'+data.id+' has been executed successful!');
                    
                    
                    //DONE之前可以做你想要做的事情
                    
                    done(); //千万别忘记调用此方法
            });
            
            
    }
    
    //添加一个取消定时任务的KUE任务
    execTask.prototype.addCancelJob = function (){
            var job =this.libs.kue.createQueue();
            job.create('cancelJob', {data:'taskDemo'}).save();
            console.log('[info] Task: cancelJob has been send!');
    }

    执行这个脚本:

    var et = new execTask();
    
    //取消定时任务
    et.addCancelJob();

    执行后会有2个结果

    1. 程序会执行当前列队里的任务.

    2. 定时任务会被取消,下一分钟后任务不会再由SCHEDULE分配

    任务执行结果:

    取消任务的回应:

    注意最后一行…

  • 相关阅读:
    解决Shiro在Tomcat重启之后丢失登录信息
    代码安全审计大全
    解决Spring Boot打包war部署到Tomcat出现Could not open ServletContext resource
    weblogic12 重装记录
    【Spring】事物和锁及回滚异常类型
    【Spring】thymeleaf + SpringMVC局部刷新
    【Spring】@ModelAttribute三种使用场景
    Java Optional
    JAVA lambda
    最短时间(最短路+思维)
  • 原文地址:https://www.cnblogs.com/jkll/p/4550108.html
Copyright © 2011-2022 走看看