zoukankan      html  css  js  c++  java
  • Python多线程与队列

    Python多线程与Queue队列多线程在感官上类似于同时执行多个程序,虽然由于GIL的存在,在Python中无法实现线程的真正并行,但是对于某些场景,多线程仍不失为一个有效的处理方法:

    1,不紧急的,无需阻塞主线程的任务,此时可以利用多线程在后台慢慢处理;
    2,IO密集型操作,比如文件读写、用户输入和网络请求等,此时多线程可以近似达到甚至优于多进程的表现;

    多线程的基本使用不再赘述,以下语法便可轻松实现:

    1 def task(args1, args2):
    2     pass
    3 
    4 Thread(
    5     target=task,
    6     args=(args1, args2)
    7 ).start()


    这里我们重点关注线程通信。

    假设有这么一种场景:有一批源数据,指定一个操作系数N,需要分别对其进行与N的加减乘除操作,并将结果汇总。
    当然这里的加减乘除只是一种简单处理,在实际的生产环境中,它其实代表了一步较为复杂的业务操作,并包含了较多的IO处理。

    自然我们想到可以开启多线程处理,那么紧接着的问题便是:如何划分线程,是根据处理步骤划分,还是根据源数据划分?

    对于前者,我们把涉及的业务操作单独划分位一个线程,即有4个线程分别进行加减乘除的操作,显然上一个线程的结果是下一个线程的输入,这类似于流水线操作;

    而后者则是把源数据分为若干份,每份启动一个线程进行处理,最终把结果汇总。一般来说,我们推荐第一种方式。因为在一个线程中完成所有的操作不如每步一个线程清晰明了,

    尤其是在一些复杂的场景下,会加大单个线程的出错概率和测试难度。

    那么我们将开辟4个线程,分别执行加减乘除操作。最后一个除法线程结束则任务完成:

     1 #!/usr/bin/env python
     2 # -*- coding: utf-8 -*-
     3 
     4 from Queue import Queue
     5 from threading import Thread
     6 
     7 
     8 class NumberHandler(object):
     9     def __init__(self, n):
    10         self.n = n
    11 
    12     def add(self, num):
    13         return num + self.n
    14 
    15     def subtract(self, num):
    16         return num - self.n
    17 
    18     def multiply(self, num):
    19         return num * self.n * self.n
    20 
    21     def divide(self, num):
    22         return num / self.n
    23 
    24 
    25 class ClosableQueue(Queue):
    26     SENTINEL = object()
    27 
    28     def close(self):
    29         self.put(self.SENTINEL)
    30 
    31     def __iter__(self):
    32         while True:
    33             item = self.get()
    34             try:
    35                 if item is self.SENTINEL:
    36                     return
    37                 yield item
    38             finally:
    39                 self.task_done()
    40 
    41 
    42 class StoppableWorker(Thread):
    43     def __init__(self, func, in_queue, out_queue):
    44         super(StoppableWorker, self).__init__()
    45         self.in_queue = in_queue
    46         self.out_queue = out_queue
    47         self.func = func
    48 
    49     def run(self):
    50         for item in self.in_queue:
    51             result = self.func(item)
    52             self.out_queue.put(result)
    53             print self.func
    54 
    55 
    56 if __name__ == '__main__':
    57     source_queue = ClosableQueue()
    58     add_queue = ClosableQueue()
    59     subtract_queue = ClosableQueue()
    60     multiply_queue = ClosableQueue()
    61     divide_queue = ClosableQueue()
    62     result_queue = ClosableQueue()
    63 
    64     number_handler = NumberHandler(5)
    65 
    66     threads = [
    67         StoppableWorker(number_handler.add, add_queue, subtract_queue),
    68         StoppableWorker(number_handler.subtract, subtract_queue, multiply_queue),
    69         StoppableWorker(number_handler.multiply, multiply_queue, divide_queue),
    70         StoppableWorker(number_handler.divide, divide_queue, result_queue),
    71     ]
    72 
    73     for _thread in threads:
    74         _thread.start()
    75 
    76     for i in range(10):
    77         add_queue.put(i)
    78 
    79     add_queue.close()
    80     add_queue.join()
    81     print 'add job done...'
    82     subtract_queue.close()
    83     subtract_queue.join()
    84     print 'subtract job done...'
    85     multiply_queue.close()
    86     multiply_queue.join()
    87     print 'multiply job done...'
    88     divide_queue.close()
    89     divide_queue.join()
    90     print 'divide job done...'
    91     result_queue.close()
    92 
    93     print "%s items finished, result: %s" % (result_queue.qsize(), result_queue)
    94 
    95     for i in result_queue:
    96         print i

    运行结果:

    线程执行日志:

     总的结果:

     可见线程交叉运行,但是任务却是顺序结束,这符合我们的预期。

    值得注意的是,我们在ClosableQueue定义了一个close()方法,通过放入一个特殊的类变量SENTINEL告诉队列应该关闭。此外,由于直接加减乘除结果不变,因此我特意乘了两次来便于我们判断结果。

    总结:

    1. Queue是一种高效的任务处理方式,它可以把任务处理流程划分为若干阶段,并使用多条python线程来同时执行这些子任务;

    2. Queue类具备阻塞式的队列操作、能够指定缓冲区尺寸,而且还支 持join方法,这使得开发者可以构建出健壮的流水线。

  • 相关阅读:
    LoadRunner系统资源监视
    Loadrunner web_url函数学习(转贴)
    Chrome的开发者工具(Chrome Developer Tools)
    浏览器对同一域名进行请求的最大并发连接数(转贴)
    转贴---Performance Counter(包含最全的Windows计数器解释)
    去掉html代码中多余琐碎的标签
    你永远不知道什么地方有笔误
    office2016开发者选项在哪?
    [VBA]检测一个中文字符串是否包含在另一个字符串中
    电子发票怎么开
  • 原文地址:https://www.cnblogs.com/yonguo123/p/11901050.html
Copyright © 2011-2022 走看看