zoukankan      html  css  js  c++  java
  • 【笔记】:单线程下挂多个定时任务

    单线程多定时任务

      前言:公司业务需求,实例当中大量需要启动定时器的操作;大家都知道python中的定时器用的是threading.Timer,每当启动一个定时器时,程序内部起了一个线程,定时器触发执行结束后,线程自动销毁;这里就涉及到一个问题,如果同时有大量启动定时器的需求时,内部线程过多,程序肯定就崩了,有没有启一个线程就能完成定时器的操作呢?网上查了一些资料,还没有看到能解决目前问题的现成代码,不如自己搞一个试试

    1、初始版本:

    思路:定时器,说白了就是延时执行指定的程序,目前自己重构python里面的定时器不太现实,能力达不到,所以延时操作时还得用到系统定时器,不过我们可以改一下规则;把所有要进行定时操作的程序添加到特定列表中,把列表中定时时间最短程序拿出来,进行threading.Timer(time,callback)绑定,等时间超时触发自定义的callback,执行刚刚列表取出的程序;然后把时间更新,再次把列表中时间最短的程序拿出了,继续threading.Timer绑定,不断的迭代循环;当有新的定时任务加入到列表时,把当前的threading.Timer绑定取消,更新列表中的时间,再次取出最短时间,进行threading.Timer绑定......

    代码:

    import threading
    import time
    
    class Timer():
        '''单线程下的定时器'''
    
        def __init__(self):
            self.queues = []
            self.timer = None
            self.last_time = time.time()
    
        def start(self):
            item = self.get()
            if item:
                self.timer = threading.Timer(item[0],self.execute)
                self.timer.start()
    
        def add(self,item):
            print('add',item)
            self.flush_time()
            self.queues.append(item)
            self.queues.sort(key=lambda x:x[0])
    
            if self.timer:
                self.timer.cancel()
                self.timer = None
            self.start()
    
        def get(self):
            item = None
            if len(self.queues) > 0:
                item = self.queues[0]
            return item
    
        def pop(self):
            item = None
            if len(self.queues) > 0:
                item = self.queues.pop(0)
            return item
    
        def flush_time(self):
            curr_time = time.time()
            for i in self.queues:
                i[0] = i[0] - (curr_time - self.last_time)
            self.last_time = curr_time
    
        def execute(self):
            # if self.timer:
            #     self.timer.cancel()
            #     self.timer = None
            item = self.pop()
            self.flush_time()
            if item:
                callback = item[1]
                args = item[0]
                callback(args)
            self.start()

    执行及输出:

    if __name__ == '__main__':
        # 检测线程数
        def func():
            while True:
                print(threading.active_count())
                time.sleep(1)
        
        f1 = threading.Thread(target=func)
        f1.start()
        
        import logging
        logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]")
        def func1(*args):
            logging.info('func1 %s'%args)
            # time.sleep(5)
        
        def func2(*args):
            logging.info('func2 %s' % args)
            # time.sleep(5)
        def func3(*args):
            logging.info('func3 %s' % args)
            # time.sleep(5)
        
        def func4(*args):
            logging.info('func4 %s' % args)
            # time.sleep(5)
        
        def func5(*args):
            logging.info('func5 %s' % args)
            # time.sleep(5)
        
        
        # 测试
        t1 = Timer()
        logging.info('start')
        t1.add([5,func1])
        time.sleep(0.5)
        t1.add([4,func2])
        time.sleep(0.5)
        t1.add([3,func3])
        time.sleep(0.5)
        t1.add([2,func4])
        time.sleep(0.5)
        t1.add([1,func5])
        time.sleep(5)
        t1.add([1,func1])
        t1.add([2,func2])
        t1.add([3,func3])
        t1.add([4,func4])
        t1.add([5,func5])
        
        # 输出
        # 2
        # 07/27/2017 10:36:47 [Thursday] start
        # add [5, <function func1 at 0x000000D79FC77E18>]
        # add [4, <function func2 at 0x000000D79FCA8488>]
        # 3
        # add [3, <function func3 at 0x000000D79FCA8510>]
        # add [2, <function func4 at 0x000000D79FCA8598>]
        # 3
        # add [1, <function func5 at 0x000000D79FCA8620>]
        # 3
        # 07/27/2017 10:36:50 [Thursday] func5 1
        # 07/27/2017 10:36:51 [Thursday] func4 0.498349666595459
        # 3
        # 07/27/2017 10:36:51 [Thursday] func3 0.49782633781433105
        # 07/27/2017 10:36:52 [Thursday] func2 0.49848270416259766
        # 3
        # 07/27/2017 10:36:52 [Thursday] func1 0.48449039459228516
        # 2
        # 2
        # add [1, <function func1 at 0x000000D79FC77E18>]
        # add [2, <function func2 at 0x000000D79FCA8488>]
        # add [3, <function func3 at 0x000000D79FCA8510>]
        # add [4, <function func4 at 0x000000D79FCA8598>]
        # add [5, <function func5 at 0x000000D79FCA8620>]
        # 3
        # 07/27/2017 10:36:55 [Thursday] func1 0.9990766048431396
        # 3
        # 07/27/2017 10:36:56 [Thursday] func2 0.9988017082214355
        # 3
        # 07/27/2017 10:36:57 [Thursday] func3 0.99928879737854
        # 07/27/2017 10:36:58 [Thursday] func4 0.9991350173950195
        # 3
        # 3
        # 07/27/2017 10:36:59 [Thursday] func5 0.9988160133361816
        
    执行代码

    注:查看代码输出,所有的定时器都按照标定的时间依次执行,非常完美,一切看起来很美好,只是看起来,呵呵哒,当你把func里面的time.sleep(5)启用后,线程数蹭蹭的上来了;原因是上个定时器callback还是执行中,下个定时器已经启动了,这时就又新增了一个线程,哎,失败

    2、修订版本

    思路:利用生成者消费者模型,用到threading.Condition条件变量;强制永远启用的是一个Timer!

    代码:

    import time
    import threading
    import logging
    
    class NewTimer(threading.Thread):
        '''单线程下的定时器'''
        def __init__(self):
            super().__init__()
            self.queues = []
            self.timer = None
            self.cond = threading.Condition()
    
        def run(self):
            while True:
                # print('NewTimer',self.queues)
                self.cond.acquire()
                item = self.get()
                callback = None
                if not item:
                    logging.info('NewTimer wait')
                    self.cond.wait()
                elif item[0] <= time.time():
                    new_item = self.pop()
                    callback = new_item[1]
                else:
                    logging.info('NewTimer start sys timer and wait')
                    self.timer = threading.Timer(item[0]-time.time(),self.execute)
                    self.timer.start()
                    self.cond.wait()
                self.cond.release()
    
                if callback:
                    callback(item[0])
    
        def add(self, item):
            # print('add', item)
            self.cond.acquire()
            item[0] = item[0] + time.time()
            self.queues.append(item)
            self.queues.sort(key=lambda x: x[0])
            logging.info('NewTimer add notify')
            if self.timer:
                self.timer.cancel()
                self.timer = None
            self.cond.notify()
            self.cond.release()
    
        def pop(self):
            item = None
            if len(self.queues) > 0:
                item = self.queues.pop(0)
            return item
    
        def get(self):
            item = None
            if len(self.queues) > 0:
                item = self.queues[0]
            return item
    
        def execute(self):
            logging.info('NewTimer execute notify')
            self.cond.acquire()
            self.cond.notify()
            self.cond.release()
    

    执行及输出:

    if __name__ == '__main__':
        def func():
            while True:
                print(threading.active_count())
                time.sleep(1)
    
        f1 = threading.Thread(target=func)
        f1.start()
        logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]")
    
        newtimer = NewTimer()
        newtimer.start()
    
        def func1(*args):
            logging.info('func1 %s'%args)
            time.sleep(5)
    
        def func2(*args):
            logging.info('func2 %s' % args)
            time.sleep(5)
        def func3(*args):
            logging.info('func3 %s' % args)
            time.sleep(5)
    
        def func4(*args):
            logging.info('func4 %s' % args)
            time.sleep(5)
    
        def func5(*args):
            logging.info('func5 %s' % args)
            time.sleep(5)
    
        newtimer.add([5,func1])
        newtimer.add([4,func2])
        newtimer.add([3,func3])
        newtimer.add([2,func4])
        newtimer.add([1,func5])
        time.sleep(1)
        newtimer.add([1,func1])
        newtimer.add([2,func2])
        newtimer.add([3,func3])
        newtimer.add([4,func4])
        newtimer.add([5,func5])
    
    # 输出
    # 2
    # 07/27/2017 11:26:19 [Thursday] NewTimer wait
    # 07/27/2017 11:26:19 [Thursday] NewTimer add notify
    # 07/27/2017 11:26:19 [Thursday] NewTimer add notify
    # 07/27/2017 11:26:19 [Thursday] NewTimer add notify
    # 07/27/2017 11:26:19 [Thursday] NewTimer add notify
    # 07/27/2017 11:26:19 [Thursday] NewTimer add notify
    # 07/27/2017 11:26:19 [Thursday] NewTimer start sys timer and wait
    # 07/27/2017 11:26:20 [Thursday] NewTimer execute notify
    # 4
    # 07/27/2017 11:26:20 [Thursday] func5 1501125980.2175007
    # 07/27/2017 11:26:20 [Thursday] NewTimer add notify
    # 07/27/2017 11:26:20 [Thursday] NewTimer add notify
    # 07/27/2017 11:26:20 [Thursday] NewTimer add notify
    # 07/27/2017 11:26:20 [Thursday] NewTimer add notify
    # 07/27/2017 11:26:20 [Thursday] NewTimer add notify
    # 3
    # 3
    # 3
    # 3
    # 3
    # 07/27/2017 11:26:25 [Thursday] func4 1501125981.2175007
    # 3
    # 3
    # 3
    # 3
    # 07/27/2017 11:26:30 [Thursday] func1 1501125981.218279
    # 3
    # 3
    # 3
    # 3
    # 3
    # 3
    # 07/27/2017 11:26:35 [Thursday] func3 1501125982.2175007
    # 3
    # 3
    # 3
    # 3
    # 07/27/2017 11:26:40 [Thursday] func2 1501125982.218279
    # 3
    # 3
    # 3
    # 3
    # 3
    # 07/27/2017 11:26:45 [Thursday] func2 1501125983.2175007
    # 3
    # 3
    # 3
    # 3
    # 3
    # 07/27/2017 11:26:50 [Thursday] func3 1501125983.218279
    # 3
    # 3
    # 3
    # 3
    # 3
    # 07/27/2017 11:26:55 [Thursday] func1 1501125984.2175007
    # 3
    # 3
    # 3
    # 3
    # 3
    # 07/27/2017 11:27:00 [Thursday] func4 1501125984.218279
    # 3
    # 3
    # 3
    # 3
    # 3
    # 07/27/2017 11:27:05 [Thursday] func5 1501125985.218279
    # 3
    # 3
    # 3
    # 3
    # 3
    # 07/27/2017 11:27:10 [Thursday] NewTimer wait
    输出

    注:这次无论如何测试线程数也不会蹭蹭的上涨,同时可以实现多定时器任务要求;缺点:用到了两线程,没有用到单线程去实现,第二时间精准度问题,需要等待上个定时程序执行完毕,程序才能继续运行

      

  • 相关阅读:
    创建无线网命令行
    网站推荐(多用于IT)
    企业级快速开发平台
    用代码截图去理解MVC原理
    .Net 下开发使用JSON
    EF实体框架数据操作基类
    EF实体框架数据操作接口
    开启GZIP
    EF快速开发定义数据接口类
    仿造w3school的试一试功能,实现左侧编辑框,右侧效果页面
  • 原文地址:https://www.cnblogs.com/lianzhilei/p/7243315.html
Copyright © 2011-2022 走看看