zoukankan      html  css  js  c++  java
  • (三)多进程之守护进程与互斥锁

    一、守护进程


    1,主进程创建子进程,然后将该进程设置成守护自己的进程,守护进程就好比皇帝身边的老太监,皇帝已死老太监就跟着殉葬了。

    关于守护进程需要强调两点:

      其一:守护进程会在主进程代码执行结束后就终止。

      其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

    如果我们有两个任务需要并发执行,那么开一个主进程和一个子进程分别去执行就ok了,如果子进程的任务在主进程任务结束后就没有存在的必要了,那么该子进程应该在开启前就被设置成守护进程。主进程代码运行结束,守护进程随即终止。

    注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止。

    from multiprocessing import Process
    import time
    
    def task(name):
        print("%s is running." % name)
        time.sleep(2)
    
    if __name__ == "__main__":
        p = Process(target=task,args=("子进程1",))
        p.daemon = True     # 守护进程一定是要在开启之前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行。
    
        p.start()
    
        print("")
        # 只要终端打印出这行内容,主进程就算执行完了,那么守护进程 p 也就跟着结束掉了。

    2,练习题:思考下列代码的执行结果有可能有哪些情况?why?

    from multiprocessing import Process
    import time
    
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    if __name__ == '__main__':
        p1 = Process(target=foo)
        p2 = Process(target=bar)
        p1.daemon = True
        
        p1.start()
        p2.start()
        print("main-------")
    from multiprocessing import Process
    import time
    
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    if __name__ == '__main__':
        p1 = Process(target=foo)
        p2 = Process(target=bar)
        p1.daemon = True  # 把 p1 设置成守护进程,main------ 打印出来之后,它就跟着没了
    
        p1.start()
        p2.start()
        print("main-------")    # 这行打印出来,主进程就运行完了
    
    # 机器性能高的情况下,会出现,p2 的信号的发送过程中,p1 已经立马开启起来了,会瞬间打印123,然后会打印main----,然后p2起来
    # 但是只要看到main------,绝对不会看到 end123。
    
    """
    main-------
    456
    end456
    """
    result

    二、互斥锁


    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如下:

    # 并发运行,效率高,但竞争同一打印终端,带来了打印错乱。
    from multiprocessing import Process
    import time
    
    def task(name):
        print("%s 1" % name)
        time.sleep(1)
        print("%s 2" % name)
        time.sleep(1)
        print("%s 3" % name)
    
    
    if __name__ == "__main__":
        for i in range(3):
            p = Process(target=task,args=("进程%s" % i,))
            p.start()
    
    
    """
    结果是错乱,三个进程是共享同一个输出终端的,
    共享带来的就是竞争,谁抢到了,谁就打印,这就用到了互斥锁。
    
    进程0 1
    进程2 1
    进程1 1
    进程0 2
    进程1 2
    进程2 2
    进程0 3
    进程1 3
    进程2 3
    """

    如何控制,就是加锁处理。而互斥锁的意思就是互相排斥,如果把多个进程比喻为多个人,互斥锁的工作原理就是多个人都要去争抢同一个资源:卫生间,一个人抢到卫生间后上一把锁,其他人都要等着,等到这个完成任务后释放锁,其他人才有可能有一个抢到......所以互斥锁的原理,就是把并发改成串行,降低了效率,但保证了数据安全不错乱。

    # 由并发变成了串行,牺牲了运行效率,但避免了竞争
    from multiprocessing import Process,Lock
    import time
    
    def task(name,mutex):
        mutex.acquire()     # 加锁
    
        print("%s 1" % name)
        time.sleep(1)
        print("%s 2" % name)
        time.sleep(1)
        print("%s 3" % name)
    
        mutex.release()     # 把锁释放掉
    
    if __name__ == "__main__":
        mutex = Lock()
        for i in range(3):
            p = Process(target=task,args=("进程%s" % i,mutex))    # 确保所有的进程用的是同一把锁,就把这个锁传递给子进程。
            p.start()
    
    
    """
    虽然效率降低了,但是避免了竞争,错乱
    
    进程0 1
    进程0 2
    进程0 3
    进程1 1
    进程1 2
    进程1 3
    进程2 1
    进程2 2
    进程2 3
    """

    三、模拟抢票练习


    多个进程共享同一文件,我们可以把文件当数据库,用多个进程模拟多个人执行抢票任务。

    # 文件db.txt的内容为:{"count":1}
    # 注意一定要用双引号,不然json无法识别
    from multiprocessing import Process
    import json
    import time
    
    def search(name):
        """查票"""
        time.sleep(1)   # 模拟网络延迟
        dic = json.load(open("db.txt","r",encoding="utf-8"))
        print("<%s>查看到剩余票数<%s>" % (name,dic["count"]))
    
    
    def get(name):
        """购票"""
        time.sleep(1)
        dic = json.load(open("db.txt", "r", encoding="utf-8"))
        if dic["count"] > 0:
            dic["count"] -= 1
            time.sleep(3)
            json.dump(dic,open("db.txt","w",encoding="utf-8"))
            print("<%s>购票成功" % name)
    
    def task(name):
        search(name)
        get(name)
    
    if __name__ == "__main__":
        for i in range(1,11):
            p = Process(target=task,args=("路人%s" % i,))
            p.start()
    
    # 并发运行,效率高,但竞争同一文件,数据写入错乱,只有一张票,却成功卖给了10个人
    
    """
    <路人1>查看到剩余票数<1>
    <路人2>查看到剩余票数<1>
    <路人3>查看到剩余票数<1>
    <路人4>查看到剩余票数<1>
    <路人5>查看到剩余票数<1>
    <路人6>查看到剩余票数<1>
    <路人7>查看到剩余票数<1>
    <路人8>查看到剩余票数<1>
    <路人9>查看到剩余票数<1>
    <路人10>查看到剩余票数<1>
    <路人1>购票成功
    <路人2>购票成功
    <路人3>购票成功
    <路人4>购票成功
    <路人5>购票成功
    <路人6>购票成功
    <路人7>购票成功
    <路人8>购票成功
    <路人9>购票成功
    <路人10>购票成功
    """
    并发运行,效率高,但竞争同一文件,数据写入错乱,只有一张票,卖成功给了10个人。
    # 加锁处理:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全
    from multiprocessing import Process,Lock
    import json
    import time
    
    def search(name):
        """查票"""
        time.sleep(1)   # 模拟网络延迟
        dic = json.load(open("db.txt","r",encoding="utf-8"))
        print("<%s>查看到剩余票数<%s>" % (name,dic["count"]))
    
    
    def get(name):
        """购票"""
        time.sleep(1)
        dic = json.load(open("db.txt", "r", encoding="utf-8"))
        if dic["count"] > 0:
            dic["count"] -= 1
            time.sleep(3)
            json.dump(dic,open("db.txt","w",encoding="utf-8"))
            print("<%s>购票成功" % name)
    
    def task(name,mutex):
        search(name)
    
        mutex.acquire()
        get(name)
        mutex.release()
    
    if __name__ == "__main__":
        mutex = Lock()
        for i in range(1,11):
            p = Process(target=task,args=("路人%s" % i,mutex))
            p.start()
    
    """
    <路人1>查看到剩余票数<1>
    <路人2>查看到剩余票数<1>
    <路人3>查看到剩余票数<1>
    <路人4>查看到剩余票数<1>
    <路人5>查看到剩余票数<1>
    <路人6>查看到剩余票数<1>
    <路人7>查看到剩余票数<1>
    <路人8>查看到剩余票数<1>
    <路人9>查看到剩余票数<1>
    <路人10>查看到剩余票数<1>
    <路人1>购票成功
    
    Process finished with exit code 0
    """
    加锁处理:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据的安全

    四、互斥锁与join


    使用 join 可以将并发变成串行,互斥锁的原理也是将并发变成穿行,那我们直接使用 join 就可以了啊,为何还要互斥锁,那我们就来试一下:

    # 互斥锁与 join
    from multiprocessing import Process
    import json
    import time
    
    def search(name):
        """查票"""
        time.sleep(1)   # 模拟网络延迟
        dic = json.load(open("db.txt","r",encoding="utf-8"))
        print("<%s>查看到剩余票数<%s>" % (name,dic["count"]))
    
    
    def get(name):
        """购票"""
        time.sleep(1)
        dic = json.load(open("db.txt", "r", encoding="utf-8"))
        if dic["count"] > 0:
            dic["count"] -= 1
            time.sleep(3)
            json.dump(dic,open("db.txt","w",encoding="utf-8"))
            print("<%s>购票成功" % name)
        else:
            print("<%s>购票失败" % name)
    
    def task(name):
        search(name)
        get(name)
    
    if __name__ == "__main__":
        for i in range(1,11):
            p = Process(target=task,args=("路人%s" % i,))
            p.start()
            p.join()    # 进程按顺序一个个执行
    
    """
    这种方式连查看票都变成串行的了,而我们只想要购票是串行的。
    <路人1>查看到剩余票数<1>
    <路人1>购票成功
    <路人2>查看到剩余票数<0>
    <路人2>购票失败
    <路人3>查看到剩余票数<0>
    <路人3>购票失败
    <路人4>查看到剩余票数<0>
    <路人4>购票失败
    <路人5>查看到剩余票数<0>
    <路人5>购票失败
    <路人6>查看到剩余票数<0>
    <路人6>购票失败
    <路人7>查看到剩余票数<0>
    <路人7>购票失败
    <路人8>查看到剩余票数<0>
    <路人8>购票失败
    <路人9>查看到剩余票数<0>
    <路人9>购票失败
    <路人10>查看到剩余票数<0>
    <路人10>购票失败
    """

    发现使用 join 将并发改成穿行,确实能保证数据安全,但问题是连查票操作也变成只能一个一个人去查了,很明显大家查票时应该是并发地去查询而无需考虑数据准确与否,此时 join 与互斥锁的区别就显而易见了,join是将一个任务整体串行,而互斥锁的好处则是可以将一个任务中的某一段代码串行,比如只让 task 函数中的 get 任务串行:

    def task(name,):
        search(name)    # 并发执行
    
        mutex.acquire()
        get(name)   # 串行执行
        mutex.release()

    总结:

    # 加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    2.需要自己加锁处理
    
    
    
    # 因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
    1 队列和管道都是将数据存放于内存中
    2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
  • 相关阅读:
    111
    python 错误宝典
    Node.js Web开发:Connect
    Node.js 中的重要API:HTTP
    Node.js 中的重要API:TCP
    Node.js 中的重要API:命令行工具以及FS API 首个Node应用
    Node.js 中的JS
    Node.js 阻塞式IO与非阻塞式IO与错误处理
    Learning Vue.js 2
    A1046——入门模拟 Shortest Distance
  • 原文地址:https://www.cnblogs.com/zoling7/p/13381103.html
Copyright © 2011-2022 走看看