zoukankan      html  css  js  c++  java
  • 第三十一天- 进程串行(锁) 队列 生成者消费者模型

    1.进程同步/串行(锁)

      进程之间数据不共享,但共享同一套文件系统,所以访问同一个文件,或同一个打印终端,没有问题,但共享带来的是竞争容易错乱,如抢票时。这就需让进程一个个的进去保证数据安全,也就是加锁处理,Lock

      并发,效率高,但是竞争同一个文件时,导致数据混乱

      加锁,由并发改成了串行,牺牲了运行效率,但避免数据竞争

      

    以模拟抢票为例:

     1 # 注意:首先在当前文件目录下创建一个名为db的文件
     2 # 文件db的内容为:{"count":1},只有这一行数据,并且注意,每次运行完了之后,文件中的1变成了0,你需要手动将0改为1,然后在去运行代码。
     3 # 注意一定要用双引号,不然json无法识别
     4 # 加锁保证数据安全,不出现混乱
     5 from multiprocessing import Process,Lock
     6 import time,json,random
     7 
     8 
     9 # 查看剩余票数
    10 def search(i):
    11     dic=json.load(open('db')) # 打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典
    12     print('客户%s查看剩余票数%s' %(i,dic['count']))
    13 
    14 
    15 # 抢票
    16 def get(i):
    17     dic = json.load(open('db'))
    18     time.sleep(0.1)       # 模拟读数据的网络延迟,那么进程之间的切换,所有人拿到的字典都是{"count": 1},也就是每个人都拿到了这一票。
    19     if dic['count'] > 0:
    20         dic['count'] -= 1
    21         time.sleep(random.randint(0,1))   # 模拟写数据的网络延迟
    22         json.dump(dic,open('db','w'))
    23         # 若不加限制最终导致,每个人显示都抢到了票,这就出现了问题
    24         print('客户%s购票成功'%i)
    25     else:
    26         print('sorry,客户%s 没票了亲!'%i)
    27 
    28 
    29 def task(i,lock):
    30     search(i)
    31     # 抢票时是发生数据变化的时候,所以我们将锁加到这里,让进程串行执行
    32     lock.acquire()  # 加锁
    33     get(i)
    34     lock.release()  # 解锁
    35 
    36 
    37 if __name__ == '__main__':
    38     lock = Lock() # 创建一个锁
    39     for i in range(10): # 模拟并发10个客户端抢票
    40         p = Process(target=task,args=(i,lock,)) # 将锁作为参数传给task函数
    41         p.start()
    加锁模拟抢票

    总结:

      加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,速度是慢了,但牺牲了速度却保证了数据安全。

      因此需一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是 mutiprocessing 模块提供的基于消息的IPC通信机制:队列和管道(见后续)。

      

    2.进程守护

      子进程是不会随着主进程结束而结束,子进程全部执行完后,程序才结束,那如果需求主进程结束,子进程必须跟着结束,怎么办?这就需要用到守护进程了!

      运用:如,系统关机,其他一切都要跟着结束

     1 import time
     2 from multiprocessing import Process
     3 
     4 def func1(m):
     5     time.sleep(1)
     6     print('我是func1',m)
     7 
     8 
     9 # 注意:进程之间是互相独立的,主进程代码运行结束,不管有没有运行完,守护进程随即终止
    10 if __name__ == '__main__':
    11     p = Process(target=func1,args=(666,))
    12     p.daemon = True  # 守护进程,在start之前
    13     p.start()
    14 
    15     print('主进程执行结束')

    总结:

      其一:守护进程会在主进程代码执行结束后就终止
      其二:守护进程内无法再开启子进程,否则出异常

    3.队列

      进程之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持 队列和管道,这两种方式都是使用消息传递队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出,取出就没有这个数据了。

      方法介绍:

     1 '''
     2 q = Queue([maxsize]) 
     3 创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 
     4 Queue的实例q具有以下方法:
     5 
     6 q.get( [ block [ ,timeout ] ] ) 
     7 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
     8 
     9 q.get_nowait( ) 
    10 同q.get(False)方法。
    11 
    12 q.put(item [, block [,timeout ] ] ) 
    13 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
    14 
    15 q.qsize() 
    16 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
    17 
    18 
    19 q.empty() 
    20 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
    21 
    22 q.full() 
    23 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)
    24 
    25 '''

      示例代码:

     1 from multiprocessing import Process,Queue
     2 # Queue 先进先出 fifo first in first out,队列里面的数据,只能取一次,取出就没了
     3 
     4 q = Queue(3)  # Queue(参数)可理解成一个可限制长度(参数)的列表
     5 # 添加数据
     6 # print(q.full())
     7 q.put(4)
     8 q.put(3)
     9 q.put(2)
    10 # print(q.full())  # 查看序列是否满了,但不可信的(如多进程时)
    11 
    12 # 取出数据
    13 print('---------')
    14 # print(q.empty())
    15 print(q.get())
    16 print(q.get())
    17 print(q.get())
    18 # print(q.empty())  # 查看序列是否空了,但是不可信的(如多进程时)
    19 print('---------')
    20 print(q.get()) # 超出长度会一直停在这等待,直到有数据给他
    21 
    22 # 用try优化上面代码
    23 # for i in range(4):
    24 #     try:
    25 #         s = q.get_nowait()
    26 #         # s = q.get(False) # 等同nowait
    27 #         print('=====',s)
    28 #
    29 #     except:
    30 #         print('没有数据了,去干别的吧...')
    31 #
    队列参考代码

      基于队列的进程通信:

     1 from multiprocessing import Process,Queue
     2 
     3 
     4 def func(q):
     5     # 拿出数据
     6     res = q.get()
     7     print(res)
     8     print(q.get())
     9 
    10 
    11 if __name__ == '__main__':
    12     q = Queue(5)
    13     q.put('hello')  # 添加数据
    14     q.put('emmm')
    15     p = Process(target=func,args=(q,))
    16     p.start()
    17 
    18     print('主进程结束')
    19 
    20 # 队列的数据是安全的,先进先出,且取一次出来就没有了
    View Code

    4.生产消费模式

      在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题我们需要通过一个容器(缓冲区)来解决生产者和消费者的强耦合问题。

      生产消费模式图解:

    基于队列来实现一个生产者消费者模型:

     1 # 解耦合
     2 import time
     3 from multiprocessing import Process,Queue
     4 
     5 
     6 def producer(q):
     7     for i in range(10):
     8         time.sleep(0.5)
     9         q.put('包子%s号'%i)
    10         print('包子%s号做好了'%i)
    11     q.put(None)  # None表示没有 防止后面死循环
    12 
    13 
    14 def consumer(q):
    15     while 1:
    16         baozi = q.get()
    17         if baozi == None:
    18             break
    19         time.sleep(1)
    20         print('%s被吃掉了'%baozi)
    21 
    22 
    23 if __name__ == '__main__':
    24     q = Queue(10)  # 创建一个队列,耦合生产者和消费者,p1和p2共享q(独立于进程的一个空间)
    25     p1 = Process(target=producer,args=(q,))
    26     p2 = Process(target=consumer,args=(q,))
    27     p1.start()
    28     p2.start()
    View Code

    总结:

     1 # 生产者消费者模型总结
     2 
     3 # 程序中有两类角色
     4 一类负责生产数据(生产者)
     5 一类负责处理数据(消费者)
     6 
     7 # 引入生产者消费者模型为了解决的问题是:
     8 平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
     9 
    10 # 如何实现:
    11 生产者 < -->队列 <—— > 消费者
    12 # 生产者消费者模型实现类程序的解耦和
    13 
    14 生产者消费者模型总结

    5.joinableQueue

      有多个生产者和多个消费者时,由于队列是进程安全的,一个进程拿走了结束信号,另外一个进程就拿不到了,所以使用时需要消费者发送消息给生产者已使用。

    1 #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
    2 
    3    #参数介绍:
    4     maxsize是队列中允许最大项数,省略则无大小限制。    
    5   #方法介绍:
    6     JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    7     q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    8     q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止,也就是队列中的数据全部被get拿走了。
     1 import time
     2 from multiprocessing import Process,JoinableQueue
     3 
     4 def producer(q):
     5     for i in range(10):
     6         time.sleep(0.5)
     7         q.put('包子%s号'%i)
     8         print('包子%s号生产完毕'%i)
     9     print('aaaaaaaaaaaaa')
    10     q.join()  #
    11     print('包子卖完了')
    12 
    13 def consumer(q):
    14     while 1:
    15         baozi = q.get()
    16         time.sleep(0.8)
    17         print('%s被吃掉了'%baozi)
    18         q.task_done()  # 给队列发送一个任务处理完了的信号
    19 
    20 if __name__ == '__main__':
    21 
    22     q = JoinableQueue()
    23     p1 = Process(target=producer,args=(q,))
    24     p2 = Process(target=consumer,args=(q,))
    25     p2.daemon = True
    26     p1.start()
    27     p2.start()
    28     p1.join()  # 主进程等着生产者进程的结束才结束 ,生产者结束意味着q获得了10个task_done的信号,
    简单示例
     1 # 与queque类似,多了 q.task_done()  q.join()
     2 from multiprocessing import Process,JoinableQueue
     3 import time,random,os
     4 
     5 
     6 def consumer(q):
     7     while True:
     8         res=q.get()
     9         # time.sleep(random.randint(1,3))
    10         time.sleep(random.random())
    11         print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
    12         q.task_done() # 向q.join()发送一次信号,证明一个数据已经被取走并执行完了
    13 
    14 
    15 def producer(name,q):
    16     for i in range(10):
    17         # time.sleep(random.randint(1,3))
    18         time.sleep(random.random())
    19         res='%s%s' %(name,i)
    20         q.put(res)
    21         print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    22     print('%s生产结束'%name)
    23     q.join() # 生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。
    24     print('%s生产结束~~~~~~'%name)
    25 
    26 
    27 if __name__ == '__main__':
    28     q=JoinableQueue()
    29     # 生产者们:即厨师们
    30     p1=Process(target=producer,args=('包子',q))
    31     p2=Process(target=producer,args=('骨头',q))
    32     p3=Process(target=producer,args=('泔水',q))
    33 
    34     # 消费者们:即吃货们
    35     c1=Process(target=consumer,args=(q,))
    36     c2=Process(target=consumer,args=(q,))
    37     c1.daemon=True
    38     c2.daemon=True
    39     # 如果不加守护,那么主进程结束不了,但是加了守护之后,必须确保生产者的内容生产完并且被处理完了,所有必须还要在主进程给生产者设置join,才能确保生产者生产的任务被执行完了,并且能够确保守护进程在所有任务执行完成之后才随着主进程的结束而结束。
    40 
    41     # 开始
    42     p_l=[p1,p2,p3,c1,c2]
    43     for p in p_l:
    44         p.start()
    45 
    46     p1.join() # 我要确保你的生产者进程结束了,生产者进程的结束标志着你生产的所有的人任务都已经被处理完了
    47     p2.join()
    48     p3.join()
    49     print('主程序')
    稍复杂示例参考
  • 相关阅读:
    Notes about "Exploring Expect"
    Reuse Sonar Checkstyle Violation Report for Custom Data Analysis
    Eclipse带参数调试的方法
    MIT Scheme Development on Ubuntu
    Manage Historical Snapshots in Sonarqube
    U盘自动弹出脚本
    hg的常用配置
    Java程序员的推荐阅读书籍
    使用shared memory 计算矩阵乘法 (其实并没有加速多少)
    CUDA 笔记
  • 原文地址:https://www.cnblogs.com/xi1419/p/10035791.html
Copyright © 2011-2022 走看看