zoukankan      html  css  js  c++  java
  • Python中的生产者消费者模型

    ---恢复内容开始---

    了解知识点:

    1、守护进程:

    ·什么是守护进程:

    守护进程其实就是一个‘子进程’,守护即伴随,守护进程会伴随主进程的代码运行完毕后而死掉

    ·为何用守护进程:

    当该子进程内的代码在父进程代码运行完毕后就没有存在的意义了,就应该将进程设置为守护进程,会在父进程代码结束后死掉

     1 from multiprocessing import Process
     2 
     3 import time,os
     4 
     5 def task(name):
     6     print('%s is running'%name)
     7     time.sleep(3)
     8 
     9 if __name__ == '__main__':
    10     p1=Process(target=task,args=('守护进程',))
    11     p2=Process(target=task,args=('正常的子进程',))
    12     p1.daemon=True  # 一定要放到p.start()之前
    13     p1.start()
    14     p2.start()
    15     print('')
    16     
    守护进程举例

    以下是守护进程会迷惑人的范例:

     1 #主进程代码运行完毕,守护进程就会结束
     2 from multiprocessing import Process
     3 import time
     4 def foo():
     5     print(123)
     6     time.sleep(1)
     7     print("end123")
     8 
     9 def bar():
    10     print(456)
    11     time.sleep(3)
    12     print("end456")
    13 
    14 if __name__ == '__main__':
    15     p1=Process(target=foo)
    16     p2=Process(target=bar)
    17 
    18     p1.daemon=True
    19     p1.start()
    20     p2.start()
    21     print("main-------")
    22 
    23     '''
    24     main-------
    25     456
    26     enn456
    27     '''
    28 
    29 
    30     '''
    31     main-------
    32     123
    33     456
    34     enn456
    35     '''
    36 
    37     '''
    38     123
    39     main-------
    40     456
    41     end456
    42     '''
    View Code

    2、互斥锁:

    互斥锁:可以将要执行任务的部分代码(只涉及到修改共享数据的代码)变成串行

    join:是要执行任务的所有代码整体串行

    强调:必须是lock.acquire()一次,然后 lock.release()释放一次,才能继续lock.acquire(),不能连续的lock.acquire()。否者程序停在原地。
    互斥锁vs join: 
    大前提:二者的原理都是一样,都是将并发变成串行,从而保证有序(在多个程序共享一个资源时,为保证有序不乱,需将并发变成串行)
    区别一:join是按照人为指定的顺序执行,而互斥锁是所以进程平等地竞争,谁先抢到谁执行
    区别二:互斥锁可以让一部分代码(修改共享数据的代码)串行,而join只能将代码整体串行(详见抢票系统)

    互斥锁
     1 from multiprocessing import Process,Lock
     2 import json
     3 import os
     4 import time
     5 import random
     6 
     7 def check():
     8     time.sleep(1) # 模拟网路延迟
     9     with open('db.txt','rt',encoding='utf-8') as f:
    10         dic=json.load(f)
    11     print('%s 查看到剩余票数 [%s]' %(os.getpid(),dic['count']))
    12 
    13 def get():
    14     with open('db.txt','rt',encoding='utf-8') as f:
    15         dic=json.load(f)
    16     time.sleep(2)
    17     if dic['count'] > 0:
    18         # 有票
    19         dic['count']-=1
    20         time.sleep(random.randint(1,3))
    21         with open('db.txt','wt',encoding='utf-8') as f:
    22             json.dump(dic,f)
    23         print('%s 购票成功' %os.getpid())
    24     else:
    25         print('%s 没有余票' %os.getpid())
    26 
    27 
    28 def task(mutex):
    29     # 查票
    30     check()
    31 
    32     #购票
    33     mutex.acquire() # 互斥锁不能连续的acquire,必须是release以后才能重新acquire
    34     get()
    35     mutex.release()
    36 
    37 
    38 
    39     # with mutex:
    40     #     get()
    41 
    42 if __name__ == '__main__':
    43     mutex=Lock()
    44     for i in  range(10):
    45         p=Process(target=task,args=(mutex,))
    46         p.start()
    47         # p.join()
    模拟抢票

    3、IPC通信机制

    进程之间通信必须找到一种介质,该介质必须满足
    1、是所有进程共享的
    2、必须是内存空间
    附加:帮我们自动处理好锁的问题
     
    a、   from multiprocessing import Manager(共享内存,但要自己解决锁的问题)
    b、   IPC中的队列(Queue) 共享,内存,自动处理锁的问题(最常用)
    c、   IPC中的管道(Pipe),共享,内存,需自己解决锁的问题

    a、用Manager(了解知识点)

     1 from multiprocessing import Process,Manager,Lock
     2 import time
     3 
     4 mutex=Lock()
     5 
     6 def task(dic,lock):
     7     lock.acquire()
     8     temp=dic['num']
     9     time.sleep(0.1)
    10     dic['num']=temp-1
    11     lock.release()
    12 
    13 if __name__ == '__main__':
    14     m=Manager()
    15     dic=m.dict({'num':10})
    16 
    17     l=[]
    18     for i in range(10):
    19         p=Process(target=task,args=(dic,mutex))
    20         l.append(p)
    21         p.start()
    22     for p in l:
    23         p.join()
    24     print(dic)
    View Code

    b、用队列Queue

    1)共享的空间

    2)是内存空间

    3)自动帮我们处理好锁定问题

     1 from multiprocessing import Queue
     2 q=Queue(3)  #设置队列中maxsize个数为三
     3 q.put('first')
     4 q.put({'second':None})
     5 q.put('')
     6 # q.put(4)   #阻塞。不报错,程序卡在原地等待队列中清出一个值。默认blok=True
     7 print(q.get())
     8 print(q.get())
     9 print(q.get())
    10 
    11 强调:
    12 1、队列用来存成进程之间沟通的消息,数据量不应该过大
    13 2、maxsize的值超过的内存限制就变得毫无意义
    14                                                               
     1 了解:
     2 q=Queue(3)
     3 q.put('first',block=False)
     4 q.put('second',block=False)
     5 q.put('third',block=False)
     6 q.put('fourth',block=False)  #报错 queue.Full
     7 
     8 q.put('first',block=True)
     9 q.put('second',block=True)
    10 q.put('third',block=True)
    11 q.put('fourth',block=True,timeout=3)  #等待3秒后若还进不去报错。注意timeout不能和block=False连用
    12 
    13 q.get(block=False)
    14 q.get(block=False)
    15 q.get(block=False)
    16 q.get(block=False)           #报错 queue.Empty
    17 
    18 q.get(block=True)
    19 q.get(block=True)
    20 q.get(block=True)
    21 q.get(block=True,timeout=2)    #等待2秒后还取不出东西则报错。注意timeout不能和block=False连用
    了解

    4、生产者与消费者模型

    该模型中包含两类重要的角色:
    1、生产者:将负责造数据的任务比喻为生产者
    2、消费者:接收生产者造出的数据来做进一步的处理,该类人物被比喻成消费者
     
    实现生产者消费者模型三要素
    1、生产者
    2、消费者
    3、队列
    什么时候用该模型:
    程序中出现明显的两类任何,一类任务是负责生产,另外一类任务是负责处理生产的数据的
     
    该模型的好处:
    1、实现了生产者与消费者解耦和
    2、平衡了生产者的生产力与消费者的处理数据的能力

    注意:生产者消费者模型是解决问题的思路不是技术。可以用进程和队列来实现,也可以用其他的来实现。

     1 from multiprocessing import JoinableQueue,Process
     2 import time
     3 import os
     4 import random
     5 
     6 def producer(name,food,q):
     7     for i in range(3):
     8         res='%s%s' %(food,i)
     9         time.sleep(random.randint(1,3))
    10         # 往队列里丢
    11         q.put(res)
    12         print('33[45m%s 生产了 %s33[0m' %(name,res))
    13     # q.put(None)
    14 
    15 def consumer(name,q):
    16     while True:
    17         #从队列里取走
    18         res=q.get()
    19         if res is None:break
    20         time.sleep(random.randint(1,3))
    21         print('33[46m%s 吃了 %s33[0m' %(name,res))
    22         q.task_done()
    23 
    24 if __name__ == '__main__':
    25     q=JoinableQueue()
    26     # 生产者们
    27     p1=Process(target=producer,args=('egon','包子',q,))
    28     p2=Process(target=producer,args=('杨军','泔水',q,))
    29     p3=Process(target=producer,args=('猴老师','',q,))
    30     # 消费者们
    31     c1=Process(target=consumer,args=('Alex',q,))
    32     c2=Process(target=consumer,args=('wupeiqidsb',q,))
    33     c1.daemon=True
    34     c2.daemon=True
    35 
    36     p1.start()
    37     p2.start()
    38     p3.start()
    39     c1.start()
    40     c2.start()
    41 
    42     p1.join()
    43     p2.join()
    44     p3.join()
    45     q.join() #等待队列被取干净
    46     # q.join() 结束意味着
    47     # 主进程的代码运行完毕--->(生产者运行完毕)+队列中的数据也被取干净了->消费者没有存在的意义
    48 
    49     # print('主')
  • 相关阅读:
    第二阶段冲刺第七天
    学习进度表_十五周
    第二阶段冲刺第六天
    第二阶段冲刺第五天
    第二阶段冲刺第四天
    第二阶段冲刺第三天
    Beta阶段项目总结
    Alpha阶段项目总结
    第二次冲刺站立会议10
    第二次冲刺站立会议09
  • 原文地址:https://www.cnblogs.com/huyingsakai/p/9300835.html
Copyright © 2011-2022 走看看