zoukankan      html  css  js  c++  java
  • 网编的信号量,管道,事件,进程池

    管道(from multiprocessing import Process,Pipe):
    管道的介绍:
     1 #创建管道的类:
     2 Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
     3 #参数介绍:
     4 dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
     5 #主要方法:
     6     conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
     7     conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
     8  #其他方法:
     9 conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
    10 conn1.fileno():返回连接使用的整数文件描述符
    11 conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
    12  
    13 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
    14 conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
    15  
    16 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

    方法总结:

    Conn1,conn2 = Pipe() #创建一个管道对象,全双工,返回管道的两端,但是一端发送的消息,只能另外一端接收,自己这一端是不能接收的
    Conn1.recv() #接收
    Conn1.send() #发送
    数据接收一次就没有了
    图例:

    关于管道会造成数据不安全问题的官方解释:
        The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
        
    由Pipe方法返回的两个连接对象表示管道的两端。每个连接对象都有send和recv方法(除其他之外)。注意,如果两个进程(或线程)试图同时从管道的同一端读取或写入数据,那么管道中的数据可能会损坏。当然,在使用管道的不同端部的过程中不存在损坏风险。

     

    多个消费者竞争会出现数据不安全的问题的解决方案:加锁
     1 from multiprocessing import Process,Pipe,Lock
     2 
     3 def consumer(p,name,lock):
     4     produce, consume=p
     5     produce.close()
     6     while True:
     7         lock.acquire()
     8         baozi=consume.recv()
     9         lock.release()
    10         if baozi:
    11             print('%s 收到包子:%s' %(name,baozi))
    12         else:
    13             consume.close()
    14             break
    15 
    16 
    17 def producer(p,n):
    18     produce, consume=p
    19     consume.close()
    20     for i in range(n):
    21         produce.send(i)
    22     produce.send(None)
    23     produce.send(None)
    24     produce.close()
    25 
    26 if __name__ == '__main__':
    27     produce,consume=Pipe()
    28     lock = Lock()
    29     c1=Process(target=consumer,args=((produce,consume),'c1',lock))
    30     c2=Process(target=consumer,args=((produce,consume),'c2',lock))
    31     p1=Process(target=producer,args=((produce,consume),10))
    32     c1.start()
    33     c2.start()
    34     p1.start()
    35 
    36     produce.close()
    37     consume.close()
    38 
    39     c1.join()
    40     c2.join()
    41     p1.join()
    42     print('主进程')
    事件(from multiprocessing import Process,Event):
      E = Event()  #初识状态是false
      E.wait() 当事件对象e的状态为false的时候,在wait的地方会阻塞程序,当对象状态为true的时候,直接在这个wait地方继续往下执行
      E.set() 将事件对象的状态改为true,
      E.is_set() 查看状态
      E.clear() 将事件对象的状态改为false
    时间方法的使用:
     1 from multiprocessing import Process,Semaphore,Event
     2 import time,random
     3 
     4 e = Event() #创建一个事件对象
     5 print(e.is_set())  #is_set()查看一个事件的状态,默认为False,可通过set方法改为True
     6 print('look here!')
     7 # e.set()          #将is_set()的状态改为True。
     8 # print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr
     9 # e.clear()        #将is_set()的状态改为False
    10 # print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr
    11 e.wait()           #根据is_set()的状态结果来决定是否在这阻塞住,is_set()=False那么就阻塞,is_set()=True就不阻塞
    12 print('give me!!')
    13 
    14 #set和clear  修改事件的状态 set-->True   clear-->False
    15 #is_set     用来查看一个事件的状态
    16 #wait       依据事件的状态来决定是否阻塞 False-->阻塞  True-->不阻塞

    通过事件来模拟红绿灯:

     1 from multiprocessing import Process, Event
     2 import time, random
     3 
     4 def car(e, n):
     5     while True:
     6         if not e.is_set():  # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
     7             print('33[31m红灯亮33[0m,car%s等着' % n)
     8             e.wait()    # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色
     9             print('33[32m车%s 看见绿灯亮了33[0m' % n)
    10             time.sleep(random.randint(2,4))
    11             if not e.is_set():   #如果is_set()的值是Flase,也就是红灯,仍然回到while语句开始
    12                 continue
    13             print('车开远了,car', n)
    14             break
    15 
    16 # def police_car(e, n):
    17 #     while True:
    18 #         if not e.is_set():# 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
    19 #             print('33[31m红灯亮33[0m,car%s等着' % n)
    20 #             e.wait(0.1) # 阻塞,等待设置等待时间,等待0.1s之后没有等到绿灯就闯红灯走了
    21 #             if not e.is_set():
    22 #                 print('33[33m红灯,警车先走33[0m,car %s' % n)
    23 #             else:
    24 #                 print('33[33;46m绿灯,警车走33[0m,car %s' % n)
    25 #         break
    26 
    27 def traffic_lights(e, inverval):
    28     while True:
    29         time.sleep(inverval)
    30         if e.is_set():
    31             print('######', e.is_set())
    32             e.clear()  # ---->将is_set()的值设置为False
    33         else:
    34             e.set()    # ---->将is_set()的值设置为True
    35             print('***********',e.is_set())
    36 
    37 
    38 if __name__ == '__main__':
    39     e = Event()
    40     for i in range(10):
    41         p=Process(target=car,args=(e,i,))  # 创建10个进程控制10辆车
    42         time.sleep(random.random(1, 3))    #车不是一下子全过来
    43         p.start()
    44 
    45     # for i in range(5):
    46     #     p = Process(target=police_car, args=(e, i,))  # 创建5个进程控制5辆警车
    47     #     p.start()
    48 
    49     #信号灯必须是单独的进程,因为它不管你车开到哪了,我就按照我红绿灯的规律来闪烁变换,对吧
    50     t = Process(target=traffic_lights, args=(e, 5))  # 创建一个进程控制红绿灯
    51     t.start()
    52 
    53     print('预备~~~~开始!!!')
    信号量(from multiprocessing import Process,Semaphore):
    互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
    假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
    实现:
    信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
    信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
     1 from multiprocessing import Process,Semaphore
     2 import time,random
     3 
     4 def go_ktv(sem,user):
     5     sem.acquire()
     6     print('%s 占到一间ktv小屋' %user)
     7     time.sleep(random.randint(0,3)) #模拟每个人在ktv中待的时间不同
     8     sem.release()
     9 
    10 if __name__ == '__main__':
    11     sem=Semaphore(4)
    12     p_l=[]
    13     for i in range(13):
    14         p=Process(target=go_ktv,args=(sem,'user%s' %i,))
    15         p.start()
    16         p_l.append(p)
    17 
    18     for i in p_l:
    19         i.join()
    20     print('============》')
    信号量中的常用方法:
      S = semphore(4),内部维护了一个计数器,acquire-1,release+1,为0的时候,其他的进程都要在acquire之前等待
      S.acquire()
      需要锁住的代码
      S.release()
    进程池(from multiprocessing import Process,Pool):

    为什么要有进程池?进程池的概念。

      在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行切换并且记录每个进程的执行节点,也就是记录上下文(各种变量等等乱七八糟的东西,虽然你看不到,但是操作系统都要做),这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。就看我们上面的一些代码例子,你会发现有些程序是不是执行的时候比较慢才出结果,就是这个原因,那么我们要怎么做呢?

      在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果

    进程池的出现:进程的创建和销毁是很有消耗的,影响代码执行效率

    进程池的常用方法:
      Map:异步提交任务,并且传参需要可迭代类型的数据,自带close和join功能
      Res = Apply(f1,args=(i,)) #同步执行任务,必须等任务执行结束才能给进程池提交下一个任务,可以直接拿到返回结果res
      Res_obj = Apply_async(f1,args=(i,)) #异步提交任务,可以直接拿到结果对象,从结果对象里面拿结果,要用get方法,get方法会阻塞程序,没有拿到结果会一直等待   
      Close : 锁住进程池,防止有其他的新的任务在提交给进程池
      Join : 等待着进程池将自己里面的任务都执行完
    代码展示:
     1 import time
     2 from multiprocessing import Process,Pool
     3 
     4 # def f1(n):
     5 #     time.sleep(1)
     6 #     print(n)
     7 
     8 #对比多进程和进程池的效率
     9 def f1(n):
    10     for i in range(5):
    11         n = n + i
    12 if __name__ == '__main__':
    13     #统计进程池执行100个任务的时间
    14     s_time = time.time()
    15     pool = Pool(4)  #里面这个参数是指定进程池中有多少个进程用的,4表示4个进程,如果不传参数,默认开启的进程数一般是cpu的个数
    16     # pool.map(f1,[1,2])  #参数数据必须是可迭代的
    17     pool.map(f1,range(100))  #参数数据必须是可迭代的,异步提交任务,自带join功能
    18     e_time = time.time()
    19     dif_time = e_time - s_time
    20 
    21     #统计100个进程,来执行100个任务的执行时间
    22     p_s_t = time.time() #多进程起始时间
    23     p_list = []
    24     for i in range(100):
    25         p = Process(target=f1,args=(i,))
    26         p.start()
    27         p_list.append(p)
    28         # p.join()
    29     [pp.join() for pp in p_list]
    30     p_e_t = time.time()
    31     p_dif_t = p_e_t - p_s_t
    32     print('进程池的时间:',dif_time)
    33     print('多进程的执行时间:',p_dif_t)
    34     # 结果:
    35     # 进程池的时间: 0.40102291107177734
    36     # 多进程的执行时间: 9.247529029846191


  • 相关阅读:
    贝尔级数
    NOIP2018 退役记
    Codeforces1106F 【BSGS】【矩阵快速幂】【exgcd】
    codeforces1111 简单题【DE】简要题解
    BZOJ4836: [Lydsy1704月赛]二元运算【分治FFT】【卡常(没卡过)】
    BZOJ3771: Triple【生成函数】
    Codeforces 1096G. Lucky Tickets【生成函数】
    Codeforces1099F. Cookies【DP】【线段树】【贪心】【博弈】【沙比提(这是啥算法)】
    Codeforces gym101955 A【树形dp】
    BZOJ3551: [ONTAK2010]Peaks加强版【Kruskal重构树】【主席树】
  • 原文地址:https://www.cnblogs.com/Godisgirl/p/10251871.html
Copyright © 2011-2022 走看看