zoukankan      html  css  js  c++  java
  • 生产者消费模型

    生产者消费模型

    模型就是解决某个问题固定方法和套路

    e.g.

    生产者和消费者

    存在的问题 :效率不同,往往双方之间的处理速度不一样,导致双方需要等待对方

    tips:

    1将双方解开耦合,让不同进程负责不同的任务

    2提供一个共享的容器,来平衡双方的能力,之所以用进程队列是因为可以在进程之间共享

    case:

    from multiprocessing import Process,Queue
    import requests
    import re,os,time,random
    
    #生产者的任务
    def task(url,q):
        i=0
        for url in urls:
            response =requests.get(url)
            text=response.text
            #将生产完成的数放入队列中
            time.sleep(random.random())
            q.put(text)
            i+=1
            print(os.getpid(),"生产了第%s个数据"%i)
            
            
    #消费者的任务
    def customer(q):
        i=0
        while True:
            text=q.get()
            time.sleep(random.random())
            res =re.findall('src=//(.*?)width',text)
            
            i+=1
            print('第%s任务获取到%s个img'%(i,len(res))
    if __name__=='__main__'
    	url=[
            "http://www.baidu.com",
            "http://www.baidu.com",
            "http://www.baidu.com",
            "http://www.baidu.com",
        ]
          #创建一个双方能共享的容器
          q=Queue()
        #生产者进程
        p1=Process(target=product,args=(url,q))
        p1.start()
        
        #消费这进程
        c=Process(target=customer,args=(q,))
        c.start()
        
    

    遇到的问题:

    1消费不知道何时结束

    加了一个模块joinableQueue -------->和 继承Queue 用法一致

    增加了join和taskDone(任务完成)

    join这个是阻塞函数,会阻塞直到taskdone的调用次数等于存入的元素个数,可以用于表示队列任务处理完成

    完善上述代码案列case

    from multiprocessing import Process,joinableQueue
    import requests
    import re,os,time,random
    """热狗为例子"""
    #生产者任务
    def product(q,name):
        for i in range(5):
            dog ="%s的热狗%s"%(name,(i+1))
            time.sleep(random.random())
            print('生产了',dog)
            q.put(dog)
            
    #吃热狗
    def customer(q):
        while True:
            dog=q.get()
            time.sleep(random.random())
            print('消费了%s'%dog)
            q.task_done()    #标记这个任务处理完成
    if __name__=="__main__":
        #创建一个双方共享的容器
        q=JoinableQueue()
        
        #生产者的进程
        p1=Process(target=product,args=(q,"上海分店"))
        p2=Process(target=product,args=(q,"北京分店"))
        
        p1.start()
        p2.start()
        
        
        #消费者进程
        c=Process(target=customer,args=(q,))
        
        #c.deamon=True  可以将消费者设置为守护进程,将主进程确认 任务全部完成时 可以随着主进程一起结束
        c.start()
        
        p1.join()
        p2.join()  #代码走到这儿代表生产方已经完成
        
        q.join()   #意味着队列中的任务都处理完成了
        
        
        #结束所有任务
        c.terminate()   #直接终止消费者进程
        
        #解决问题的关键
        1确定生成者的任务完成
        2确定生出来的数据已经全部处理完成
    
  • 相关阅读:
    【Teradata】DSA服务器tdactivemq重启清理消息队列步骤
    什么是5G
    【时序数据库】十分钟系列
    16_Android的数据存储_ SharedPreference、XML和JSON
    15_Android文件读写操作
    14_TTS
    13_拍照、录像和音频
    12_Sensor简单实例
    11_SurfaceView绘图
    10_多点触摸交互处理
  • 原文地址:https://www.cnblogs.com/zhuyuanying123--/p/11134130.html
Copyright © 2011-2022 走看看