zoukankan      html  css  js  c++  java
  • openstack中运行定时任务的两种方法及源代码分析

    启动一个进程,如要想要这个进程的某个方法定时得进行执行的话,在openstack有两种方式: 一种是通过继承 periodic_task.PeriodicTasks,另一种是使用loopingcall.py,针对两种方式分别说一下实现原理。
    (1) 继承periodic_task.PeriodicTasks
             看一下PeriodicTasks 这个类。
    144 @six.add_metaclass(_PeriodicTasksMeta)
    145 class PeriodicTasks(object):
    146     def __init__(self):
    147         super(PeriodicTasks, self).__init__()
    148         self._periodic_last_run = {}
    149         for name, task in self._periodic_tasks:
    150             self._periodic_last_run[name] = task._periodic_last_run
    152     def run_periodic_tasks(self, context, raise_on_error=False):
    153         """Tasks to be run at a periodic interval."""
    154         idle_for = DEFAULT_INTERVAL
    155         for task_name, task in self._periodic_tasks:
    156             full_task_name = '.'.join([self.__class__.__name__, task_name])
    158             spacing = self._periodic_spacing[task_name]
    159             last_run = self._periodic_last_run[task_name]
    161             # If a periodic task is _nearly_ due, then we'll run it early
    162             if spacing is not None:
    163                 idle_for = min(idle_for, spacing)
    164                 if last_run is not None:
    165                     delta = last_run + spacing - time.time()
    166                     if delta > 0.2:
    167                         idle_for = min(idle_for, delta)
    168                         continue
    170             LOG.debug("Running periodic task %(full_task_name)s",
    171                       {"full_task_name": full_task_name})
    172             self._periodic_last_run[task_name] = time.time()
    174             try:
    175                 task(self, context)
    176             except Exception as e:
    177                 if raise_on_error:
    178                     raise
    179                 LOG.exception(_LE("Error during %(full_task_name)s: %(e)s"),
    180                               {"full_task_name": full_task_name, "e": e})
    181             time.sleep(0)
    183         return idle_for
    run_periodic_tasks 函数是用户启动各个定时任务的,其中里面有几个数据结构比较重要,self._periodic_tasks:记录来每个task和每个task的函数句柄;self._periodic_spacing: 记录每一个task的运行间隔时间。在__init__函数中,还有构造一个self._periodic_last_run 结构用来记录每一个task上一次运行的时间;具体运行的时候会根据上次运行时间和间隔时间来确定是否运行,函数第162~168行;那具体的self._periodic_tasks和self._periodic_spacing是怎么得来的,是通过元类的方式来实现的;
    100 class _PeriodicTasksMeta(type):
    101     def __init__(cls, names, bases, dict_):
    102         """Metaclass that allows us to collect decorated periodic tasks."""
    103         super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
    105         # NOTE(sirp): if the attribute is not present then we must be the base
    106         # class, so, go ahead an initialize it. If the attribute is present,
    107         # then we're a subclass so make a copy of it so we don't step on our
    108         # parent's toes.
    109         try:
    110             cls._periodic_tasks = cls._periodic_tasks[:]
    111         except AttributeError:
    112             cls._periodic_tasks = []
    114         try:
    115             cls._periodic_spacing = cls._periodic_spacing.copy()
    116         except AttributeError:
    117             cls._periodic_spacing = {}
    119         for value in cls.__dict__.values():
    120             if getattr(value, '_periodic_task', False):
    121                 task = value
    122                 name = task.__name__
    124                 if task._periodic_spacing < 0:
    125                     LOG.info(_LI('Skipping periodic task %(task)s because '
    126                                  'its interval is negative'),
    127                              {'task': name})
    128                     continue
    129                 if not task._periodic_enabled:
    130                     LOG.info(_LI('Skipping periodic task %(task)s because '
    131                                  'it is disabled'),
    132                              {'task': name})
    133                     continue
    135                 # A periodic spacing of zero indicates that this task should
    136                 # be run every pass
    137                 if task._periodic_spacing == 0:
    138                     task._periodic_spacing = None
    140                 cls._periodic_tasks.append((name, task))
    141                 cls._periodic_spacing[name] = task._periodic_spacing
    其中109~117为类添加_periodic_tasks与_periodic_spacing两个类变量, for value in cls.__dict__.values() 语句访问类的各个成员,主要是函数成员;如果发现成员中有_periodic_task属性,并且等于True,则构造_periodic_tasks与_periodic_spacing两个数据结构;那么剩下就要弄清楚task的结构了,task就是类中的一个函数,它为什么具有_periodic_task属性和_periodic_spacing的呢?这个活就是装饰器做的事情了。
    @periodic_task.periodic_task(spacing=…, run_immediately=...)
    def f(args,kwargs):
     42 def periodic_task(*args, **kwargs):
     43     """Decorator to indicate that a method is a periodic task.
     45     This decorator can be used in two ways:
     47         1. Without arguments '@periodic_task', this will be run on every cycle
     48            of the periodic scheduler.
     50         2. With arguments:
     51            @periodic_task(spacing=N [, run_immediately=[True|False]])
     52            this will be run on approximately every N seconds. If this number is
     53            negative the periodic task will be disabled. If the run_immediately
     54            argument is provided and has a value of 'True', the first run of the
     55            task will be shortly after task scheduler starts.  If
     56            run_immediately is omitted or set to 'False', the first time the
     57            task runs will be approximately N seconds after the task scheduler
     58            starts.
     59     """
     60     def decorator(f):
     61         # Test for old style invocation
     62         if 'ticks_between_runs' in kwargs:
     63             raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
     65         # Control if run at all
     66         f._periodic_task = True
     67         f._periodic_external_ok = kwargs.pop('external_process_ok', False)
     68         if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
     69             f._periodic_enabled = False
     70         else:
     71             f._periodic_enabled = kwargs.pop('enabled', True)
     73         # Control frequency
     74         f._periodic_spacing = kwargs.pop('spacing', 0)
     75         f._periodic_immediate = kwargs.pop('run_immediately', False)
     76         if f._periodic_immediate:
     77             f._periodic_last_run = None
     78         else:
     79             f._periodic_last_run = time.time()
     80         return f
     82     # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
     83     # and without parents.
     84     #
     85     # In the 'with-parents' case (with kwargs present), this function needs to
     86     # return a decorator function since the interpreter will invoke it like:
     87     #
     88     #   periodic_task(*args, **kwargs)(f)
     89     #
     90     # In the 'without-parents' case, the original function will be passed
     91     # in as the first argument, like:
     92     #
     93     #   periodic_task(f)
     94     if kwargs:
     95         return decorator
     96     else:
     97         return decorator(args[0])
    (2)另一个种方法是使用loopingcall.py ,也在nova/openstack/common中。
    使用方法是:obj = loopingcall.FixedIntervalLoopingCall(f, args,kwargs),
     62 class FixedIntervalLoopingCall(LoopingCallBase):
     63     """A fixed interval looping call."""
     65     def start(self, interval, initial_delay=None):
     66         self._running = True
     67         done = event.Event()
     69         def _inner():
     70             if initial_delay:
     71                 greenthread.sleep(initial_delay)
     73             try:
     74                 while self._running:
     75                     start = timeutils.utcnow()
     76                     self.f(*self.args, **self.kw)
     77                     end = timeutils.utcnow()
     78                     if not self._running:
     79                         break
     80                     delay = interval - timeutils.delta_seconds(start, end)
     81                     if delay <= 0:
     82                         LOG.warn(_LW('task run outlasted interval by %s sec') %
     83                                  -delay)
     84                     greenthread.sleep(delay if delay > 0 else 0)
     85             except LoopingCallDone as e:
     86                 self.stop()
     87                 done.send(e.retvalue)
     88             except Exception:
     89                 LOG.exception(_LE('in fixed duration looping call'))
     90                 done.send_exception(*sys.exc_info())
     91                 return
     92             else:
     93                 done.send(True)
     95         self.done = done
     97         greenthread.spawn_n(_inner)
     98         return self.done
    start()方法运行定时任务,initial_delay表示是否有延时,75~84每次运行任务要记录开始时间和结束时间,如果开始时间减去结束时间比interval还大的话,那么就不等待了,立刻运行:greenthread.sleep(delay if delay > 0 else 0)
  • 相关阅读:
    mysql 将null转代为0(转)
  • 原文地址:https://www.cnblogs.com/yuhan-TB/p/4085074.html
Copyright © 2011-2022 走看看