需要你的主线程把处理任务托付给一个工作线程池
Queue.Queue是用来协调工作线程池的最简单和最有效率的方法。因为它已经有了全局变量的保护(加锁和解锁)
#-*-coding:utf-8-*- import threading, Queue, time, sys #global varialbes Qin = Queue.Queue() Qout = Queue.Queue() Qerr = Queue.Queue() Pool = [ ] def report_error(): '''put error information into Qerr to report the error''' Qerr.put(sys.exc_info()[:2]) def get_all_from_queue(Q): '''acquire all items of Queue Q ,no need to wait''' try: while True: yield Q.get_nowait() except Queue.Empty: raise StopIteration def do_work_from_queue(): ''' workthread acquire little job and loops working''' while True: command, item = Qin.get() if command == 'stop': break try: #simulate the job of working if command == 'process': result = 'new' + item else: raise ValueError ,'Unknown command %r' %command except: #it's right to except unconditionally,because we report all the mistakes report_error() else: Qout.put(result) def make_and_start_thread_pool(number_of_threads_in_pool=5, daemons=True): '''create a pool has N threads, and make them daemon and start''' for i in range(number_of_threads_in_pool): new_thread = threading.Thread(target = do_work_from_queue) new_thread.setDaemon(daemons) Pool.append(new_thread) new_thread.start() def request_work(data, command = 'process'): '''requesting work as if tuple(command, data)''' Qin.put((command, data)) def get_result(): return Qout.get() def show_all_results(): for result in get_all_from_queue(Qout): print 'Result:', result def show_all_errors(): for etyp, err in get_all_from_queue(Qerr): print 'Error:', etyp, err def stop_and_free_thread_pool(): #sequence is important,first of all, let all the threads stop for i in range(len(Pool)): request_work(None,'stop') #wait every thread stop for existing_thread in Pool: existing_thread.join() #clean thread pool del Pool[:] for i in ('_ba', '_be', '_bo'): request_work(i) make_and_start_thread_pool() stop_and_free_thread_pool() show_all_results() show_all_errors()
运行效果:
Result: new_ba
Result: new_be
Result: new_bo