zoukankan      html  css  js  c++  java
  • day35 【守护进程、 互斥锁、 IPC机制 [进程之间通信主要:Q队列]、 生产者与消费者模型】

    今日内容

    一、守护进程

    p.daemon = True  父进程结束时【a进程中创建了子进程b,b标记为】,子进程出跟着结结束 【定义守护进程一定要放在子进程p.start前】

    使用场景:如QQ接收到一个视频文件,于是开启了一个子进程来下载,QQ退出了,下载就没必要继续了

    并发安全问题:多个进程争抢资源时,导致数据不安全。如同用一个打印机,打印错乱;抢票系统,余票为1,被多人抢购。

    二、互斥锁  将共享的资源锁住,待该进程释放后,其它进程才可以操作,保证了数据的安全有序【本质上是限制了其他进程的执行 】

      使用:from umltiprocessing import Lock                                                 等同:  global   my_lock

         name = Lock() #产生一个锁对象,                                                       if my_lock == False:

         lock. acquire()   #上锁 【子进程中调用】                                                           my_lock = True

         lock. release()   # 释放角锁                                                                                  被锁住的代码

                                                                                                                                                    my_lock = False

        与 join 的区别:【join 也可以保证数据的安全】

        join :  将导致子进程中所有代码全都串行;原本多个进程之间是公平竞争,join后就固定顺序了

        LOCK锁: 可以锁定一部分【即公共资源加锁】代码,其他代码还是并发。

            【 被锁的代码称之为粒度,粒度越大意味着锁住的代码越多,并发的就少了,效率就低,反之亦然

        最主要的区别:join 固定了某进程先执行,且所有进程任务全部串行

            锁维系了并发,让部分代码串行,可以随意锁定代码,自行调整粒度  

       两个注意点:1)想要保住数据安全,必须保证所有进程使用同一把锁

           2)加锁解锁需一 一对应,避免锁死 :a = Lock()                             a=Lock()

                            a. acquire ()                           a.acquire()

                            print( ' 抢到了!‘)                        print( ' 抢到了!‘ )

                            a. acquire()                             a. release()

      例:模拟抢票:

    from multiprocessing import Process, Lock
    import time
    import random
    import json, os

    mute = Lock()

    def search(i, lock):
    time.sleep(random.randint(0, 2))
    with open('db.json', 'rt', encoding='utf-8') as f:
    count_dic = json.load(f)
    temp = count_dic['count']
    print('%s查得余票%s' %(i,temp))

    def get(i, lock):
    time.sleep(random.randint(0, 3))
    lock.acquire()
    with open('db.json', 'rt', encoding='utf-8') as f:
    count_dic = json.load(f)
    temp = count_dic['count']
    if temp > 0:
    count_dic['count'] -= 1
    #写入文件
    with open('db.json', 'wt', encoding='utf-8') as f:
    json.dump(count_dic,f)
    print('%s抢票成功' % i)
    print('%s 说余票为0' % i)
    lock.release()

    def task(i,lock):
    search(i, lock)
    get(i, lock)


    if __name__ == '__main__':
    for i in range(10):
    obj = Process(target=task,args=(i,mute))
    obj.start()

    三、IPC (inter-Process Communication)  进程与进程间的通信 【进程们对共享数据的操作】

    方式:

    1)管道:只能单向通信,【数据都是二进制】

    2)文件:在硬盘上创意共享文件

        缺点:速度慢                   优点:数据量几乎没有限制

    3)socket: 编程复杂度较高

    4)进程之间共享的,且在内存中:Manager类

         Manager类 --  申请开辟一个共享内存空间,创建出来的数据结构,具备进程间共享的特点【问题:数据错乱需自己加锁处理

    from multiprocessing import Process, Manager, Lock
    import time
    mutex = Lock()

    def task(dic, lock):
    lock.acquire()
    temp = dic['num']
    time.sleep(0.1)
    dic['num'] = temp - 1
    lock.release()

    if __name__ == '__main__':
    m = Manager()
    dic = m.dict({'num': 10})

    l = []
    for i in range(10):
    p = Process(target=task, args=(dic,mutex))
    l.append(p)
    p.start()
    for i in l:
    i.join()
    print(dic)

    5)multiprocessing 之 Queue 【队列】帮助我们处理了锁的问题 --  IPC机制核心内容:【共享的空间;是内存空间;自动处理锁问题】

          队列是一种特殊的数据结构,先存储的先取   ----    【队列用来存进程之间沟通的消息,数据量不应该过大,是实现IPC机制的一种方式】  

       对立面是堆栈,先存储的最后取出

                     【堆栈扩展:函数嵌套调用时,执行顺序是先进后出 ---   也称之为函数栈;调用函数时,函数入栈 函数结束,函数出栈】

          队列用法:

      from multiprocessing import  Queue

         obj = Queue(maxsize =-1)  # 创建队列,默认没有数量限制,实际也不可能创建无数多个元素【受内存限制】

     obj_q = Queue(3)

         obj_q.put('' first'')

      obj_q.put({’second': None})

      obj_q.put([ ])

         obj_q.put('' ooo")  #如果容量已经满了,再调用put时将进入阻塞状态,必须列队中有数据被取出,腾出空位置,才会继续执行放入该数据

    print(obj.get())  #first
    obj.put(())
    print(obj.get()) #{’second': None
    print(obj.get()) # [ ]
    print(obj.get()) # ooo

    q.get(block=True,timeout=2);  block 表示是否阻塞 默认是阻塞的可以等,timeout等的时间,默认无限期等

                  当block设置为False时,若队列为空则抛出异常,timeout值也就无效

    q.put(block=True,timeout=2); block 表示是否阻塞 默认是阻塞的,timeout等的时间,默认无限期等

                 当设置为False 并且队列满了时 抛出异常,timeout值无效

    timeout 表示阻塞的超时时间 ,超过时间还是没有值或还是没位置则抛出异常 【仅在block为True有效】

     四、生产者与消费者模型:一种解决问题的套路

       处理某问题的方式:解除两类进程之间的对数据的相互依赖,即一类【数据生产者】与另一类【数据使用者】进程处理速度不平衡,一方快一方慢,导致一方需等待另一方

    --  该模型中的两类:数据创造者(进程)比喻为生产者、

              接收生产者(进程)造出来的数据进一步处理,该类进程比喻为消费者

              例如:爬虫,爬取数据, 分析数据

    --  实现生产者消费者模型三要素: 1. 生产者   2. 消费者    3. 队列

    --  该模型的益处/有点:

      1.实现了生产者与消费者解藕合

      2. 平衡了生产力与消费力,即生产者与消费者不直接对接,生产者将数据放入队列,消费者从队列中取

    --  该模式的应用场景:

      如果程序中有明显的两类任务,一类任务是负责生产数据,另外一类是处理数据就可以考虑用该模型

     案例1:

    import time
    import random
    from multiprocessing import Process,Queue

    def consumer(name, q):
    while True:
    res = q.get()
    time.sleep(random.randint(1,3))
    print('33[46m消费者 == > %s 吃了 %s 33[0m'%(name, res))

    def producer(name, q, food):
    for i in range(5):
    time.sleep(random.randint(1, 2))
    res = '%s%s'%(food, i)
    q.put(res)
    print('33[39m生产者===> %s生产了%s33[0m' %(name, res))

    if __name__ == '__main__':
    obj_q = Queue()

    #生产者
    p1= Process(target=producer, args=('egon', obj_q,'玉米'))
    p2= Process(target=producer, args=('jerry', obj_q,'包子'))
    p3= Process(target=producer, args=('owen', obj_q,'米饭'))

    #消费者
    c1 = Process(target=consumer, args=('alex', obj_q,))
    c2 = Process(target=consumer, args=('fanny', obj_q,))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    ----------------------------------------------------------------------------------------------------------------------------------------------------

    案例2:

    import time
    import random
    from multiprocessing import Process, Queue


    def eat(q):
    for i in range(10):
    rose = q.get()
    time.sleep(random.randint(1, 2))
    print('33[46m%s吃完了33[0m' % rose)


    def make_rose(q):
    for i in range(10):
    time.sleep(random.randint(0, 3))
    print('33[45m%s盘西红柿生产完成33[0m' % i)
    rose = '%s盘西红柿' % i
    q.put(rose)


    if __name__ == '__main__':
    q = Queue()
    make_p = Process(target=make_rose, args=(q,))
    eat_p = Process(target=eat, args=(q,))

    make_p.start()
    eat_p.start()
  • 相关阅读:
    Reusable action with query database
    Oracle实现分组统计记录
    Oracle行列转换的几种实现方法
    junit私有方法测试
    Junit实现抽象类测试(二)
    C++的性能C#的产能?! .Net Native 系列《二》:.NET Native开发流程详解
    C++的性能C#的产能?! .Net Native 系列向导
    c++的性能, c#的产能?!鱼和熊掌可以兼得,.NET NATIVE初窥
    辞职敬礼
    WPF 心路历程
  • 原文地址:https://www.cnblogs.com/qingqinxu/p/10968620.html
Copyright © 2011-2022 走看看