zoukankan      html  css  js  c++  java
  • 分布式定时任务框架——python定时任务框架APScheduler扩展

    http://bbs.7boo.org/forum.php?mod=viewthread&tid=14546



            如果将定时任务部署在一台服务器上,那么这个定时任务就是整个系统的单点,这台服务器出现故障的话会影响服务。对于可以冗余的任务(重复运行不影响服务),可以部署在多台服务器上,让他们同时执行,这样就可以很简单的避免单点。但是如果任务不允许冗余,最多只能有一台服务器执行任务,那么前面的方法显然行不通。本篇文章就向大家介绍如何避免这种互斥任务的单点问题,最后再介绍一下基于APScheduler的分布式定时任务框架,这个框架是通过多个项目的实践总结而成的。

            对于运行在同一台服务器上的两个进程,可以通过加锁实现互斥执行,而对于运行在多个服务器上的任务仍然可以通过用加锁实现互斥,不过这个锁是分布式锁。这个分布式锁并没有那么神秘,实际上只要一个提供原子性的数据库即可。比如,在数据库的locks表里有一个记录(lock record),包含属性:

            name:锁的名字,互斥的任务需要用名字相同的锁。

            active_ip:持有锁的服务器的ip。

            update_time:上次持有锁的时间,其他非活跃的服务器通过这个属性判断活跃的服务器是否超时,如果超时,则会争夺锁。

            一个持有锁的服务器通过不断的发送心跳,来更新这个记录,心跳的内容就是持有锁的时间戳(update_time),以及本机ip。也就是说,通过发送心跳来保证当前的服务器是活跃的,而其他服务器通过lock record中的update_time来判断当前活跃的服务器是否超时,一旦超时,其他的服务器就会去争夺锁,接管任务的执行,并发送心跳更新active_ip。

            通过上面描述,这个框架中最重要的两个概念就是分布式锁和心跳。下面看一下分布式定时任务框架中是如何实现这两点的。当然,这个框架依赖于APScheduler,所以必须安装这个模块,具体APScheduler的介绍见我的另一篇文章,因为依赖APScheduler,所以这个框架很简单,只有一个类:

    from apscheduler.scheduler import Scheduler
    import datetime
    import time
    import socket
    import struct
    import fcntl

    def get_ip(ifname):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        return socket.inet_ntoa(fcntl.ioctl(
            s.fileno(),
            0x8915, # SIOCGIFADDR
            struct.pack('256s', ifname[:15])
        )[20:24])
        

    class MutexScheduler(Scheduler):
        def __init__(self, gconfig={}, **options):
            Scheduler.__init__(self, gconfig, **options)
            self.ip = get_ip('eth0')

        def mutex(self, lock = None, heartbeat = None, lock_else = None, 
                unactive_interval = datetime.timedelta(seconds = 30)):
            
            def mutex_func_gen(func):
                def mtx_func():

                    if lock:
                        lock_rec = lock()
                        now = datetime.datetime.now()

                        # execute mutex job when the server is active, or the other server is timeout.
                        if not lock_rec or lock_rec['active_ip'] == self.ip or (lock_rec['update_time'] and now - lock_rec['update_time'] >= unactive_interval):  
                            if lock_rec:
                                del lock_rec['active_ip']
                                del lock_rec['update_time']

                            if not lock_rec:
                                lock_rec = {}

                            lock_attrs = func(**lock_rec)
                            if not lock_attrs:
                                lock_attrs = {}

                            # send heart beat
                            heartbeat(self.ip, now, **lock_attrs)
                        else: 
                            lock_else(lock_rec)
                    else:
                        func()

                return mtx_func

            self.mtx_func_gen = mutex_func_gen

            def inner(func):
                return func

            return inner

        def cron_schedule(self, **options):
            def inner(func):
                if hasattr(self, 'mtx_func_gen'):
                    func = self.mtx_func_gen(func)

                func.job = self.add_cron_job(func, **options)
                return func
            return inner

            mutex方法是核心,通过装饰器的方式提供互斥功能。在使用时:

    @sched.mutex(lock = my_lock, heartbeat = my_heartbeat)
    @sched.cron_schedule(second = '*')
    def my_job(**attrs):
        print 'my_job ticks'

            mutex装饰器必须用在cron_schedule装饰器之前,mutex主要是组装job。mutex的参数有:

                    lock:函数,用于获取锁记录(lock record),函数原型:lock()。lock的返回值时dict,就是锁记录内容。

                    heartbeat:函数,用于发出心跳,函数原型:heartbeat(ip, now, **attrs)。ip是本机ip;now是当前时间戳;attrs是一个dict,用于在锁记录中存放一些其他用户自定义信息。

                    lock_else:函数,在没有获得锁时执行,函数原型:lock_else(lock_rec)。lock_rec是锁记录,包含active_ip,update_time以及用户自定义的属性。

                    unactive_interval:datetime.timedelta类型,超时时间,也就是说当前时间减去update_time大于unactive_interval的话,就代表超时。

            在使用这个类时,必须实现自己的lock,heartbeat以及lock_else函数。

            job的原型是job(**attrs),attrs就是存放在锁记录中的用户自定义属性,job可以有dict类型的返回值,这个返回值会存入锁记录中。

            下面,看一下具体使用的例子,使用的mongodb存放分布式锁。

    import apscheduler.events
    import datetime
    import time
    import pymongo
    import sys

    sys.path.append('../src/')

    import mtxscheduler

    sched = mtxscheduler.MutexScheduler()

    mongo = pymongo.Connection(host = '127.0.0.1', port = 27017)
    lock_store = mongo['lockstore']['locks']

    def lock():
        print 'lock()'
        now = datetime.datetime.now() - datetime.timedelta(seconds = 3)
        lck = lock_store.find_one({'name': 't'})
        return lck

    def hb(ip, now, **attrs):
        print 'heartbeat()'
        attrs['active_ip'] = ip
        attrs['update_time'] = now
        lock_store.update({'name': 't'}, {'$set': attrs}, upsert = True)

    def le(lock_rec):
        if lock_rec:
            print 'active ip', lock_rec['active_ip']
        else:
             print 'lock else'

    i = 0

    @sched.mutex(lock = lock, heartbeat = hb, lock_else = le)
    @sched.cron_schedule(second = '*')
    def job(**attr):
        global i
        i += 1
        print i

    def err_listener(ev):
        if ev.exception:
            print sys.exc_info()
                
    sched.add_listener(err_listener, apscheduler.events.EVENT_JOB_ERROR)

    sched.start()
    time.sleep(10)

            这里用到了mongodb的python driver,可以通过命令安装:

    easy_install pymongo

            easy_install的安装件另一篇文章。

            这个任务很简单就是定时打印整数序列。同时在两台服务器上部署运行,可以发现只有一台服务器会输出整数序列。

            使用起来还是很方便的。源代码见github,其中还有使用redis存储锁,已经在锁记录中存放自定义信息的例子。

  • 相关阅读:
    花生壳 manjaro 安装
    manjaro+apache+django+mod_wsgi 安装
    arch linux或 Manjaro下安装 微信 wechat deepin-wine-wechat
    BBU+RRU基本介绍
    黑马python01——基础
    NNLearning阶段性总结01
    【信息论】——第二讲
    10.09——今日文章收集
    pygame安装【在pycharm的IDE project下】
    Git笔记——01
  • 原文地址:https://www.cnblogs.com/dingxiaoyue/p/4926814.html
Copyright © 2011-2022 走看看