zoukankan      html  css  js  c++  java
  • 生产者和消费者模型

    day37 生产者和消费者模型

    多道技术

    • 空间复用: 多个程序共用一个内存条, 彼此隔离, 物理级别的隔离
    • 时间复用: 公用一个cup

    切换的情况:

    • io切换时, 占用的时间过长

    串行:一个任务完完整整的运行结束, 在运行下一个任务

    并发: 看起来是同时执行多个任务, 是单核的

    并行: 真正的做到了同时执行多个任务, 是多核的

    开启并发编程的两种方式

    # 方式一
    from mulitprocessing import Process
    
    def task():
        pass
    
    if __name__ == '__main__':
        p = Process(terget=task)
        p.start()
        
    # 方式二
    from mulitprocessing import Process
    
    class MyP(Process):
        def run(self):
            pass
        
    if __name__ == '__main__':
        p = MyP()
        p.start() #给操作系统发送一个请求, 操作系统去开进程
        #会申请一个空间,把父进程的代码完整的copy一份放进去, 然后去运行代码
    
    

    join阻塞

    from mulitprocessing import Process
    import time
    
    def task(s):
        time.sleep(s)
        pass
    
    if __name__ == '__main__':
        p = Process(terget=task,args=(1,))
        p1 = Process(terget=task,args=(2,))
        p.start()
        p1.start()
        p.join() #等一秒
        p1.join() #等一秒, 
        print('主') #主进程要等待所有的子进程结束才会结束,因为主进程要在结束前回收僵尸进程.
        
    

    僵尸进程: 没有死透的子进程

    孤儿进程: 子进程运行的过程中父进程死了就变成了孤儿进程, 被进程init接管.

    父进程一直不死, 会一直开启子进程,意味着占用过多的cpu并且不回收,

    解决方案: 强制的杀死这个父进程.

    守护进程

    守护进程的本质也是一个进程

    主进程的代码执行完毕守护进程直接结束.在主进程结束之前,守护进程一直运行

    from mulitprocessing import Process
    import time
    
    def task():
        print('守护进程 start')
        time.sleep(3)
        print('守护进程 end')
        pass
    
    if __name__ == '__main__':
        p = Process(terget=task)
        p.daemon = True #开启一个守护进程
        p.start()
        time.sleep(3)
        print('主')
    

    进程锁

    进程锁是把锁住的代码变成了串行

    #用进程锁的方法实现抢票
    
    from multiprocessing import Process,Lock
    import json,time,os
    
    def search():
        time.sleep(2)
        #基于硬盘的,打开一个文件
        with open('db.txt',mode='rt',encoding='utf-8') as f:
            res = json.load(f)
            print(f'还剩{res["count"]}')
            
    def get():
        with open('db.txt',mode='rt',encoding='utf-8') as f:
            res = json.load(f)
            # print(f'还剩{res["count"]}')
        time.sleep(1) # 模拟网络io
        if res['count'] > 0:
            res['count'] -= 1
            with open('db.txt',mode='wt',encoding='utf-8') as f:
                json.dump(res,f)
                print(f'进程{os.getpid()} 抢票成功')
            time.sleep(1.5) # 模拟网络io
        else:
            print('票已经售空啦!!!!!!!!!!!')
            
       
    def task(lock):
        search()
        
        #锁住
        lock.acquire()
        get()
        lock.release()
        #执行完后释放
        
    if __name__ == '__main__':
        lock = Lock() # 写在主进程是为了让子进程拿到同一把锁.
        for i in range(15):
            p = Process(target=task,args=(lock,))
            p.start()
            
    #进程锁: 是把锁住的代码变成了串行
    #join 是把所有的子进程变成了串行
    
    #为了数据安全的考虑, 串行会牺牲掉效率
    

    队列

    进程间通信使用IPC

    创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

    from mulitprocessing import Process,Queue
    
    #案列一
    q = Queue()
    q.put(1) #put放入一个值
    q.put(2)
    print(q.get())#get拿一个值
    print(q.get())
    print(q.get())#队列里面的值已经拿完了,默认会等在这里拿值
    
    
    #案例二
    q = Queue(2) #限制它只能放2个值
    q.put(1)
    q.put(2)#这里已经放满了
    q.put(3)#队列满时在放值, 会阻塞
    
    
    #案例三(以下均做了解)
    q = Queue(2) #限制三个
    q.put('kang', block=True, timeout=2)
    q.put('kang', block=True, timeout=2)#这里放满了
    
    q.put('kang', block=True, timeout=2)#put里的 block=True 如果放满了会等待, timeout是最多等待ns,如果等待ns后队列还是满的就会报错
    
    #案例四
    q = Queue()
    q.put(1)
    q.get()
    q.get(block=True, timeout=3)#阻塞取值, timeout最多等待3秒, 超过时间会报错
    
    #案例五
    q = Queue(2)
    q.put(1)
    q.put(2)
    
    q.put('kang',block=False) #对于put来说block=False 如果队列满了就直接报错
    
    q = Queue()
    q.put(1)
    q.get()
    
    q.get(block=False)# block=False 拿不到值不阻塞, 会直接报错
    
    案例六
    q = Queue(1)
    q.put(1)
    q.get
    q.put_nowait('333') #等同于block = False 会报错
    q.get_nowait() #等同于block=False 会报错
    
    
    
    

    生产者和消费者模型

    #生产者: 生产数据的任务
    #消费者: 处理数据的任务
    
    #生产者--对列(盆)-->消费者
    #生产者可以不停的生产,达到了自己的最大生产效率,消费者可以不停的消费,达到自己最大的消费效率,
    #生产者和消费者模型大大提高了生产者和消费者的效率.
    #补充: 使用Queue不适合传大文件, 通常传一些消息.
    
    #案例一
    from multiprocessing inport Process, Queue
    import time, random
    
    def producer(q,name,food): #
        #生产者
        for i in range(3):
            print(f'{name}生产了{food}{i}')
            time.sleep(random.randint(1,3))
            res = f'{food}{i}'
            q.put(res)
            
    def consumer(q, name):
        #消费者
        while True:
            res = q.get(timeout=5)
            if res is None:
                break
            time.sleep(random.randint(1,3))
            print(f'{name}吃了{res}')
            
    if __name__ == '__main__':
        q = Queue()
        p1 = Procerss(target=producer, args=(q,'kang','包子'))
        p2 = Procerss(target=producer, args=(q,'lin','饺子'))
        p3 = Procerss(target=producer, args=(q,'kang','排骨'))
        c1 = Process(target=consumer,args=(q,'大哥'))
        c2 = Process(target=consumer,args=(q,'二哥'))
        
        p1.start()
        p2.start()
        p3.start()
     
        p1.join()
        p2.join()
        p3.join() # 生产者生产完毕
        c1.start()
        c2.start()
        q.put(None)# 几个消费者put几次
        q.put(None)
    
    #案例二
    from multiprocessing inport Process, Queue,JoinableQueue
    import time, random
    
    def producer(q,name,food): #
        #生产者
        for i in range(3):
            print(f'{name}生产了{food}{i}')
            time.sleep(random.randint(1,3))
            res = f'{food}{i}'
            q.put(res)
            
    def consumer(q, name):
        #消费者
        while True:
            res = q.get(timeout=5)
            time.sleep(random.randint(1,3))
            print(f'{name}吃了{res}')
            q.task_done()
            
    if __name__ == '__main__':
        q = JoinableQueue()
        p1 = Procerss(target=producer, args=(q.'kang','包子'))
        p2 = Procerss(target=producer, args=(q.'lin','饺子'))
        p3 = Procerss(target=producer, args=(q.'kang','排骨'))
        c1 = Process(target=consumer,args=(q,'大哥'))
        c2 = Process(target=consumer,args=(q,'二哥'))
        p1.start()
        p2.start()
        p3.start()
        c1.daemon = True #开启守护进程
        c2.daemon = True
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        p3.join() # 生产者生产完毕
        
        q.join()# 分析
        # 生产者生产完毕--这是主进程最后一行代码结束--q.join()消费者已经取干净了,没有存在的意义了.
        #这是主进程最后一行代码结束,消费者已经取干净了,没有存在的意义了.守护进程的概念.
        
    #JoinableQueue解析
    
       
    from multiprocessing import Process,Queue,JoinableQueue
    
    
    q = JoinableQueue()
    
    q.put('zhao') # 放队列里一个任务
    q.put('qian')
    
    print(q.get())
    q.task_done() # 完成了一次任务
    print(q.get())
    q.task_done() # 完成了一次任务
    q.join() #计数器不为0的时候 阻塞等待计数器为0后通过
    
    # 想象成一个计数器 :put +1   task_done -1
        
        
        
    
    
         
            
    
    
  • 相关阅读:
    redis持久化方案之RDB
    redis实现分布式锁
    redis数据类型(图解)
    js上拉加载
    apicloud直接上传图片
    layer.confirm
    json数组去重
    js解决手机键盘影响定位的问题
    click禁用事件
    ipcloud上传裁切图片,保存为base64再压缩传给后台
  • 原文地址:https://www.cnblogs.com/kangwy/p/11530292.html
Copyright © 2011-2022 走看看