zoukankan      html  css  js  c++  java
  • Python初学——多进程Multiprocessing

    1.1 什么是 Multiprocessing

    多线程在同一时间只能处理一个任务。

    可把任务平均分配给每个核,而每个核具有自己的运算空间。

    1.2 添加进程 Process

    与线程类似,如下所示,但是该程序直接运行无结果,因为IDLE不支持多进程,在命令行终端运行才有结果显示

    import multiprocessing as mp
    
    def job(a,b):
        print('abc')
    if __name__=='__main__':
        p1=mp.Process(target=job,args=(1,2))
        p1.start()
        p1.join()

    1.3 存储进程输出 Queue

    不知道为什么下面的这个程序可以在IDLE中正常运行。首先定义了一个job函数作系列数学运算,然后将结果放到res中,在main函数运行,取出queue中存储的结果再进行一次加法运算。

    import multiprocessing as mp
    
    def job(q):
        res=0
        for i in range(1000):
            res+=i+i**2+i**3
        q.put(res)
    
        
    if __name__ == '__main__':
        q=mp.Queue()
        p1 = mp.Process(target=job,args=(q,))#注意当参数只有一个时,应加上逗号
        p2 = mp.Process(target=job,args=(q,)) 
        p1.start()
        p2.start()
        
        p1.join()
        p2.join()
        res1=q.get()
        res2=q.get()
        print(res1+res2)

     结果如下所示:

     

    1.4 效率比对 threading & multiprocessing

     在job函数中定义了数学运算,比较正常情况、多线程和多进程分别的运行时间。

    import multiprocessing as mp
    import threading as td
    import time
    
    def job(q):
        res = 0
        for i in range(10000000):
            res += i+i**2+i**3
        q.put(res) # queue
    
    def multicore():
        q = mp.Queue()
        p1 = mp.Process(target=job, args=(q,))
        p2 = mp.Process(target=job, args=(q,))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        res1 = q.get()
        res2 = q.get()
        print('multicore:' , res1+res2)
    
    def normal():
        res = 0
        for _ in range(2):#线程或进程都构造了两个,进行了两次运算,所以这里循环两次
            for i in range(10000000):
                res += i+i**2+i**3
        print('normal:', res)
    
    def multithread():
        q = mp.Queue()
        t1 = td.Thread(target=job, args=(q,))
        t2 = td.Thread(target=job, args=(q,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        res1 = q.get()
        res2 = q.get()
        print('multithread:', res1+res2)
    
    if __name__ == '__main__':
        st = time.time()
        normal()
        st1= time.time()
        print('normal time:', st1 - st)
        multithread()
        st2 = time.time()
        print('multithread time:', st2 - st1)
        multicore()
        print('multicore time:', time.time()-st2)

     在视频中的运行结果是多进程<正常<多线程,而我的运行结果为下图所示:

     

    综上,多核/多进程运行最快,说明在同时间运行了多个任务,而多线程却不一定会比正常情况下的运行来的快,这和多线程中的GIL有关。

    1.5 进程池

    进程池Pool,就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题。

    import multiprocessing as mp
    
    def job(x):
        return x*x
    
    def multicore():
        pool=mp.Pool(processes=2)#定义一个Pool,并定义CPU核数量为2
        res=pool.map(job,range(10))
        print(res)
        res=pool.apply_async(job,(2,))
        print(res.get())
        multi_res=[pool.apply_async(job,(i,)) for i in range(10)]
        print([res.get()for res in multi_res])
    
    if __name__=='__main__':
        multicore()
        

    运行结果如下所示:

    首先定义一个池子,有了池子之后,就可以让池子对应某一个函数,在上述代码中定义的pool对应job函数。我们向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。

    接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果

     

    我们怎么知道Pool是否真的调用了多个核呢?我们可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况

    打开CPU负载(Mac):活动监视器 > CPU > CPU负载(单击一下即可)

    Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量。

    Pool除了可以用map来返回结果之外,还可以用apply_async(),与map不同的是,只能传递一个值,只会放入一个核进行计算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值。所对应的代码为:

    res=pool.apply_async(job,(2,))
    print(res.get())

    运行结果为4。

    由于传入值是可以迭代的,则我们同样可以使用apply_async()来输出多个结果。如果在apply_async()中输入多个传入值:

    res = pool.apply_async(job, (2,3,4,))

    结果会报错:

    TypeError: job() takes exactly 1 argument (3 given)

    即apply_async()只能输入一组参数。

    在此我们将apply_async()放入迭代器中,定义一个新的multi_res

    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]

    同样在取出值时需要一个一个取出来

    print([res.get() for res in multi_res])

    apply用迭代器的运行结果与map取出的结果相同。

    note:
    (1)Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数

    (2)map() 放入迭代参数,返回多个结果

    (3)apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代

    1.6 共享内存 shared memory

    只有通过共享内存才能让CPU之间进行交流。

    通过Value将数据存储在一个共享的内存表中。

    import multiprocessing as mp
    
    value1 = mp.Value('i', 0) 
    value2 = mp.Value('d', 3.14)

     

     其中,i和d表示数据类型。i为带符号的整型,d为双精浮点类型。更多数据类型可参考网址:https://docs.python.org/3/library/array.html

    在多进程中有一个Array类,可以和共享内存交互,来实现进程之间共享数据。

    和numpy中的不同,这里的Array只能是一维的,并且需要定义数据类型否则会报错。

    array = mp.Array('i', [1, 2, 3, 4])

    1.7 进程锁 Lock

    首先是不加进程锁的运行情况,在下述代码中定义了共享变量v,定义了两个进程,均可对v进行操作。job函数的作用是每隔0.1s输出一次累加num的值,累加值num在两个进程中分别为1和3。

    import multiprocessing as mp
    import time
    
    def job(v,num):
        for _ in range(10):
            time.sleep(0.1)#暂停0.1s,让输出效果更明显
            v.value+=num #v.value获取共享变量值
            print(v.value)
        
    def multicore():
        v=mp.Value('i',0)#定义共享变量
        p1=mp.Process(target=job,args=(v,1))
        p2=mp.Process(target=job,args=(v,3))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    
    if __name__=='__main__':
        multicore()

     运行结果如下所示:

    可以看到两个进程互相抢占共享内存v。

    为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。

    首先需要定义一个进程锁:

     l = mp.Lock() # 定义一个进程锁

    然后将进程锁的信息传入各个进程中

     p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入
     p2 = mp.Process(target=job, args=(v,3,l)) 

    job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占

    def job(v, num, l):
        l.acquire() # 锁住
        for _ in range(5):
            time.sleep(0.1) 
            v.value += num # v.value获取共享内存
            print(v.value)
        l.release() # 释放

    完整代码:

    def job(v, num, l):
        l.acquire() # 锁住
        for _ in range(5):
            time.sleep(0.1) 
            v.value += num # 获取共享内存
            print(v.value)
        l.release() # 释放
    
    def multicore():
        l = mp.Lock() # 定义一个进程锁
        v = mp.Value('i', 0) # 定义共享内存
        p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入
        p2 = mp.Process(target=job, args=(v,3,l)) 
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    if __name__ == '__main__':
        multicore()

    运行结果如下所示:

    可以看到进程1运行完之后才运行进程2。

  • 相关阅读:
    BZOJ 3744 Gty的妹子序列
    BZOJ 3872 Ant colony
    BZOJ 1087 互不侵犯
    BZOJ 1070 修车
    BZOJ 2654 tree
    BZOJ 3243 向量内积
    1003 NOIP 模拟赛Day2 城市建设
    CF865D Buy Low Sell High
    CF444A DZY Loves Physics
    Luogu 4310 绝世好题
  • 原文地址:https://www.cnblogs.com/wwf828/p/7344338.html
Copyright © 2011-2022 走看看