Python 在不依赖第三方库的前提下,对于定时器的实现并不是很完美,但是这不意味着我们无法实现。
阅读了网上的一些资料,得出一些结论,顺手写了一个基类的定时器(Python3)
BaseTimer:
1 # -*- coding: utf-8 -*- 2 3 4 from abc import ABCMeta, abstractmethod 5 import time 6 import threading 7 8 class BaseTimer(threading.Thread): 9 """ 10 基础定时器抽象类,可以随意继承本类并且实现exec抽象方法即可定时产生任务 11 """ 12 __metaclass__ = ABCMeta 13 def __init__(self,howtime=1.0,enduring=True): 14 """ 15 howtime 每次定时的时间 16 enduring 是否是一个持久性的任务,用这个可以控制开关 17 """ 18 19 self.enduring = enduring 20 self.howtime = howtime 21 threading.Thread.__init__(self) 22 23 def run(self): 24 time.sleep(self.howtime) #至少执行一次 任务 25 self.exec() 26 while self.enduring: #是否持久,或者是否进行运行 27 time.sleep(self.howtime) 28 self.exec() #每次开始执行任务 29 30 @abstractmethod 31 def exec(self): 32 """抽象方法,子类实现""" 33 pass 34 35 def destroy(self): 36 """销毁自身""" 37 self.enduring = False 38 del self 39 40 def stop(self): 41 self.enduring = False 42 43 def restart(self): 44 self.enduring = True 45 46 def get_status(self): 47 return self.enduring 48
如何使用?
我们来建立一个新的任务,这个任务是过一段时间就输出:
1 class TimerMask(BaseTimer): 2 """定时任务类,不同的业务逻辑""" 3 def __init__(self,howtime=1.0,enduring=True): 4 BaseTimer.__init__(self,howtime,enduring) 5 self.ws=0 6 7 def exec(self): 8 print("HelloWorld!") 9 self.ws = self.ws + 1 #这里是过0.8秒输出一次 10 if self.ws >5: 11 self.destroy()
加入如下:
1 if __name__ == "__main__": 2 Q_w = 0 3 w = TimerMask(howtime=0.8) 4 print("-") 5 w.start() 6 #这里线程输出这些,做其他事情的 7 while True: 8 time.sleep(0.4) #0.4秒 9 print("- ",Q_w," - WMask:",w) 10 Q_w += 1 11 pass
输出:
于是你可能会想问,那岂不是每个不同的行为都要写一个继承了BaseTimer的类来做事呢,其实不然,你可以写个函数调用的TimerFunc类:
1 class TimerFunc(BaseTimer): 2 """可传递任何函数的定时任务类""" 3 def __init__(self,func,howtime=1.0,enduring=True): 4 BaseTimer.__init__(self,howtime,enduring) 5 self.func = func 6 7 def exec(self): 8 self.func() #调用函数 9 10 #使用方法: 11 def doing(): 12 print("Hello") 13 14 w = TimerFunc(doing) 15 w.start()
输出:"Hello",并且会每隔1秒执行一次
是不是觉得可以做一堆事情了?你可以自由发挥,继承BaseTimer类
1 w = TimerFunc(doing,5,False) #这样就可以定义延迟5秒使用了~ 2 w.start()
在搜索资料的时候,找到了网上大部分的实现方法,其实都差不多,感兴趣你可以看看:
1 import threading ,time 2 from time import sleep, ctime 3 class Timer(threading.Thread): 4 """ 5 very simple but useless timer. 6 """ 7 def __init__(self, seconds): 8 self.runTime = seconds 9 threading.Thread.__init__(self) 10 def run(self): 11 time.sleep(self.runTime) 12 print ("Buzzzz!! Time's up!") 13 14 class CountDownTimer(Timer): 15 """ 16 a timer that can counts down the seconds. 17 """ 18 def run(self): 19 counter = self.runTime 20 for sec in range(self.runTime): 21 print (counter) 22 time.sleep(1.0) 23 counter -= 1 24 print ("Done") 25 26 class CountDownExec(CountDownTimer): 27 """ 28 a timer that execute an action at the end of the timer run. 29 """ 30 def __init__(self, seconds, action, args=[]): 31 self.args = args 32 self.action = action 33 CountDownTimer.__init__(self, seconds) 34 def run(self): 35 CountDownTimer.run(self) 36 self.action(self.args) 37 38 def myAction(args=[]): 39 print ("Performing my action with args:") 40 print (args) 41 42 if __name__ == "__main__": 43 t = CountDownExec(3, myAction, ["hello", "world"]) 44 t.start() 45 print("2333")
1 ''' 2 使用sched模块实现的timer,sched模块不是循环的,一次调度被执行后就Over了,如果想再执行,可以使用while循环的方式不停的调用该方法 3 Created on 2013-7-31 4 5 @author: Eric 6 ''' 7 import time, sched 8 9 #被调度触发的函数 10 def event_func(msg): 11 print("Current Time:", time.strftime("%y-%m-%d %H:%M:%S"), 'msg:', msg) 12 13 def run_function(): 14 #初始化sched模块的scheduler类 15 s = sched.scheduler(time.time, time.sleep) 16 #设置一个调度,因为time.sleep()的时间是一秒,所以timer的间隔时间就是sleep的时间,加上enter的第一个参数 17 s.enter(0, 2, event_func, ("Timer event.",)) 18 s.run() 19 20 def timer1(): 21 while True: 22 #sched模块不是循环的,一次调度被执行后就Over了,如果想再执行,可以使用while循环的方式不停的调用该方法 23 time.sleep(1) 24 run_function() 25 26 if __name__ == "__main__": 27 timer1()