zoukankan      html  css  js  c++  java
  • Python多进程编程

    一 多进程编程

    Python实现多进程的方式有两种:一种方法是os模块中的fork方法,另一种是使用multiprocessing模块。

    前者仅适用于LINUX/UNIX操作系统,对Windows不支持,后者则是跨平台的实现方式。

    第一种方式:使用os模块中的fork方式实现多进程

    import os
    if __name__ == '__main__':
        print 'current Process (%s) start ...'%(os.getpid())
        pid = os.fork()
        if pid < 0:
            print 'error in fork'
        elif pid == 0:
            print 'I am child process(%s) and my parent process is (%s)',(os.getpid(),os.getppid())
        else:
            print 'I(%s) created a chlid process (%s).',(os.getpid(),pid)

    第二种方式:multiprocessing

    由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

    • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
    • multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
    • 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

    Process.PID中保存有PID,如果进程还没有start(),则PID为None。

    window系统下,需要注意的是要想启动一个子进程,必须加上那句if __name__ == "main",进程相关的要写在这句下面。

    创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
    方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。join()方法实现进程间的同步。

    #__author: greg
    #date: 2017/9/19 23:52
    from multiprocessing import Process
    import time
    
    def f(name):
        time.sleep(1)
        print('hello', name,time.ctime())
    
    if __name__ == '__main__':
        p_list=[]
        for i in range(3):
            p = Process(target=f, args=('alvin',))
            p_list.append(p)
            p.start()
        for i in p_list:
            i.join()
        print('end')#一个主进程,三个子进程
    
    
    # output:
    # hello alvin Fri Nov 24 19:10:08 2017
    # hello alvin Fri Nov 24 19:10:08 2017
    # hello alvin Fri Nov 24 19:10:08 2017
    # end

    类式调用:

    #__author: greg
    #date: 2017/9/21 20:02
    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
        def __init__(self):
            super(MyProcess, self).__init__()
            #self.name = name
        def run(self):
            time.sleep(1)
            print ('hello', self.name,time.ctime())
    if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end') #output: # hello MyProcess-1 Fri Nov 24 19:12:17 2017 # hello MyProcess-2 Fri Nov 24 19:12:17 2017 # hello MyProcess-3 Fri Nov 24 19:12:17 2017 # end

    显示进程ID号:

    #__author: greg
    #date: 2017/9/21 20:16
    from multiprocessing import Process
    import os
    import time
    def info(title):
        print(title)
        print('module name:', __name__)
        print('parent process:', os.getppid())#父进程号
        print('process id:', os.getpid())#进程号
    
    def f(name):
        info('33[31;1mfunction f33[0m')
        print('hello', name)
    
    if __name__ == '__main__':
        info('33[32;1mmain process line33[0m')
        time.sleep(10)
        p = Process(target=info, args=('bob',))
        p.start()
        p.join()
    
    #output:
    # main process line
    # module name: __main__
    # parent process: 1548 pycharm的进程号
    # process id: 8416  Python进程号
    # bob
    # module name: __mp_main__
    # parent process: 8416  Python进程号
    # process id: 5556  info进程号

    二 Process类

    构造方法:

    Process([group [, target [, name [, args [, kwargs]]]]])

      group: 线程组,目前还没有实现,库引用中提示必须是None; 
      target: 要执行的方法; 
      name: 进程名; 
      args/kwargs: 要传入方法的参数。

    实例方法:

      is_alive():返回进程是否在运行。

      join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

      start():进程准备就绪,等待CPU调度

      run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

      terminate():不管任务是否完成,立即停止工作进程

    属性:

      authkey

      daemon:和线程的setDeamon功能一样

      exitcode(进程在运行时为None、如果为–N,表示被信号N结束)

      name:进程名字。

      pid:进程号。

    三 进程间通讯

    不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

    Queues 用来在多个进程间通信

    1. 阻塞模式

    import queue
    import time
    
    q = queue.Queue(10) #创建一个队列
    start=time.time()
    for i in range(10):
    q.put('A')
    time.sleep(0.5)
    end=time.time()
    print(end-start)

    这是一段极其简单的代码(另有两个线程也在操作队列q),我期望每隔0.5秒写一个'A'到队列中,但总是不能如愿:
    间隔时间有时会远远超过0.5秒。
    原来,Queue.put()默认有 block = True 和 timeout两个参数。
    源码:def put(self, item, block=True, timeout=None):
    当 block = True 时,写入是阻塞式的,阻塞时间由 timeout确定。
    当队列q被(其他线程)写满后,这段代码就会阻塞,直至其他线程取走数据。
    Queue.put()方法加上 block=False 的参数,即可解决这个隐蔽的问题。
    但要注意,非阻塞方式写队列,当队列满时会抛出 exception Queue.Full 的异常。

    #__author: greg
    #date: 2017/9/21 22:27
    from multiprocessing import Process, Queue
    
    def f(q,n):
        q.put([42, n, 'hello'])
        print('subprocess id',id(q))
    
    if __name__ == '__main__':
        q = Queue()
        p_list=[]
        print('process id',id(q))
        for i in range(3):
            p = Process(target=f, args=(q,i))
            p_list.append(p)
            p.start()
        print(q.get())
        print(q.get())
        print(q.get())
        for i in p_list:
            i.join()
    
    # output
    # process id 2284856854176
    # subprocess id 2607348001872
    # [42, 0, 'hello']
    # subprocess id 1712786975824
    # [42, 2, 'hello']
    # subprocess id 2254764977120
    # [42, 1, 'hello']

    Pipe常用来两个进程间进行通信,两个进程分别位于管道的两端

    def f(conn):
        conn.send([42, None, 'hello'])
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())  # prints "[42, None, 'hello']"
        p.join()

    Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

    send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。 
    #__author: greg
    #date: 2017/9/21 22:57
    import multiprocessing
    import random
    import time,os
    
    def proc_send(pipe,urls):
        for url in urls:
            print("Process(%s) send: %s" %(os.getpid(),url))
            pipe.send(url)
            time.sleep(random.random())
    
    def proc_recv(pipe):
        while True:
            print("Process(%s) rev:%s" %(os.getpid(),pipe.recv()))
            time.sleep(random.random())
    
    if __name__=="__main__":
        pipe=multiprocessing.Pipe()
        p1=multiprocessing.Process(target=proc_send,args=(pipe[0],['url_'+str(i)
                                                          for i in range(10)]))
        p2=multiprocessing.Process(target=proc_recv,args=(pipe[1],))
        p1.start()
        p2.start()
        p1.join()
        p2.terminate()

    Manager()返回的管理器对象控制一个服务器进程,该进程持有Python对象,并允许其他进程使用代理来操纵它们。

    #__author: greg
    #date: 2017/9/21 23:10
    from multiprocessing import Process, Manager
    def f(d, l,n):
        d[n] = '1'
        d['2'] = 2
        d[0.25] = None
        l.append(n)
        # print(l)
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(5))
            p_list = []
            for i in range(10):
                p = Process(target=f, args=(d, l,i))
                p.start()
                p_list.append(p)
            for res in p_list:
                res.join()
            print(d)
            print(l)

    四 进程同步

    当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

    #__author: greg
    #date: 2017/9/21 23:25
    from multiprocessing import Process, Lock
    
    def f(l, i):
        l.acquire()
        try:
            print('hello world', i)
        finally:
            l.release()
    
    if __name__ == '__main__':
        lock = Lock()
        for num in range(10):
            Process(target=f, args=(lock, num)).start()

    五 进程池 Pool类

    Pool可以提供指定数量的进程供用户使用,默认大小是CPU的核数。当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程来执行该请求

    但如果池中的进程数已经达到规定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来处理它。

    # -*- coding: utf-8 -*-
    # 2017/11/24 20:15
    from multiprocessing import Pool
    import os, time, random
    
    def run_task(name):
        print('Task %s (pid = %s) is running...' % (name, os.getpid()))
        time.sleep(random.random() * 3)
        print('Task %s end.' % name)
    
    if __name__=='__main__':
        print('Current process %s.' % os.getpid())
        p = Pool(processes=3)
        for i in range(5):
            p.apply_async(run_task, args=(i,))
        print('Waiting for all subprocesses done...')
        p.close()
        p.join()
        print('All subprocesses done.')
    
    
    """
    Current process 9788.
    Waiting for all subprocesses done...
    Task 0 (pid = 5916) is running...
    Task 1 (pid = 3740) is running...
    Task 2 (pid = 6964) is running...
    Task 2 end.
    Task 3 (pid = 6964) is running...
    Task 1 end.
    Task 4 (pid = 3740) is running...
    Task 0 end.
    Task 3 end.
    Task 4 end.
    All subprocesses done.
    """
    • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞
    • close()    关闭pool,使其不在接受新的任务。
    • terminate()    结束工作进程,不在处理未完成的任务。
    • join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

    每次最多运行3个进程,当一个任务结束了,新的任务依次添加进来,任务执行使用的进程依然是原来的进程,这一点通过进程的pid可以看出来。

  • 相关阅读:
    C# API 大全
    也说_T、_TEXT、TEXT、L
    项脊轩志--归有光
    C# tostring()汇总
    StructLayout特性
    关于C#静态构造函数的几点说明
    C#生成DLL文件
    做.net的早晚会用到,并且网上还没有这方面的正确资料或几乎很少
    C# 实现屏幕键盘 (ScreenKeyboard)
    Microsoft .NET Native
  • 原文地址:https://www.cnblogs.com/ningxin18/p/7892222.html
Copyright © 2011-2022 走看看