zoukankan      html  css  js  c++  java
  • 25 Apr 18 守护进程 互斥锁 抢票系统 IPC通信机制 生产者消费者模型

    25 Apr 18
    一、上节课复习
    if __name__ == '__main__': 放在最后面
    obj.join(1)  #只等1秒
     
    二、守护进程
    from multiprocessing import Process
    import time
     
    def task(name):
        print('%s is running' % name)
        time.sleep(3)
     
    if __name__ == '__main__':
        obj = Process(target=task, args=('egon',))
        obj.daemon=True    #将obj变成守护进程,主进程执行完毕后子进程跟着结束
        obj.start()  # 发送信号给操作系统
    print('主')
     
    三、互斥锁
    强调:必须是lock.acquire()一次,然后 lock.release()释放一次,才能继续lock.acquire(),不能连续的lock.acquire()。否者程序停在原地。
     
    互斥锁vs join: 
    大前提:二者的原理都是一样,都是将并发变成串行,从而保证有序(在多个程序共享一个资源时,为保证有序不乱,需将并发变成串行)
    区别一:join是按照人为指定的顺序执行,而互斥锁是所以进程平等地竞争,谁先抢到谁执行
    区别二:互斥锁可以让一部分代码(修改共享数据的代码)串行,而join只能将代码整体串行(详见抢票系统)
     
    from multiprocessing import Process,Lock
    import time,random
     
    mutex=Lock()
     
    def task1(lock):
        lock.acquire() 
        print('task1:名字是egon')
        time.sleep(random.randint(1,3))
        print('task1:性别是male')
        time.sleep(random.randint(1,3))
        print('task1:年龄是18')
        lock.release()
     
    def task2(lock):
        lock.acquire()
        print('task2:名字是alex')
        time.sleep(random.randint(1,3))
        print('task2:性别是male')
        time.sleep(random.randint(1,3))
        print('task2:年龄是78')
        lock.release()
     
    def task3(lock):
        lock.acquire()
        print('task3:名字是lxx')
        time.sleep(random.randint(1,3))
        print('task3:性别是female')
        time.sleep(random.randint(1,3))
        print('task3:年龄是30')
        lock.release()
     
    if __name__ == '__main__':
        p1=Process(target=task1,args=(mutex,))
        p2=Process(target=task2,args=(mutex,))
        p3=Process(target=task3,args=(mutex,))
     
        p1.start()
        p2.start()
        p3.start()
     
    四、抢票系统
    import json
    import time
    import random
    import os
    from multiprocessing import Process,Lock
     
    mutex=Lock()
     
    def search():
        time.sleep(random.randint(1,3))
        with open('db.json','r',encoding='utf-8') as f:
            dic=json.load(f)
            print('%s 剩余票数:%s' %(os.getpid(),dic['count']))
     
    def get():
        with open('db.json','r',encoding='utf-8') as f:
            dic=json.load(f)
        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3))
            with open('db.json','w',encoding='utf-8') as f:
                json.dump(dic,f)
            print('%s 购票成功' %os.getpid())
     
    def task(lock):
        search()
        lock.acquire()
        get()
        lock.release()
     
    if __name__ == '__main__':
        for i in range(10):
            p=Process(target=task,args=(mutex,))
            p.start()
     
    五、IPC通信机制
    进程之间通信必须找到一种介质,该介质必须满足
    1、是所有进程共享的
    2、必须是内存空间
    附加:帮我们自动处理好锁的问题
     
    a、 from multiprocessing import Manager(共享内存,但要自己解决锁的问题)
    b、 IPC中的队列(Queue) 共享,内存,自动处理锁的问题(最常用)
    c、 IPC中的管道(Pipe),共享,内存,需自己解决锁的问题
     
    a、 用Manager
    from multiprocessing import Process,Manager,Lock
    import time
     
    mutex=Lock()
     
    def task(dic,lock):
        lock.acquire()
        temp=dic['num']
        time.sleep(0.1)
        dic['num']=temp-1
        lock.release()
     
    if __name__ == '__main__':
        m=Manager()
        dic=m.dict({'num':10})
     
        l=[]
        for i in range(10):
            p=Process(target=task,args=(dic,mutex))
            l.append(p)
            p.start()
     
        for p in l:
            p.join()
    print(dic)
     
    b、 用队列Queue
    1)共享的空间
    2)是内存空间
    3)自动帮我们处理好锁定问题
     
    from multiprocessing import Queue
    q=Queue(3)  #设置队列中maxsize个数为三
    q.put('first')
    q.put({'second':None})
    q.put('三')
    # q.put(4)   #阻塞。不报错,程序卡在原地等待队列中清出一个值。默认blok=True
    print(q.get())
    print(q.get())
    print(q.get())
     
    强调:
    1、队列用来存成进程之间沟通的消息,数据量不应该过大
    2、maxsize的值超过的内存限制就变得毫无意义
                                                                  
                                                                                                                                                                                                          
    了解:
    q=Queue(3)
    q.put('first',block=False)
    q.put('second',block=False)
    q.put('third',block=False)
    q.put('fourth',block=False)  #报错 queue.Full
     
    q.put('first',block=True)
    q.put('second',block=True)
    q.put('third',block=True)
    q.put('fourth',block=True,timeout=3)  #等待3秒后若还进不去报错。注意timeout不能和block=False连用
     
    q.get(block=False)
    q.get(block=False)
    q.get(block=False)
    q.get(block=False)           #报错 queue.Empty
     
    q.get(block=True)
    q.get(block=True)
    q.get(block=True)
    q.get(block=True,timeout=2)    #等待2秒后还取不出东西则报错。注意timeout不能和block=False连用
     
    六、生产者消费者模型
    该模型中包含两类重要的角色:
    1、生产者:将负责造数据的任务比喻为生产者
    2、消费者:接收生产者造出的数据来做进一步的处理,该类人物被比喻成消费者
     
    实现生产者消费者模型三要素
    1、生产者
    2、消费者
    3、队列
     
    什么时候用该模型:
    程序中出现明显的两类任何,一类任务是负责生产,另外一类任务是负责处理生产的数据的
     
    该模型的好处:
    1、实现了生产者与消费者解耦和
    2、平衡了生产者的生产力与消费者处理数据的能力
     
    import time
    import random
    from multiprocessing import Process,Queue
     
    def consumer(name,q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('33[46m消费者===》%s 吃了 %s33[0m' %(name,res))
     
    def producer(name,q,food):
        for i in range(5):
            time.sleep(random.randint(1,2))
            res='%s%s' %(food,i)
            q.put(res)
            print('33[45m生产者者===》%s 生产了 %s33[0m' %(name,res))
     
    if __name__ == '__main__':
        #1、共享的盆
        q=Queue()
     
        #2、生产者们
        p1=Process(target=producer,args=('egon',q,'包子'))
        p2=Process(target=producer,args=('刘清政',q,'泔水'))
        p3=Process(target=producer,args=('杨军',q,'米饭'))
     
        #3、消费者们
        c1=Process(target=consumer,args=('alex',q))
        c2=Process(target=consumer,args=('梁书东',q))
     
        p1.start()
        p2.start()
        p3.start()
        c1.start()
    c2.start()
     
    生产者消费者模型是解决问题的思路不是技术。可以用进程和队列来实现,也可以用其他的来实现。
  • 相关阅读:
    博客园美化-SimpleMemor
    Java多线程-synchronized与ReentrantLock
    springboot中删除@SessionAttributes注解的属性
    SSM整合笔记
    Spring中xml和注解方式使用AOP
    Mysql 数据库基本操作
    Mysql 二进制包安装
    named piped tcp proxy 下载
    docker容器中日志文件过大处理方法
    自动做bond的脚本
  • 原文地址:https://www.cnblogs.com/zhangyaqian/p/py20180425.html
Copyright © 2011-2022 走看看