zoukankan      html  css  js  c++  java
  • 进程2

    一、 管道

    创建管道的类

    Pipe([duplex]):在进程之间创建一条管道,并返回元祖(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生process对象之前产生管道

           参数介绍

    dumplex

           默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送

           主要方法:

    Conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另一端已经关闭,那么recv方法会抛出EOFError。

    Conn1.send(obj):通过连接发送对象。Obj是与序列化兼容的任意对象

    其他方法:

    Conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法

    Conn1.fileno():返回连接使用的整数文件描述符

    Conn1.poll(【timeout】):如果连接上的数据可用,返回True。Timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达

    Conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。Maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发ioError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发eoferror异常

    Conn.send_bytes(buffer[,offset[,size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收

    Conn1.recv_bytes_info(buffer[,offset])接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。Offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发bufferTooShort异常

    from multiprocessing import Pipe,Process
    
    def func(conn1,):
        # conn2.close()
        msg = conn1.recv()
        print('>>>>>',msg)
    
    if __name__ == '__main__':
        conn1,conn2 = Pipe()
    
        p = Process(target=func,args=(conn1,))
        p.start()
        # conn1.close()
        # conn2.close()
        conn2.send('小鬼!')
    
        print('主进程结束')

    应该特别注意管道端点的正确管理问题。如果生产者或消费者都没有使用管道的某个端点,就应将它关闭。这也说明了为何生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记要执行这些步骤,程序可能在消费者中的recv()操作上挂起(就是阻塞)。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道的相同一端就会能生成EOFError异常。因此,在生产者中关闭通道不会有任何效果,除非消费者也关闭了相同的管道端点

    主进程将管道的两端都传给子进程,子进程和主进程共用管道的两种报错情况,都是在recv接收的时候报错的:

    1, 主进程和子进程的管道的相同一端都关闭了,出现EOFError

    2, 如果你管道的一端在主进程和子进程中都关闭了,但是你还用这个关闭的一端去接收消息,那么就会出现OSError

     

    所以你关闭管道的时候,就会容易出现问题,需要将所有只有这个管道的进程中的两端全部关闭才行,当然也可以通过异常捕获(try:except EOFerror)来处理。

           虽然我们在主进程和子进程中都打印了conn1一端的对象,发现两个不再同一个地址,但是子进程中的管道和主进程的管道还是可以通信的,因为管道是同一套,系统能够记录

           我们的目的就是关闭所有的通道,那么主进程和子进程进行通信的时候,可以给子进程传管道的一端就够了,并且用我们之前学到的,信息发送完后,再发送一个结束信号None,那么你收到的消息为None的时候直接结束接收或者说结束循环,就不用每次都关闭各个进程中管道了。

    from multiprocessing import Process,Pipe
    
    def consumer(p,name):
        produce,consume = p
        produce.close()
        while 1:
            try:
                baozi = consume.recv()
                print('%s 收到包子:%s'%(name,baozi))
            except EOFError:
                break
    def producer(seq,p):
        produce,consume = p
        consume.close()
        for i in seq:
            produce.send(i)
    
    if __name__ == '__main__':
        produce,consume = Pipe()
        c1 = Process(target=consumer,args=((produce,consume),'c1'))
        c1.start()
    
        seq = (i for i in range(10))
        producer(seq,(produce,consume))
        produce.close()
        consume.close()
    
        c1.join()
        print('主进程')

    关于管道会造成数据不安全问题的官方解释:

    Pipe方法返回的两个连接对象表示管道的两端。每个连接对象都有send和recv方法(除其他之外)。注意,如果两个进程(或线程)试图同时从管道的同一端读取或写入数据,那么管道中的数据可能会损坏。当然,在使用管道的不同端部的过程中不存在损坏风险

    from multiprocessing import Process,Pipe,Lock
    
    def consumer(p,name,lock):
        produce, consume=p
        produce.close()
        while True:
            lock.acquire()
            baozi=consume.recv()
            lock.release()
            if baozi:
                print('%s 收到包子:%s' %(name,baozi))
            else:
                consume.close()
                break
    
    
    def producer(p,n):
        produce, consume=p
        consume.close()
        for i in range(n):
            produce.send(i)
        produce.send(None)
        produce.send(None)
        produce.close()
    
    if __name__ == '__main__':
        produce,consume=Pipe()
        lock = Lock()
        c1=Process(target=consumer,args=((produce,consume),'c1',lock))
        c2=Process(target=consumer,args=((produce,consume),'c2',lock))
        p1=Process(target=producer,args=((produce,consume),10))
        c1.start()
        c2.start()
        p1.start()
    
        produce.close()
        consume.close()
    
        c1.join()
        c2.join()
        p1.join()
        print('主进程')
    from multiprocessing import Manager,Process,Lock
    def work(d,lock):
    with lock:
    d['count'] -=1
    if __name__ == '__main__':
    lock = Lock()
    with Manager() as m:
    dic = m.dict({'count':100})
    p_1 = []
    for i in range(100):
    p = Process(target=work,args=(dic,lock))
    p_1.append(p)
    p.start()
    for p in p_1:
    p.join()
    print(dic)

    一、 数据共享

    进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题,应该尽量避免共享数据的方式,以后会使用数据库来解决进程之间的数据共享问题

    总结一下,进程之间的通信:队列,管道,数据共享也算

    一、 进程池和mutiprocess.poll

    为什么要有进程池?进程池的概念

           在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万割任务需要被执行的时候,我们就需要去船舰成千上万个进程吗?首先,创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等内容)也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行切换并且记录每个进程的执行节点,也就是记录上下文,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者开启程序。

    定义一个池子,有需求来了,就拿一个池中的进程来处理任务,等处理完毕,进程并不关闭,而是将进程再放回池中继续等待任务。如果有很多任务需要执行,池中的数量不够,任务就要等待之前的进程执行完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程再运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果

    import time
    from multiprocessing import Process,Pool

    def func(i):
        num = 0
        for j in range(5):
            num += i

    if __name__ == '__main__':
        pool = Pool(4)  #
        p_list = []
        start_time = time.time()
        for i in range(500):
            p = Process(target=func,args=(i,))
            p_list.append(p)
            p.start()
        [pp.join() for pp in p_list]
        end_time = time.time()
        print(end_time - start_time)
        s_time = time.time()
        pool.map(func,range(500))  #map
        e_time = time.time()
        print(e_time - s_time)



     

     

  • 相关阅读:
    pronunciation techniques
    contraction prnounciation
    洛谷P2097 资料分发1 题解
    洛谷P1482 Cantor表(升级版) 题解
    洛谷P1615 西游记公司 题解
    洛谷P2676 超级书架 题解
    洛谷P1554 梦中的统计 题解
    洛谷P1720 月落乌啼算钱 题解
    洛谷P1420 最长连号 题解
    洛谷P1634 禽兽的传染病 题解
  • 原文地址:https://www.cnblogs.com/DanielYang11/p/10039316.html
Copyright © 2011-2022 走看看