第39条:用Queue来协调各线程之间的工作
如果python程序同时要执行许多事务,那么开发者经常需要协调这些事务。而在各种协调方式中,较为高效的一种,则是采用函数管线。-->流水线。
管线分为许多首尾相连的阶段(phase),每个阶段都有一种具体的函数来负责。程序总是把待处理的新部件添加到管线的开端。每一种函数都可以在它所负责的那个阶段内,并发地处理位于该阶段的部件。等负责本阶段的那个函数,把某个部件处理好之后,该部件就会传送到管线中的下一个阶段,以此类推。涉及阻塞式I/O操作或子进程的工作任务,尤其适合用此办法处理,因为这样的任务,很容易分配到多个Python线程或进程之中。
例如,要构建一个照片处理系统,该系统从数码相机里面持续获取照片、调整尺寸,并将其添加到网络相册中。这样的程序,可以采用三阶段的管线来做。第一阶段获取新图片dowmload。第二阶段把下载好的图片传给缩放函数resize。第三阶段把缩放后的图片交给上传函数upload。
如何将其拼接为一条可以并发处理照片的管线呢?首先,要设计一种任务传递方式-->比如生产者-消费者模型。
1 # 线程安全的生产者-消费者模型 2 class MyQueue(object): 3 def __init__(self): 4 self.items = deque() 5 self.lock = Lock() 6 7 # 生产者-->相机,把新的图片添加到items列表的末端,这个列表用来存 8 # 放待处理的条目 9 def put(self, item): 10 with self.lock: 11 self.items.append(item) 12 13 #消费者-->图片处理管线,从待处理的条目清单顶部移除图片 14 def get(self): 15 with self.lock: 16 return self.items.popleft() 17 18 # 用Python线程来表示管线的各个阶段,这种Worker线程,会从MyQueue这样的队列中取出待处理的任务,并针对该任务运行相关函数,
然后把运行结果放到另一个MyQueue队列里。此外,Worker线程还会记录查询新任务的次数,以及处理完成的任务数量。
Worker四件事:生产队列取消息-->将处理结果放到处理完的队列中;记录查询新任务的次数,以及处理完成的任务数量。
19 class Worker(Thread): 20 def __init__(self, func, in_queue, out_queue): 21 super().__init__() 22 self.func = func # 处理函数 23 self.in_queue = in_queue # 取任务队列 24 self.out_queue = out_queue # 完成任务队列 25 self.polled_count = 0 # 查询新任务次数 26 self.work_done = 0 # 完成任务数量 27 28 # 对于Worker线程来说,最棘手的部分就是如何应对输入队列为空的情 29 况。如果上一个阶段没有及时地把相关任务处理完,那就会引发此问 30 题。-->处理方式:捕获异常。你可以将其想象为生产线上某个环节
发生了阻滞。 31 def run(self): 32 while True: 33 self.polled_count += 1 # 查询新任务次数 34 try: 35 item = self.in_queue.get() 36 except IndexError: 37 sleep(0.01) # No work to do 38 else: 39 result = self.func(item) 40 self.out_queue.put(result) 41 self.work_done += 1 42 43 # 创建相关队列,把整条管线的三个阶段拼接好 44 download_queue = MyQueue() 45 resize_queue = MyQueue() 46 upload_queue = MyQueue() 47 done_queue = MyQueue() 48 threads = [ 49 Worker(download, download_queue,resize_queue), 50 Worker(resize, resize_queue, upload_queue), 51 Worker(upload, upload_queue, done_queue), 52 ] 53 54 # 启动这些线程,并将大量任务添加到管线的第一个阶段 55 for thread in threads: 56 thread.start() 57 for _ in range(1000): 58 download_queue.put(object()) 59 # 用简单的obj对象来模拟download函数所需下载的真实数据 60 while len(done_queue.items) < 1000: 61 # Do something useful while waiting
这个范例程序可以正常运行,但是线程在查询其输入队列并获取新的任务时,可能会产生一种副作用,这是值得我们注意的。run方法中有一段微妙的代码,用来捕获IndexError异常的,而通过下面的输出信息,可以得知:这段代码运行了很多次。
processed = len(done_queue.items) polled = sum(t.polled_count for t in threads) print('Processed', processed, 'items after polling', polled, 'times') >>> Processed 1000 items after polling 3030 times
用Queue类来弥补自编队列的缺陷
内置的queue模块中,有个名叫Queue的类,Queue类使得工作线程无需再频繁地查询输入队列的状态,因为它的get方法会持续阻塞,直到有新的数据加入。
from queue import Queue queue = Queue() def consumer(): print('Consumer waiting') queue.get() # runs after put below print('Consumer done') thread = Thread(target=consumer) thread.start() # 线程虽然已经启动了,但它却并不会立刻就执行完毕,而是会卡在queue.get()那里,我们必须调用Queue实例的put方法,给队列中放入一项任务,方能使queue.get()方法得以返回。 print('Producer putting') queue.put(object()) thread.join() print('Producer done')
为了解决管线的迟滞问题,我们用Queue类来限定队列中待处理的最大任务数量,使得相邻的两个阶段,可以通过该队列平滑地衔接起来。构造Queue时,可以指定缓冲区的容量,如果队列已满,那么后续的put方法就会阻塞。例如,定义一条线程,让该线程先等待片刻,然后再去消费queue队列中的任务
queue = Queue(1) # Buffer size 1 def consumer(): time.sleep(0.1) queue.get() # runs second print('Consumer got 1') queue.get() # runs fourth print('Consumer got 2') thread = Thread(target=consumer) thread.start()
queue.put(object()) # runs first
print('Producer put 1')
queue.put(object()) # runs third
print('Producer put 2')
thread.join()
print('Producer done')
我们还可以通过Queue类的task_done方法来追踪工作进度。有了这个方法,我们就不用再像原来那样,在管线末端的done_queue处进行轮询,而是可以直接判断:管线中的某个阶段,是否已将输入队列中的任务,全都处理完毕。
in_queue = Queue() def consumer(): print('Consumer waiting') work = in_queue.get() # Done second print('Consumer working') # Doing work # ... print('Consumer done') in_queue.task_done() # Done third Thread(target=consumer).start()
现在,生产者线程的代码,既不需要在消费者线程上面调用join方法,也不需要轮询消费者线程。生产者只需在Queue实例上面调用join,并等待in_queue结束即可。即便调用in_queue.join()时队列为空,join也不会立刻返回,必须等消费者线程为队列中的每个条目都调用task_done()之后,生产者线程才可以从join处继续向下执行。
in_queue.put(object()) print('Producer waiting') in_queue.join()#?????? print('Producer done')
我们把这些行为都封装到Queue的子类里面,并且令工作线程可以通过这个ClosableQueue类,判断出自己何时应该停止处理。这个子类定义了close方法,此方法会给队列中添加一个特殊的对象,用以表明该对象之后再也没有其他任务需要处理了
class ClosableQueue(Queue): SENTINEL = object() def close(self): self.put(self.SENTINEL)
然后,为该类定义迭代器,此迭代器在发现特殊对象时,会停止迭代。__iter__方法也会在适当的时机调用task_done,使得开发者可以追踪队列的工作进度。
1 def __init__(self): 2 while True: 3 item = self.get() 4 try: 5 if item is self.SENTINEL: 6 return # Cause the thread to exit 7 yield item 8 finally: 9 self.task_done()
现在,根据ClosableQueue类的行为,来重新定义工作线程。这一次只要for循环耗尽,线程就会退出。
1 class StoppableWorker(Thread): 2 def __init__(self, func, in_queue, out_queue): 3 pass 4 5 def run(self): 6 for item in self.in_queue: 7 result = self.func(item) 8 self.out_queue.put(result) 9 10 download_queue = ClosableQueue() 11 #... 12 threads = [ 13 StoppableWorker(download, download_queue, resize_queue), 14 # ... 15 ] 16 for thread in threads: 17 thread.start() 18 for _ in range(1000): 19 download_queue.put(object()) 20 download_queue.close() 21 22 download_queue.join() 23 resize_queue.close() 24 resize_queue.join() 25 upload_queue.close() 26 upload_queue.join() 27 print(done_queue.qsize(), 'items finished')
要点:
管线是一种优秀的任务处理方式,它可以把处理流程划分为若干阶段,并使用多条Python线程来同时执行这些任务。
构建并发式的管线时,要注意许多问题,其中包括:如何防止某个阶段陷入持续等待的状态之中、如何停止工作线程,以及如何防止内存膨胀等。
Queue类所提供的机制,可以彻底解决上述问题,它具备阻塞式的队列操作、能够指定缓冲区尺寸,而且还支持join方法,这使得开发者可以构建出健壮的管线。