zoukankan      html  css  js  c++  java
  • Python初学——多进程Multiprocessing

    1.1 什么是 Multiprocessing

    多线程在同一时间只能处理一个任务。

    可把任务平均分配给每个核,而每个核具有自己的运算空间。

    1.2 添加进程 Process

    与线程类似,如下所示,但是该程序直接运行无结果,因为IDLE不支持多进程,在命令行终端运行才有结果显示

    import multiprocessing as mp
    
    def job(a,b):
        print('abc')
    if __name__=='__main__':
        p1=mp.Process(target=job,args=(1,2))
        p1.start()
        p1.join()

    1.3 存储进程输出 Queue

    不知道为什么下面的这个程序可以在IDLE中正常运行。首先定义了一个job函数作系列数学运算,然后将结果放到res中,在main函数运行,取出queue中存储的结果再进行一次加法运算。

    import multiprocessing as mp
    
    def job(q):
        res=0
        for i in range(1000):
            res+=i+i**2+i**3
        q.put(res)
    
        
    if __name__ == '__main__':
        q=mp.Queue()
        p1 = mp.Process(target=job,args=(q,))#注意当参数只有一个时,应加上逗号
        p2 = mp.Process(target=job,args=(q,)) 
        p1.start()
        p2.start()
        
        p1.join()
        p2.join()
        res1=q.get()
        res2=q.get()
        print(res1+res2)

     结果如下所示:

     

    1.4 效率比对 threading & multiprocessing

     在job函数中定义了数学运算,比较正常情况、多线程和多进程分别的运行时间。

    import multiprocessing as mp
    import threading as td
    import time
    
    def job(q):
        res = 0
        for i in range(10000000):
            res += i+i**2+i**3
        q.put(res) # queue
    
    def multicore():
        q = mp.Queue()
        p1 = mp.Process(target=job, args=(q,))
        p2 = mp.Process(target=job, args=(q,))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        res1 = q.get()
        res2 = q.get()
        print('multicore:' , res1+res2)
    
    def normal():
        res = 0
        for _ in range(2):#线程或进程都构造了两个,进行了两次运算,所以这里循环两次
            for i in range(10000000):
                res += i+i**2+i**3
        print('normal:', res)
    
    def multithread():
        q = mp.Queue()
        t1 = td.Thread(target=job, args=(q,))
        t2 = td.Thread(target=job, args=(q,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        res1 = q.get()
        res2 = q.get()
        print('multithread:', res1+res2)
    
    if __name__ == '__main__':
        st = time.time()
        normal()
        st1= time.time()
        print('normal time:', st1 - st)
        multithread()
        st2 = time.time()
        print('multithread time:', st2 - st1)
        multicore()
        print('multicore time:', time.time()-st2)

     在视频中的运行结果是多进程<正常<多线程,而我的运行结果为下图所示:

     

    综上,多核/多进程运行最快,说明在同时间运行了多个任务,而多线程却不一定会比正常情况下的运行来的快,这和多线程中的GIL有关。

    1.5 进程池

    进程池Pool,就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题。

    import multiprocessing as mp
    
    def job(x):
        return x*x
    
    def multicore():
        pool=mp.Pool(processes=2)#定义一个Pool,并定义CPU核数量为2
        res=pool.map(job,range(10))
        print(res)
        res=pool.apply_async(job,(2,))
        print(res.get())
        multi_res=[pool.apply_async(job,(i,)) for i in range(10)]
        print([res.get()for res in multi_res])
    
    if __name__=='__main__':
        multicore()
        

    运行结果如下所示:

    首先定义一个池子,有了池子之后,就可以让池子对应某一个函数,在上述代码中定义的pool对应job函数。我们向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。

    接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果

     

    我们怎么知道Pool是否真的调用了多个核呢?我们可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况

    打开CPU负载(Mac):活动监视器 > CPU > CPU负载(单击一下即可)

    Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量。

    Pool除了可以用map来返回结果之外,还可以用apply_async(),与map不同的是,只能传递一个值,只会放入一个核进行计算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值。所对应的代码为:

    res=pool.apply_async(job,(2,))
    print(res.get())

    运行结果为4。

    由于传入值是可以迭代的,则我们同样可以使用apply_async()来输出多个结果。如果在apply_async()中输入多个传入值:

    res = pool.apply_async(job, (2,3,4,))

    结果会报错:

    TypeError: job() takes exactly 1 argument (3 given)

    即apply_async()只能输入一组参数。

    在此我们将apply_async()放入迭代器中,定义一个新的multi_res

    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]

    同样在取出值时需要一个一个取出来

    print([res.get() for res in multi_res])

    apply用迭代器的运行结果与map取出的结果相同。

    note:
    (1)Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数

    (2)map() 放入迭代参数,返回多个结果

    (3)apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代

    1.6 共享内存 shared memory

    只有通过共享内存才能让CPU之间进行交流。

    通过Value将数据存储在一个共享的内存表中。

    import multiprocessing as mp
    
    value1 = mp.Value('i', 0) 
    value2 = mp.Value('d', 3.14)

     

     其中,i和d表示数据类型。i为带符号的整型,d为双精浮点类型。更多数据类型可参考网址:https://docs.python.org/3/library/array.html

    在多进程中有一个Array类,可以和共享内存交互,来实现进程之间共享数据。

    和numpy中的不同,这里的Array只能是一维的,并且需要定义数据类型否则会报错。

    array = mp.Array('i', [1, 2, 3, 4])

    1.7 进程锁 Lock

    首先是不加进程锁的运行情况,在下述代码中定义了共享变量v,定义了两个进程,均可对v进行操作。job函数的作用是每隔0.1s输出一次累加num的值,累加值num在两个进程中分别为1和3。

    import multiprocessing as mp
    import time
    
    def job(v,num):
        for _ in range(10):
            time.sleep(0.1)#暂停0.1s,让输出效果更明显
            v.value+=num #v.value获取共享变量值
            print(v.value)
        
    def multicore():
        v=mp.Value('i',0)#定义共享变量
        p1=mp.Process(target=job,args=(v,1))
        p2=mp.Process(target=job,args=(v,3))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    
    if __name__=='__main__':
        multicore()

     运行结果如下所示:

    可以看到两个进程互相抢占共享内存v。

    为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。

    首先需要定义一个进程锁:

     l = mp.Lock() # 定义一个进程锁

    然后将进程锁的信息传入各个进程中

     p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入
     p2 = mp.Process(target=job, args=(v,3,l)) 

    job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占

    def job(v, num, l):
        l.acquire() # 锁住
        for _ in range(5):
            time.sleep(0.1) 
            v.value += num # v.value获取共享内存
            print(v.value)
        l.release() # 释放

    完整代码:

    def job(v, num, l):
        l.acquire() # 锁住
        for _ in range(5):
            time.sleep(0.1) 
            v.value += num # 获取共享内存
            print(v.value)
        l.release() # 释放
    
    def multicore():
        l = mp.Lock() # 定义一个进程锁
        v = mp.Value('i', 0) # 定义共享内存
        p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入
        p2 = mp.Process(target=job, args=(v,3,l)) 
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    if __name__ == '__main__':
        multicore()

    运行结果如下所示:

    可以看到进程1运行完之后才运行进程2。

  • 相关阅读:
    7.12-7.19 id、w、who、last、lastb、lastlog
    查询登录信息 w, who*, id, tty, last, finger
    [rhel-media] :Yum软件仓库唯一标识符,避免与其他仓库冲突。
    :整数 跳转到该行 Vim中常用的命令
    spec2006与spec2000的对比简要说明
    Linux性能监控与分析之--- CPU
    Android实现登录小demo
    python学习笔记——旧类与新类继承中的构造函数
    tair ldb存储引擎性能測试方案
    串口通讯方式1编程
  • 原文地址:https://www.cnblogs.com/wwf828/p/7344338.html
Copyright © 2011-2022 走看看