zoukankan      html  css  js  c++  java
  • multiprocessing- 基于进程的并行性

    介绍
    multiprocessing是一个使用类似于threading模块的API支持生成进程的包。该multiprocessing软件包提供本地和远程并发,通过使用子进程而不是线程有效地支持 全局解释器锁。multiprocessing模块充分利用给定机器上的多个处理器。它可以在Unix和Windows上运行。

    该multiprocessing模块还引入了threading模块中没有模拟的API 。一个主要的例子是该 Pool对象提供了一种方便的方法,可以跨多个输入值并行化函数的执行,跨过程分配输入数据(数据并行)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用Pool

    from multiprocessing import Pool
    
    def f(x):
        return x*x
    
    if __name__ == '__main__':
        with Pool(5) as p:
            print(p.map(f, [1, 2, 3]))
    
    >>>[1, 4, 9]
    
    #encoding:utf-8
    # __author__ = 'donghao'
    # __time__ = 2019/4/1 11:27
    from multiprocessing import Pool
    import time
    import os
    # 进程池
    # 大量进程创建,使用pool的方法
    
    def worker(msg):
        start = time.time()
        print('%s开始执行,进程号%d'%(msg,os.getpid()))
        time.sleep(1)
        end = time.time()
        print('耗时%0.2f'%(end-start))
    
    
    if __name__ == '__main__':
        po = Pool(3)
        for i in range(10):
            po.apply_async(worker, (i,))
    
        print('——tart____')
        po.close()  # 关闭进程池,关闭后不再接受新的请求
        po.join()  # 等待所有的子进程执行完成,必须放到close之后
    
    

    apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的
    close() 关闭pool,使其不在接受新的任务。
    terminate() 结束工作进程,不在处理未完成的任务。
    join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

    Process
    multiprocessing,通过创建Process 对象然后调用其start()方法来生成进程。 Process 遵循API的threading.Thread

    from multiprocessing import Process
    
    def f(name):
        print('hello', name)
    
    if __name__ == '__main__':
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()
    

    显示所涉及的各个进程ID

    from multiprocessing import Process
    import os
    
    def info(title):
        print(title)
        print('module name:', __name__)
        print('父进程:', os.getppid())
        print('进程:', os.getpid())
    
    def f(name):
        info('函数 f')
        print('我是', name)
    
    if __name__ == '__main__':
        info('main line')
        p = Process(target=f, args=('鲁班七号',))
        p.start()
        p.join()
        
    >>>
    main line
    module name: __main__
    父进程: 1668
    进程: 1368
    函数 f
    module name: __mp_main__
    父进程: 1368
    进程: 4644
    我是 鲁班七号
    

    multiprocessing 支持进程之间的两种通信
    队列

    这个Queue是近乎克隆的queue.Queue。例如:

    from multiprocessing import Process, Queue
    
    def f(q):
        q.put(['鲁班七号', '妲己', '后裔'])
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())    # prints ['鲁班七号', '妲己', '后裔']
        p.join()
    

    队列是线程和进程安全的。

    管道

    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send(['鲁班七号', '妲己', '后裔'])
        conn.close()
    
    if __name__ == '__main__':
        parent_conn,child_conn = Pipe()
        p = Process(target=f,args=(child_conn,))
        p.start()
        print(parent_conn.recv())
        p.join()
        parent_conn.close()
    
    

    返回的两个连接对象Pipe()表示管道的两端。每个连接对象都有send()recv()方法(以及其他)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险

    进程间的同步

    multiprocessing包含所有同步原语的等价物threading。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

    from multiprocessing import Process, Lock
    def f(l, i):
        print('hello world', i)
    if __name__ == '__main__':
        lock = Lock()
    
        for num in range(10):
            Process(target=f, args=(lock, num)).start()
    

    不使用来自不同进程的锁输出容易被混淆。

    进程间共享状态

    在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。
    但是,如果您确实需要使用某些共享数据,那么 multiprocessing提供了几种方法。

    共享内存

    可以使用Value或 将数据存储在共享存储器映射中Array。例如,以下代码

    from multiprocessing import Process, Value, Array
    
    def f(n, a):
        n.value = 3.1415927
        for i in range(len(a)):
            a[i] = -a[i]
    
    if __name__ == '__main__':
        num = Value('d', 0.0)
        arr = Array('i', range(10))
    
        p = Process(target=f, args=(num, arr))
        p.start()
        p.join()
    
        print(num.value)
        print(arr[:])
    
        
    >>>
    3.1415927
    [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
    

    服务器进程
    Manager()控制器返回的管理器对象控制一个服务器进程,该进程保存Python对象并允许其他进程使用代理操作它们
    通过返回的经理Manager()将支持类型

    list,dict,Namespace,Lock, RLock,Semaphore,BoundedSemaphore, Condition,Event,Barrier, Queue,Value和Array

    例如

    from multiprocessing import Process, Manager
    
    def f(d, l, kills):
        d['name'] = '程咬金'
        d['slogan'] = '真男人,必须要有强健的肌肉,身体和精神'
        d['装备'] = None
        l.reverse()
        kills.append('后裔')
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(10))
            kills = manager.list(['达摩','鲁班七号'])
            p = Process(target=f, args=(d, l, kills))
            p.start()
            p.join()
    
            print(d)
            print(l)
            print(kills)
    
    >>>
    {'name': '程咬金', 'slogan': '真男人,必须要有强健的肌肉,身体和精神', '装备': None}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
    ['达摩', '鲁班七号', '后裔']
    

    服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。

    daemon程序

    # 不加daemon属性
    import multiprocessing
    import time
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()))
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()))
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.start()
        print("end!")
    >>>
    end!
    work start:Mon Apr  1 16:08:40 2019
    work end:Mon Apr  1 16:08:43 2019
    
    #加上daemon属性
    import multiprocessing
    import time
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()))
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()))
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.daemon = True
        p.start()
        print("end!")
    >>>
    end!
    

    注:因子进程设置了daemon属性,主进程结束,它们就随着结束了。

    Event用来实现进程间同步通信。

    import multiprocessing
    import time
    
    def wait_for_event(e):
        print("wait_for_event: starting")
        e.wait()
        print("wairt_for_event: e.is_set()->" + str(e.is_set()))
    
    def wait_for_event_timeout(e, t):
        print("wait_for_event_timeout:starting")
        e.wait(t)
        print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))
    
    if __name__ == "__main__":
        e = multiprocessing.Event()
        w1 = multiprocessing.Process(name = "block",
                target = wait_for_event,
                args = (e,))
    
        w2 = multiprocessing.Process(name = "non-block",
                target = wait_for_event_timeout,
                args = (e, 1))
        w1.start()
        w2.start()
    
        time.sleep(5)
    
        e.set()
        print("main: event is set")
    
    >>>
    wait_for_event: starting
    wait_for_event_timeout:starting
    wait_for_event_timeout:e.is_set->False
    main: event is set
    wairt_for_event: e.is_set()->True
    

    文件拷贝器:

    #encoding:utf-8
    # __author__ = 'donghao'
    # __time__ = 2019/4/1 14:14
    from multiprocessing import pool,Manager,Queue
    import os,time
    
    def mycopy(old_file_name, new_file_name, filename, queue):
        f = open(old_file_name+'/' + filename,'rb')
        content = f.read()
        f.close()
        w = open(new_file_name+'/' + filename,'wb')
        w.write(content)
        w.close()
        queue.put(filename)
    
    def main():
        old_file_name = input('请输入文件名称')
        path = os.listdir(old_file_name)
        length = len(path)
        po = pool.Pool(5)
        queue = Manager().Queue()
        try:
            new_file_name = old_file_name+'[副本]'
            os.mkdir(new_file_name)
        except:
            pass
        for filename in path:
            po.apply_async(mycopy,args=(old_file_name, new_file_name, filename, queue))
        po.close()
        copy_file_nums = 0
        while True:
            filename = queue.get()
            copy_file_nums += 1
            print('
     拷贝进度: %0.2f %%'%(copy_file_nums*100/length),end='')
            if copy_file_nums >= length:
                break
        print('
    文件拷贝成功!')
        po.join()
    
    if __name__ == '__main__':
        main()
    
  • 相关阅读:
    浅尝EffectiveCSharp_6
    浅尝EffectiveCSharp_7
    浅尝EffectiveCSharp_9
    CLR_via_C#.3rd 翻译[1.6 框架类库]
    浅尝EffectiveC#_11
    CLR_via_C#.3rd 翻译[1.9 与非托管代码的操作]
    wcf学习笔记_2(修改wcf配置文件)
    CLR_via_C#.3rd 翻译[1.4.2 不安全代码]
    CLR_via_C#.3rd 翻译[1.4.1 IL与验证]
    CLR_via_C#.3rd 翻译[1.8 通用语言规范]
  • 原文地址:https://www.cnblogs.com/donghaoblogs/p/10637129.html
Copyright © 2011-2022 走看看