zoukankan      html  css  js  c++  java
  • 工作流源代码分析

    一、runner.js工作流运行者

    // - identifier: Unique identifier for this runner.---每个runner一个唯一标示
    // - forks: Max number of child processes to fork at the same time.
    // - run_interval: Check for new jobs every 'run_interval' milliseconds.
    // (By default, every 250 milliseconds).
    // - sandbox: Collection of node modules to pass to the sandboxed tasks
    // execution. Object with the form:
    // {
    // 'module_global_var_name': 'node-module-name'
    // }
    // By default, only the global timeouts are passed to the tasks
    // sandbox.
    var WorkflowRunner = module.exports = function (opts) {
    var runner = {
    init: init,----按照传入的IdentifyId注册运行者,取消所有与当前运行者相关的作业
    quit: quit,
    // The next 5 are properties, possibly should encapsulate into methods:
    identifier: identifier,--当前运行者的ID
    backend: backend,
    log: log,
    shutting_down: shutting_down,
    do_fork: do_fork,
    getIdentifier: getIdentifier,
    runNow: runNow,
    nextRun: nextRun,
    kill: kill,
    childUp: childUp,
    childDown: childDown,
    childCount: childCount,
    inactiveRunners: inactiveRunners,---查看其它不活动的运行者
    staleJobs: staleJobs,---查看与不活动运行者相关的作业
    getSlot: getSlot,
    releaseSlot: releaseSlot
    };
    // Put the runner to work
       // Put the runner to work
        function run() {
            shutting_down = false;
            function retryJob(oldJob, callback) {
            }
    
            // Queue worker. Tries to run a job, including "hiding" it from other
            // runners:
            var worker = function (uuid, callback) {
                return backend.getJob(uuid, function (err, job) {
                    if (err) {
                        return callback(err);
                    }
    
                    if (!job) {
                        return callback();
                    }
    
                    if (runNow(job)) {
      return backend.runJob(uuid, identifier,
                            function (err, job) {
    ....var runJobOpts = {
                                job: job,
                                trace: job_trace,
                                log: job_log
                            };
                            return runJob(runJobOpts, function (err) {
                                delete job_runners[job.uuid];
    
                                if (err && err === 'retry') {
                                    return retryJob(job, callback);
                                } else if (err) {
                                    return callback(err);
                                }
                                return callback();
                            });
                        });
                    } else {
                        return callback();
                    }
                });
            };
    
            // We keep a queue with concurrency limit where we'll be pushing new
            // jobs
            var queue = vasync.queue(worker, forks - 1);
    
            function reportActivity() {
                backend.runnerActive(identifier, function (err) {
                    if (err) {
                        log.error({err: err}, 'Error reporting runner activity');
                    } else {
                        log.debug({runner: identifier}, 'Runner is active');
                    }
                    if (!shutting_down) {
                        ainterval = setTimeout(reportActivity, activity_interval);
                    }
                    return;
                });
            }
    
            function doPoll() {
                backend.isRunnerIdle(identifier, function (idle) {
                    if (idle === false) {
                        vasync.parallel({
                            // Fetch stale jobs from runners which stopped
                            // reporting activity and cancel them:
                            funcs: [function cancelStaleJobs(cb) {
                                staleJobs(function (err, jobs) {
                                    if (err) {
                                        log.error({err: err},
                                          'Error fetching stale jobs');
                                        // We will not stop even on error:
                                        return cb(null, null);
                                    }
                                    function cancelJobs(uuid, fe_cb) {
                                        backend.updateJobProperty(
                                          uuid, 'execution', 'canceled',
                                            function (err) {
                                                if (err) {
                                                    return fe_cb(err);
                                                }
                                                return backend.getJob(uuid,
                                                  function (err, job) {
                                                    if (err) {
                                                        return fe_cb(err);
                                                    }
                                                    return backend.finishJob(
                                                      job, function (err, job) {
                                                        if (err) {
                                                            return fe_cb(err);
                                                        }
                                                        log.info(
                                                          'Stale Job ' +
                                                          job.uuid +
                                                          ' canceled');
                                                        return fe_cb(null);
                                                    });
                                                });
                                          });
                                    }
                                    return vasync.forEachParallel({
                                        inputs: jobs,
                                        func: cancelJobs
                                    }, function (err, results) {
                                        return cb(null, null);
                                    });
                                });
                            },
                            // Fetch jobs to process.
                            function fetchJobsToProcess(cb) {
                                var fetch = slots - 1;
                                if (isNaN(fetch) || fetch <= 0) {
                                    log.info('No available slots. ' +
                                        'Waiting next iteration');
                                    return cb(null, null);
                                }
                                return backend.nextJobs(0, fetch,
                                    function (err, jobs) {
                                    // Error fetching jobs
                                    if (err) {
                                        log.error({err: err},
                                          'Error fetching jobs');
                                        // We will not stop even on error:
                                        return cb(null, null);
                                    }
                                    // No queued jobs
                                    if (!jobs) {
                                        return cb(null, null);
                                    }
                                    // Got jobs, let's see if we can run them:
                                    jobs.forEach(function (job) {
                                        if (rn_jobs.indexOf(job) === -1) {
                                            rn_jobs.push(job);
                                            queue.push(job, function (err) {
                                                // Called once queue worker
                                                // finished processing the job
                                                if (err) {
                                                    log.error({err: err},
                                                      'Error running job');
                                                }
                                                log.info('Job with uuid ' +
                                                  job +
                                                  ' ran successfully');
                                                if (rn_jobs.indexOf(job) !==
                                                    -1) {
                                                    rn_jobs.splice(
                                                        rn_jobs.indexOf(job),
                                                        1);
                                                }
                                            });
                                        }
                                    });
                                    return cb(null, null);
                                });
                            }]
                        }, function (err, results) {
                            if (!shutting_down) {
                                interval = setTimeout(doPoll, run_interval);
                            }
                            return;
                        });
                    } else {
                        log.info('Runner idle.');
                        if (!shutting_down) {
                            interval = setTimeout(doPoll, run_interval);
                        }
                    }
                });
            }
    
            reportActivity();
            doPoll();//首先根据其它runnner注册时间清除陈旧的作业,然后获取
        }
    runner.run = run;
    return runner;
    }
     
     
  • 相关阅读:
    在mac上搭建python环境
    iOS开发-你真的会用SDWebImage?(转发)
    iOS tableview 优化总结
    Masonry 实现输入框随键盘位置改变
    sudo 权限问题
    nodejs save遇到的一个坑
    iOS app的webview注入JS遇到的坑
    HW18-广搜
    HW17-深搜
    HW16-动归2
  • 原文地址:https://www.cnblogs.com/justart/p/13546055.html
Copyright © 2011-2022 走看看