zoukankan      html  css  js  c++  java
  • Python3学习之路~10.1 多进程、进程间通信、进程池

    一 多进程multiprocessing

    multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

    import multiprocessing,time
    
    def run(name):
        print("hello",name)
        time.sleep(2)
    
    if __name__ == '__main__':
    
        for i in range(10):
            p = multiprocessing.Process(target=run,args=('Bob %s'%i,))
            p.start()
    
    import multiprocessing,time,threading
    
    def thread_run():
        print(threading.get_ident()) #线程号
    
    def run(name):
        print("hello",name)
        t = threading.Thread(target=thread_run,)
        t.start()
        time.sleep(2)
    
    if __name__ == '__main__':
    
        for i in range(10):
            p = multiprocessing.Process(target=run,args=('Bob %s'%i,))
            p.start()
    可以在进程中起线程
    # 在主进程里调用了info,在子进程了又调用了info,我们看看效果?
    # 可以看到,每一个进程都是由父进程启动的。主程序的父进程是pyCharm,子进程的父进程是主进程。
    
    from multiprocessing import Process
    import os
    
    def info(title):
        print(title)
        print('module name:', __name__)
        print('parent process:', os.getppid()) #得到父进程ID
        print('process id:', os.getpid()) #得到进程ID
        print("
    
    ")
    
    def f(name):
        info('33[31;1mfunction f33[0m')
        print('hello', name)
    
    if __name__ == '__main__':
        info('33[32;1mmain process line33[0m')
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()
    
    #####输出:
    ####ain process line
    ####odule name: __main__
    ####arent process: 8268
    ####rocess id: 4448
    
    ####unction f
    ####odule name: __mp_main__
    ####arent process: 4448
    ####rocess id: 9596
    
    ####ello bob
    得到进程ID

    二 进程间通信

    不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

    1.Queues

    首先我们知道,线程之间是数据共享的,子线程放进queue数据,父线程可以取出来。如下示例

    import threading,queue
    
    def f():
        q.put([42,None,'hello'])
    
    if __name__ == '__main__':
        q = queue.Queue()
        t = threading.Thread(target=f)
        t.start()
        print(q.get())
        
    ####输出:[42, None, 'hello']
    线程之间数据共享

    把线程改为进程,会发现报错。

    import multiprocessing,queue
    
    def f():
        q.put([42,None,'hello'])
    
    if __name__ == '__main__':
        q = queue.Queue()
        p = multiprocessing.Process(target=f)
        p.start()
        print(q.get())
    
    ####输出报错:NameError: name 'q' is not defined
    将线程改为进程,尝试数据传输,报错NameError: name 'q' is not defined

    报错的原因是进程之间数据不共享。子进程和父进程分别拥有独立的内存空间,所以子进程是访问不了父进程的queue的。那有什么办法可以使子进程访问到父进程的queue呢?我们可以尝试将这个queue当做变量传给子进程。发现还是报错。

    import multiprocessing,queue
    
    def f(q):
        q.put([42,None,'hello'])
    
    if __name__ == '__main__':
        q = queue.Queue()
        p = multiprocessing.Process(target=f,args=(q,))
        p.start()
        print(q.get())
    
    ####输出报错:TypeError: can't pickle _thread.lock objects
    将线程queue传递给子进程是不可以的,报错TypeError: can't pickle _thread.lock objects

    报错的原因是我们错将线程queue(通过import queue引入)传递给了子进程,实际上我们传递给子进程的应该是进程queue(通过from multiprocessing import  Queue引入)。接下来才是正确的示例:

    from multiprocessing import Process,Queue #引入进程queue
    
    def f(q):
        q.put([42,None,'hello']) #子进程放入数据
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f,args=(q,)) #将q传递给子进程
        p.start()
        print(q.get()) #主进程取出数据
    
    ####输出:[42, None, 'hello']

    上面的例子,我们把进程queue传递给了子进程,表面上看,子进程和父进程共用一个queue,实际上并不是这样,而是子进程克隆了一个父进程的queue,子进程将数据放入克隆queue中,克隆queue将其序列化保存,然后进行反序列化后放到父进程的原始queue中,所以严格意义上子进程和父进程的queue并不是一个共享queue。

    2.Pipes

    要想实现两个进程间的数据传递,除了Queues,还可以使用Pipes。

    Pipe()返回的两个连接对象代表管道的两端。 每个连接对象都有send()和recv()方法(以及其他方法)。

    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send([42, None, 'hello'])
        print('from parent:',conn.recv())
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print('from son:',parent_conn.recv())
        parent_conn.send('hello')
        p.join()

    3.Managers

    Queues和Pipes仅能实现两个进程之间的数据传递,而Managers可以实现进程之间数据的共享。

    A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

    A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example,

    from multiprocessing import Process, Manager
    import os
    
    def f(d,l):
        d[os.getpid()] = os.getpid()
        l.append(os.getpid())
        print(l)
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()    #生成一个字典,可在多个进程间共享和传递
            l = manager.list(range(5))    #生成一个列表,可在多个进程间共享和传递
            p_list = []
            for i in range(10):
                p = Process(target=f,args=(d,l))
                p.start()
                p_list.append(p)
            for res in p_list: #等待结果
                res.join()
    
            print(d)
    
    [0, 1, 2, 3, 4, 8512]
    [0, 1, 2, 3, 4, 8512, 11060]
    [0, 1, 2, 3, 4, 8512, 11060, 4820]
    [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496]
    [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264]
    [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420]
    [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420, 9184]
    [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420, 9184, 6592]
    [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420, 9184, 6592, 9808]
    [0, 1, 2, 3, 4, 8512, 11060, 4820, 9496, 4264, 8420, 9184, 6592, 9808, 5064]
    {8512: 8512, 11060: 11060, 4820: 4820, 9496: 9496, 4264: 4264, 8420: 8420, 9184: 9184, 6592: 6592, 9808: 9808, 5064: 5064}
    输出

    进程锁

    虽然进程之间是独立运行的,但是对于各进程来说,终端屏幕是共享的,为了防止输出结果时,各个进程争抢输出,造成打印结果混乱,可以给进程加一把锁。

    from multiprocessing import Process,Lock
    
    def f(l,i):
        l.acquire() #得到锁
        print("hello world",i)
        l.release() #释放锁
    
    if __name__ == '__main__':
        lock = Lock() #生成锁的实例
    
        for num in range(10):
            Process(target=f,args=(lock,num)).start() #将lock传递给子进程

    三 进程池

    我们每起一个进程实际上就是克隆一份父进程数据给子进程使用,起多个进程时就会占用很多内存空间。为了节省开销,我们使用进程池。进程池就是限制同一时间有多少个进程运行。

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    • apply                #同步执行,即串行
    • apply_async     #异步执行,即并发
    from multiprocessing import Pool
    import time,os
    
    def Foo(i):
        time.sleep(2)
        print('my processid is ',os.getpid())
        return  i+100
    
    
    if __name__ == '__main__': #windows上运行进程池必须加这行代码,否则报错
        pool = Pool(5)  #运行进程池中同时放入5个进程
    
        for i in range(10):
            # pool.apply(func=Foo,args=(i,))  #同步执行,即串行
            pool.apply_async(func=Foo, args=(i,))  # 异步执行,即并发,此时有10个进程,同时执行的有5个,其他的挂起
    
        print('end')
        pool.close() #注意:一定要先关闭进程池再join
        pool.join() #表示等进程池中进程执行完毕后称程序再关闭,如果注释,则程序直接关闭。

    下面的例子,实现了主进程起了10个子进程,分别执行Foo函数,每次子进程执行完毕后,父进程回调Bar函数(可观察到执行Bar函数的进程ID与主进程ID相同)。

    from multiprocessing import Pool
    import time,os
    
    def Foo(i):
        time.sleep(2)
        print('my processid is ',os.getpid())
        return  i+100
    
    def Bar(arg):
        print('--exec done:',arg,'my processid is ',os.getpid())
    
    if __name__ == '__main__': #windows上运行进程池必须加这行代码,否则报错
        pool = Pool(5)  #运行进程池中同时放入5个进程
    
        for i in range(10):
            pool.apply_async(func=Foo, args=(i,),callback=Bar)  # callback=回调
    
        print('end',os.getpid())
        pool.close() #注意:先close再join
        pool.join() #表示等进程池中进程执行完毕后称程序再关闭,如果注释,则程序直接关闭。
    View Code
  • 相关阅读:
    局域网组网总目录
    VLAN之间的通信
    DHCP
    ACL
    linux 程序后台运行
    VLAN
    VTP
    dubbox生产者与消费者案例
    String data jpa执行的增删改查
    StringBoot整合Mytais实现数据查询与分页
  • 原文地址:https://www.cnblogs.com/zhengna/p/10558746.html
Copyright © 2011-2022 走看看