zoukankan      html  css  js  c++  java
  • 生产者消费者模型

    生产者消费者模型

    一、抢票小程序

    from multiprocessing import Process,Lock
    import time
    import json
    import random
    import os
    
    
    #查看车票子进程
    def seach():
        time.sleep(random.randint(0,3))  #模拟网络IO
        with open('db.txt') as f:   #因为是查看,默认查看,所以就不用写了
            msg = json.load(f)  #将文件中的json数据读为python数据
            print(f'余票{msg["count"]}张')
    
    #买票子进程
    def get():
        #先查看票数
        time.sleep(random.randint(0, 3))  # 模拟读数据的网络IO
        with open('db.txt') as f:
            msg = json.load(f)
            print(f'余票{msg["count"]}张')
    
        if msg["count"] > 0 :
            msg["count"] -= 1
            time.sleep(random.randint(0, 2))  # 模拟写数据的网络IO
            with open('db.txt','w',encoding='utf8') as f:  #把修改后的数据写进去
                json.dump(msg,f)  #将Python数据以json形式写去文件
                print(f'进程{os.getpid()}抢票成功')
        else:
            print('票售空了')
    
    #调动函数子程序
    def Foo(l):   #加锁让程序变为了串行,牺牲了效率,但是保护了数据安全
        seach()
        l.acquire()  #拿钥匙
        get()
        l.release()  #还钥匙
    
    if __name__ == '__main__':
        for i in range(4):
            l = Lock()  #实例化一个锁对象  写在主进程是为了让子进程拿到同一把锁
            p = Process(target=Foo,args=(l,))
            p.start()
            # p.join()  #就这一句话,会把他搞成串行
    
    

    二、队列

    管道(pipe):基于共享的内存空间

    队列(Queue):管道+锁

    from multiprocessing import Queue
    
    q = Queue(2)   #实例化一个队列对象,队列里面最多放四个
    q.put('hahhahahah')   #往队列中放东西
    q.put(1)   #往队列中放东西
    q.put([1,2,3,4])  #队列满了的情况下再放值就会阻塞
    
    q.get()   #从队列中取东西
    q.get()   #从队列中取东西
    q.get()   #放几个,拿几个,如果没有可以拿的值,默认就会一直等着拿值(阻塞)
    
    
    q = Queue(2)   #实例化一个队列对象,队列里面最多放四个
    q.put('yjy',block=True,timeout=1)   #block=True默认阻塞
    q.put('yjy',block=True,timeout=1)   #timeout=1最多等待一秒,如果一秒时队列还哦是满的就报错
    #第三次报错 :queue.Full
    
    q.get(block=True,timeout=2)  #block=True如果队列中空了,会阻塞等待
    q.get(block=True,timeout=2)  #timeout=2,等待两秒,取不到就会报错:queue.Empty
    '''
    在put()和get()中,如果block=False,就不会阻塞,不用等,直接报错
    '''
    
    q.put_nowait()   #相当于block=False
    q.get_nowait()   #相当于block=False
    

    三、生产者消费者模型

    生产者可以不停地生产数据,达到自己最大的生产效率

    消费者可以不停地处理数据,达到自己的最大处理效率

    生产者消费者模型,提高了生产者生产的效率和消费者处理数据的效率

    from multiprocessing import Process,Queue
    import time
    import random
    
    '''生产者进程'''
    def producer(name,food,q):
        for i in range(2):
            time.sleep(random.randint(1,3))
            res = f"{food}{i}"
            q.put(res)
            print(f'33[44m{name} 生产了 {res}33[0m')
    
    
    
    '''消费者进程'''
    def consumer(name,q):
        while True:
            res = q.get()
            if res is None:
                break   #收到为空的信号就结束
            time.sleep(random.randint(1,3))
            print(f"{name}吃了{res}")
    
    if __name__ == '__main__':
        q = Queue()
        #生产者
        p1 = Process(target=producer,args=('yjy','酱大骨',q))
        p2 = Process(target=producer,args=('wwb','酸菜鱼',q))
        p3 = Process(target=producer,args=('hhh','卤猪蹄',q))
    
        #消费者
        c1 = Process(target=consumer,args = ('xx',q,))
        c2 = Process(target=consumer,args = ('yy',q,))
    
        #开始生产,开始吃
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    
        #必须保证生产者生产完才能发送结束的信号,运用到.join
        p1.join()  #感知子进程的结束
        p2.join()
        p3.join()
    
        #有几个消费者就发送几次信号None
        q.put(None)
        q.put(None) #发送结束信号
        print("主进程结束了")
    
    

    补充: queue不适合传大文件,通常传一些消息.

    四、JoinableQueue实现生产者消费者模型

    4.1方法介绍

    JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:

    q.task_done()
    使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。

    q.join()
    生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。

    4.2 模型

    from multiprocessing import Process,JoinableQueue
    import time
    import random
    
    '''生产者进程'''
    def producer(name,food,q):
        for i in range(2):
            time.sleep(random.randint(1,3))
            res = f"{food}{i}"
            q.put(res)
            print(f'33[44m{name} 生产了 {res}33[0m')
        q.join()  #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。
    
    
    
    '''消费者进程'''
    def consumer(name,q):
        while True:
            res = q.get()
            time.sleep(random.randint(1,3))
            print(f"{name}吃了{res}")
            q.task_done()  #向q.join()发送一次信号,证明一个数据已经被取走了
    
    if __name__ == '__main__':
        q = JoinableQueue()
        #生产者
        p1 = Process(target=producer,args=('yjy','酱大骨',q))
        p2 = Process(target=producer,args=('wwb','酸菜鱼',q))
        p3 = Process(target=producer,args=('hhh','卤猪蹄',q))
    
        #消费者
        c1 = Process(target=consumer,args = ('xx',q,))
        c2 = Process(target=consumer,args = ('yy',q,))
        c1.daemon = True  #将他设置为守护进程
        c2.daemon = True  #将他设置为守护进程
    
        #开始生产,开始吃
        l = [p1,p2,p3,c1,c2]
        for i in l :
            i.start()
    
        #必须保证生产者生产完才能发送结束的信号,运用到.join
        p1.join()  #感知子进程的结束
        p2.join()
        p3.join()
        print("主进程结束了")
    
    '''
    主进程等p1,p2,p3
    P1,P2,p3等c,c2
    p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    #因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。
    '''
    
  • 相关阅读:
    hibernate一对多查询
    hibernate关联关系查询
    Cookie&&session
    JSP&&EL&&JSTL
    servlet下的request&&response
    servlet
    mysql命令
    html小结
    RabbitMQ初步学习和使用
    爬虫简单案例
  • 原文地址:https://www.cnblogs.com/yanjiayi098-001/p/11528440.html
Copyright © 2011-2022 走看看