zoukankan      html  css  js  c++  java
  • Python的并发并行[2] -> 队列[1] -> 使用队列进行任务控制

    使用队列进行任务控制


    1 FIFOLIFO队列

    FIFO(First In First Out)与LIFO(Last In First Out)分别是两种队列形式,在FIFO中,满足先入先出的队列方式,而LIFO则是后入先出的队列形式,利用这两种方式可以实现不同的队列功能。

     1 from random import randint
     2 from time import sleep, ctime
     3 from queue import Queue, LifoQueue
     4 from threading import Thread
     5 
     6 COUNT = 0
     7 
     8 
     9 class MyThread(Thread):
    10     """
    11     Bulid up a Module to make this subclass more general
    12     And get return value by add a function named 'getResult()'
    13     """
    14     def __init__(self, func, args, name=''):
    15         Thread.__init__(self)
    16         self.name = name
    17         self.func = func
    18         self.args = args
    19 
    20     def getResult(self):
    21         return self.res
    22 
    23     def run(self):
    24         print('Starting', self.name, 'at:', ctime())
    25         # Call function here and calculate the running time
    26         self.res = self.func(*self.args)
    27         print(self.name, 'finished at:', ctime())
    28 
    29 
    30 class MyQueue():
    31     def __init__(self):
    32         self.funcs = [self.writer, self.reader]
    33         self.nfuncs = range(len(self.funcs))
    34 
    35     def writeQ(self, queue):
    36         global COUNT
    37         print('Producing object OBJ_%d for Q...' % COUNT, end=' ')
    38         queue.put('OBJ_%d' % COUNT, True)
    39         print('size now:', queue.qsize())
    40         COUNT += 1
    41     
    42     def readQ(self, queue):
    43         # If queue is empty, block here until queue available
    44         val = queue.get(True)
    45         print('Consumed object %s from Q... size now:' % val, queue.qsize())
    46     
    47     def writer(self, queue, loops):
    48         for i in range(loops):
    49             self.writeQ(queue)
    50             sleep(randint(1, 3))
    51     
    52     def reader(self, queue, loops):
    53         for i in range(loops):
    54             self.readQ(queue)
    55             sleep(randint(2, 5))
    56     
    57     def main(self):
    58         nloops = randint(2, 5)
    59         fifoQ = Queue(32)
    60         lifoQ = LifoQueue(32)
    61     
    62         # First In First Out mode for Queue
    63         print('-----Start FIFO Queue-----')
    64         threads = []
    65         for i in self.nfuncs:
    66             threads.append(MyThread(self.funcs[i], (fifoQ, nloops), self.funcs[i].__name__))
    67         for t in threads:
    68             t.start()
    69         for t in threads:
    70             t.join()
    71         # Last In First Out mode for LifoQueue
    72         print('-----Start LIFO Queue-----')
    73         threads = []
    74         for i in self.nfuncs:
    75             threads.append(MyThread(self.funcs[i], (lifoQ, nloops), self.funcs[i].__name__))
    76         for t in threads:
    77             t.start()
    78         for t in threads:
    79             t.join()
    80     
    81         print('All DONE')
    82 
    83 if __name__ == '__main__':
    84      MyQueue().main()

    第 1-27 行,首先对需要的模块进行导入,并定义一个全局变量的计数器,派生一个MyThread线程类,用于调用函数及其返回值(本例中MyThread可用于接受writer和reader函数,同时将Queue的实例作为参数传给这两个函数)。

    第 30-79 行,定义一个队列类,用于进行队列一系列处理,其中writeQ与readQ会分别对队列执行put和get函数,在writeQ中利用全局变量设置每个加入队列的对象的名字。而writer和reader则会利用循环多次执行writeQ和readQ函数。最后定义一个main函数,用于生成队列,同时调用FIFO以及LIFO两种队列方式。

    运行得到结果

    -----Start FIFO Queue-----  
    Starting writer at: Tue Aug  1 21:43:22 2017  
    Producing object OBJ_0 for Q... size now: 1  
    Starting reader at: Tue Aug  1 21:43:22 2017  
    Consumed object OBJ_0 from Q... size now: 0  
    Producing object OBJ_1 for Q... size now: 1  
    Producing object OBJ_2 for Q... size now: 2  
    Producing object OBJ_3 for Q... size now: 3  
    Consumed object OBJ_1 from Q... size now: 2  
    writer finished at: Tue Aug  1 21:43:26 2017  
    Consumed object OBJ_2 from Q... size now: 1  
    Consumed object OBJ_3 from Q... size now: 0  
    reader finished at: Tue Aug  1 21:43:34 2017  
    -----Start LIFO Queue-----  
    Starting writer at: Tue Aug  1 21:43:34 2017  
    Producing object OBJ_4 for Q... size now: 1  
    Starting reader at: Tue Aug  1 21:43:34 2017  
    Consumed object OBJ_4 from Q... size now: 0  
    Producing object OBJ_5 for Q... size now: 1  
    Producing object OBJ_6 for Q... size now: 2  
    Producing object OBJ_7 for Q... size now: 3  
    writer finished at: Tue Aug  1 21:43:38 2017  
    Consumed object OBJ_7 from Q... size now: 2  
    Consumed object OBJ_6 from Q... size now: 1  
    Consumed object OBJ_5 from Q... size now: 0  
    reader finished at: Tue Aug  1 21:43:53 2017  
    All DONE  
    View Code

    从输出可以看出,FIFO满足先入先出,LIFO满足后入先出的队列形式。

    2 join挂起与task_done信号

    在queue模块中,Queue类提供了两个用于跟踪监测任务完成的函数,join和task_done,对于join函数来说,当Queue的类实例调用了join函数挂起时,join函数会阻塞等待,一直到join之前进入队列的所有任务全部标记为task_done后才会解除阻塞。

    Note: 通过查看Queue的源码可以看出,在调用put函数时,会对类变量unfinished_tasks进行数值加1,而调用get函数时并不会将unfinished_tasks进行减1,只有调用task_done函数才会导致变量减1。而调用join函数时,join函数会对这个unfinished_tasks变量进行获取,也就是说,join函数会获取到在调用之前所有被put进队列里的任务中,还没有调用过task_done函数的任务数量,无论这个任务是否已经被get出列。

    下面的例子中,以Queue_FIFO_LIFO.py中的MyQueue为基类,派生出一个新类,用于测试join函数与task_done函数。

     1 from Queue_FIFO_LIFO import *
     2 
     3 class NewQueue(MyQueue):
     4     def __init__(self):
     5         MyQueue.__init__(self)
     6 
     7     def writer(self, queue, loops):
     8         for i in range(loops):
     9             self.writeQ(queue)
    10             sleep(randint(1, 3))
    11         print('Producing join here, waiting consumer')
    12         queue.join()
    13     
    14     def reader(self, queue, loops):
    15         for i in range(loops):
    16             self.readQ(queue)
    17             sleep(randint(2, 5))
    18             print('OBJ_%d task done' % i)
    19             queue.task_done()
    20 
    21     def main(self):
    22         nloops = randint(2, 5)
    23         fifoQ = Queue(32)
    24 
    25         print('-----Start FIFO Queue-----')
    26         threads = []
    27         for i in self.nfuncs:
    28             threads.append(MyThread(self.funcs[i], (fifoQ, nloops), self.funcs[i].__name__))
    29         for t in threads:
    30             t.start()
    31         for t in threads:
    32             t.join()
    33 
    34         print('All DONE')
    35 
    36 if __name__ == '__main__':
    37     NewQueue().main()

    上面的代码,在导入模块后,调用MyQueue的初始化函数进行初始化设置。在新类NewQueue中,对原基类的writer和reader以及main方法进行了重载,加入了join函数和task_done函数,并在main函数中只采用FIFO队列进行试验。

    运行得到结果

    -----Start FIFO Queue-----  
    Starting writer at: Wed Aug  2 09:06:40 2017  
    Producing object OBJ_0 for Q... size now: 1  
    Starting reader at: Wed Aug  2 09:06:40 2017  
    Consumed object OBJ_0 from Q... size now: 0  
    Producing object OBJ_1 for Q... size now: 1  
    Producing object OBJ_2 for Q... size now: 2  
    OBJ_0 task done  
    Consumed object OBJ_1 from Q... size now: 1  
    Producing object OBJ_3 for Q... size now: 2  
    Producing object OBJ_4 for Q... size now: 3  
    Producing join here, waiting consumer  
    OBJ_1 task done  
    Consumed object OBJ_2 from Q... size now: 2  
    OBJ_2 task done  
    Consumed object OBJ_3 from Q... size now: 1  
    OBJ_3 task done  
    Consumed object OBJ_4 from Q... size now: 0  
    OBJ_4 task done  
    reader finished at: Wed Aug  2 09:07:02 2017  
    writer finished at: Wed Aug  2 09:07:02 2017  
    All DONE  
    View Code

    通过得到的结果可以看出,当新类里的writer完成了自己的Producing任务后,会由join挂起,一直等待直到reader的Consuming全部完成且标记task_done之后,才会解除挂起,此时writer和reader将会一起结束退出。

    相关阅读


    1. 多线程的建立

    2. queue 模块

    参考链接 


     

    《Python 核心编程 第3版》

  • 相关阅读:
    AIMS 2013中的性能报告工具不能运行的解决办法
    读懂AIMS 2013中的性能分析报告
    在线研讨会网络视频讲座 方案设计利器Autodesk Infrastructure Modeler 2013
    Using New Profiling API to Analyze Performance of AIMS 2013
    Map 3D 2013 新功能和新API WebCast视频下载
    为Autodesk Infrastructure Map Server(AIMS) Mobile Viewer创建自定义控件
    ADN新开了云计算Cloud和移动计算Mobile相关技术的博客
    JavaScript修改css样式style
    文本编辑神器awk
    jquery 开发总结1
  • 原文地址:https://www.cnblogs.com/stacklike/p/8166854.html
Copyright © 2011-2022 走看看