zoukankan      html  css  js  c++  java
  • 并发编程笔记(2)——信号量、事件、队列(进程间的通信)

    内容目录

    • 信号量
    • 事件
    • 队列

    内容详细

    信号量(重点)

    • 可以规定有多少进程使用关键代码,其余进程阻塞,直到有子进程释放

    • 示例:模拟KTV使用,同时只有4个人使用

      import random
      import time
      from multiprocessing import Process
      from multiprocessing import Semaphore  #使用信号量模块
      
      def ktv(i,sem):
          sem.acquire()       #获取钥匙,只有4个进程可以执行,后续阻塞
          print('%s走进KTV'%i)
          time.sleep(random.randint(5,10))    #模拟进程使用的时间(随机秒数)
          print('%s走出KTV'%i)
          sem.release()       #进程结束后还钥匙
      
      if __name__ == '__main__':
          sem = Semaphore(4)      #实例化信号量,规定多少进程可以使用
          for i in range(20):
              p = Process(target=ktv,args=(i,sem))
              p.start()
      

    事件(重点)

    • 通过一个信号来控制多个进程同时执行或阻塞

    • 一个事件被创建之后,默认是阻塞状态

      from multiprocessing import Event   #使用事件模块
      
      # 一个信号可以使所有的进程都进入阻塞状态
      # 也可以控制所有的进程解除阻塞
      # 一个事件被创建之后,默认是阻塞状态
      
      e = Event()     #创建一个事件
      print(e.is_set())   #查看一个事件的状态,默认被设置成阻塞
      e.set()         #将这个事件的状态改为True
      print(e.is_set())
      e.wait()        #依据e.is_set()的值来决定是否阻塞的
      print(123456)
      
      e.clear()       #将这个事件的状态改为False
      print(e.is_set())
      e.wait()        #此时程序为阻塞状态,等待信号变为True
      print('*'*10)
      
    • set 和 clear

      • 分别用来修改一个事件的状态,True或者False
    • is_set用来查看一个事件的状态

    • wait 是依据事件的状态来决定自己是否阻塞

      • is_set()状态为False是阻塞,True是不阻塞
    • 事件示例:经典的红绿灯事件

      # 绿灯来了车通过,红灯车为阻塞状态
      import time
      import random
      from multiprocessing import Event,Process
      
      def cars(e,i):
          if not e.is_set():
              print('car%i在等待'%i)
              e.wait()            # 阻塞 直到得到一个事件状态变为True的信号
          print('33[0;32;40mcar%s通过33[0m'%i)
      
      def light(e):
          while True:
              if e.is_set():
                  e.clear()
                  print('33[31m红灯亮了33[0m')
              else:
                  e.set()
                  print('33[32m绿灯亮了33[0m')
              time.sleep(2)
      
      if __name__ == '__main__':
          e = Event()
          traffic = Process(target=light,args=(e,))
          traffic.start()
          for i in range(20):
              car = Process(target=cars,args=(e,i))
              car.start()
              time.sleep(random.randint(1,5))
      

    队列 --- 进程间的通信

    • 进程间通信-IPC(使用Queue模块)

      import time
      from multiprocessing import Queue   #使用队列模块
      q = Queue(5)        #设置队列中只能有5个进程或数据
      q.put(1)        #往队列中放入1
      q.put(2)
      q.put(3)
      q.put(4)
      q.put(5)        #此时队列已经满了,如果再往里放则为阻塞状态
      print(q.full())     # 查询队列是否满了(True为满了,False为不满)
      
      print(q.get())  #从队列中取出
      print(q.get())
      print(q.get())
      print(q.get())
      print(q.get())  #此时队列已经空了,如果继续取出,则为阻塞状态
      print(q.empty())    # 查询队列是否为空(True为空,False为未空)
      
      q.get_nowait()  #强制取值,此时会报queue.Empty错误,表示队列已经空
      
      # while True:
      #     try:
      #         q.get_nowait()
      #     except:
      #         print('队列已空')
      #         time.sleep(0.5)
      #         while循环1秒会循环上千次,损耗内存,此时加上睡眠时间以避免内存过度损耗
      
    • 注意:

      • q.empty()为检查队列是否为空,有不可靠因素。此方法是实时检测队列是否为空,如果此时生产者有往队列中正在添加的进程时,队列此时为空
    • 简单的多进程队列模型

      from multiprocessing import Queue,Process
      def produce(q):          #生产数据
          q.put('hello')
      
      def consume(q):          #取出数据
          print(q.get())
      
      if __name__ == '__main__':
          q = Queue()     #此队列表示没有限制
          p = Process(target=produce,args=(q,))
          p.start()
          c = Process(target=consume,args=(q,))
          c.start()
      
    • 经典的生产者消费者模型

      • 为解决供需不平衡的问题
      #plan 1:不完整,存在BUG
      import time
      import random
      from multiprocessing import Queue,Process
      
      def consumer(q,name):
          while True:
              food = q.get()
              if food is None:
                  #如果获取到None则终止循环,问题bug:此时如果多个消费者取队列中的None,
                  #只能是第一个进程能取到,剩余进程为阻塞状态。需要往队列中添加与消费者数量相匹配的None。
                  print('%s获取到一个空'%name)
                  break
              print( '33[31m%s消费了%s33[0m'%(name,food))
              time.sleep(random.randint(1,3))
      
      def producer(name,food,q):      # 创建生产者
          for i in range(4):
              time.sleep(random.randint(1,3))
              foods = '%s生产了%s个%s'%(name,i+1,food)
              print(foods)
              q.put(foods)
      
      if __name__ == '__main__':
          q = Queue(20)               # 创建队列,所有进程里最大限制20
          p1 = Process(target=producer,args=('alec','包子',q))
          p1.start()
          p2 = Process(target=producer,args=('yazhou','玉米',q))
          p2.start()
          c1 = Process(target=consumer,args=(q,'喳喳辉'))
          c1.start()
          c2 = Process(target=consumer,args=(q,'古天乐'))
          c2.start()
          p1.join()           #让生产者进程回归到主程序进程
          p2.join()
          q.put(None)         #队列中放入两个None,让消费者取到后结束阻塞状态
          q.put(None)
      
    • 使用Joinablequeue模块

    #plan 2:
    import time
    import random
    from multiprocessing import Process,JoinableQueue
    
    def consumer(q,name):
        while True:
            food = q.get()
            print( '33[31m%s消费了%s33[0m'%(name,food))
            time.sleep(random.randint(1,3))
            q.task_done()   # 每执行一次该命令都会被记录下来,直到队列中的所有数据都执行完此命令
    def producer(name,food,q):   # 创建生产者
        for i in range(4):
            time.sleep(random.randint(1,3))
            foods = '%s生产了%s个%s'%(name,i+1,food)
            print(foods)
            q.put(foods)
        q.join()        # 进程延迟了,进入阻塞状态,直到一个队列中的所有数据全部被处理完毕
    
    if __name__ == '__main__':
        q = JoinableQueue(20)               # 创建队列,所有进程里最大限制20
        p1 = Process(target=producer,args=('alec','包子',q))
        p2 = Process(target=producer,args=('yazhou','玉米',q))
        p1.start()
        p2.start()
    
        c1 = Process(target=consumer,args=(q,'喳喳辉'))
        c2 = Process(target=consumer,args=(q,'古天乐'))
        c1.daemon = True        #设置为守护进程,主进程中的代码执行完毕之后,子进程自动结束
        c2.daemon = True
        c1.start()
        c2.start()
    
        p1.join()           #让生产者进程回归到主程序进程
        p2.join()
    

    此模块运行流程:

    从消费者端看:

    • 每次获取一个数据
    • 处理一个数据
    • 发送一个记号:标志一个数据被处理成功

    从生产者端看:

    • 每次生产一个数据,并放入队列中

    • 对队列中每一个数据刻上记号

    • 当生产者全部生产完毕后,发送join信号,进程为阻塞状态:

      • 此时生产者已经停止生产数据了
      • 等待之前被刻上记号的数据都被消费完
      • 当数据都被处理完时,join阻塞结束
    • 1.consumer中把所有的任务都消耗完

    • 2.producer端的join感知到,停止阻塞

    • 3.所有的producer进程结束# 主进程中的p.join结束

    • 4.主进程中代码结束# 守护进程(消费者的进程)结束

    总结:

    • 消费者模型
  • 相关阅读:
    TCP粘包的拆包处理
    字节序列化
    同步,异步,阻塞和非阻塞
    用Doxygen生成文档
    Visual Studio新建的源文件的默认编码
    Boost编程之获取可执行文件的当前路径
    特征点寻找的基础数据结构和函数
    寻找Harris、Shi-Tomasi和亚像素角点
    特征点的基本概念和如何找到它们
    工业相机与普通相机的差别
  • 原文地址:https://www.cnblogs.com/lynlearnde/p/13471789.html
Copyright © 2011-2022 走看看