多线程多进程示例代码:
1 import time 2 import threading 3 import multiprocessing 4 from queue import Queue as threadQueue 5 from multiprocessing import Queue as processQueue 6 7 8 # 多进程多线程 9 # process_num 进程 10 # thread_num 线程 11 12 def main_func(params, process_num, thread_num, interval=1): 13 """ 14 example: 15 params:[1,3,5,7,9,11,2,4,6,8,10,12,13,14,15,16,17,18,19,20,22,24] 16 process_num:1 17 thread_num:5 18 """ 19 if process_num > 1: 20 # 成功对列 21 sq = processQueue() 22 # 失败对列 23 fq = processQueue() 24 _multi_process(params, process_num, thread_num, sq, fq) 25 else: 26 # 成功对列 27 sq = threadQueue() 28 # 失败对列 29 fq = threadQueue() 30 _multi_thread(params, thread_num, sq, fq) 31 32 # 打印进度 33 s_t = time.time() 34 while True: 35 try: 36 time.sleep(interval) # 每隔 interval 打印一次 37 print('进度:%d/%d (%s) 成功:%d个 失败:%d个 速率:%d个/s' 38 % (sq.qsize() + fq.qsize(), len(params), str((sq.qsize() + fq.qsize()) * 100 / len(params)) + '%', 39 sq.qsize(), fq.qsize(), int(sq.qsize() / (time.time() - s_t)))) 40 if process_num > 1: 41 if sq.qsize() + fq.qsize() == len(params) or multiprocessing.active_children() == []: 42 print('处理完成 耗时:%s' % str(time_p(s_t))) 43 44 success_values = get_queue_values(sq) 45 fail_values = get_queue_values(fq) 46 print("成功的个数为: %s" % str(len(success_values))) 47 print("失败的个数为: %s" % str(len(fail_values))) 48 print("成功的比例为: %s" % str( 49 round(len(success_values) * 100 / (len(success_values) + len(fail_values)), 6)) + "%") 50 print("计算结束.") 51 52 print("结果处理开始...") 53 # todo 进行相应的结果处理 54 print("结果处理结束.") 55 return True 56 else: 57 if sq.qsize() + fq.qsize() == len(params) or threading.active_count() == 1: 58 print('处理完成 耗时:%s' % str(time_p(s_t))) 59 60 success_values = get_queue_values(sq) 61 fail_values = get_queue_values(fq) 62 print("成功的个数为: %s" % str(len(success_values))) 63 print("失败的个数为: %s" % str(len(fail_values))) 64 print("成功的比例为: %s" % str( 65 round(len(success_values) * 100 / (len(success_values) + len(fail_values)), 6)) + "%") 66 print("计算结束.") 67 68 print("结果处理开始...") 69 # todo 进行相应的结果处理 70 print("结果处理结束.") 71 return True 72 except: 73 print('停止,耗时:%s' % str(time_p(s_t))) 74 return False 75 76 77 def _multi_process(params, process_num, thread_num, sq, fq): 78 num, y = divmod(params, process_num) 79 if num > 0: 80 for i in range(process_num): 81 p = multiprocessing.Process(target=_multi_thread, 82 args=(params[i * num:(i + 1) * num], thread_num, sq, fq, True)) 83 p.daemon = True 84 p.start() 85 if y > 0: 86 p = multiprocessing.Process(target=_multi_thread, 87 args=(params[num * process_num:], thread_num, sq, fq, True)) 88 p.daemon = True 89 p.start() 90 91 92 def _multi_thread(params, thread_num, sq, fq, _join=False): 93 num, y = divmod(len(params), thread_num) 94 ts = [] 95 if num > 0: 96 for i in range(thread_num): 97 t = threading.Thread(target=_patch, args=(params[i * num:(i + 1) * num], sq, fq)) 98 ts.append(t) 99 t.daemon = True 100 t.start() 101 if y > 0: 102 t = threading.Thread(target=_patch, args=(params[thread_num * num:], sq, fq)) 103 ts.append(t) 104 t.daemon = True 105 t.start() 106 if _join is True: 107 for t in ts: 108 t.join() 109 110 111 def _patch(params, sq, fq): 112 _func(params, success_queue=sq, fail_queue=fq) 113 114 115 def _func(params, success_queue=None, fail_queue=None): 116 # 对params进行处理,并put相应的结果到success_queue和fail_queue 117 # todo 118 for index, every_value in enumerate(params): 119 if type(every_value) == int: 120 success_queue.put(every_value) 121 else: 122 fail_queue.put(every_value) 123 124 125 def time_p(t_s): 126 dt = time.time() - t_s 127 if dt < 60: 128 return str(int(dt)) + 's' 129 elif dt < 3600: 130 return str(int(dt // 60)) + 'min' + str(int(dt % 60)) + 's' 131 elif dt < 86400: 132 return str(int(dt // 3600)) + 'h' + str(int(dt % 3600 // 60)) + 'min' + 133 str(int(dt % 3600 % 60)) + 's' 134 135 136 def get_queue_values(q): 137 ret = [] 138 while True: 139 if q.qsize() > 0: 140 ret.append(q.get()) 141 else: 142 break 143 return ret 144 145 146 # params = [1, 3, 5, 7, 9, 11, 2, 4, 6, 8, 10, 12, 13, 14, 15, 16, 17, 18, 19, 20, 22, 24] 147 # process_num = 1 148 # thread_num = 5 149 # main_func(params=params, process_num=process_num, thread_num=thread_num)
测试结果: