zoukankan      html  css  js  c++  java
  • 线程、进程、协程

    线程:

    线程可以理解成轻量的进程,实际在linux两者几乎没有区别,唯一的区别是线程并不产生新的地址空间和资源。

    Threading用于提供线程的相关操作,线程是应用程序中工作的最小单元。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import threading
    import time
        
    def show(arg):
        time.sleep(1)
        print('thread'+str(arg))
        
    for i in range(10):
        t = threading.Thread(target=show, args=(i,))
        t.start()

    上述代码创建了10个前台线程,然后控制器就交给了cpu,CPU根据内部算法进行调度,

    更多的方法:

    • start 线程准备就绪,等待CPU调度
    • setName  为线程设置名字
    • getName  获取线程名称
    • setDaemon  设置后台线程或者前台线程(默认False) 

        如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止

        如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

    •  join   逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
    • run  线程被cpu调度后自动执行线程对象的run方法

    线程锁:

    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。所以,可能出现如下问题:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    import time
    
    gl_num = 0
    
    def show(arg):
        global gl_num
        time.sleep(1)
        gl_num +=1
        print gl_num
    
    for i in range(10):
        t = threading.Thread(target=show, args=(i,))
        t.start()
    
    print 'main thread stop'
    未使用锁
    加上线程锁

    event

    python线程的事件用于主线程控制其他线程的执行,线程主要提供了三个方法:set、wait、clear

    事件处理的机制:全局定义了一个"flag",值为False,,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

    • clear:将"flag"设置为False
    • set:将"flag"设置为True
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    #!/usr/bin/env python
    # -*- coding:utf-8 -*- 
    import threading
       
    def do(event):
        print 'start'
        event.wait()
        print 'execute'
       
    event_obj = threading.Event()
    for i in range(10):
        t = threading.Thread(target=do, args=(event_obj,))
        t.start()
       
    event_obj.clear()
    inp = input('input:')
    if inp == 'true':
        event_obj.set()

    使用线程队列

    当多个线程需要共享数据的或者资源的时候,可能会使线程的使用变得复杂,线程的模块提供了许多同步原语,包括信号量、条件变量、事件和锁。当这些选项存在时,最佳实践是转而关注于使用队列,相比较而言,队列更容易处理,并且更容易处理,并且可以使线程编程更加安全,因为他们能够有效地传递单个线程资源的所有访问,并支持更加清晰的、可读性更高的设计模式。

    url获取序列:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import urllib2
    import time
             
    hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
            "http://ibm.com", "http://apple.com"]
             
    start = time.time()
            #grabs urls of hosts and prints first 1024 bytes of page
    for host in hosts:
         url = urllib2.urlopen(host)
         print url.read(1024)
             
    print("Elapsed Time: %s" % (time.time() - start))
     

    简单版本的线程池:

    #!/usr/bin/env python 
    # -*- coding:utf-8 -*- 
    import Queue 
    import threading 
      
      
    class ThreadPool(object): 
      
        def __init__(self, max_num=20): 
            self.queue = queue.Queue(max_num) 
            for i in xrange(max_num): 
                self.queue.put(threading.Thread) 
      
        def get_thread(self): 
            return self.queue.get() 
      
        def add_thread(self): 
            self.queue.put(threading.Thread) 
      
    pool = ThreadPool(10) 
      
    def func(arg, p): 
        print arg 
        import time 
        time.sleep(2) 
        p.add_thread() 
      
      
    for i in xrange(30): 
        thread = pool.get_thread() 
        t = thread(target=func, args=(i, pool)) 
        t.start() 
    简单版的线程池
    import queue 
    import threading 
    import contextlib 
      
    StopEvent = object() 
      
    class ThreadPool(object): 
      
        def __init__(self, max_num): 
            self.q = queue.Queue(max_num) 
            self.max_num = max_num 
            self.cancel = False
            self.generate_list = [] 
            self.free_list = [] 
      
        def run(self, func, args, callback=None): 
            """ 
            线程池执行一个任务 
            :param func: 任务函数 
            :param args: 任务函数所需参数 
            :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 
            :return: 如果线程池已经终止,则返回True否则None 
            """
            if self.cancel: 
                return True
            if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 
                self.generate_thread() 
            w = (func, args, callback,) 
            self.q.put(w) 
      
        def generate_thread(self): 
            """ 
            创建一个线程 
            """
            t = threading.Thread(target=self.call) 
            t.start() 
      
        def call(self): 
            """ 
            循环去获取任务函数并执行任务函数 
            """
            current_thread = threading.currentThread 
            self.generate_list.append(current_thread) 
      
            event = self.q.get() 
            while event != StopEvent: 
                func, arguments, callback = event 
                try: 
                    result = func(*arguments) 
                    success = True
                except Exception, e: 
                    success = False
                    result = None
      
                if callback is not None: 
                    try: 
                        callback(success, result) 
                    except Exception, e: 
                        pass
                with self.worker_state(self.free_list, current_thread): 
                    event = self.q.get() 
            else: 
                self.generate_list.remove(current_thread) 
      
        def terminal(self): 
            """ 
            终止线程池中的所有线程 
            """
            self.cancel = True
            full_size = len(self.generate_list) 
            while full_size: 
                self.q.put(StopEvent) 
                full_size -= 1
      
        @contextlib.contextmanager 
        def worker_state(self, state_list, worker_thread): 
            """ 
            用于记录线程中正在等待的线程数 
            """
            state_list.append(worker_thread) 
            try: 
                yield
            finally: 
                state_list.remove(worker_thread) 
    进阶版的线程池
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import queue
    import threading
    import contextlib
    import time
    
    StopEvent = object()
    
    
    class ThreadPool(object):
    
        def __init__(self, max_num):
            self.q = queue.Queue()
            self.max_num = max_num
    
            self.terminal = False
            self.generate_list = []
            self.free_list = []
    
        def run(self, func, args, callback=None):
            """
            线程池执行一个任务
            :param func: 任务函数
            :param args: 任务函数所需参数
            :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
            :return: 如果线程池已经终止,则返回True否则None
            """
    
            if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
                self.generate_thread()
            w = (func, args, callback,)
            self.q.put(w)
    
        def generate_thread(self):
            """
            创建一个线程
            """
            t = threading.Thread(target=self.call)
            t.start()
    
        def call(self):
            """
            循环去获取任务函数并执行任务函数
            """
            current_thread = threading.currentThread
            self.generate_list.append(current_thread)
    
            event = self.q.get()
            while event != StopEvent:
    
                func, arguments, callback = event
                try:
                    result = func(*arguments)
                    success = True
                except Exception as e:
                    success = False
                    result = None
    
                if callback is not None:
                    try:
                        callback(success, result)
                    except Exception as e:
                        pass
    
                with self.worker_state(self.free_list, current_thread):
                    if self.terminal:
                        event = StopEvent
                    else:
                        event = self.q.get()
            else:
    
                self.generate_list.remove(current_thread)
    
        def close(self):
            """
            执行完所有的任务后,所有线程停止
            """
            full_size = len(self.generate_list)
            while full_size:
                self.q.put(StopEvent)
                full_size -= 1
    
        def terminate(self):
            """
            无论是否还有任务,终止线程
            """
            self.terminal = True
    
            while self.generate_list:
                self.q.put(StopEvent)
    
            self.q.empty()
    
        @contextlib.contextmanager
        def worker_state(self, state_list, worker_thread):
            """
            用于记录线程中正在等待的线程数
            """
            state_list.append(worker_thread)
            try:
                yield
            finally:
                state_list.remove(worker_thread)
    
    # How to use
    
    pool = ThreadPool(5)
    
    def callback(status, result):
        # status, execute action status
        # result, execute action return value
        pass
    
    def action(i):
        time.sleep(1)
        print(i)
    
    for i in range(30):
        ret = pool.run(action, (i,), callback)
    
    # pool.close()
    # pool.terminate()
    线程池更新一
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import queue
    import threading
    import contextlib
    import time
    
    StopEvent = object()
    
    class ThreadPool(object):
        def __init__(self, max_num, max_task_num = None):
            if max_task_num:
                self.q = queue.Queue(max_task_num)
            else:
                self.q = queue.Queue()
            self.max_num = max_num
            self.cancel = False
            self.terminal = False
            self.generate_list = []
            self.free_list = []
    
        def run(self, func, args, callback=None):
            """
            线程池执行一个任务
            :param func: 任务函数
            :param args: 任务函数所需参数
            :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
            :return: 如果线程池已经终止,则返回True否则None
            """
            if self.cancel:
                return
            if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
                self.generate_thread()
            w = (func, args, callback,)
            self.q.put(w)
    
        def generate_thread(self):
            """
            创建一个线程
            """
            t = threading.Thread(target=self.call)
            t.start()
    
        def call(self):
            """
            循环去获取任务函数并执行任务函数
            """
            current_thread = threading.currentThread
            self.generate_list.append(current_thread)
    
            event = self.q.get()
            while event != StopEvent:
    
                func, arguments, callback = event
                try:
                    result = func(*arguments)
                    success = True
                except Exception as e:
                    success = False
                    result = None
    
                if callback is not None:
                    try:
                        callback(success, result)
                    except Exception as e:
                        pass
    
                with self.worker_state(self.free_list, current_thread):
                    if self.terminal:
                        event = StopEvent
                    else:
                        event = self.q.get()
            else:
    
                self.generate_list.remove(current_thread)
    
        def close(self):
            """
            执行完所有的任务后,所有线程停止
            """
            self.cancel = True
            full_size = len(self.generate_list)
            while full_size:
                self.q.put(StopEvent)
                full_size -= 1
    
        def terminate(self):
            """
            无论是否还有任务,终止线程
            """
            self.terminal = True
    
            while self.generate_list:
                self.q.put(StopEvent)
    
            self.q.empty()
    
        @contextlib.contextmanager
        def worker_state(self, state_list, worker_thread):
            """
            用于记录线程中正在等待的线程数
            """
            state_list.append(worker_thread)
            try:
                yield
            finally:
                state_list.remove(worker_thread)
    
    # How to use
    
    pool = ThreadPool(5)
    
    def callback(status, result):
        # status, execute action status
        # result, execute action return value
        pass
    
    def action(i):
        print(i)
    
    for i in range(30):
        ret = pool.run(action, (i,), callback)
    
    time.sleep(5)
    print(len(pool.generate_list), len(pool.free_list))
    print(len(pool.generate_list), len(pool.free_list))
    # pool.close()
    # pool.terminate()
    线程池更新二

    queue模块:

    queue就是对队列,它是线程安全的

    举例来说,我们去肯德基吃饭。厨房是给我们做饭的地方,前台负责把厨房做好的饭卖给顾客,顾客则去前台领取做好的饭。这里的前台就相当于我们的队列。

    这个模型也叫生产者-消费者模型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import queue
     
    q = queue.Queue(maxsize=0# 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。
     
    q.join()    # 等到队列为kong的时候,在执行别的操作
    q.qsize()   # 返回队列的大小 (不可靠)
    q.empty()   # 当队列为空的时候,返回True 否则返回False (不可靠)
    q.full()    # 当队列满的时候,返回True,否则返回False (不可靠)
    q.put(item, block=True, timeout=None) #  将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置,                         为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,                          如果队列无法给出放入item的位置,则引发 queue.Full 异常q.get(block=True, timeout=None) #   移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞,                      若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。q.put_nowait(item) #   等效于 put(item,block=False)q.get_nowait() #    等效于 get(item,block=False)

    生产者--消费者:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    #!/usr/bin/env python
    import Queue
    import threading
     
     
    message = Queue.Queue(10)
     
     
    def producer(i):
        while True:
            message.put(i)
     
     
    def consumer(i):
        while True:
            msg = message.get()
     
     
    for i in range(12):
        t = threading.Thread(target=producer, args=(i,))
        t.start()
     
    for i in range(10):
        t = threading.Thread(target=consumer, args=(i,))
        t.start()

    multiprocessing模块

    multiprocessing是python的多进程管理包,和threading.Thread类似。直接从侧面用subprocesses替换线程使用GIL的方式,由于这一点,multiprocessing模块可以让程序员在给定的机器上充分的利用CPU。

    在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法,

    1
    2
    3
    4
    5
    6
    7
    8
    9
    from multiprocessing import Process
      
    def f(name):
        print('hello', name)
      
    if __name__ == '__main__':
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()

    进程:

    进程间的数据共享

    在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。 如果你真有需要 要共享数据, multiprocessing提供了两种方式。

    Shared memory

    数据可以用Value或Array存储在一个共享内存地图里,如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    from multiprocessing import Process, Value, Array
      
    def f(n, a):
        n.value = 3.1415927
        for i in range(len(a)):
            a[i] = -a[i]
     if __name__ == '__main__':
        num = Value('d', 0.0)
        arr = Array('i', range(10))
      
        p = Process(target=f, args=(num, arr))
        p.start()
        p.join()
      
        print(num.value)
        print(arr[:])

    输出:

    1
    2
    3.1415927
    [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

    创建num和arr时,“d”和“i”参数由Array模块使用的typecodes创建:“d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。Array(‘i’, range(10))中的‘i’参数:

    1
    2
    ‘c’: ctypes.c_char     ‘u’: ctypes.c_wchar    ‘b’: ctypes.c_byte     ‘B’: ctypes.c_ubyte‘h’: ctypes.c_short     ‘H’: ctypes.c_ushort    ‘i’: ctypes.c_int      ‘I’: ctypes.c_uint
    ‘l’: ctypes.c_long,    ‘L’: ctypes.c_ulong    ‘f’: ctypes.c_float    ‘d’: ctypes.c_double

    Server process

    由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    from multiprocessing import Process, Manager
     def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.reverse()
      
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(10))
      
            p = Process(target=f, args=(d, l))
            p.start()
            p.join()
      
            print(d)
            print(l)

    输出:

    1
    {0.25: None, 1: '1', '2': 2}<br>[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

    Server process manager比 shared memory 更灵活,因为它可以支持任意的对象类型。另外,一个单独的manager可以通过进程在网络上不同的计算机之间共享,不过他比shared memory要慢。

    进程池:

    Pool类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止我们可以用Pool类创建一个进程池, 展开提交的任务给进程池。 例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    from multiprocessing import Pool
    import time
    def myFun(i):
        time.sleep(2)
        return i+100
     
    def end_call(arg):
        print("end_call",arg)
    p = Pool(5)
     
    # print(p.map(myFun,range(10)))
    for i in range(10):
        p.apply_async(func=myFun,args=(i,),callback=end_call)
    print("end")p.close()
    p.join()
    from multiprocessing import Pool, TimeoutError
    import time
    import os
     
    def f(x):
        return x*x
     
    if __name__ == '__main__':
        # 创建4个进程 
        with Pool(processes=4) as pool:
     
            # 打印 "[0, 1, 4,..., 81]" 
            print(pool.map(f, range(10)))
     
            # 使用任意顺序输出相同的数字, 
            for i in pool.imap_unordered(f, range(10)):
                print(i)
     
            # 异步执行"f(20)" 
            res = pool.apply_async(f, (20,))      # 只运行一个进程 
            print(res.get(timeout=1))             # 输出 "400" 
     
            # 异步执行 "os.getpid()" 
            res = pool.apply_async(os.getpid, ()) # 只运行一个进程 
            print(res.get(timeout=1))             # 输出进程的 PID 
     
            # 运行多个异步执行可能会使用多个进程 
            multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
            print([res.get(timeout=1) for res in multiple_results])
     
            # 是一个进程睡10秒 
            res = pool.apply_async(time.sleep, (10,))
            try:
                print(res.get(timeout=1))
            except TimeoutError:
                print("发现一个 multiprocessing.TimeoutError异常")
     
            print("目前,池中还有其他的工作")
     
        # 退出with块中已经停止的池 
        print("Now the pool is closed and no longer available")
    进程池官方版
    1
    class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
    • processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
    • initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
    • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个心的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
    • context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context注意:Pool对象的方法只可以被创建pool的进程所调用。进程池的方法
    • apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。

    • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。

    • close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

    • terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。

    • join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。

    • map(func, iterable[, chunksize])¶

    • map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶

    • imap(func, iterable[, chunksize])¶

    • imap_unordered(func, iterable[, chunksize])

    • starmap(func, iterable[, chunksize])¶

    • starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

    协程

    协程又叫微线程,从技术的角度来说,“协程就是你可以暂停执行的函数”。如果你把它理解成“就像生成器一样”,那么你就想对了。 线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

    协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

    协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

    Event Loop

    Event Loop是一种等待程序分配时间或消息的编程架构。简单的说就是 当事件A发生的时候,我们就去执行事件B。 最简单的例子就是:当我们浏览网页的时候,我们点击页面的某个元素,这个点击事件会被 JavaScript 捕捉到,然后 JavaScript 就会检查这个事件是否绑定了onclick()回调函数来处理这个事件,只要绑定了,onclick()回调函数就会被执行。

    event loop是协程执行的控制点, 如果你希望执行协程, 就需要用到它们。

    event loop提供了如下的特性:

    • 注册、执行、取消延时调用(异步函数)
    • 创建用于通信的client和server协议(工具)
    • 创建和别的程序通信的子进程和协议(工具)
    • 把函数调用送入线程池中

    协程示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import asyncio
      
    async def cor1():
        print("COR1 start")
        await cor2()
        print("COR1 end")
      
    async def cor2():
        print("COR2")
      
    loop = asyncio.get_event_loop()
    loop.run_until_complete(cor1())
    loop.close()

    最后三行是重点:

    • asyncio.get_event_loop()  : asyncio启动默认的event loop 
    • run_until_complete()  :  这个函数是阻塞执行的,知道所有的异步函数执行完成,
    • close()  :  关闭event loop。

    subprocess模块

    通过使用subprocess模块可以创建新的进程,连接到他们的输入/输出/错误管道,并获取他们的返回值。 该模块计划替代及一个旧的模块的方法:

    1
    2
    os.system
    os.spawn*

    使用subprocess模块

    在所有用例调用subprocess时推荐使用run()方法,更高级的用例,可以直接使用subprocess.Popen接口。

    run()方法

    在Python3.5增加的。

    1
    subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, timeout=None, check=False)

    run()默认不会捕捉到标准输出和标准错误输出,要捕捉的话,可以为标准输出和标准错误输出指定subprocess.PIPE(一个特殊值,可被用于Popen的stdin, stdout或 stderr参数,表示一个标准流的管道应该被打开, Popen.communicate()用的最多)。

    • args :args应该是一个字符串或者一个序列。
    • timeout:设置超时时间,会传递给subprocess.Popen.communicate()。如果超时,子进程会被杀死并等待。子进程被终止后会报告一个 TimeoutExpired异常。
    • input参数会传递给subprocess.Popen.communicate(),从而作为subprocess的标准输入。当我们使用的时候,内部的Popen对象会自动创建stdin=PIPE,stdin参数可能不会被使用。
    • check:如果check参数为True,且进程退出的时候得到退出码一个非0的值,会报告一个 CalledProcessError异常
    • shell:shell参数默认为False,此时arg参数应该是一个列表。subprocess.run(["ls","-l"]) ; 当shell=True时,args可以是一个字符串。subprocess.run("ls -l",shell=True)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    >>> ret = subprocess.run(["ls", "-l"])  # doesn't capture output
    CompletedProcess(args=['ls', '-l'], returncode=0)
    >>> print(ret.stdout)
    None
      
    >>> subprocess.run("exit 1", shell=True, check=True)
    Traceback (most recent call last):
      ...
    subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1
      
    >>> ret1 = subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE)
    CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0,
    stdout=b'crw-rw-rw- 1 root root 1, 3 Jan 23 16:23 /dev/null ')
      
    >>> print(ret.stdout)
    b'crw-rw-rw- 1 root root 1, 3 6xe6x9cx88   8 06:50 /dev/null '

    call方法

    subprocess.call(args, *, stdin=Nonestdout=Nonestderr=Noneshell=Falsetimeout=None

    call()方法等价于:run(..., check=True)和run()方法类所以,只是不支持input参数和check参数; 

    注意: 不要在这个方法里使用stdout=PIPE 或 stderr=PIPE,当到一个管道的输出填满系统的管道缓存时,子进程会被阻塞。  

    check_call方法

    1
    subprocess.check_output(args, *, stdin=None, stderr=None, shell=False, universal_newlines=False, timeout=None)

    check_call()方法等价于: run(..., check=True, stdout=PIPE).stdout

    3.1新增,3.3时增加了timeout参数,3.4时增加了对关键字参数的支持

    check_output()方法

    内部调用的是run()方法,但是会捕捉到stdout

    1
    2
    3
    >>> ret = subprocess.check_output(["ls", "-l", "/dev/null"])
    >>> print(ret)
    b'crw-rw-rw- 1 root root 1, 3 6xe6x9cx88   8 06:50 /dev/null '

    Popen类

    上面的四个方法本质上调用的都是subprocess中的Popen类。

    Popen对象都有以下方法:

    poll() : 检查子进程是否已经终止,返回一个returncode,相当于exit code。

    wait() : 等待子进程终止,返回一个returncode

    communicate(input=None) :和进程进行交互:发送数据到stdin;从stdout和stderr读取数据,直到读取完。等待进程终止。可选参数input会传递数据给子进程,当input=None事,没有数据传递给子进程。 communicate() 返回一个元组 (stdout, stderr). 注意: 读取的数据在内存的buffer中,所以当数据大小大于buffer或者没有限制时,不要使用这个方法。

  • 相关阅读:
    流畅的python,Fluent Python 第四章笔记
    Python中的eval()、exec()及其相关函数(转)
    给自己与初学者关于decode,encode的建议(啥utf-8,GBK)。
    流畅的python,Fluent Python 第三章笔记
    流畅的python,Fluent Python 第二章笔记
    python数组array.array(转帖)
    流畅的python,Fluent Python 第一章笔记
    流畅的Python第五章,一等函数笔记
    python中的__slots__使用极其定义(转)
    load
  • 原文地址:https://www.cnblogs.com/mosson/p/5966016.html
Copyright © 2011-2022 走看看