zoukankan      html  css  js  c++  java
  • 多进程

    多进程multiprocessing

    multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency(并发), effectively side-stepping(有效的避过) the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage(充分利用) multiple processors on a given machine. It runs on both Unix and Windows.

    多进程的用法和多线程差不多

    import multiprocessing
    
    def run():
        print(__name__)
        print("prcess is running")
    
    
    if __name__ == "__main__":  #On Windows the subprocesses will import (i.e. execute) the main module at start. You need to protect the main code like this to avoid creating subprocesses recursively
        p = multiprocessing.Process(target=run)
        p.start()
    
    结果:
    __mp_main__
    prcess is running
    

    To show the individual process IDs involved, here is an expanded example:  

    import multiprocessing,threading
    import os
    
    def info(title):
        print(title)
        print("module name:",__name__)
        print("parent process:",os.getppid())     #父进程
        print("process id:",os.getpid())   #当前进程id
        print(threading.current_thread())  #打印当前线程id
    
    def f(name):
        info("33[31;1m function f33[0m")
        print("hello",name)
    
    if __name__ == "__main__":
        info("33[32;1mmain process line33[0m")
        p = multiprocessing.Process(target=f,args=("lxj",))
        p.start()
        multiprocessing.Queue()
    
    结果:
    main process line
    module name: __main__
    parent process: 7388
    process id: 13352
    <_MainThread(MainThread, started 5828)>
     function f
    module name: __mp_main__
    parent process: 13352
    process id: 608
    <_MainThread(MainThread, started 2948)>
    hello lxj
    

     

    我们直接运行info,也有主进程id,那这个主进程pid是怎么产生的呢?查看windows任务管理器,可以看到,7388pid号即pycharm

     

     

    这里验证了我们之前进程是由线程创建的,我们称为主线程。

    进程间通信

    先来看下线程间通讯

    #子线程共享所属进程的内存空间
    
    def f():
        q.put(1)
    
    if __name__ == "__main__":
        
        q = queue.Queue()           #线程的队列
    
        p = threading.Thread(target=f)
        p.start()
        print(q.get())
        p.join()
    
    结果:
    1
    

    j

    进程间是不共享内存空间的,看下进程间是如何通信的

     Queue

    from multiprocessing import Queue,Process
    import threading
    import queue
    
    
    #换成多进程
    
    def f():
        q.put(1)
    
    if __name__ == "__main__":
    
        q = queue.Queue()
    
        p = Process(target=f)
        p.start()
        print(q.get())
        p.join()
    
    结果:
    报错
    NameError: name 'q' is not defined
    
    因为进程间的内存是不共享的
    
    对上面的程序做修改
    def f(q):
        q.put(1)
    
    if __name__ == "__main__":
    
        q = Queue()   #换成进程间的Queue
    
        p = Process(target=f,args=(q,))  #把q传到子进程
        p.start()
        print(q.get())
        p.join()
    
    结果:
    1

      

    进程间通讯Pipe

    The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

    from multiprocessing import Process,Pipe
    import os
    def f(conn):
        conn.send([1,2,3])
        data = conn.recv()
        print(os.getpid(),os.getppid(),data)
    if __name__ == "__main__":
        parent_conn,child_conn = Pipe()   #两端数据
    
        p = Process(target=f,args=(parent_conn,))  #这里传入一端,随便哪个都行
    
        p.start()
        data = child_conn.recv()           #使用另外一个来接收,有点类似socket客户端服务器模型,可以发多条,收多条
        print(os.getpid(), os.getppid(), data)
        child_conn.send(["asd"])
    
        p.join()
    
    结果:
    5176 11420 [1, 2, 3]
    7304 5176 ['asd']
    

    Queue和Pipe是实现进程间数据的传递,那么如何能使数据共享呢?答案使使用Mangers

    A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

    A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example

    from multiprocessing import Process,Manager
    import os
    def func(d,l):
        d[os.getpid()] = os.getppid()
        l.append(os.getpid())
    if __name__ == "__main__":
    
        with Manager() as manger:
            d = manger.dict()              #生成一个字典
            l = manger.list(range(3))
            p_list = []
    
            for i in range(10):
                p = Process(target=func,args=(d,l))
                p.start()
                p_list.append(p)
    
            for p in p_list:    #等待进程结束
                p.join()
    
            print(d)
            print(l)
    
    结果:
    {1232: 4656, 13656: 4656, 11668: 4656, 9368: 4656, 2516: 4656, 13772: 4656, 12316: 4656, 6764: 4656, 11788: 4656, 8756: 4656}
    [0, 1, 2, 2516, 11668, 12316, 9368, 13656, 13772, 11788, 6764, 8756, 1232]
    
    可以发现实现了进程间的数据共享
    

    进程同步

    当要用到同一块屏幕时,需要用到锁

    Without using the lock output from the different processes is liable to get all mixed up.

    from multiprocessing import Process, Lock
     
    def f(l, i):
        l.acquire()
        try:
            print('hello world', i)
        finally:
            l.release()
     
    if __name__ == '__main__':
        lock = Lock()
     
        for num in range(10):
            Process(target=f, args=(lock, num)).start()  

    进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    • apply
    • apply_async

    apply(同步)用法

    from multiprocessing import Process,Pool
    import os,time
    def foo(i):
        time.sleep(1)
        print("in process",os.getpid())
        return i +100
    if __name__ == "__main__":
        pool = Pool(5)         
    
        for i in range(10):
            pool.apply(func=foo,args=(i,))
    
        print("end")
        pool.close()
        pool.join()
    
    结果:是串行的执行方式
    

    apply_async(异步)用法

    from multiprocessing import Process,Pool
    import os,time
    def foo(i):
        time.sleep(1)
        print("in process",os.getpid())
        return i +100
    if __name__ == "__main__":
        pool = Pool(5)
    
        for i in range(10):
            # pool.apply(func=foo,args=(i,))
            pool.apply_async(func=foo,args=(i,))
        pool.close()
        pool.join()                    #这里必须先close再join(可以看源码),如果这里只close,不join则主进程不会等待进程池执行完,会看到结果直接打印end
        print("end")
    
    #结果会看到5个进程,5个进程的执行效果
    
    from multiprocessing import Process,Pool
    import os,time
    def foo(i):
        time.sleep(1)
        print("in process",i+100,os.getpid())
        return i +100
    
    def bar(arg):                 #arg参数为foo返回的结果
        print("-->exex done:",arg,os.getpid())
    if __name__ == "__main__":
        pool = Pool(5)
        print(os.getpid())
        for i in range(10):
            # pool.apply(func=foo,args=(i,))
            pool.apply_async(func=foo,args=(i,),callback=bar)  #回调函数,当foo函数执行完成,执行回调函数bar
        pool.close()
        pool.join()                    #这里必须先close再join(可以看源码),如果这里close,不join则主进程不会等待进程池执行完
        print("end")
    
    
    #回调函数的用法
    结果:
    12328
    in process 100 4160
    -->exex done: 100 12328
    in process 101 1304
    -->exex done: 101 12328
    in process 102 1988
    -->exex done: 102 12328
    in process 103 4744
    -->exex done: 103 12328
    in process 104 9460
    -->exex done: 104 12328
    in process 105 4160
    -->exex done: 105 12328
    in process 106 1304
    -->exex done: 106 12328
    in process 107 1988
    -->exex done: 107 12328
    in process 108 4744
    -->exex done: 108 12328
    in process 109 9460
    -->exex done: 109 12328
    end
    
    每次执行的结果都不一样,上面的结果我们可以得到回调函数是主进程执行的
    

      

      

  • 相关阅读:
    唐骏:学会高明的让员工加班
    [ZT]微软亚洲技术中心面试题[附非標准答案]
    企业的后ERP时代
    WinForm中使用DXperience控件中XtraForm,如何实现换肤
    世卫组织健康食品排行榜:健康肉类减为三种
    [轉載]在研究院的新兵训练
    [转]怎样面对同事升职?
    今天是个好日子!
    程序员跳槽动机被误读,薪资只是表象(图文)
    2008年个人技术十大趋势
  • 原文地址:https://www.cnblogs.com/zj-luxj/p/7222988.html
Copyright © 2011-2022 走看看