zoukankan      html  css  js  c++  java
  • Python_多进程multiprocessing

    Python 多进程库 multiprocessing ,支持子进程、通信、数据共享、执行不同形式的同步

    多进程,绕过gil ,实现多核的利用,多进程也是原生进程,由操作系统维护

    在pycharm中,可能没有办法正常使用multiprocessing.Process,最好是在Linux中运行

    Process 用于创建进程模块
    Pool 用于创建管理进程池
    Queue 用于进程通信,资源共享
    Pipe 用于管道通信
    Manager 用于资源共享,同步进程                   

    1.Process类


    Process(group = None,target =None,name=None, args=[ ], kwargs={ })

    group 线程组
    target 要执行的方法
    name 进程名
    args/kwargs 要传入方法的参数                                       

    process属性&方法:

    authkey 进程的身份验证密钥
    daemon 同thread的setDaemon,守护进程
    exitcode 进程运行时为None,若为—N,则表示被信号N结束
    pid 进程号
    name 进程名
    is_alive() 返回进程是否正在运行
    join([timeout]) 阻塞到线程结束或到timeout值 
    start() 进程准备就绪,等待CPU调度
    run() start()调用run方法,如果实例进程时未制定传入target,start执行默认run()方法。         
    terminate() 不管任务是否完成,立即停止工作进程

    多进程的创建:

    #!/usr/bin/python
    # -*- coding:utf-8 -*-
    '''多进程的创建'''
    from multiprocessing import Process
    import time
    
    def fun(name):
        time.sleep(1)
        print('hello,%s' % name)
        print('----')
    
    if __name__ =='__main__':
        for i in range(5):                # 进程同步
            p = Process(target=fun, args=('Presley',))
            p.start()
        p.join()
        print('结束。')
    多进程

    进程id :

    #!/usr/bin/python3
    # -*- coding:utf-8 -*-
    
    from multiprocessing import Process
    import os
    def info(title):
        print(title)
        print('moudle name :',__name__)
        print('parent process id ', os.getppid())         
        print('process id ', os.getpid())                  
    
    
    if __name__ =='__main__':
        info('hei. ')          # pycharm id和 主进程id             
        for i in range(3):
            p = Process(target=info, args=('Presley',))     # 主进程id   和 info 子进程id
            p.start()
            p.join()
    View Code
    hei. 
    moudle name : __main__
    parent process id  1610
    process id  1826
    Presley
    moudle name : __main__
    parent process id  1826
    process id  1827
    Presley
    moudle name : __main__
    parent process id  1826
    process id  1828
    Presley
    moudle name : __main__
    parent process id  1826
    process id  1829
    result

     

    2.Queue类 


    不同进程间内存是不共享的,想要实现两个进程间的数据交换,可以用Queue进行进程间通讯

    queue是在多进程中做了一层封装的队列,以保证在当前进程里进程安全

    方法:queue

     进程中的队,以保证进程安全

    from multiprocessing import Process,Queue
    def info(q):
        #  global q       # 错误,queue中 ,global 不行,因为子进程无法访问父进程的内存数据
        q.put([34, None, 'yes'])
    
    
    if __name__ =='__main__':
        q = Queue()
        for i in range(3):
            p = Process(target=info, args=[q,])      # 多个子进程的数据可以都可以放父进程数据
            p.start()
            print('来自父进程%s:%s'%(i, q.get()))
        p.join()
    多进程_queue
    来自父进程0:[34, None, 'yes']
    来自父进程1:[34, None, 'yes']
    来自父进程2:[34, None, 'yes']
    result

    3.Pipe类


    管道操作(双向队列):会返回一对对象,管道的两端分别赋给子进程和父进程

    和队列操作差不多,所以一般运用队列较多

    方法:

    send() 发送序列
    recv() 接收序列
    fileno()  返回一个整型的文件描述符
    close() 退出
    poll()  判断子进程是否结束
    send_bytes() 以bytes格式发送序列
    recv_bytes() 以bytes格式接收序列                                   
    from multiprocessing import Process,Pipe
    import time
    def info(conn):
        time.sleep(0.5)
        conn.send([32,None,'ni hao wa'])
    
        conn.close()
    
    if __name__=='__main__':
        conn_parent ,conn_child = Pipe()
        print(conn_parent.fileno())       
    
        for i in range(3):
            p = Process(target=info,args=(conn_child,))
            print(bool(conn_child.poll))        # 进程是否结束
            p.start()
            # 如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。
            print('父端接收%s:%s'% (i,conn_parent.recv()))
    
        p.join()
    多进程_Pipe
    200
    True
    父端接收0:[32, None, 'ni hao wa']
    True
    父端接收1:[32, None, 'ni hao wa']
    True
    父端接收2:[32, None, 'ni hao wa']
    result

    4.Manager


    通过Manager可以简单的使用list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barries,Value+Arrary等类型的高级接口

    Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全

     例:对list,dict的应用例子:

    #!/usr/bin/python3
    # -*- coding:utf-8 -*-
    from multiprocessing import Process,Manager
    
    def fun(d,l,n):
        d[2] = '3'
        d['e'] = 'e'
        d[34] = None
        l.append(n)
        print(l)
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
            l = manager.list()
            join_list = []
            for i in range(6):
                p = Process(target=fun, args=(d,l,i))
                p.start()
                join_list.append(p)
            for res in join_list:
                res.join()
                print(l)
            print(d)
    example
    [5]
    [5, 2]
    [5, 2, 3]
    [5, 2, 3, 0]
    [5, 2, 3, 0, 4]
    [5, 2, 3, 0, 4, 1]
    [5, 2, 3, 0, 4, 1]
    [5, 2, 3, 0, 4, 1]
    [5, 2, 3, 0, 4, 1]
    [5, 2, 3, 0, 4, 1]
    [5, 2, 3, 0, 4, 1]
    [5, 2, 3, 0, 4, 1]
    {2: '3', 'e': 'e', 34: None}
    result

    Manager的详细参考:https://www.aliyun.com/jiaocheng/490316.html

    5.Pool 类(进程池)


    当进程数过多时,用于限制进程数

     异步:进程并行

    同步:进程串行

    方法:

    apply_async(func,args,kwds,callback)

    进程异步,并行(func:执行一个函数,args/ dwds:进程参数,callback:Foo执行结果返回到callback执行的函数中)                 

    apply(func,args,kwds) 进程同步,串行
    close() 关闭进程池
    terminate() 结束工作进程,不在处理未完成的任务
    join() 主进程阻塞,等待子进程执行完毕
    from multiprocessing import Pool,freeze_support
    import time
    
    def Foo(i):
        time.sleep(1)
        print('exec..')
        return i+100     # 返回到Bar中
    
    
    def Bar(arg):
        print('来自Foo 的i :',arg)   # 接收 Foo中 的返回值
    
    if __name__ == '__main__':
        freeze_support()       # 仅在Windows上才导入此模块进程程序才不会出错,Linux上不用
        pool = Pool(5)     # 限制每次进行的进程数为 5
        for i in range(10):
            pool.apply_async(func=Foo, args=(i,),callback=Bar)  # 进程异步    # callback 把前面func的放在Bar中打印
            # pool.apply(func=Foo, args=(i,))         # 同步,串行   # 没有callback属性
        print('结束。。')
        pool.close()           # 注意:join必须放在close()后面,否则将不会等待子进程打印结束,而直接结束
        pool.join()
    进程池
    结束。。
    exec..
    exec..
    exec..
    exec..
    exec..
    来自Foo 的i : 104
    来自Foo 的i : 102
    来自Foo 的i : 103
    来自Foo 的i : 100
    来自Foo 的i : 101
    exec..
    exec..
    exec..
    exec..
    exec..
    来自Foo 的i : 105
    来自Foo 的i : 106
    来自Foo 的i : 107
    来自Foo 的i : 108
    来自Foo 的i : 109
    异步结果
    exec..
    exec..
    exec..
    exec..
    exec..
    exec..
    exec..
    exec..
    exec..
    exec..
    结束。。
    同步结果
  • 相关阅读:
    rhel 7.0 配置centos yum源(2016/12/8),成功!
    rosetta2014/2015安装时出现INCLUDE(keyerror)错误,解决。
    显示python已安装模块及路径,添加修改模块搜索路径
    sort
    linux 查看磁盘剩余命令
    cat hesA/Models/score_tgt.sc| awk '{ print $2,$19}' | sort -n -k 1
    Python_sys模块
    Python_os模块
    Python_datetime模块
    Python_time模块
  • 原文地址:https://www.cnblogs.com/Vera-y/p/10013778.html
Copyright © 2011-2022 走看看