在使用Queue模块+多线程模拟生产者+消费者问题时,遇到了一个小问题,现在记录下来。供可能会遇到类似问题的初学者们参考。
该问题的完整参考代码如下。主要实现了以下的功能:在一个线程中,开启生产者模式,生成出来的object会put进一个Queue对象queue中。除此以外,在n个线程中(本代码中n为5,nfuncs = 5),开启消费者模式,每一个消费者线程会在一个while循环中不断地从queue中消耗一个object(使用get方法),直到生产者生产出的object被全部消耗(本代码中设为100个object)。
from random import randint
from time import sleep
from Queue import Queue
from threading import Thread,Lock,currentThread
lock = Lock()
def writes(queue):
print "producing object for Q...",
queue.put('xxx', 1)
print "size now", queue.qsize()
def readQ(queue):
val = queue.get(1)
print "consumed object from Q... size now",
queue.qsize()
print currentThread().name
def writer(queue, loops):
for i in range(loops):
lock.acquire()
writes(queue)
lock.release()
sleep(0.1)
def read(queue):
while queue.qsize() >0 :
sleep(randint(2, 4))
lock.acquire()
readQ(queue)
lock.release()
funcs = [writer, read]
nfuncs = range(5)
def main():
nloops1 = 100
q = Queue(1024)
thread = []
t = Thread(target = writer, args = (q, nloops1))
thread.append(t)
for i in nfuncs:
t = Thread(target = read, args = (q,))
thread.append(t)
for i in range(len(thread)):
thread[i].start()
for i in range(len(thread)):
thread[i].join()
print "All Done"
if __name__ == '__main__':
main()
为了防止生产速度跟不上消费的速度,生产线程中每次生产仅间隔0.1秒,且在消费线程中每次消费之前,随机sleep 2~3秒。
在运行之后,生产的object数量达到100(实际不会print 100号object的生成,因为在生产的过程中已经开始消费),然后多个线程开始消费。然而在把object数量消费至0以后,线程们并不会结束,既“print “”All done“””语句一直没有被执行。
思考以后,得出了三种解决途径:
1.在线程的Join方法中加入参数timeout,如果线程阻塞,线程运行时间达到timeout时,将中止该线程。
该方法的缺点在于当生产数量不确定时,timeout的时间无法很好的确定。如果join的时间太短,可能有的进程还在运行,主进程就继续运行了。如果join的时间太长,在线程很多的情况下,将会阻塞很长的一段时间。
2.试图考虑为什么线程会阻塞。
发现在read函数中,如前所述,在每次消费之前,随机sleep 2~3秒。于是可能会出现以下的问题。
当 queue.qsize() = 1的时候,某个线程x进入了while循环,然后开始睡眠2~3秒,在这个睡眠过程中,GUI切换至其它的线程,此时由于线程x处于睡眠,并没有调用readq函数,因此queue中仍然有一个元素。以此类推,每个进程都在queue.qsize() = 1时进入了while循环,然后最早结束睡眠的线程将调用readq函数中的queue.get(1)。之后其它进程在调用queue.get(1)时,将会因为queue中缺少元素阻塞。
解决方法如下:
def read(queue):
sleep(randint(2, 4))
while queue.qsize():
lock.acquire()
readQ(queue)
lock.release()
sleep(randint(2, 4))
在while之前睡眠,且将每次消费时所需的睡眠放至readQ函数之后。可避免多个线程同时进入while循环,该改进将不会
引起阻塞。
3.Queue中的get方法,若将其参数设置为非0,则不会因为队列中没有可用元素而阻塞。将get的参数设置为0,
利用try/excep语句,当get不到数据引起异常时,excep一个break,中断线程。