zoukankan      html  css  js  c++  java
  • 多线程多进程code

     

    多线程多进程示例代码:

      1 import time
      2 import threading
      3 import multiprocessing
      4 from queue import Queue as threadQueue
      5 from multiprocessing import Queue as processQueue
      6 
      7 
      8 # 多进程多线程
      9 # process_num 进程
     10 # thread_num 线程
     11 
     12 def main_func(params, process_num, thread_num, interval=1):
     13     """
     14     example:
     15         params:[1,3,5,7,9,11,2,4,6,8,10,12,13,14,15,16,17,18,19,20,22,24]
     16         process_num:1
     17         thread_num:5
     18     """
     19     if process_num > 1:
     20         # 成功对列
     21         sq = processQueue()
     22         # 失败对列
     23         fq = processQueue()
     24         _multi_process(params, process_num, thread_num, sq, fq)
     25     else:
     26         # 成功对列
     27         sq = threadQueue()
     28         # 失败对列
     29         fq = threadQueue()
     30         _multi_thread(params, thread_num, sq, fq)
     31 
     32         # 打印进度
     33     s_t = time.time()
     34     while True:
     35         try:
     36             time.sleep(interval)  # 每隔 interval 打印一次
     37             print('进度:%d/%d (%s)    成功:%d个    失败:%d个    速率:%d个/s'
     38                   % (sq.qsize() + fq.qsize(), len(params), str((sq.qsize() + fq.qsize()) * 100 / len(params)) + '%',
     39                      sq.qsize(), fq.qsize(), int(sq.qsize() / (time.time() - s_t))))
     40             if process_num > 1:
     41                 if sq.qsize() + fq.qsize() == len(params) or multiprocessing.active_children() == []:
     42                     print('处理完成    耗时:%s' % str(time_p(s_t)))
     43 
     44                     success_values = get_queue_values(sq)
     45                     fail_values = get_queue_values(fq)
     46                     print("成功的个数为: %s" % str(len(success_values)))
     47                     print("失败的个数为: %s" % str(len(fail_values)))
     48                     print("成功的比例为: %s" % str(
     49                         round(len(success_values) * 100 / (len(success_values) + len(fail_values)), 6)) + "%")
     50                     print("计算结束.")
     51 
     52                     print("结果处理开始...")
     53                     # todo 进行相应的结果处理
     54                     print("结果处理结束.")
     55                     return True
     56             else:
     57                 if sq.qsize() + fq.qsize() == len(params) or threading.active_count() == 1:
     58                     print('处理完成    耗时:%s' % str(time_p(s_t)))
     59 
     60                     success_values = get_queue_values(sq)
     61                     fail_values = get_queue_values(fq)
     62                     print("成功的个数为: %s" % str(len(success_values)))
     63                     print("失败的个数为: %s" % str(len(fail_values)))
     64                     print("成功的比例为: %s" % str(
     65                         round(len(success_values) * 100 / (len(success_values) + len(fail_values)), 6)) + "%")
     66                     print("计算结束.")
     67 
     68                     print("结果处理开始...")
     69                     # todo 进行相应的结果处理
     70                     print("结果处理结束.")
     71                     return True
     72         except:
     73             print('停止,耗时:%s' % str(time_p(s_t)))
     74             return False
     75 
     76 
     77 def _multi_process(params, process_num, thread_num, sq, fq):
     78     num, y = divmod(params, process_num)
     79     if num > 0:
     80         for i in range(process_num):
     81             p = multiprocessing.Process(target=_multi_thread,
     82                                         args=(params[i * num:(i + 1) * num], thread_num, sq, fq, True))
     83             p.daemon = True
     84             p.start()
     85     if y > 0:
     86         p = multiprocessing.Process(target=_multi_thread,
     87                                     args=(params[num * process_num:], thread_num, sq, fq, True))
     88         p.daemon = True
     89         p.start()
     90 
     91 
     92 def _multi_thread(params, thread_num, sq, fq, _join=False):
     93     num, y = divmod(len(params), thread_num)
     94     ts = []
     95     if num > 0:
     96         for i in range(thread_num):
     97             t = threading.Thread(target=_patch, args=(params[i * num:(i + 1) * num], sq, fq))
     98             ts.append(t)
     99             t.daemon = True
    100             t.start()
    101     if y > 0:
    102         t = threading.Thread(target=_patch, args=(params[thread_num * num:], sq, fq))
    103         ts.append(t)
    104         t.daemon = True
    105         t.start()
    106     if _join is True:
    107         for t in ts:
    108             t.join()
    109 
    110 
    111 def _patch(params, sq, fq):
    112     _func(params, success_queue=sq, fail_queue=fq)
    113 
    114 
    115 def _func(params, success_queue=None, fail_queue=None):
    116     # 对params进行处理,并put相应的结果到success_queue和fail_queue
    117     # todo
    118     for index, every_value in enumerate(params):
    119         if type(every_value) == int:
    120             success_queue.put(every_value)
    121         else:
    122             fail_queue.put(every_value)
    123 
    124 
    125 def time_p(t_s):
    126     dt = time.time() - t_s
    127     if dt < 60:
    128         return str(int(dt)) + 's'
    129     elif dt < 3600:
    130         return str(int(dt // 60)) + 'min' + str(int(dt % 60)) + 's'
    131     elif dt < 86400:
    132         return str(int(dt // 3600)) + 'h' + str(int(dt % 3600 // 60)) + 'min' + 
    133                str(int(dt % 3600 % 60)) + 's'
    134 
    135 
    136 def get_queue_values(q):
    137     ret = []
    138     while True:
    139         if q.qsize() > 0:
    140             ret.append(q.get())
    141         else:
    142             break
    143     return ret
    144 
    145 
    146 # params = [1, 3, 5, 7, 9, 11, 2, 4, 6, 8, 10, 12, 13, 14, 15, 16, 17, 18, 19, 20, 22, 24]
    147 # process_num = 1
    148 # thread_num = 5
    149 # main_func(params=params, process_num=process_num, thread_num=thread_num)

    测试结果:

  • 相关阅读:
    jquery blockUI 扩展插件 Dialog
    ExtJS Form扩展组件[ColorFiled, DateTimeFiled, IconCombo, MultiComboBox, DynamicTreeCombox]
    Struts 笔记
    Spring整合CXF,发布RSETful 风格WebService
    Mybatis传递参数到 xml
    学习Mybatis xml 常用关键语法 Ivin
    一行命令搞定/usr/bin/perl^M: bad interpreter
    js基本功能大全
    Foxmail Server 可以搭建出功能强大的邮件服务器
    Access转成Sql 2008步骤,同时解决自动编号问题,主键,id数值不重置。
  • 原文地址:https://www.cnblogs.com/sunshine-blog/p/14653160.html
Copyright © 2011-2022 走看看