zoukankan      html  css  js  c++  java
  • python并发编程基础之守护进程、队列、锁

    并发编程2

    1.守护进程

    什么是守护进程?

    表示进程A守护进程B,当被守护进程B结束后,进程A也就结束。

    from multiprocessing import Process
    import time
    ​
    def task():
        print('妃子的一生')
        time.sleep(15)
        print('妃子死了')
    ​
    if __name__ == '__main__':
        fz = Process(target=task)
        fz.daemon = True #将子进程作为主进程的守护进程。必须在开启之前设置
        fz.start()
        print('皇帝登基了')
        time.sleep(1)
        print('做了10年皇帝')
        time.sleep(5)
        print('皇帝驾崩了!')

    上面的代码说明了什么叫守护进程,需要注意的是,守护进程须在进程开启前设置,就是改变了,本质其实就是改变了Process类里面的daemon属性,默认是False

    应用场景:之所以开启子进程,是为了帮助主进程完成某个任务,然而主进程认为自己的事情一旦完成,子进程就没必要存在,那就可以将子进程设置未主进程的守护进程。

    例如:QQ下载文件,QQ退出,文件就终止下载。

     

    2、互斥锁

    1.什么是锁(Lock)?

    锁是multiprocessing模块中的一个类(Lock)。

    2.为什么要有锁?

    串行效率低,但是数据不会出现问题。并发效率高,但是数据可能出错。

    当多进程共享一个数据时,可能造成数据错乱。

                    1.使用join方法可以让数据不会错乱,但是会让进程变为串行。效率不高。

    2.使用lock可以将数据加锁,其他程序使用时需等待当前程序使用完成。lock只是使部分代码串行,其他部分还是并发。

    3.互斥锁的示例

    Lock类的两个方法acquire(锁)和release(释放),acquire将状态变为True,  release是将状态变为False

    锁的本质就是一个bool类型,在执行前会先判断这个值。

    def acquire(self, blocking=True, timeout=-1):
        passdef release(self):
        pass
    #锁的使用
    #注意:使用锁的必须保证锁是同一个
    from multiprocessing import Process,Lock
    import random
    import time
    ​
    ​
    def task1(lock):
        lock.acquire()
        print('1111')
        time.sleep(random.randint(1, 3))
        print('11111111')
        lock.release()
    def task2(lock):
        lock.acquire()
        print('222222')
        time.sleep(random.randint(1,2))
        print('2222222222222')
        lock.release()
    def task3(lock):
        lock.acquire()
        print('3333333333')
        time.sleep(random.randint(1, 3))
        print('33333333333333333333333')
        lock.release()
    ​
    if __name__ == '__main__':
        lock = Lock()
        p1 = Process(target=task1, args=(lock,))
        p1.start()
        p2 = Process(target=task2, args=(lock,))
        p2.start()
        p3 = Process(target=task3, args=(lock,))
        p3.start()
        
     ###输出结果除了1的位置固定,后两个不固定,但是如果出现了2,则必须运行完2
    1111
    11111111
    3333333333
    33333333333333333333333
    222222
    2222222222222

    4.锁的应用

    应用场景:购票软件的购票过程。

    逻辑分析:1.用户都可以查看余票

        2.但是其中一个用户发起请求购票,其他用户不能发起请求购票(锁状态)

    import json
    from multiprocessing import Process,Lock
    import random
    import time
    #查看余票功能
    def check_ticket(name):
        time.sleep(random.randint(1, 3))
        with open('ticket.json', mode='rt', encoding='utf-8')as f:
            dic = json.load(f)
            print('%s正在查看余票,  余票:%s 张' % (name, dic['count']))
    #购票功能
    def buy_ticket(name):
        with open('ticket.json', mode='rt', encoding='utf-8')as f:
            dic = json.load(f)
            if dic['count'] > 0:
                time.sleep(random.randint(1, 3))
                dic['count'] -= 1
                print('%s购票成功! 余票:%s张' % (name, dic['count']))
                with open('ticket.json', mode='wt', encoding='utf-8')as f:
                    json.dump(dic, f)
    #用户购票流程
    def task(name,lock):
        check_ticket(name)
        lock.acquire()
        buy_ticket(name)
        lock.release()
        
    if __name__ == '__main__':
        lock = Lock()
        for i in range(1, 11):
            p = Process(target=task, args=('用户%s' % i, lock))
            p.start()

    5.重入锁(RLock)

    RLock类,可重入的锁。Lock类只能有一个acquire方法,RLock可以有多个,如果在多进程中使用Rlock,并且一个进程A执行了多次acquire,其他进程B想要获取锁,则需将所有的acquire释放可以获取。

    from multiprocessing import RLock
    lock = RLock()
    lock.acquire()
    lock.acquire()
    lock.acquire()
    print('haha')
    lock.release()
    lock.release()
    lock.release()
    #注意重入锁,锁几次就需要开几次

    6.死锁

    1.什么是死锁?

    死锁是指锁无法打开,导致程序卡死。必须是多把锁才能形成死锁。正常开发时,要避免这种情况。通常一把锁可以解决大多数问题。

    ##以一个吃饭需要盘子l1 和筷子 l2 二者缺一不可来说明死锁
    from multiprocessing import Process,Lock
    import time
    def task1(l1, l2, i):
        l1.acquire()
        print('盘子被%s抢走了'% i)
        time.sleep(2)
    ​
        l2.acquire()
        print('筷子被%s抢走了'% i)
        print('吃饭!')
        l1.release()
        l2.release()
    ​
    def task2(l1, l2, i):
        l2.acquire()      ###task2先抢了筷子,导致task1无法完成后续动作
        print('筷子被%s抢走了' % i)
    ​
    ​
        l1.acquire()
        print('盘子被%s抢走了'% i)
        print('吃饭!')
        l1.release()
        l2.release()
    ​
    ​
    if __name__ == '__main__':
        l1 = Lock()
        l2 = Lock()
        Process(target=task1, args=(l1, l2, 1)).start()   #开启两个进程
        Process(target=task2, args=(l1, l2, 2)).start()
        
    ​
        
    #在开发中要避免这种问题。如果有使用则应该注意将task2调整锁的位置。
    def task2(l1, l2, i):
        l1.acquire()
        print('盘子被%s抢走了'% i)
        l2.acquire()      
        print('筷子被%s抢走了' % i)
        print('吃饭!')
        l1.release()
        l2.release()

    7.ICP进程间通讯

    由于进程间内存是相互独立的,所以需要对应积极的方案能够使得进程之间可以相互传递数据。

    解决方案:

    1.使用共享文件。多个进程同时读写同一个文件。但是IO速度慢,传输的数据大小不受限制。

    2.管道。基于内存,速度块,但是是单向的,用起来麻烦。

    3.队列。申请共享内存空间,多个进程可以共享这个内存区域。

    #创建共享文件
    from multiprocessing import Manager,Process,Lock
    def work(d):
        # with lock:
            d['count']-=1if __name__ == '__main__':
    ​
        with Manager() as m:
            dic=m.dict({'count':100}) #创建一个共享的字典
            p_l=[]
            for i in range(100):
                p=Process(target=work,args=(dic,))
                p_l.append(p)
                p.start()
    ​
            for p in p_l:
                p.join()
            print(dic)

    8.队列

    进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。

    队列也是一种数据容器,其特点:先进先出。

    与堆栈相反:先进后出。  (函数的调用就是一种类似堆栈方式)

    创建队列的类(底层就是以管道和锁定的方式实现):

    1.Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递

    2.maxsize是队列中允许最大项数,省略则无大小限制。

    主要方法

    q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
    q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
    q.get_nowait():同q.get(False)
    q.put_nowait():同q.put(False)
    q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
    q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
    q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

    队列的优点是:保证数据不会错乱,即使在多进程中。因为put和get都是阻塞

    from multiprocessing import Queue
    ​
    q = Queue(3)    #创建一个队列,队列最多存三个数据
    q.put('ming')   #存数据
    q.put(123)
    print(q.get())  #put默认会阻塞,当队列装满时程序会卡住
    print(q.get())
    q.put('xian')   #
    q.put('daidai',False)  #put第二个参数设置为False会强行将数据放入队列
    q.put('j', True, timeout=3)    #timeout等待时间

        9.生产者消费者模型

    1.什么是生产者消费者模型?

    生产者:生产数据的一方。

    消费者:处理使用数据的一方。

    例如:一个爬虫程序:1.爬去数据     2.解析数据

    2.怎么实现?

    1.将两个不同任务分配给不同的进程。

    2.提供一个进程共享的数据容器

     

    #模型示例:
    from multiprocessing import Process, Queue
    import time
    import random
    ​
    ​
    def get_data(q):
        for num in range(1, 6):
            print('正在爬取第%s个数据' % num)
            time.sleep(random.randint(1, 2))
            print('第%s个数据爬取完成。' % num)
            q.put('第%s个数据' % num)
    ​
    ​
    def parse_data(q):
        for num in range(1, 6):
            data = q.get()
            print('正在解析%s' % data)
            time.sleep(random.randint(1, 2))
            print('%s解析完成。' % data)
    if __name__ == '__main__':
        q = Queue(5)  # 创建一个可以存放5个数据的队列
        #生产者模型
        producer = Process(target=get_data, args=(q,))
        producer.start()
        #消费者模型
        consumer = Process(target=parse_data, args=(q,))
        consumer.start()
    ###输出结果
    正在爬取第1个数据
    第1个数据爬取完成。
    正在爬取第2个数据
    正在解析第1个数据
    第2个数据爬取完成。
    正在爬取第3个数据
    第1个数据解析完成。
    正在解析第2个数据
    第3个数据爬取完成。
    正在爬取第4个数据
    第2个数据解析完成。
    正在解析第3个数据
    第3个数据解析完成。
    第4个数据爬取完成。
    正在爬取第5个数据
    正在解析第4个数据
    第5个数据爬取完成。
    第4个数据解析完成。
    正在解析第5个数据
    第5个数据解析完成。
    ###可以看出两个模型之间互不干扰,相互独立。
  • 相关阅读:
    计算机网络中的多路复用技术
    ActiveMQ之一--ActiveMQ入门
    ehcache介绍
    I/O模型之二:Linux IO模式及 select、poll、epoll详解
    【甘道夫】HBase(0.96以上版本号)过滤器Filter具体解释及实例代码
    Android Studio安装及主题字体配置
    HDU 2136 Largest prime factor 參考代码
    update更新两个字段
    Hadoop对小文件的解决方式
    赵雅智_ContentProvider
  • 原文地址:https://www.cnblogs.com/5j421/p/10198001.html
Copyright © 2011-2022 走看看