zoukankan      html  css  js  c++  java
  • 生产者和消费者模型

    day37 生产者和消费者模型

    多道技术

    • 空间复用: 多个程序共用一个内存条, 彼此隔离, 物理级别的隔离
    • 时间复用: 公用一个cup

    切换的情况:

    • io切换时, 占用的时间过长

    串行:一个任务完完整整的运行结束, 在运行下一个任务

    并发: 看起来是同时执行多个任务, 是单核的

    并行: 真正的做到了同时执行多个任务, 是多核的

    开启并发编程的两种方式

    # 方式一
    from mulitprocessing import Process
    
    def task():
        pass
    
    if __name__ == '__main__':
        p = Process(terget=task)
        p.start()
        
    # 方式二
    from mulitprocessing import Process
    
    class MyP(Process):
        def run(self):
            pass
        
    if __name__ == '__main__':
        p = MyP()
        p.start() #给操作系统发送一个请求, 操作系统去开进程
        #会申请一个空间,把父进程的代码完整的copy一份放进去, 然后去运行代码
    
    

    join阻塞

    from mulitprocessing import Process
    import time
    
    def task(s):
        time.sleep(s)
        pass
    
    if __name__ == '__main__':
        p = Process(terget=task,args=(1,))
        p1 = Process(terget=task,args=(2,))
        p.start()
        p1.start()
        p.join() #等一秒
        p1.join() #等一秒, 
        print('主') #主进程要等待所有的子进程结束才会结束,因为主进程要在结束前回收僵尸进程.
        
    

    僵尸进程: 没有死透的子进程

    孤儿进程: 子进程运行的过程中父进程死了就变成了孤儿进程, 被进程init接管.

    父进程一直不死, 会一直开启子进程,意味着占用过多的cpu并且不回收,

    解决方案: 强制的杀死这个父进程.

    守护进程

    守护进程的本质也是一个进程

    主进程的代码执行完毕守护进程直接结束.在主进程结束之前,守护进程一直运行

    from mulitprocessing import Process
    import time
    
    def task():
        print('守护进程 start')
        time.sleep(3)
        print('守护进程 end')
        pass
    
    if __name__ == '__main__':
        p = Process(terget=task)
        p.daemon = True #开启一个守护进程
        p.start()
        time.sleep(3)
        print('主')
    

    进程锁

    进程锁是把锁住的代码变成了串行

    #用进程锁的方法实现抢票
    
    from multiprocessing import Process,Lock
    import json,time,os
    
    def search():
        time.sleep(2)
        #基于硬盘的,打开一个文件
        with open('db.txt',mode='rt',encoding='utf-8') as f:
            res = json.load(f)
            print(f'还剩{res["count"]}')
            
    def get():
        with open('db.txt',mode='rt',encoding='utf-8') as f:
            res = json.load(f)
            # print(f'还剩{res["count"]}')
        time.sleep(1) # 模拟网络io
        if res['count'] > 0:
            res['count'] -= 1
            with open('db.txt',mode='wt',encoding='utf-8') as f:
                json.dump(res,f)
                print(f'进程{os.getpid()} 抢票成功')
            time.sleep(1.5) # 模拟网络io
        else:
            print('票已经售空啦!!!!!!!!!!!')
            
       
    def task(lock):
        search()
        
        #锁住
        lock.acquire()
        get()
        lock.release()
        #执行完后释放
        
    if __name__ == '__main__':
        lock = Lock() # 写在主进程是为了让子进程拿到同一把锁.
        for i in range(15):
            p = Process(target=task,args=(lock,))
            p.start()
            
    #进程锁: 是把锁住的代码变成了串行
    #join 是把所有的子进程变成了串行
    
    #为了数据安全的考虑, 串行会牺牲掉效率
    

    队列

    进程间通信使用IPC

    创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

    from mulitprocessing import Process,Queue
    
    #案列一
    q = Queue()
    q.put(1) #put放入一个值
    q.put(2)
    print(q.get())#get拿一个值
    print(q.get())
    print(q.get())#队列里面的值已经拿完了,默认会等在这里拿值
    
    
    #案例二
    q = Queue(2) #限制它只能放2个值
    q.put(1)
    q.put(2)#这里已经放满了
    q.put(3)#队列满时在放值, 会阻塞
    
    
    #案例三(以下均做了解)
    q = Queue(2) #限制三个
    q.put('kang', block=True, timeout=2)
    q.put('kang', block=True, timeout=2)#这里放满了
    
    q.put('kang', block=True, timeout=2)#put里的 block=True 如果放满了会等待, timeout是最多等待ns,如果等待ns后队列还是满的就会报错
    
    #案例四
    q = Queue()
    q.put(1)
    q.get()
    q.get(block=True, timeout=3)#阻塞取值, timeout最多等待3秒, 超过时间会报错
    
    #案例五
    q = Queue(2)
    q.put(1)
    q.put(2)
    
    q.put('kang',block=False) #对于put来说block=False 如果队列满了就直接报错
    
    q = Queue()
    q.put(1)
    q.get()
    
    q.get(block=False)# block=False 拿不到值不阻塞, 会直接报错
    
    案例六
    q = Queue(1)
    q.put(1)
    q.get
    q.put_nowait('333') #等同于block = False 会报错
    q.get_nowait() #等同于block=False 会报错
    
    
    
    

    生产者和消费者模型

    #生产者: 生产数据的任务
    #消费者: 处理数据的任务
    
    #生产者--对列(盆)-->消费者
    #生产者可以不停的生产,达到了自己的最大生产效率,消费者可以不停的消费,达到自己最大的消费效率,
    #生产者和消费者模型大大提高了生产者和消费者的效率.
    #补充: 使用Queue不适合传大文件, 通常传一些消息.
    
    #案例一
    from multiprocessing inport Process, Queue
    import time, random
    
    def producer(q,name,food): #
        #生产者
        for i in range(3):
            print(f'{name}生产了{food}{i}')
            time.sleep(random.randint(1,3))
            res = f'{food}{i}'
            q.put(res)
            
    def consumer(q, name):
        #消费者
        while True:
            res = q.get(timeout=5)
            if res is None:
                break
            time.sleep(random.randint(1,3))
            print(f'{name}吃了{res}')
            
    if __name__ == '__main__':
        q = Queue()
        p1 = Procerss(target=producer, args=(q,'kang','包子'))
        p2 = Procerss(target=producer, args=(q,'lin','饺子'))
        p3 = Procerss(target=producer, args=(q,'kang','排骨'))
        c1 = Process(target=consumer,args=(q,'大哥'))
        c2 = Process(target=consumer,args=(q,'二哥'))
        
        p1.start()
        p2.start()
        p3.start()
     
        p1.join()
        p2.join()
        p3.join() # 生产者生产完毕
        c1.start()
        c2.start()
        q.put(None)# 几个消费者put几次
        q.put(None)
    
    #案例二
    from multiprocessing inport Process, Queue,JoinableQueue
    import time, random
    
    def producer(q,name,food): #
        #生产者
        for i in range(3):
            print(f'{name}生产了{food}{i}')
            time.sleep(random.randint(1,3))
            res = f'{food}{i}'
            q.put(res)
            
    def consumer(q, name):
        #消费者
        while True:
            res = q.get(timeout=5)
            time.sleep(random.randint(1,3))
            print(f'{name}吃了{res}')
            q.task_done()
            
    if __name__ == '__main__':
        q = JoinableQueue()
        p1 = Procerss(target=producer, args=(q.'kang','包子'))
        p2 = Procerss(target=producer, args=(q.'lin','饺子'))
        p3 = Procerss(target=producer, args=(q.'kang','排骨'))
        c1 = Process(target=consumer,args=(q,'大哥'))
        c2 = Process(target=consumer,args=(q,'二哥'))
        p1.start()
        p2.start()
        p3.start()
        c1.daemon = True #开启守护进程
        c2.daemon = True
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        p3.join() # 生产者生产完毕
        
        q.join()# 分析
        # 生产者生产完毕--这是主进程最后一行代码结束--q.join()消费者已经取干净了,没有存在的意义了.
        #这是主进程最后一行代码结束,消费者已经取干净了,没有存在的意义了.守护进程的概念.
        
    #JoinableQueue解析
    
       
    from multiprocessing import Process,Queue,JoinableQueue
    
    
    q = JoinableQueue()
    
    q.put('zhao') # 放队列里一个任务
    q.put('qian')
    
    print(q.get())
    q.task_done() # 完成了一次任务
    print(q.get())
    q.task_done() # 完成了一次任务
    q.join() #计数器不为0的时候 阻塞等待计数器为0后通过
    
    # 想象成一个计数器 :put +1   task_done -1
        
        
        
    
    
         
            
    
    
  • 相关阅读:
    【2018.05.05 C与C++基础】C++中的自动废料收集:概念与问题引入
    【2018.04.27 C与C++基础】关于switch-case及if-else的效率问题
    【2018.04.19 ROS机器人操作系统】机器人控制:运动规划、路径规划及轨迹规划简介之一
    March 11th, 2018 Week 11th Sunday
    March 10th, 2018 Week 10th Saturday
    March 09th, 2018 Week 10th Friday
    March 08th, 2018 Week 10th Thursday
    March 07th, 2018 Week 10th Wednesday
    ubantu之Git使用
    AMS分析 -- 启动过程
  • 原文地址:https://www.cnblogs.com/kangwy/p/11530292.html
Copyright © 2011-2022 走看看