使用队列进行任务控制
1 FIFO与LIFO队列
FIFO(First In First Out)与LIFO(Last In First Out)分别是两种队列形式,在FIFO中,满足先入先出的队列方式,而LIFO则是后入先出的队列形式,利用这两种方式可以实现不同的队列功能。
1 from random import randint 2 from time import sleep, ctime 3 from queue import Queue, LifoQueue 4 from threading import Thread 5 6 COUNT = 0 7 8 9 class MyThread(Thread): 10 """ 11 Bulid up a Module to make this subclass more general 12 And get return value by add a function named 'getResult()' 13 """ 14 def __init__(self, func, args, name=''): 15 Thread.__init__(self) 16 self.name = name 17 self.func = func 18 self.args = args 19 20 def getResult(self): 21 return self.res 22 23 def run(self): 24 print('Starting', self.name, 'at:', ctime()) 25 # Call function here and calculate the running time 26 self.res = self.func(*self.args) 27 print(self.name, 'finished at:', ctime()) 28 29 30 class MyQueue(): 31 def __init__(self): 32 self.funcs = [self.writer, self.reader] 33 self.nfuncs = range(len(self.funcs)) 34 35 def writeQ(self, queue): 36 global COUNT 37 print('Producing object OBJ_%d for Q...' % COUNT, end=' ') 38 queue.put('OBJ_%d' % COUNT, True) 39 print('size now:', queue.qsize()) 40 COUNT += 1 41 42 def readQ(self, queue): 43 # If queue is empty, block here until queue available 44 val = queue.get(True) 45 print('Consumed object %s from Q... size now:' % val, queue.qsize()) 46 47 def writer(self, queue, loops): 48 for i in range(loops): 49 self.writeQ(queue) 50 sleep(randint(1, 3)) 51 52 def reader(self, queue, loops): 53 for i in range(loops): 54 self.readQ(queue) 55 sleep(randint(2, 5)) 56 57 def main(self): 58 nloops = randint(2, 5) 59 fifoQ = Queue(32) 60 lifoQ = LifoQueue(32) 61 62 # First In First Out mode for Queue 63 print('-----Start FIFO Queue-----') 64 threads = [] 65 for i in self.nfuncs: 66 threads.append(MyThread(self.funcs[i], (fifoQ, nloops), self.funcs[i].__name__)) 67 for t in threads: 68 t.start() 69 for t in threads: 70 t.join() 71 # Last In First Out mode for LifoQueue 72 print('-----Start LIFO Queue-----') 73 threads = [] 74 for i in self.nfuncs: 75 threads.append(MyThread(self.funcs[i], (lifoQ, nloops), self.funcs[i].__name__)) 76 for t in threads: 77 t.start() 78 for t in threads: 79 t.join() 80 81 print('All DONE') 82 83 if __name__ == '__main__': 84 MyQueue().main()
第 1-27 行,首先对需要的模块进行导入,并定义一个全局变量的计数器,派生一个MyThread线程类,用于调用函数及其返回值(本例中MyThread可用于接受writer和reader函数,同时将Queue的实例作为参数传给这两个函数)。
第 30-79 行,定义一个队列类,用于进行队列一系列处理,其中writeQ与readQ会分别对队列执行put和get函数,在writeQ中利用全局变量设置每个加入队列的对象的名字。而writer和reader则会利用循环多次执行writeQ和readQ函数。最后定义一个main函数,用于生成队列,同时调用FIFO以及LIFO两种队列方式。
运行得到结果
-----Start FIFO Queue----- Starting writer at: Tue Aug 1 21:43:22 2017 Producing object OBJ_0 for Q... size now: 1 Starting reader at: Tue Aug 1 21:43:22 2017 Consumed object OBJ_0 from Q... size now: 0 Producing object OBJ_1 for Q... size now: 1 Producing object OBJ_2 for Q... size now: 2 Producing object OBJ_3 for Q... size now: 3 Consumed object OBJ_1 from Q... size now: 2 writer finished at: Tue Aug 1 21:43:26 2017 Consumed object OBJ_2 from Q... size now: 1 Consumed object OBJ_3 from Q... size now: 0 reader finished at: Tue Aug 1 21:43:34 2017 -----Start LIFO Queue----- Starting writer at: Tue Aug 1 21:43:34 2017 Producing object OBJ_4 for Q... size now: 1 Starting reader at: Tue Aug 1 21:43:34 2017 Consumed object OBJ_4 from Q... size now: 0 Producing object OBJ_5 for Q... size now: 1 Producing object OBJ_6 for Q... size now: 2 Producing object OBJ_7 for Q... size now: 3 writer finished at: Tue Aug 1 21:43:38 2017 Consumed object OBJ_7 from Q... size now: 2 Consumed object OBJ_6 from Q... size now: 1 Consumed object OBJ_5 from Q... size now: 0 reader finished at: Tue Aug 1 21:43:53 2017 All DONE
从输出可以看出,FIFO满足先入先出,LIFO满足后入先出的队列形式。
2 join挂起与task_done信号
在queue模块中,Queue类提供了两个用于跟踪监测任务完成的函数,join和task_done,对于join函数来说,当Queue的类实例调用了join函数挂起时,join函数会阻塞等待,一直到join之前进入队列的所有任务全部标记为task_done后才会解除阻塞。
Note: 通过查看Queue的源码可以看出,在调用put函数时,会对类变量unfinished_tasks进行数值加1,而调用get函数时并不会将unfinished_tasks进行减1,只有调用task_done函数才会导致变量减1。而调用join函数时,join函数会对这个unfinished_tasks变量进行获取,也就是说,join函数会获取到在调用之前所有被put进队列里的任务中,还没有调用过task_done函数的任务数量,无论这个任务是否已经被get出列。
下面的例子中,以Queue_FIFO_LIFO.py中的MyQueue为基类,派生出一个新类,用于测试join函数与task_done函数。
1 from Queue_FIFO_LIFO import * 2 3 class NewQueue(MyQueue): 4 def __init__(self): 5 MyQueue.__init__(self) 6 7 def writer(self, queue, loops): 8 for i in range(loops): 9 self.writeQ(queue) 10 sleep(randint(1, 3)) 11 print('Producing join here, waiting consumer') 12 queue.join() 13 14 def reader(self, queue, loops): 15 for i in range(loops): 16 self.readQ(queue) 17 sleep(randint(2, 5)) 18 print('OBJ_%d task done' % i) 19 queue.task_done() 20 21 def main(self): 22 nloops = randint(2, 5) 23 fifoQ = Queue(32) 24 25 print('-----Start FIFO Queue-----') 26 threads = [] 27 for i in self.nfuncs: 28 threads.append(MyThread(self.funcs[i], (fifoQ, nloops), self.funcs[i].__name__)) 29 for t in threads: 30 t.start() 31 for t in threads: 32 t.join() 33 34 print('All DONE') 35 36 if __name__ == '__main__': 37 NewQueue().main()
上面的代码,在导入模块后,调用MyQueue的初始化函数进行初始化设置。在新类NewQueue中,对原基类的writer和reader以及main方法进行了重载,加入了join函数和task_done函数,并在main函数中只采用FIFO队列进行试验。
运行得到结果
-----Start FIFO Queue----- Starting writer at: Wed Aug 2 09:06:40 2017 Producing object OBJ_0 for Q... size now: 1 Starting reader at: Wed Aug 2 09:06:40 2017 Consumed object OBJ_0 from Q... size now: 0 Producing object OBJ_1 for Q... size now: 1 Producing object OBJ_2 for Q... size now: 2 OBJ_0 task done Consumed object OBJ_1 from Q... size now: 1 Producing object OBJ_3 for Q... size now: 2 Producing object OBJ_4 for Q... size now: 3 Producing join here, waiting consumer OBJ_1 task done Consumed object OBJ_2 from Q... size now: 2 OBJ_2 task done Consumed object OBJ_3 from Q... size now: 1 OBJ_3 task done Consumed object OBJ_4 from Q... size now: 0 OBJ_4 task done reader finished at: Wed Aug 2 09:07:02 2017 writer finished at: Wed Aug 2 09:07:02 2017 All DONE
通过得到的结果可以看出,当新类里的writer完成了自己的Producing任务后,会由join挂起,一直等待直到reader的Consuming全部完成且标记task_done之后,才会解除挂起,此时writer和reader将会一起结束退出。
相关阅读
1. 多线程的建立
2. queue 模块
参考链接
《Python 核心编程 第3版》