zoukankan      html  css  js  c++  java
  • ~~并发编程(五):重要方法~~

    进击のpython

    *****

    并发编程——重要方法


    前面提到的方法都是比较基本的,比较基础

    这个小节我们着重了解一下以下几个方法:

    1.守护进程

    2.互斥锁

    3.队列


    守护进程

    守护进程,就有点像古时候的封建奴隶制下的皇上与太监,皇上死了,太监就得陪着殉葬,守护着皇上

    守护进程就是这样的关系,当主进程结束,守护进程随之消失

    守护进程一般是怎么用呢?如果我们有两个进程并发执行,那我们就可以开一个主进程,再开一个子进程

    但是呢,如果我们开的这个子进程在主进程结束之后就没什么用了,那这个时候就建议用守护进程了

    p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程

    当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程

    必须在p.start()之前设置

    from multiprocessing import Process
    
    
    def func(name, *args, **kwargs):
        print(f'{name}执行了')
        pass
    
    
    if __name__ == '__main__':
        p = Process(target=func, args=('子进程',))
        # p.daemon = True
        p.start()
        print('我是主进程... ...')
    
    

    正常开启子进程,都不是守护进程,由于声明之后还要有一段时间才能执行子进程

    所以先打印 “我是主进程... ...” ,再打印 “子进程被执行了”

    但是当我把注释的语句打开,p就变成了守护进程(注意,是在start上面)

    由于还是主进程会先打印,就会导致,子进程还没打印就陪葬了

    就只会打印“我是主进程... ...”


    互斥锁

    首先啊,原则上进程之间是不能够相互通信的,但是捏,进程之间还是共享一套文件资源的

    所以访问同一个文件或者同一个打印终端,是没有问题的

    但是计算机是贪婪地,有共享,就会有竞争

    有竞争,如果没有限制,没有秩序,就会混乱

    import time
    from multiprocessing import Process
    
    
    def func(name, *args, **kwargs):
        print(f'{name}正在执行.. ..')
        time.sleep(2)
        print(f'{name}执行完毕!')
    
        pass
    
    
    if __name__ == '__main__':
        for i in range(3):
            p = Process(target=func, args=(f'进程{i}',))
            p.start()
    
    

    我想达到的效果是不是应该是

    进程0正在执行.. ..
    进程0执行完毕!
    进程1正在执行.. ..
    进程1执行完毕!
    进程2正在执行.. ..
    进程2执行完毕!
    

    而实际情况打印的是:

    进程1正在执行.. ..
    进程0正在执行.. ..
    进程2正在执行.. ..
    进程1执行完毕!
    进程0执行完毕!
    进程2执行完毕!
    

    这就出现了竞争(多打印几次,还会出现别的结果)

    就好像你去上厕所,但是大家都不排队,你挤进去了,别人还往里挤

    还有的时候开门看看你,所以为了解决这个问题,我们一般都在卫生间加把锁

    上厕所的时候锁一下门,我不完事,谁也别想进来

    是对于程序也是,加一个锁,就让程序变得有序了

    其实也就相当于把并发变成了串行,诚然降低了效率,但是保证了数据的安全

    import time
    from multiprocessing import Process, Lock
    
    
    def func(name, lock, *args, **kwargs):
        lock.acquire()
        print(f'{name}正在执行.. ..')
        time.sleep(2)
        print(f'{name}执行完毕!')
        lock.release()
        pass
    
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(3):
            p = Process(target=func, args=(f'进程{i}', lock))
            p.start()
    
    

    lock = Lock(),对象的实例化,实例化出一个锁

    然后锁,有关锁(acquire)就有开锁(release)

    这样就保证某个子进程在用打印的时候,别的子进程只能在外面等待,就达到了我们的目的

    抢票系统

    这个经典的例子也可以通过这个抢票系统来帮助理解

    首先所有人都可以查票,如果有票就可以买

    我们可以先写一下

    import json
    
    from multiprocessing import Process
    
    
    def search(name, *args, **kwargs):
        msg = json.load(open('db.json'))
        print(f'{name}查看票数,当前有{msg["count"]}张')
        pass
    
    
    def get(name, *args, **kwargs):
        msg = json.load(open('db.json'))
        if msg["count"] > 0:
            msg["count"] -= 1
            json.dump(msg, open('db.json', 'w'))
            print(f'{name}买到票了,当前有{msg["count"]}张')
    
    
    def main(name):
        search(name)
        get(name)
    
    
    if __name__ == '__main__':
        for i in range(1, 5):
            p = Process(target=main, args=(f"第{i}个人",))
            p.start()
    
    json.dump({'count': 1}, open("db.json", 'w'))
    
    

    执行一下

    第2个人查看票数,当前有1张
    第2个人买到票了,当前有0张
    第4个人查看票数,当前有1张
    第3个人查看票数,当前有1张
    第4个人买到票了,当前有0张
    第3个人买到票了,当前有0张
    第1个人查看票数,当前有1张
    第1个人买到票了,当前有0张
    

    问题出现了,只有一张票,但是我们却卖出去四张

    这是不对的,应该是查询大家一起查,买票只能一个一个买

    所以可以这样:

    import json
    import time
    
    from multiprocessing import Process, Lock
    
    
    def search(name, *args, **kwargs):
        msg = json.load(open('db.json'))
        print(f'{name}查看票数,当前有{msg["count"]}张')
        pass
    
    
    def get(name,lock, *args, **kwargs):
        lock.acquire()
        msg = json.load(open('db.json'))
        if msg["count"] > 0:
            msg["count"] -= 1
            json.dump(msg, open('db.json', 'w'))
            print(f'{name}买到票了,当前有{msg["count"]}张')
        else:
            print(f'{name}没买到票!')
        lock.release()
    def main(name,lock):
        time.sleep(1) # 模拟网络延迟
        search(name)
        get(name,lock)
    
    
    if __name__ == '__main__':
        lock = Lock()
    
        for i in range(1, 5):
            p = Process(target=main, args=(f"第{i}个人",lock))
            p.start()
    
    json.dump({'count': 1}, open("db.json", 'w'))
    
    

    输出结果就是我们想的那样了,只有一个人买到了

    第2个人查看票数,当前有1张
    第2个人买到票了,当前有0张
    第4个人查看票数,当前有0张
    第4个人没买到票!
    第3个人查看票数,当前有0张
    第3个人没买到票!
    第1个人查看票数,当前有0张
    第1个人没买到票!
    

    横向对比join方法,在刚开始的时候我们也利用join方法进行了等待的操作

    我们看看用join怎么做

    import json
    import time
    
    from multiprocessing import Process, Lock
    
    
    def search(name, *args, **kwargs):
        msg = json.load(open('db.json'))
        print(f'{name}查看票数,当前有{msg["count"]}张')
        pass
    
    
    def get(name, lock, *args, **kwargs):
        lock.acquire()
        msg = json.load(open('db.json'))
        if msg["count"] > 0:
            msg["count"] -= 1
            json.dump(msg, open('db.json', 'w'))
            print(f'{name}买到票了,当前有{msg["count"]}张')
        else:
            print(f'{name}没买到票!')
        lock.release()
    
    
    def main(name, lock):
        time.sleep(1)  # 模拟网络延迟
        search(name)
        get(name, lock)
    
    
    if __name__ == '__main__':
        lock = Lock()
    
        for i in range(1, 5):
            p = Process(target=main, args=(f"第{i}个人", lock))
            p.start()
            p.join()
    
    

    发现也可以达到我哦们想要的效果,但是查票的过程也变成串行了

    很明显大家查票时应该是并发地去查询而无需考虑数据准确与否

    此时join与互斥锁的区别就显而易见了:

    join是将一个任务整体串行,而互斥锁的好处则是可以将一个任务中的某一段代码串行

    比如只让main函数中的get任务串行


    队列

    进程彼此之间互相隔离,要实现进程间通信(IPC)

    multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

    是消息传递,不是大数据!

    看一下队列中有什么

    class Queue(object):
        def __init__(self, maxsize=-1):
            self._maxsize = maxsize
    

    maxsize:可放入的允许的最大项数,如果不设置,默认无限

    ​ 但是队列是基于内存的,真正的无限其实取决于内存

     def qsize(self):
            return 0
    
        def empty(self):
            return False
    
        def full(self):
            return False
    
        def put(self, obj, block=True, timeout=None):
            pass
    
        def put_nowait(self, obj):
            pass
    
        def get(self, block=True, timeout=None):
            pass
    
        def get_nowait(self):
            pass
    
        def close(self):
            pass
    
        def join_thread(self):
            pass
    
        def cancel_join_thread(self):
            pass
    

    队列的方法大概是上述这些

    put():往队列里放信息

    get():从队列里拿数据

    empty():判断队列是否为空

    full():判断队列满没满

    同时也要记住,队列是“先进先出”

    当队列满了,再往里放,就会阻塞

    当队列空了,再往外取,也会阻塞

    from multiprocessing import Queue
    
    q = Queue(3)
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.full())
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty())
    

    可以看到打印出来的是1 2 3

    因为我们放的顺序就是1 2 3


    *****
    *****
  • 相关阅读:
    【分享】马化腾:产品设计与用户体验
    《JavaScript高级程序设计》读书笔记(八):Function类及闭包
    《JavaScript高级程序设计》阅读笔记(七):ECMAScript中的语句
    SET XACT_ABORT各种用法及显示结果
    发布一款域名监控小工具——Domain(IP)Watcher
    【转】C#正则表达式整理备忘
    《JavaScript高级程序设计》阅读笔记(一):ECMAScript基础
    Entity Framework多对多关系实践(manytomany)
    jQuery插件原来如此简单——jQuery插件的机制及实战
    《JavaScript高级程序设计》阅读笔记(二):ECMAScript中的原始类型
  • 原文地址:https://www.cnblogs.com/jevious/p/11402260.html
Copyright © 2011-2022 走看看