zoukankan      html  css  js  c++  java
  • 多进程的数据共享

    老师的博客:http://www.cnblogs.com/Eva-J/articles/8253549.html#_label14

    Pipe

    pipe是管道但是不是很推荐使用,因为有着不安全的危险,queue就相当于pipe加上lock,比较安全,但是的注意他们的close的时间,详见python3中的day38的笔记

    下面是老师的代码

    from multiprocessing import Lock,Pipe,Process
    def producer(con,pro,name,food):
        con.close()
        for i in range(100):
            f = '%s生产%s%s'%(name,food,i)
            print(f)
            pro.send(f)
        pro.send(None)
        pro.send(None)
        pro.send(None)
        pro.close()
    
    def consumer(con,pro,name,lock):
        pro.close()
        while True:
                lock.acquire()
                food = con.recv()
                lock.release()
                if food is None:
                    con.close()
                    break
                print('%s吃了%s' % (name, food))
    if __name__ == '__main__':
        con,pro = Pipe()
        lock= Lock()
        p = Process(target=producer,args=(con,pro,'egon','泔水'))
        c1 = Process(target=consumer, args=(con, pro, 'alex',lock))
        c2 = Process(target=consumer, args=(con, pro, 'bossjin',lock))
        c3 = Process(target=consumer, args=(con, pro, 'wusir',lock))
        c1.start()
        c2.start()
        c3.start()
        p.start()
        con.close()
        pro.close()
    # pipe 数据不安全性
    # IPC
    # 加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象
    
    # 队列 进程之间数据安全的
    # 管道 + 锁

    manager

    老师的代码

    from multiprocessing import Manager,Process,Lock
    def main(dic,lock):
        dic['count'] -= 1
    
    if __name__ == '__main__':
        m = Manager()
        l = Lock()
        dic=m.dict({'count':100})
        p_lst = []
        for i in range(50):
            p = Process(target=main,args=(dic,l))
            p.start()
            p_lst.append(p)
        for i in p_lst: i.join()
        print('主进程',dic)

    运行几次后,你会发现得到的结果不一样,所以也存在数据不安全的现象,所以一般推荐使用queue比较安全,queue,pipe都是可是实现数据通信的。

    数据池pool

    参数介绍:

    Pool([numprocess [,initializer [, initargs]]]):创建进程池

    1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值

    2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None

    3 initargs:是要传给initializer的参数组

    1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    2 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,
    必须从不同线程调用p.apply()函数或者使用p.apply_async()
    ''' 3 4 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,
    将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
    ''' 6 7 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 8 9 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
    1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
    2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
    3 obj.ready():如果调用完成,返回True
    4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
    5 obj.wait([timeout]):等待结果变为可用。
    6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

    进程池的效率要高许多,详见老师的博客

    import time
    from multiprocessing import Pool,Process
    def func(n):
        n=n
        for i in range(10):
            n+=i
        print(n)
    if __name__ == '__main__':
        start = time.time()
        pool = Pool(5)               # 5个进程
        pool.map(func,range(100))    # 100个任务
        t1 = time.time() - start
    
        start = time.time()
        p_lst = []
        for i in range(100):#100个任务
            p = Process(target=func,args=(i,))
            p_lst.append(p)
            p.start()
        for p in p_lst :p.join()
        t2 = time.time() - start
        print(t1,t2)
    测试代码

    进程池的三种调用方式:

    def fun(name):
        for i in range(10):
    
            print('my name is %s'%name,getpid())
    if __name__=='__main__':
        pool=Pool(3)
        pool.map(fun,(1,2,3,4,5,6,7,8,9,10))
        print('end')
        '''你可以发现进程号最多最多只有3个
        ,你可以看end的输出始终一直在最后面)
        map(函数名,加上一个可迭代的对象)'''
    第一种
    import os
    import time
    from multiprocessing import Pool
    def func(n):
        print('start func%s'%n,os.getpid())
        time.sleep(1)
        print('end func%s' % n,os.getpid())
    
    if __name__ == '__main__':
        p = Pool(5)
        for i in range(10):
            p.apply_async(func,args=(i,))
        p.close()  # 结束进程池接收任务
        p.join()   # 感知进程池中的任务执行结束
    '''注意此种调用方法时真真的异步,如果你不加入jion的的话你连打印的值就不可能显示出来
    这个方法时一个一个的调,只是进程在那几个进程池中而已,而且传参的方式也不一样,前面一个是位置参数,后面一个是
    第二个是默认参数须要=,具体自己看源码'''
    第二种
    import os
    import time
    from multiprocessing import Pool
    def func(n):
        print('start func%s'%n,os.getpid())
        time.sleep(1)
        print('end func%s' % n,os.getpid())
    
    if __name__ == '__main__':
        p = Pool(5)
        for i in range(10):
            p.apply(func,args=(i,))
        p.close()  # 结束进程池接收任务,及不能再想进池中加入显得代码,就是不能调用p了
        p.apply(func,args=(10000,))
    '''注意此种调用方法是同步的
    这个方法时一个一个的调,只是进程在那几个进程池中而已,而且传参的方式也不一样,前面一个是位置参数,后面一个是
    第二个是默认参数须要=,具体自己看源码
    其实apply_async的形式是一样的
    不用加join'''
    第三种
    import time
    from multiprocessing import Pool
    def func(name):
        print('你传递的参数是%s'%name)
        return '你的返回值'
    if __name__=='__main__':
        p=Pool(2)
    
        a=p.apply(func,args=('alex',))
        b=p.apply_async(func, args=('jin',))
        c = p.map(func, ('alex', 1))
        time.sleep(1)
        print(a)
        print(b)
        print(c,type(c))
        # print(c)
    '''输出:你传递的参数是alex
    你传递的参数是jin
    你传递的参数是alex
    你传递的参数是1
    你的返回值
    <multiprocessing.pool.ApplyResult object at 0x0000000002BE8198>
    ['你的返回值', '你的返回值'] <class 'list'>
    说明了apply是有返回值的
    而apply_async这是一种方法
    map由于其特殊性,返回值所有函数返回值组成的list'''
    返回值

    总结一下:

    1.三种放方法的调用方式不同,map的调用时的参数是位置参数,必须传,第一个是函数名字,而且第二个是可迭代的对象,调用的是一群函数,返回值是list。

    2.apply是第一个是位置参数,第二个是默认参数了,调用的是一个函数,而且有返回值

    3.apply_asnyc与applyl一样的调用,也只能调用一个函数

    4.比较:apply与map是同步的。不需要添加join

      而apply_async如果主进程的时间太快则不会打印其内容的

    5.p.close就是不能再往里里面添加新的进程了,pool里面的代码运行完毕后就结束了

    更正一下:在apply_async是能拿到返回值的,但是需要.get()来调用的

    map也是异步的,当时自动的添加了join,阻塞了,所以显示觉得是同步的。

    另外,如果想要apply-async到达阻塞的效果,有两种放法,一种是通过get获取返回值,另外一种是先close在join也能达到前面的效果,但是再也不能往池子里加入代码了

    回调函数

    from multiprocessing import Pool
    from time import sleep
    def func(name):
        return name
    def func2(age):
        print('my name is %s'%age)
        return 'i am this func2’s retrun'
    if __name__=='__main__':
        p=Pool(5)
        a= p.apply_async(func,args=('alex',),callback=func2)
        # b = p.apply(func, args=('alex',), callback=func2)
        p.close()
        p.join()
        print(a)
        # print(b)
    '''输出结果;my name is alex
    <multiprocessing.pool.ApplyResult object at 0x0000000002BE35F8>
    只有apply_aspnc才有回调函数,而且就是一样接受不到返回值,只是方法
    原理是前面的函数的返回值带入callback函数的参数'''
    from multiprocessing import Pool
    def func1(n):
        return n+1
    
    def func2(m):
        print(m)
    
    if __name__ == '__main__':
        p = Pool(5)
        for i in  range(10,20):
            a=p.apply_async(func1,args=(i,),callback=func2)
            a.get()
    #看一下这个是通过,get来达到依次执行的效果的
    View Code

     下面是老师的总结,可以看一下

    # 管道
    # 数据的共享 Manager dict list
    # 进程池
        # cpu个数+1
        # ret = map(func,iterable)
            # 异步 自带close和join
            # 所有结果的[]
        # apply
            # 同步的:只有当func执行完之后,才会继续向下执行其他代码
            # ret = apply(func,args=())
            # 返回值就是func的return
        # apply_async
            # 异步的:当func被注册进入一个进程之后,程序就继续向下执行
            # apply_async(func,args=())
            # 返回值 : apply_async返回的对象obj
            #          为了用户能从中获取func的返回值obj.get()
            # get会阻塞直到对应的func执行完毕拿到结果
            # 使用apply_async给进程池分配任务,
            # 需要先close后join来保持多进程和主进程代码的同步性

  • 相关阅读:
    面经分享 | B站 | 数据分析 | 2021.1--转载
    TensorFlow 2.0 学习笔记--第六章 循环神经网络
    TensorFlow 2.0 学习笔记--第五章 神经网络卷积计算
    TensorFlow 2.0 学习笔记--第一章 神经网络计算过程及介绍
    免费服务器
    Nginx采坑日记(后台响应ResponseEntity时,Nginx将部分数据过滤)
    Vue 注意事项
    服务熔断&服务降级
    阿里微服务解决方案-Alibaba Cloud之负载均衡(Feign)(五)
    阿里微服务解决方案-Alibaba Cloud之服务消费方(Feign)(四)
  • 原文地址:https://www.cnblogs.com/accolade/p/10580888.html
Copyright © 2011-2022 走看看