zoukankan      html  css  js  c++  java
  • quartz详解3:quartz数据库集群-锁机制

    http://blog.itpub.NET/11627468/viewspace-1764753/

    一、quartz数据库锁


    其中,QRTZ_LOCKS就是Quartz集群实现同步机制的行锁表,其表结构如下:

    点击(此处)折叠或打开

    1. --QRTZ_LOCKS表结构
    2. CREATE TABLE `QRTZ_LOCKS` (
    3.   `LOCK_NAME` varchar(40) NOT NULL,
    4.    PRIMARY KEY (`LOCK_NAME`)
    5. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    6. --QRTZ_LOCKS记录
    7. +-----------------+ 
    8. | LOCK_NAME |
    9. +-----------------+ 
    10. | CALENDAR_ACCESS |
    11. | JOB_ACCESS |
    12. | MISFIRE_ACCESS |
    13. | STATE_ACCESS |
    14. | TRIGGER_ACCESS |
    15. +-----------------+
    注:此表结构在2.2版本有新增字段,这里暂时不考虑。
    可以看出QRTZ_LOCKS中有5条记录,代表5把锁,分别用于实现多个Quartz Node对Job、Trigger、Calendar访问的同步控制。 
    关于行锁的机制:
    1、mysql >  set autocommit=0;    //先把mysql设置为不自动提交。
    2、 select * from es_locks where lock_name = 'TRIGGER_ACCESS' for update ;     //线程一通过for update 可以把这行锁住
    3、 select * from es_locks where lock_name = 'TRIGGER_ACCESS' for update ;     //线程二通过for update 无法获得锁,线程等待。
    4、commit;        //线程一通过commit 释放锁
    5、 //线程二可以访问到数据,线程不再等待。

    所以,通过这个机制,一次只能有一个线程来操作 加锁 -  操作 - 释放锁。  如果 操作 的时间过长的话,会带来集群间的主线程等待。
    数据库行锁是一种悲观锁,锁表时其它线程无法查询。

    源码中关于数据库集群加锁的方法有如下几种:
    1、executeInNonManagedTXLock方法的含义是自己管理事务,不让容器管理事务的加锁方法。

    点击(此处)折叠或打开

    1. executeInNonManagedTXLock(
    2.             String lockName,
    3.             TransactionCallback<T> txCallback , final TransactionValidator<T> txValidator )
    三个参数lockName的值是上面所说的TRIGGER_ACCESS,表示要加锁的类型。
    txCallback是加锁后再回调的方法。
    txValidator是验证方法,一般为null
    函数先执行加锁,再回调要操作的方法,然后再解锁。
    看一下源码:

    点击(此处)折叠或打开

    1. if (lockName != null) {
    2.                 // If we aren't using db locks, then delay getting DB connection 
    3.                 // until after acquiring the lock since it isn't needed.
    4.                 if (getLockHandler().requiresConnection()) {
    5.                     conn = getNonManagedTXConnection();
    6.                 }
    7.                 
    8.                 transOwner = getLockHandler().obtainLock(conn, lockName);
    9.             }
    10.             
    11.             if (conn == null) {
    12.                 conn = getNonManagedTXConnection();
    13.             }
    14.             
    15.             final T result = txCallback.execute(conn);
    16.             try {
    17.                 commitConnection(conn);
    18.             } catch (JobPersistenceException e) {
    19.                 rollbackConnection(conn);
    20.                 if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
    21.                     @Override
    22.                     public Boolean execute(Connection conn) throws JobPersistenceException {
    23.                         return txValidator.validate(conn, result);
    24.                     }
    25.                 })) {
    26.                     throw e;
    27.                 }
    28.             }
    29.             Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
    30.             if(sigTime != null && sigTime >= 0) {
    31.                 signalSchedulingChangeImmediately(sigTime);
    32.             }
    33.             
    34.             return result;
    35.         } catch (JobPersistenceException e) {
    36.             rollbackConnection(conn);
    37.     throw e;
    38.   } catch (RuntimeException e) {
                  rollbackConnection(conn);
                  throw new JobPersistenceException("Unexpected runtime exception: "
                          + e.getMessage(), e);
              } finally {
                  try {
                      releaseLock(lockName, transOwner);
                  } finally {
                      cleanupConnection(conn);
                  }
              }

            

    2、如果不是通过这种回调方法的加锁,一般是:
    getLockHandler().obtainLock
    执行
    commitConnection(conn)
    releaseLock
    cleanupConnection


    二、源码分析锁
    目前代码中行锁只用到了STATE_ACCESS 和TRIGGER_ACCESS 这两种。

    1、TRIGGER_ACCESS
    先了解一篇文章,通过源码来分析quartz是如何通过加锁来实现集群环境,触发器状态的一致性。 
    http://www.360doc.com/content/14/0926/08/15077656_412418636.shtml
    可以看到触发器的操作主要用主线程StdScheduleThread来完成,不管是获取需要触发的30S内的触发器,还是触发过程。select和update触发器表时
    都会先加锁,后解锁。如果数据库资源竞争比较大的话,锁会影响整个性能。可以考虑将任务信息放在分布式内存,如redis上进行处理。数据库只是定时从redis上load数据下来做统计。
    参考:quartz详解2:quartz由浅入深   查看第四章第1,2节
    实现都在JobStoreSupport类 
    加锁类型 加锁方法 底层数据库操作 备注
    executeInNonManagedTXLock acquireNextTrigger selectTriggerToAcquire
    selectTrigger
    selectJobDetail
    insertFiredTrigger
    查询需要点火的trigger
    选择需要执行的trigger加入到fired_trigger表
    for执行 triggerFired selectJobDetail
    selectCalendar
    updateFiredTrigger
    triggerExists updateTrigger
    点火trigger
    修改trigger状态为可执行状态。
    recoverJobs updateTriggerStatesFromOtherStates
    hasMisfiredTriggersInState doUpdateOfMisfiredTrigger
    selectTriggersForRecoveringJobs
    selectTriggersInState
    deleteFiredTriggers
    非集群环境下重新执行
    failed与misfired的trigger
    retryExecuteInNonManagedTXLock releaseAcquiredTrigger updateTriggerStateFromOtherState
    deleteFiredTrigger
    异常情况下重新释放trigger到初使状态。
    triggeredJobComplete selectTriggerStatus
    removeTrigger   updateTriggerState
    deleteFiredTrigger
    触发JOB任务完成后的处理。
    obtainLock recoverMisfiredJobs hasMisfiredTriggersInState doUpdateOfMisfiredTrigger 重新执行misfired的trigger
    可以在启动时执行,也可以由misfired线程定期执行。
    clusterRecover selectInstancesFiredTriggerRecords
    updateTriggerStatesForJobFromOtherState
    storeTrigger
    deleteFiredTriggers
    selectFiredTriggerRecords
    removeTrigger
    deleteSchedulerState
    集群有结点faied,让JOB能重新执行。
    executeInLock
    数据库集群里等同于
    executeInNonManagedTXLock
    storeJobAndTrigger updateJobDetail insertJobDetail
    triggerExists
    selectJobDetail
    updateTrigger insertTrigger
    保存JOB和TRIGGER配置
    storeJob   保存JOB
    removeJob   删除JOB
    removeJobs   批量删除JOB
    removeTriggers   批量删除triggers
    storeJobsAndTriggers   保存JOB和多个trigger配置
    removeTrigger   删除trigger
    replaceTrigger   替换trigger
    storeCalendar   保存定时日期
    removeCalendar   删除定时日期
    clearAllSchedulingData   清除所有定时数据
    pauseTrigger   停止触发器
    pauseJob   停止任务
    pauseJobs   批量停止任务
    resumeTrigger   恢复触发器
    resumeJob   恢复任务
    resumeJobs   批量恢复任务
    pauseTriggers   批量停止触发器
    resumeTriggers   批量恢复触发器
    pauseAll   停止所有
    resumeAll   恢复所有













    ---



    2、STATE_TRIGGER
    实现都在JobStoreSupport类 
    加锁类型 加锁方法 底层数据库操作 备注
    obtainLock doCheckin clusterCheckIn 判断集群状态
    先用LOCK_STATE_ACCESS锁集群状态
    再用LOCK_TRIGGER_ACCESS恢复集群运行
         






    ---
  • 相关阅读:
    属性与字段的区别
    修改LVDS支持1024*600分辨率
    Altium designer 10如何设置标题栏
    嵌入式C开发人员的最好的0x10道笔试题
    进程线程及堆栈关系的总结
    GDB调试
    c语言
    如何使用autotools生成Makefile
    ubuntu NFS
    Ubuntu安装配置TFTP服务
  • 原文地址:https://www.cnblogs.com/SimplifyIT/p/6588365.html
Copyright © 2011-2022 走看看