zoukankan      html  css  js  c++  java
  • day 035 管道 和数据共享

    管道

    #创建管道的类:
    Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
    #参数介绍:
    dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
    #主要方法:
        conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
        conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
     #其他方法:
    conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
    conn1.fileno():返回连接使用的整数文件描述符
    conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
     
    conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
    conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收   
     
    conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
    管道报错

    主进程将管道的两端都传送给子进程,子进程和主进程共用管道的两种报错情况,都是在recv接收的时候报错的:

        1.主进程和子进程中的管道的相同一端都关闭了,出现EOFError;

        2.如果你管道的一端在主进程和子进程中都关闭了,但是你还用这个关闭的一端去接收消息,那么就会出现OSError;

        所以你关闭管道的时候,就容易出现问题,需要将所有只用这个管道的进程中的两端全部关闭才行。当然也可以通过异常捕获(try:except EOFerror)来处理。

        虽然我们在主进程和子进程中都打印了一下conn1一端的对象,发现两个不再同一个地址,但是子进程中的管道和主进程中的管道还是可以通信的,因为管道是同一套,系统能够记录。    

        我们的目的就是关闭所有的管道,那么主进程和子进程进行通信的时候,可以给子进程传管道的一端就够了,并且用我们之前学到的,信息发送完之后,再发送一个结束信号None,那么你收到的消息为None的时候直接结束接收或者说结束循环,就不用每次都关闭各个进程中的管道了。

    数据共享:

    Manager模块介绍

    多进程共同去处理共享数据的时候,就和我们多进程同时去操作一个文件中的数据是一样的,不加锁就会出现错误的结果,进程不安全的,所以也需要加锁

    from multiprocessing import Manager,Process,Lock
    def work(d,lock):
        with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
            d['count']-=1
    
    if __name__ == '__main__':
        lock=Lock()    
        with Manager() as m:
            dic=m.dict({'count':100})   #字典形式
            p_l=[]
            for i in range(100):
                p=Process(target=work,args=(dic,lock))
                p_l.append(p)
                p.start()
            for p in p_l:
                p.join()
            print(dic)

    进程池和multiprocess Pool 模块

    创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务(高级一些的进程池可以根据你的并发量,搞成动态增加或减少进程池中的进程数量的操作),不会开启其他进程,提高操作系统效率,减少空间的占用等。

    定义池子将创建定量的进程:去完成任务,3个出去再回到池子中但进程不关闭,在出去执行任务,直到将任务执行完成结束

    进程池的主要方法:

    p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''
    p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。  得到的是返回结果也可以不返回结果
    '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
       
    p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
     
    进程池的实例:
    进程池的效比进程的效率高很多
    map方法是异步执行并且自带close和join方法
    from multiprocessing import Process,Pool
    import time
    def func(n):
        for i in range(5):
            n=n=i
        print(n)
    if __name__ == '__main__':
        pool_start_time=time.time()
        pool= Pool(4)
        pool.map(func,range(100))
        pool_end_time=time.time()
        pool_dif_time=pool_end_time-pool_start_time
        p_s_time=time.time()
        p_list=[]
        for i in range(100):
            p2=Process(target=func,args=(i,))
            p2.start()
            p_list.append(p2)
        [p.join() for p in p_list]
        p_e_time=time.time()
        p_dif_time=p_e_time-p_s_time
        print("进程池时间",pool_dif_time)
        print("多进程时间",p_dif_time)

    进程池的同步异步:

    同步:

    from multiprocessing import Process,Pool
    import time
    def func(i):
        time.sleep(1)
        return i**2
    if __name__ == '__main__':
        p=Pool(4)
        for i in range(10):
            res=p.apply(func,args=(i,))   #同步的方法多个进程完成任务  可以得到返回值也可以不提取返回值
            print(res)

    异步:

    from multiprocessing import Process,Pool
    import time
    import os
    def func(i):
        time.sleep(2)
        # print(os.getpid())
        return i**2
    if __name__ == '__main__':
        p=Pool(4)
        res_list=[]
        for i in range(10):
            res=p.apply_async(func,args=(i,))   #异步的方法  得到返回值
            res_list.append(res)
        for i in res_list:
            print(i.get())

    回调函数:

    进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,这是进程池特有的,普通进程没有这个机制,但是我们也可以通过进程通信来拿到返回值,进程池的这个回调也是进程通信的机制完成的。

    import os
    from multiprocessing import Pool
    
    def func1(n):
        print('func1>>',os.getpid())
        print('func1')
        return n*n
    
    def func2(nn):
        print('func2>>',os.getpid())
        print('func2')
        print(nn)
        # import time
        # time.sleep(0.5)
    if __name__ == '__main__':
        print('主进程:',os.getpid())
        p = Pool(5)
        #args里面的10给了func1,func1的返回值作为回调函数的参数给了callback对应的函数,不能直接给回调函数直接传参数,他只能是你任务函数func1的函数的返回值
        # for i in range(10,20): #如果是多个进程来执行任务,那么当所有子进程将结果给了回调函数之后,回调函数又是在主进程上执行的,那么就会出现打印结果是同步的效果。我们上面func2里面注销的时间模块打开看看
        #     p.apply_async(func1,args=(i,),callback=func2)
        p.apply_async(func1,args=(10,),callback=func2)
    
        p.close()
        p.join()
    
    #结果
    # 主进程: 11852  #发现回调函数是在主进程中完成的,其实如果是在子进程中完成的,那我们直接将代码写在子进程的任务函数func1里面就行了,对不对,这也是为什么称为回调函数的原因。
    # func1>> 17332
    # func1
    # func2>> 11852
    # func2
    # 100
     
  • 相关阅读:
    Android.mk添加第三方jar包
    关于回调函数
    Ubuntu下GIT服务器的安装与配置
    三星I9100在Ubuntu下用Adb调试
    Android检测网络是否可用和主机是否可达
    keepalived配置文件解析系列之(一)keepalived简介及配置文件介绍
    keepalived配置文件解析系列之(三)配置文件解析过程
    C语言中的位域(bitfield)概念
    popen和变长参数库函数(va_xxx)用法举例及命令执行失败情况探讨
    《深入理解Linux内核》条目式笔记 _3
  • 原文地址:https://www.cnblogs.com/litieshuai/p/9857826.html
Copyright © 2011-2022 走看看