zoukankan      html  css  js  c++  java
  • 并发编程之多进程

    一、multiprocessing 模块介绍

    Python 中的多线程无法利用多核优势,如果想要充分地使用多核 CPU 的资源(os.cpu_count()查看),在 Python 中大部分情况需要使用多进程。Python提供了 multiprocessing。

    multiprocessing 模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块 threading 的编程接口类似。

    multiprocessing 模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等组件。

    需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

    二、Process 类的介绍

    1、创建进程的类

    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,可用来开启一个子进程
     
    强调:
    1. 需要使用关键字的方式来指定参数
    2. args 指定的为传给 target 函数的位置参数,是一个元组形式,必须有逗号

    2、参数介绍

    group参数未使用,值始终为None
    
    target表示调用对象,即子进程要执行的任务
    
    args表示调用对象的位置参数元组,args=(1,2,'qiu',)
    
    kwargs表示调用对象的字典,kwargs={'name':'qiu','age':18}
    
    name为子进程的名称

    3、方法介绍

    p.start():启动进程,并调用该子进程中的 p.run() 
    p.run():进程启动时运行的方法,正是它去调用 target 指定的函数,我们自定义类的类中一定要实现该方法  
    
    p.terminate(): 强制终止进程 p,不会进行任何清理操作,如果 p 创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。
    如果 p 还保存了一个锁那么也将不会被释放,进而导致死锁
    p.is_alive(): 如果 p 仍然运行,返回 True
    
    p.join([timeout]): 主线程等待 p 终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,
    需要强调的是,p.join只能join住 start 开启的进程,而不能 join 住 run 开启的进程

    4、属性介绍

    p.daemon:默认值为 False,如果设为 True,代表 p 为后台运行的守护进程,当 p 的父进程终止时,p 也随之终止,并且设定为 True 后,p 不能创建自己的新进程,必须在 p.start() 之前设置
    
    p.name: 进程的名称
    
    p.pid:进程的pid
    
    p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
    
    p.authkey: 进程的身份验证键,默认是由 os.urandom() 随机生成的 32 字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

    三、Process类的使用

    注意:在 Windows 中 Process() 必须放到 if __name__ == '__main__': 下

    Since Windows has no fork, the multiprocessing module starts a new Python
    process and imports the calling module.
        If Process() gets called upon import, then this sets off an infinite succession of
    new processes (or until your machine runs out of resources).
        This is the reason for hiding calls to Process() inside
     
        if __name__ == "__main__"
        since statements inside this if-statement will not get called upon import.
        由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。
        如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。
    这是隐藏对Process()的内部调用,使用if __name__ == “__main __”,这个if语句
    中的语句将不会在导入时被调用。

    创建并开启子进程的两种方式

    from multiprocessing import Process
    import time
    
    def task(name):
        print("%s is running" %name)
        time.sleep(3)
        print("%s is done" %name)
    
    if __name__ == '__main__':
        p = Process(target=task, args=("qiu",))
        # p = Process(target=task, kwargs={"name": "qiu"})
    
        # p.start()只是向操作系统发送了一个开启子进程的信号, 操作系统才能开启子进程,
        # 涉及到申请内存空间, 要将父进程的数据拷贝到子进程, 要将CPU调到子进程里运行子进程的代码
        # 才会有 is running的显示, 这都是一系列的硬件操作
        # 所以print("主")这行代码运行速度要快一些
        p.start()
        print("")
    方式一
    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
    
        def __init__(self, name):
            super().__init__()
            self.name = name
    
        def run(self):
            print("%s is running" %self.name)
            time.sleep(3)
            print("%s is done" %self.name)
    
    if __name__ == '__main__':
        p = MyProcess("qiu")
        p.start()
        print("")
    方式二

    四、join方法

    在主进程运行过程中如果想要并发的执行其他任务,我们可以开启子进程,此时主进程的任务和子进程的任务分为两种情况:

      一种情况是:在主进程的任务与子进程的任务彼此独立的情况下,主进程的任务先执行完毕后,主进程还需要等待子进程执行完毕,然后统一回收资源

      还有一种情况是:如果主进程的任务在执行到某一个阶段时,需要等待子进程执行完毕后才能继续执行,就需要一种机制能够让主进程监测子进程是否运行完毕,在子进程执行完毕后才继续执行,否则一直在原地阻塞,这就是 join 方法的作用。 

    from multiprocessing import Process
    import time
    
    def task(name, n):
        print("%s is running" %name)
        time.sleep(n)
        print("%s is done" %name)
    
    if __name__ == '__main__':
        p1 = Process(target=task, args=("Process 1", 1))
        p2 = Process(target=task, args=("Process 2", 2))
        p3 = Process(target=task, args=("Process 3", 3))
    
        start = time.time()
        p1.start()
        p2.start()
        p3.start()
    
        p1.join()
        p2.join()
        p3.join()
        print("主进程", time.time() - start)
    join

      人会有疑问,既然 join 是等待进程结束,那么我像下面 join 下去,进程不就变成串行了的吗?

      当然不是了,必须明确 join 是让谁等:进程只要 start 就会在开始运行了,所以 p1 到 p3.start() 时,系统中已经有三个并发的进程了,而 p1.join() 是在等 p1 结束,p1 只要不结束主线程就会一直卡在原地,这也是问题的关键。join 是让主线程等,而 p1-p3 仍然是并发执行的,p1.join() 的时候,其余 p2,p3 仍然在运行,等 p1.join() 结束,可能 p2,p3 早已经结束了,这样 p2.join(),p3.join() 直接通过检测,无需等待。所以 3 个 join 花费的总时间仍然是耗费时间最长的那个进程运行的时间,所以这里即便交换 join 的顺序,执行的时间仍然是 3 秒多一点,多出来的那零点几秒是开启进程以及进程切换的时间。

    from multiprocessing import Process
    import time
    
    def task(name, n):
        print("%s is running" %name)
        time.sleep(n)
        print("%s is done" %name)
    
    if __name__ == '__main__':
        p1 = Process(target=task, args=("Process 1", 1))
        p2 = Process(target=task, args=("Process 2", 2))
        p3 = Process(target=task, args=("Process 3", 3))
    
        start = time.time()
        p1.start()
        p2.start()
        p3.start()
    
        p3.join()
        p1.join()
        p2.join()
    
        print("主进程", time.time() - start)
    交换join的顺序

    join 是让主进程在原地等待,等待子进程运行完毕,不会影响子进程的执行

    上面的代码可以使用 for 循环简写

    from multiprocessing import Process
    import time
    
    def task(name, n):
        print("%s is running" %name)
        time.sleep(n)
        print("%s is done" %name)
    
    if __name__ == '__main__':
    
        start = time.time()
        p_l = []
        for i in range(1, 4):
            p = Process(target=task, args=("Process %s" %i, i))
            p_l.append(p)
            p.start()
    
        for p in p_l:
            p.join()
    
        print("主进程", time.time() - start)
    使用for循环简写

    进程间的内存空间互相隔离

    from multiprocessing import Process
    
    n = 100
    
    def task():
        global n
        n = 0
    
    if __name__ == '__main__':
        p = Process(target=task)
        p.start()
        p.join()
        print("主进程内的:", n)
    View Code

    五、僵尸进程与孤儿进程

      僵尸进程:一个进程使用 fork 创建子进程,如果子进程退出,而父进程并没有调用 wait 或 waitpid 获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程

      我们知道在 Unix/Linux 中,正常情况下子进程是通过父进程创建的,子进程在创建新的进程。子进程的结束和父进程的运行是一个异步过程,即父进程永远无法预测子进程到底什么时候结束,如果子进程一结束就立刻回收其全部资源,那么在父进程内将无法获取子进程的状态信息。因此,Unix 提供了一种机制可以保证父进程可以在任意时刻获取子进程结束时的状态信息:

      1、在每个进程退出的时候,内核释放该进程所有的资源,包括打开的文件,占用的内存等。但是仍然为其保留一定的信息(包括进程号、退出状态、运行时间等)

      2、直到父进程通过 wait/waitpid 来取时才释放。但这样就导致了问题,如果进程不调用 wait/waitpid 的话,那么保留的那段信息就不会释放,其进程号就会一直被占用,但是系统所能使用的进程号是有限的,如果大量的产生僵死进程,将因为没有可用的进程号而导致系统不能产生新的进程。此即为僵尸进程的危害,应当避免。

      任何一个子进程(init除外)在 exit() 之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。这是每个子进程在结束时都要经过的阶段。如果子进程在 exit() 之后,父进程没有来得及处理,这时用 ps 命令就能看到子进程的状态是 “Z” 。如果父进程能及时 处理,可能用 ps 命令就来不及看到子进程的僵尸状态,但这并不等于子进程不经过僵尸状态。  如果父进程在子进程结束之前退出,则子进程将由 init 接管。init 将会以父进程的身份对僵尸状态的子进程进行处理。

      孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被 init 进程(进程号为 1)所收养,并由 init 进程对它们完成状态收集工作。

      孤儿进程是没有父进程的进程,孤儿进程这个重任就落到了 init 进程身上,init 进程就好像是一个民政局,专门负责处理孤儿进程的善后工作。每当出现一个孤儿进程的时候,内核就把孤 儿进程的父进程设置为 init,而 init 进程会循环地 wait() 它的已经退出的子进程。这样,当一个孤儿进程凄凉地结束了其生命周期的时候,init 进程就会代表党和政府出面处理它的一切善后工作。因此孤儿进程并不会有什么危害。

    六、守护进程

    主进程创建子进程,然后将该进程设置成守护自己的进程,守护进程就好比皇帝身边的老太监,皇帝一死老太监就跟着殉葬。

    关于守护进程需要强调两点:

      其一:守护进程会在主进程代码执行结束后就终止

      其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

    如果我们有两个任务需要并发执行,那么开一个主进程和一个子进程分别去执行就 ok 了,如果子进程的任务在主进程任务结束后就没有存在的必要了,那么该子进程应该在开启前就被设置成守护进程。主进程代码运行结束,守护进程随即终止 

    from multiprocessing import Process
    import time
    import random
    
    
    def task(name):
        print('%s is running' % name)
        time.sleep(random.randrange(1, 3))
        print('%s is done' % name)
    
    
    if __name__ == '__main__':
        p = Process(target=task, args=('qiu',))
        p.daemon = True  # 一定要在p.start()前设置, 设置p为守护进程, 禁止p创建子进程, 并且父进程代码执行结束, p即终止运行
        p.start()
        print('')  # 只要终端打印出这一行内容, 那么守护进程p也就跟着结束掉了
    守护进程

    七、互斥锁

    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或者打印终端是没有问题的,但是带来的是竞争,竞争带来的结果是错乱,如下:多个进程模拟多个人执行抢票任务

    # db.json文件内容:
    {"count": 1}
    
    import json, time, random
    from multiprocessing import Process
    
    def search(name):
        with open("db.json", "rt", encoding="utf-8") as f:
            dic = json.load(f)
        time.sleep(1)   # 模拟查看票数的网络延迟
        print("%s查看到余票为%s张" %(name, dic["count"]))
    
    def get(name):
        with open("db.json", "rt", encoding="utf-8") as f:
            dic = json.load(f)
        if dic["count"] > 0:
            dic["count"] -= 1
            time.sleep(random.randint(1, 3))    # 模拟买票的网络延迟
    
            with open("db.json", "wt", encoding="utf-8") as f:
                json.dump(dic, f)
                print("%s购票成功" %name)
        else:
            print("%s查看到没有票了" %name)
    
    def task(name):
        search(name)
        get(name)
    
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=task, args=("路人%s " %i,))
            p.start()
    
    # 运行结果
    路人0 查看到余票为1张
    路人1 查看到余票为1张
    路人2 查看到余票为1张
    路人3 查看到余票为1张
    路人4 查看到余票为1张
    路人5 查看到余票为1张
    路人6 查看到余票为1张
    路人7 查看到余票为1张
    路人9 查看到余票为1张
    路人8 查看到余票为1张
    路人0 购票成功
    路人3 购票成功
    路人4 购票成功
    路人7 购票成功
    路人9 购票成功
    路人1 购票成功
    路人2 购票成功
    路人6 购票成功
    路人8 购票成功
    路人5 购票成功
    View Code

    可以看到,它们10个人是并发运行,效率高,但竞争写同一文件,数据写入错乱,只有一张票,却成功卖给了10个人。之前学到 join,但 join 的操作相当于指定了买票的顺序,只能由第一个人买,并不能保证数据的安全性。

    所以要进行加锁处理。而互斥锁的意思就是互相排斥,如果把多个进程比喻为多个人,互斥锁的工作原理就是多个人都要去争抢同一个资源:卫生间,一个人抢到卫生间后上一把锁,其他人都要等着,等到这个完成任务后释放锁,其他人才有可能有一个抢到......所以互斥锁的原理,就是把部分的并发改成串行,降低了效率,但保证了数据安全不错乱

    import json, time, random
    from multiprocessing import Process, Lock
    
    def search(name):
        with open("db.json", "rt", encoding="utf-8") as f:
            dic = json.load(f)
        time.sleep(1)   # 模拟查看票数的网络延迟
        print("%s查看到余票为%s张" %(name, dic["count"]))
    
    def get(name):
        with open("db.json", "rt", encoding="utf-8") as f:
            dic = json.load(f)
        if dic["count"] > 0:
            dic["count"] -= 1
            time.sleep(random.randint(1, 3))    # 模拟买票的网络延迟
    
            with open("db.json", "wt", encoding="utf-8") as f:
                json.dump(dic, f)
                print("%s 购票成功" %name)
        else:
            print("%s 查看到没有票了" %name)
    
    def task(name, mutex):
        search(name)    # 并发
        mutex.acquire()     # 加锁
        get(name)       # 串行
        mutex.release()     # 释放锁
    
        # with mutex:   # 相当于mutex.acquire(),执行完子代码块自动执行mutex.release()
        #     get(name)
    
    if __name__ == '__main__':
        mutex = Lock()
        for i in range(10):
            p = Process(target=task, args=("路人%s " %i, mutex))
            p.start()
    
    # 运行结果
    路人0 查看到余票为1张
    路人1 查看到余票为1张
    路人3 查看到余票为1张
    路人2 查看到余票为1张
    路人4 查看到余票为1张
    路人5 查看到余票为1张
    路人6 查看到余票为1张
    路人7 查看到余票为1张
    路人8 查看到余票为1张
    路人9 查看到余票为1张
    路人0  购票成功
    路人1  查看到没有票了
    路人3  查看到没有票了
    路人2  查看到没有票了
    路人4  查看到没有票了
    路人5  查看到没有票了
    路人6  查看到没有票了
    路人7  查看到没有票了
    路人8  查看到没有票了
    路人9  查看到没有票了
    View Code

     互斥锁与 join

    使用 join 可以将并发变成串行,互斥锁的原理也是将并发变成串行,那我们直接使用 join 就可以了啊,为何还要互斥锁?

    import json, time, random
    from multiprocessing import Process
    
    def search(name):
        with open("db.json", "rt", encoding="utf-8") as f:
            dic = json.load(f)
        time.sleep(1)   # 模拟查看票数的网络延迟
        print("%s查看到余票为%s张" %(name, dic["count"]))
    
    def get(name):
        with open("db.json", "rt", encoding="utf-8") as f:
            dic = json.load(f)
        if dic["count"] > 0:
            dic["count"] -= 1
            time.sleep(random.randint(1, 3))    # 模拟买票的网络延迟
    
            with open("db.json", "wt", encoding="utf-8") as f:
                json.dump(dic, f)
                print("%s 购票成功" %name)
        else:
            print("%s 查看到没有票了" %name)
    
    def task(name):
        search(name)
        get(name)
    
    
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=task, args=("路人%s " %i,))
            p.start()
            p.join()
    
    # 运行结果
    路人0 查看到余票为0张
    路人0  查看到没有票了
    路人1 查看到余票为0张
    路人1  查看到没有票了
    路人2 查看到余票为0张
    路人2  查看到没有票了
    路人3 查看到余票为0张
    路人3  查看到没有票了
    路人4 查看到余票为0张
    路人4  查看到没有票了
    路人5 查看到余票为0张
    路人5  查看到没有票了
    路人6 查看到余票为0张
    路人6  查看到没有票了
    路人7 查看到余票为0张
    路人7  查看到没有票了
    路人8 查看到余票为0张
    路人8  查看到没有票了
    路人9 查看到余票为0张
    路人9  查看到没有票了
    View Code

    发现使用 join 将并发改成串行,确实能保证数据安全,但问题是连查票操作也变成只能一个一个人去查了,很明显大家查票时应该是并发的去查询而无需考虑数据准确与否,此时 join 与互斥锁的区别就显而易见了,join 是将一个任务整体串行,而互斥锁的好处则是可以将一个任务中的某一段代码串行

    总结:

    加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

    虽然可以用文件共享数据实现进程间通信,但问题是:

      1、效率低(共享数据基于文件,而文件是硬盘上的数据)

      2、需要自己加锁处理

    因此我们最好找寻一种解决方案能够兼顾:

      1、效率高(多个进程共享一块内存的数据)

      2、帮我们处理好锁问题

    这就是 mutiprocessing 模块为我们提供的基于消息的 IPC 通信机制:队列和管道

    队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。

    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

    八、队列

    进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing 模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

    from multiprocessing import Queue
    
    q = Queue(3)
    
    # put ,get ,put_nowait,get_nowait,full,empty
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.full())  # 满了
    # q.put(4) # 再放就阻塞住了
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty())  # 空了
    # print(q.get()) # 再取就阻塞住了
    队列的使用
  • 相关阅读:
    wireshark如何抓取本机包
    模拟post请求方法
    Spring Boot中使用RabbitMQ
    Dubbo注册中心的四种配置方式详解
    spring扩展点之三:Spring 的监听事件 ApplicationListener 和 ApplicationEvent 用法,在spring启动后做些事情
    zookeeper 大量连接断开重连原因排查
    分布式一致性协议之:Gossip(八卦)算法
    MongoDB分析工具之一:explain()语句分析工具
    MongoDB分析工具之二:MongoDB分析器Profile
    MySQL安装
  • 原文地址:https://www.cnblogs.com/qiuxirufeng/p/9925744.html
Copyright © 2011-2022 走看看