zoukankan      html  css  js  c++  java
  • python 多进程详解(Multiprocessing模块)

    python 多进程(MultiProcess)

    1.Process

    创建进程的类Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组,kwargs表示调用对象的字典,name为别名,group实质上不使用。
    方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,start()启动某个进程。join(timeout),使主调进程阻塞,直至被调用子进程运行结束或超时(如指定timeout)。
    属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

    1.1 创建函数并作为单个进程

    import multiprocessing
    import time
    
    
    def worker(interval):
        n = 5
        while n > 0:
            print("The time is {0}".format(time.ctime()))
            time.sleep(interval)
            n -= 1
    
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target=worker, args=(3,))
        p.start()
        print("p.pid:", p.pid)
        print("p.name:", p.name)
        print("p.is_alive:", p.is_alive())
    
    

    结果

    p.pid: 2460
    p.name: Process-1
    p.is_alive: True
    The time is Tue Aug  4 17:31:02 2020
    The time is Tue Aug  4 17:31:05 2020
    The time is Tue Aug  4 17:31:08 2020
    The time is Tue Aug  4 17:31:11 2020
    The time is Tue Aug  4 17:31:14 2020
    

    1.2 创建函数并作为多个进程

    import multiprocessing
    import time
    
    def worker_1(interval):
        print("worker_1")
        time.sleep(interval)
        print("end worker_1")
    
    def worker_2(interval):
        print("worker_2")
        time.sleep(interval)
        print("end worker_2")
    
    def worker_3(interval):
        print("worker_3")
        time.sleep(interval)
        print("end worker_3")
    
    if __name__ == "__main__":
        p1 = multiprocessing.Process(target = worker_1, args = (2,))
        p2 = multiprocessing.Process(target = worker_2, args = (3,))
        p3 = multiprocessing.Process(target = worker_3, args = (4,))
    
        p1.start()
        p2.start()
        p3.start()
    
        print("The number of CPU is:" + str(multiprocessing.cpu_count()))
        for p in multiprocessing.active_children():
            print("child   p.name:" + p.name + "	p.id" + str(p.pid))
        print("END!!!!!!!!!!!!!!!!!")
    

    结果

    The number of CPU is:8
    child   p.name:Process-2	p.id2897
    child   p.name:Process-1	p.id2896
    child   p.name:Process-3	p.id2898
    END!!!!!!!!!!!!!!!!!
    worker_1
    worker_3
    worker_2
    end worker_1
    end worker_2
    end worker_3
    

    1.3 daemon程序对比结果(不加daemon属性)

    import multiprocessing
    import time
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()));
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()));
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target = worker, args = (3,))
        p.start()
        print("end!")
    

    结果

    end!
    work start:Tue Aug  4 17:37:02 2020
    work end:Tue Aug  4 17:37:05 2020
    

    1.4 daemon程序对比(设置daemon为True)

    import multiprocessing
    import time
    
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()))
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()))
    
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target=worker, args=(3,))
        p.daemon = True
        p.start()
        print("end!")
    

    结果

    end!
    

    子进程设置了daemon属性,主进程结束,它们就随着结束了。

    1.5 设置daemon执行完结束的方法

    import multiprocessing
    import time
    
    
    def worker(interval):
        print("work start:{0}".format(time.ctime()))
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()))
    
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target=worker, args=(3,))
        p.daemon = True
        p.start()
        p.join()
        print("end!")
    

    结果

    work start:Tue Aug  4 17:41:11 2020
    work end:Tue Aug  4 17:41:14 2020
    end!
    

    2.Lock

    当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突

    import multiprocessing
    import sys
    
    
    def worker_with(lock, f):
        with lock:
            fs = open(f, 'a+')
            n = 10
            while n > 1:
                fs.write("Lockd acquired via with
    ")
                n -= 1
            fs.close()
    
    
    def worker_no_with(lock, f):
        lock.acquire()
        try:
            fs = open(f, 'a+')
            n = 10
            while n > 1:
                fs.write("Lock acquired directly
    ")
                n -= 1
            fs.close()
        finally:
            lock.release()
    
    
    if __name__ == "__main__":
        lock = multiprocessing.Lock()  # 声明一个锁
        f = "file.txt"
        w = multiprocessing.Process(target=worker_with, args=(lock, f))
        nw = multiprocessing.Process(target=worker_no_with, args=(lock, f))
        w.start()
        nw.start()
        print("end")
    

    结果

    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lock acquired directly
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    Lockd acquired via with
    

    3.Semaphore

    信号量Semaphore是一个计数器,控制对公共资源或者临界区域的访问量,信号量可以指定同时访问资源或者进入临界区域的进程数。每次有一个进程获得信号量时,计数器-1,若计数器为0时,其他进程就停止访问信号量,一直阻塞直到其他进程释放信号量。
    常用方法和属性
    acquire(blocking = True, timeout=None)
    请求一个信号量
    release()
    释放一个信号量

    import time, random
    from multiprocessing import Process, Semaphore
    
    def ktv(i, sem):
        sem.acquire()
        print('%s 走进ktv' %i)
        time.sleep(random.randint(1, 5))
        print('%s 走出ktv' %i)
        sem.release()
    
    if __name__ == "__main__":
        sem = Semaphore(4)
        for i in range(5):
            p = Process(target=ktv, args=(i, sem))
            p.start()
    

    结果

    0 走进ktv
    3 走进ktv
    4 走进ktv
    1 走进ktv
    1 走出ktv
    2 走进ktv
    2 走出ktv
    0 走出ktv
    3 走出ktv
    4 走出ktv
    

    4.Event

    Python 多进程中 Event 是用来实现进程间同步通信的(当然多线程中也可以用event)。事件event运行的机制是:全局定义了一个Flag,如果Flag值为 False,当程序执行event.wait()方法时就会阻塞,如果Flag值为True时,程序执行event.wait()方法时不会阻塞继续执行。
    常用方法和属性
    wait(time)方法:等待 time 时间后,执行下一步。或者在调用 event.set() 后立即执行下一步。
    set()方法:将Flag的值改成True。
    clear()方法:将Flag的值改成False。
    is_set()方法:判断当前的Flag的值。

    import time
    import random
    from multiprocessing import Process
    from multiprocessing import Event
    
    
    def now():
        return str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
    
    
    def traffic_light(e):  # 红绿灯
        print(now() + ' 33[31m红灯亮33[0m')  # Flag 默认是False
        while True:
            if e.is_set():  # 如果是绿灯
                time.sleep(2)  # 2秒后
                print(now() + ' 33[31m红灯亮33[0m')  # 转为红灯
                e.clear()  # 设置为False
    
            else:  # 如果是红灯
                time.sleep(2)  # 2秒后
                print(now() + ' 33[32m绿灯亮33[0m')  # 转为绿灯
                e.set()  # 设置为True
    
    def people(e, i):
        if not e.is_set():
            print(now() + ' people %s 在等待' % i)
            e.wait()
            print(now() + ' people %s 通过了' % i)
    
    
    if __name__ == '__main__':
        e = Event()  # 默认为 False,红灯亮
        p = Process(target=traffic_light, args=(e,))  # 红绿灯进程
        p.daemon = True
        p.start()
        process_list = []
        for i in range(6):  # 6人过马路
            time.sleep(random.randrange(0, 4, 2))
            p = Process(target=people, args=(e, i))
            p.start()
            process_list.append(p)
    
        for p in process_list:
            p.join()
    

    结果

    2020-08-04 17:53:34 红灯亮
    2020-08-04 17:53:36 people 0 在等待
    2020-08-04 17:53:36 绿灯亮
    2020-08-04 17:53:36 people 0 通过了
    2020-08-04 17:53:38 红灯亮
    2020-08-04 17:53:39 people 3 在等待
    2020-08-04 17:53:40 people 2 在等待
    2020-08-04 17:53:40 绿灯亮
    2020-08-04 17:53:40 people 2 通过了
    2020-08-04 17:53:40 people 3 通过了
    

    5.Queue

    常用方法和属性
    Queue([maxsize])
    创建共享的进程队列。
    参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
    底层队列使用管道和锁定实现
    q.get( [ block [ ,timeout ] ] )
    返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。
    block用于控制阻塞行为,默认为True阻塞进程. 如果设置为False,不阻塞但将引发Queue.Empty异常(定义在Queue模块中)。
    timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
    q.put(item [, block [,timeout ] ] )
    将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。
    block控制阻塞行为,默认为True阻塞。如果设置为False,不阻塞但将引发Queue.Empty异常(定义在Queue库模块中)。
    timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

    # encoding: utf-8
    
    import os
    import time
    from multiprocessing import Queue, Process, freeze_support
    
    
    def inputQ(queue):
        info = str(os.getpid()) + "(put):" + str(time.asctime())
        queue.put(info)
    
    
    def outputQ(queue):
        info = queue.get()
        print('%s%s 33[32m%s33[0m' % (str(os.getpid()), '(get):', info))
    
    
    if __name__ == '__main__':
        freeze_support()
        record1 = []  # store input process
        record2 = []  # stroe output process
        queue = Queue(3)
    
        # 输入进程
        for i in range(10):
            process = Process(target=inputQ, args=(queue,))
            process.start()
            record1.append(process)
        # 输出进程
        for i in range(10):
            process = Process(target=outputQ, args=(queue,))
            process.start()
            record2.append(process)
    
        for p in record1:
            p.join()
        for p in record2:
            p.join()
    

    结果

    7647(get): 7641(put):Tue Aug  4 17:56:15 2020
    7649(get): 7637(put):Tue Aug  4 17:56:15 2020
    7648(get): 7639(put):Tue Aug  4 17:56:15 2020
    7646(get): 7636(put):Tue Aug  4 17:56:15 2020
    7651(get): 7642(put):Tue Aug  4 17:56:15 2020
    7650(get): 7640(put):Tue Aug  4 17:56:15 2020
    7654(get): 7638(put):Tue Aug  4 17:56:15 2020
    7652(get): 7643(put):Tue Aug  4 17:56:15 2020
    7655(get): 7645(put):Tue Aug  4 17:56:15 2020
    7653(get): 7644(put):Tue Aug  4 17:56:15 2020
    

    6.Pipe

    Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
    send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

    import multiprocessing
    import time
    
    
    def proc1(pipe):
        while True:
            for i in range(10000):
                print("send: %s" % (i))
                pipe.send(i)
                time.sleep(1)
    
    
    def proc2(pipe):
        while True:
            print("proc2 rev:", pipe.recv())
            time.sleep(1)
    
    
    
    
    
    if __name__ == "__main__":
        pipe = multiprocessing.Pipe()
        p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
        p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    
        p1.start()
        p2.start()
    
        p1.join()
        p2.join()
    
    

    结果

    send: 0
    proc2 rev: 0
    send: 1
    proc2 rev: 1
    send: 2
    proc2 rev: 2进程池是多个需要被执行的任务在进程池外面排队等待获取进程对象去执行自己, 而信号量是一堆进程等待着去执行一段逻辑代码.
    
    信号量不能控制创建多少个进程, 但是可以控制同时多少个进程能够执行.
    进程池能控制可以创建多少个进程.
    send: 3
    proc2 rev: 3
    send: 4
    proc2 rev: 4
    send: 5
    proc2 rev: 5
    send: 6
    proc2 rev: 6
    ...
    

    7.Pool

    在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
    Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

    进程池是多个需要被执行的任务在进程池外面排队等待获取进程对象去执行自己, 而信号量是一堆进程等待着去执行一段逻辑代码.
    信号量不能控制创建多少个进程, 但是可以控制同时多少个进程能够执行.
    进程池能控制可以创建多少个进程.

    7.1 非阻塞

    # coding: utf-8
    import multiprocessing
    import time
    
    
    def func(msg):
        print("msg:", msg)
        time.sleep(3)
        print("end")
    
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=3)
        for i in range(4):
            msg = "hello %d" % (i)
            # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
            pool.apply_async(func, (msg, ))
    
        print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
        pool.close()
        pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print("Sub-process(es) done.")
    

    结果

    Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
    msg: hello 0
    msg: hello 1
    msg: hello 2
    end
    end
    end
    msg: hello 3
    end
    Sub-process(es) done.
    

    7.2 阻塞

    #coding: utf-8
    import multiprocessing
    import time
    
    
    def func(msg):
        print("msg:", msg)
        time.sleep(3)
        print("end")
    
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=3)
        for i in range(4):
            msg = "hello %d" % (i)
            pool.apply(func, (msg, ))  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    
        print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
        pool.close()
        pool.join()  # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print("Sub-process(es) done.")
    

    结果

    msg: hello 0
    end
    msg: hello 1
    end
    msg: hello 2
    end
    msg: hello 3
    end
    Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
    Sub-process(es) done.
    

    7.3 使用进程池,关注结果

    import multiprocessing
    import time
    
    
    def func(msg):
        print("msg:", msg)
        time.sleep(3)
        print("end")
        return "done" + msg
    
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=4)
        result = []
        for i in range(3):
            msg = "hello %d" % (i)
            result.append(pool.apply_async(func, (msg, )))
        pool.close()
        pool.join()
        for res in result:
            print(":::", res.get())
        print("Sub-process(es) done.")
    

    结果

    msg: hello 0
    msg: hello 1
    msg: hello 2
    end
    end
    end
    ::: donehello 0
    ::: donehello 1
    ::: donehello 2
    Sub-process(es) done.
    

    8.数据共享

    进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的。
    虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此。

    8.1 list

    # -*-encoding:utf-8-*-
    from multiprocessing import Process, Manager
    from time import sleep
    
    
    def thread_a_main(sync_data_pool):  # A 进程主函数,存入100+的数
        for ix in range(100, 105):
            sleep(1)
            sync_data_pool.append(ix)
    
    
    def thread_b_main(sync_data_pool):  # B 进程主函数,存入300+的数
        for ix in range(300, 309):
            sleep(0.6)
            sync_data_pool.append(ix)
    
    
    def _test_case_000():  # 测试用例
        manager = Manager()  # multiprocessing 中的 Manager 是一个工厂方法,直接获取一个 SyncManager 的实例
        sync_data_pool = manager.list()  # 利用 SyncManager 的实例来创建同步数据池
        Process(target=thread_a_main, args=(
            sync_data_pool, )).start()  # 创建并启动 A 进程
        Process(target=thread_b_main, args=(
            sync_data_pool, )).start()  # 创建并启动 B 进程
        for ix in range(6):  # C 进程(主进程)中实时的去查看数据池中的数据
            sleep(1)
            print(sync_data_pool)
    
    
    if '__main__' == __name__:  # 将测试用例单独列出
        _test_case_000()
    

    结果

    [300]
    [300, 100, 301, 302]
    [300, 100, 301, 302, 101, 303]
    [300, 100, 301, 302, 101, 303, 304, 102, 305]
    [300, 100, 301, 302, 101, 303, 304, 102, 305, 103, 306, 307]
    [300, 100, 301, 302, 101, 303, 304, 102, 305, 103, 306, 307, 104, 308]
    

    8.2 dict

    from multiprocessing import Manager, Lock, Process
    import time
    
    
    def worker(d, key, value):
        print(key, value)
        d[key] = value
    
    
    if __name__ == '__main__':
        mgr = Manager()
        d = mgr.dict()
        jobs = [Process(
            target=worker, args=(d, i, i*2)) for i in range(10)]
    
        for j in jobs:
            j.start()
        for j in jobs:
            j.join()
        print('Results:')
        print(d)
    

    结果

    6 12
    7 14
    9 18
    0 0
    8 16
    2 4
    5 10
    1 2
    3 6
    4 8
    Results:
    {6: 12, 7: 14, 9: 18, 0: 0, 8: 16, 2: 4, 5: 10, 1: 2, 3: 6, 4: 8}
    
  • 相关阅读:
    Hibernate提供的内置标识符生成器
    ThreadLocal解析
    save()/saveOrUpdate()/merge()的区别
    Hibernate中主键生成策略
    session/SessionFactory线程非安全和线程安全
    load/get延迟加载和及时加载
    最长公共子序列:递归,非递归实现
    c语言,递归翻转一个单链表,c实现单链表
    最长递增子序列(Longest Increase Subsequence)
    求一串数字中——和最大的连续子序列; 求一串数字差值的绝对值最小的两个数字
  • 原文地址:https://www.cnblogs.com/buyizhiyou/p/13438251.html
Copyright © 2011-2022 走看看