尽管在Python中可以使用各种锁和同步原语的组合编写非常传统的多线程程序,但有一种首推的编程方式要优于其他所有编程方式
即将多线程程序组织为多个独立人物的集合,这些任务之间通过消息队列进行通信
queue模块(在python2中叫Queue)实现了各种多生产者-多消费者队列,可用于在执行的多个线程之间安全地交换信息。
queue模块定义了三种不同的队列类
Queue([maxsize])
创造一个FIFO(first-in-first-out,先进先出)队列。maxsize是队列中可以放入的项的最大数量,如果省略maxsize
参数或将它置为0,队列大小将为无穷大。
LifoQueue([maxsize])
创建一个LIFO(last-in-first-out,后进先出)队列(也叫栈)
PriorityQueue([maxsize])
创建一个优先级队列,其中项按照优先级从低到高依次排列。使用这种队列时,项应该是(priority, data)形式的元组,
其中priority是一个数字。
队列类的实例q具有一下方法
q.qsize()
返回队列的正确大小。因为其他线程可能正在更新队列,此方法返回的数字不完全可靠
q.empty()
如果队列为空,返回True,否则返回False
q.full()
如果队列已满,返回True,否则返回False
q.put(item[,block [,timeout]])
将item放入队列。如果可选参数block为True(默认值),调用者将被阻塞直到队列中出现可用的空闲位置为止。
否则(block为False),队列满时将引发Full异常。timeout提供可选的超时值,单位为秒。如果出现超时,将引发
Full异常
q.put_nowait(item)
等价于q.put(item, False)方法
q.get(block [,timeout])
从队列中删除一项,然后返回这个项。如果可选参数block为True(默认值),调用者将阻塞,直到队列中出现可用的
空闲闲置。否则(block为False),队列为空将引发Empty异常。timeout提供可选的超时值,单位为秒。如果出现超时
将引发Empty异常。
q.get_nowait()
等价于get(0)方法
q.task_done()
队列中数据的消费数据用来指示对于项的处理已经结束。如果使用此方法,那么队列中删除的每一项都应该调用一次。
q.join()
阻塞直到队列中的所有项均被删除和处理为止。一旦为队列中的每一项都调用了一次q.task_done()方法,此方法将会直接返回。
1 import threading 2 from queue import Queue 3 4 5 class WorkerThread(threading.Thread): 6 def __init__(self, *args, **kwargs): 7 threading.Thread.__init__(self, *args, **kwargs) 8 self.input_queue = Queue() 9 10 def send(self, item): 11 self.input_queue.put(item) 12 13 def close(self): 14 self.input_queue.put(None) 15 self.input_queue.join() 16 17 def run(self): 18 while True: 19 item = self.input_queue.get() 20 if item is None: 21 break 22 # 处理项(使用有用的工作代替) 23 print(item) 24 self.input_queue.task_done() 25 # 完成,指示收到和返回了哨兵 26 self.input_queue.task_done() 27 return 28 29 30 # 使用示例 31 w = WorkerThread() 32 w.start() 33 w.send('hello') # 将项发给工作线程 34 w.send('world') 35 w.close()
1 import queue 2 import threading 3 import time 4 5 exitFlag = 0 6 7 8 class MyThread (threading.Thread): 9 def __init__(self, threadID, name, q): 10 threading.Thread.__init__(self) 11 self.threadID = threadID 12 self.name = name 13 self.q = q 14 15 def run(self): 16 print("开启线程:" + self.name) 17 process_data(self.name, self.q) 18 print("退出线程:" + self.name) 19 20 21 def process_data(threadName, q): 22 while not exitFlag: 23 queueLock.acquire() 24 if not workQueue.empty(): 25 data = q.get() 26 queueLock.release() 27 print("%s processing %s" % (threadName, data)) 28 else: 29 queueLock.release() 30 time.sleep(1) 31 32 33 threadList = ["Thread-1", "Thread-2", "Thread-3"] 34 nameList = ["One", "Two", "Three", "Four", "Five"] 35 queueLock = threading.Lock() 36 workQueue = queue.Queue(10) 37 threads = [] 38 threadID = 1 39 40 # 创建新线程 41 for tName in threadList: 42 thread = MyThread(threadID, tName, workQueue) 43 thread.start() 44 threads.append(thread) 45 threadID += 1 46 47 # 填充队列 48 queueLock.acquire() 49 for word in nameList: 50 workQueue.put(word) 51 queueLock.release() 52 53 # 等待队列清空 54 while not workQueue.empty(): 55 pass 56 57 # 通知线程是时候退出 58 exitFlag = 1 59 60 # 等待所有线程完成 61 for t in threads: 62 t.join() 63 print("退出主线程") 64 65 66 # http://www.runoob.com/python3/python3-multithreading.html