zoukankan      html  css  js  c++  java
  • Agenda文档与源码学习

    Agenda定时任务学习

    Agenda配置

    • database(url, [collectionName]):指定数据库URL,如果没有给出集合名字,则默认使用“agendaJobs”
    • mongo(dbInstance):使用一个已存在的mongoDB本地客户端实例
    • name(name):用来设置“lastModifiedBy”字段内容,如果使用多进程任务,并且想看是哪个任务队列最后一次执行这个任务,该字段会很有用。
    • processEvery(interval):用来指定Agenda查询数据库的频率,看哪些任务需要执行。Agenda内部使用“setTimeout”字段保证误差在3ms左右。降低频率将导致更少的数据库查询,但更多的作业存储在内存中。另外值得注意的是,如果作业队列关闭,存储在内存中的任何尚未运行的作业都将被锁定,这意味着您可能必须等待锁定过期。默认值是'5秒'。
    • maxConcurrency(number):最大任务同时执行数,默认值20.
    • defaultConcurrency(number):特定任务在某时刻同时执行数,默认值5.
    • lockLimit(number):同时最多可以被锁住的任务数量,默认值为0,表示无限制
    • defaultLockLimit(number):特定任务在某时刻默认上锁的数量,默认值为0,表示无限制
    • defaultLockLifetime(number):指定默认锁定时长,默认值10分钟,任务当在默认锁定时长之前完成的话会被解锁,这个锁对任务崩溃或超时会很有用。
    • sort(query):指定查找并锁定下一个任务的查找顺序,默认值{ nextRunAt: 1, priority: -1 }

    一些问题

    任务执行的顺序是什么?
    任务默认遵循带优先级的先进先出执行顺序。不过也修改sort参数来改变执行规则。

    lockLimit和maxConcurrency的区别是什么?
    agenda会将任务一个接一个的上锁,并在mongodb数据库中设置lockedAt参数,并创建Job类实例缓存到_lockedJobs数组中,这个过程默认没有限制,但可以使用lockLimit进行限制,如果所有任务都需要被执行,则agenda会尝试上锁所有任务,并将任务从_lockedJobs数组中发到_runningJobs数组中,这些任务运行后会执行用户的代码,这可以通过maxConcurrency参数进行限制。
    但如果许多任务在同一时间执行,你或许会发现他们甚至没有被加载进来,因为他们会尽可能多地给任务上锁,尽管没有足够的并发来处理他们。里有调整lockLimit和maxConcurrency参数来解决这个问题。

    源码

    保存任务

    agenda/save-job.js
    如果任务已经存在则更新数据库中的信息,如果不存在则插入一条记录。(其他无关代码省略)

    module.exports = async function(job) {
      try {
        // 保存任务信息,并设置最后修改者为当前agenda队列进程
        const props = job.toJSON();
        props.lastModifiedBy = this._name;
        // 获取当前时间并设置默认查询选项
        const now = new Date();
        const protect = {};
        let update = {$set: props};
        // 如果当前任务已经有id,则修改相关属性,例如谁最后一次修改的它,并返回
        if (id) {
          const result = await this._collection.findOneAndUpdate(
            {_id: id},
            update,
            {returnOriginal: false}
          );
          return processDbResult(job, result);
        }
        // 否则,直接插入一条
        const result = await this._collection.insertOne(props);
        return processDbResult(job, result);
      } catch (error) {
        throw error;
      }
    };
    //一个判断任务是否应该立即执行的方法
    const processDbResult = (job, result) => {
      let res = result.ops ? result.ops : result.value;
      if (res) {
        // 如果是个数组,则只取第一个
        if (Array.isArray(res)) {
          res = res[0];
        }
        job.attrs.nextRunAt = res.nextRunAt;
        // 如果当前时间大于任务下一次应该执行的时间,则立即执行任务
        if (job.attrs.nextRunAt && job.attrs.nextRunAt < this._nextScanAt) {
          processJobs.call(this, job);
        }
      }
    
      // Return the Job instance
      return job;
    };

    查找任务

    agenda/find-and-lock-next-job.js

    • JOB_PROCESS_WHERE_QUERY:查找要运行的任务,条件:(name=任务名字,disabled字段不为true)&&((lockedAt字段不为null,nextRunAt(下次运行时间)字段小于等于_nextScanAt下次扫描时间(当前时间+扫描间隔时间)||(lockedAt字段小于等于lockDeadline锁过期时间(当前时间-锁的时长))
    • JOB_PROCESS_SET_QUERY:给任务上锁,设置lockedAt字段为当前时间。
    • JOB_RETURN_QUERY:设置返回内容,返回更新后的文档并排序。
    const JOB_PROCESS_WHERE_QUERY = {
      $and: [{
        name: jobName,
        disabled: {$ne: true}
      }, {
        $or: [{
          lockedAt: {$eq: null},
          nextRunAt: {$lte: this._nextScanAt}
        }, {
          lockedAt: {$lte: lockDeadline}
        }]
      }]
    };
    const JOB_PROCESS_SET_QUERY = {$set: {lockedAt: now}};
    const JOB_RETURN_QUERY = {returnOriginal: false, sort: this._sort};
    const result = await this._collection.findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, JOB_RETURN_QUERY);
    
  • 相关阅读:
    007_排序_多重排序
    Locust 运行模式
    Locust介绍
    Locust环境搭建及报错解决
    8-02全局变量与输出语句
    8-01变量
    7-15ALL、 ANY、SOME子查询
    7-14 EXISTS子查询
    7-13IN和NOT IN 子查询
    7-12简单子查询
  • 原文地址:https://www.cnblogs.com/aojun/p/15315022.html
Copyright © 2011-2022 走看看