zoukankan      html  css  js  c++  java
  • Python并行编程(十三):进程池和mpi4py模块

    1、基本概念

          多进程库提供了Pool类来实现简单的多进程任务。Pool类有以下方法:

          - apply():直到得到结果之前一直阻塞。

          - apply_async():这是apply()方法的一个变体,返回的是一个result对象。这是一个异步的操作,在所有的子类执行之前不会锁住主进程。

          - map():这是内置的map函数的并行版本,在得到结果之前一直阻塞,此方法将可迭代的数据的每一个元素作为进程池的一个任务来执行。

          - map_async():这是map的一个变体,返回一个result对象。如果指定了回调函数,回调函数应该是callable的,并且只接受一个参数。当result准备好时,会自动调用回调函数,除非调用失败。回调函数应该立即完成,否则,持有result的进程将被阻塞。

    2、测试用例

          创建四个进程池,然后使用map方法进行一个简单的计算。

    import multiprocessing
    
    def function_square(data):
        result = data * data
        return result
    
    if __name__ == "__main__":
        inputs = list(range(100))
        pool = multiprocessing.Pool(processes=4)
        pool_outputs = pool.map(function_square, inputs)
        pool.close()
        pool.join()
        print("pool: ", pool_outputs)

          pool.map方法将一些独立的任务提交给进程池。pool.map和内置map的执行结果相同,但pool.map是通过多个并行进程计算的。

    3、mpi4py模块

          Python提供了很多MPI模块写并行程序。其中mpi4py在MPI-1/2顶层构建,提供了面向对象的接口,紧跟C++绑定的MPI-2。MPI是C语言用户可以无需学习新的接口就可以使用这个库。

          此模块包含的主要的应用:

          - 点对点通讯

          - 集体通讯

          - 拓扑

    4、安装mpi4py

          安装mpich:https://www.microsoft.com/en-us/download/confirmation.aspx?id=56727

          下载并安装msmpisetup.exe

           安装完成后安装目录如下:

           

          将bin目录添加到系统环境中:

          

          用cmd输入并显示如下即为安装成功

          

          安装mpi4py

          pip install mpi4py

          MPI测试用例

    from mpi4py import MPI
    
    def mpi_test(rank):
        print("I am rank %s" %rank)
    
    
    if __name__ == "__main__":
    
        comm = MPI.COMM_WORLD
        rank = comm.Get_rank()
        mpi_test(rank)
        print("Hello world from process", rank)

          使用mpi运行文件

         

          在MPI中,并行程序中不同进程用一个非负整数来区别,如果我们有P个进程,那么rank会从0到P-1分配。

          MPI拿到rank的函数如下:rank = comm.Get_rank()

          这个函数返回调用它的进程的rank,comm叫做交流者,用于区别不同的进程集合:comm = MPI.COMM_WORLD

     5、MPI点对点通讯

          MPI提供的最实用的一个特性是点对点通讯。两个不同的进程之间可以通过点对点通讯交换数据:一个进程是接收者,一个进程是发送者。

          Python的mpi4py通过下面两个函数提供了点对点通讯功能:

          - Comm.Send(data, process_destination):通过它在交流组中的排名来区分发送给不同进程的数据。

          - Comm.Recv(process_source):接收来自源进程的数据,也是通过在交流组中的排名来分分的。

          Comm变量表示交流着,定义了可以互相通讯的进程组:

          comm  = MKPI.COMM_WORLD

          交换信息测试用例: 

    from mpi4py import MPI
    
    comm = MPI.COMM_WORLD
    rank = comm.rank
    print("My rank is :",rank)
    
    if rank == 0:
        data = 10000000
        destination_process = 4
        comm.send(data, dest=destination_process)
        print("sending data %s to process %d" %(data, destination_process))
    
    if rank == 1:
        destination_process = 8
        data = "hello,I am rank 1"
    
        comm.send(data, dest=destination_process)
        print("sending data %s to process %d" %(data, destination_process))
    
    if rank == 4:
        data = comm.recv(source=0)
        print("data received is = %s" %data)
    
    if rank == 8:
        data1 = comm.recv(source=1)
        print("data received is = %s" %data1)

          运行结果:

          

          通过mpiexec -n 9运行9个互相通讯的进程,使用rank的值来区分每个进程。

          整个过程分为两部分,发送者发送数据,接收者接收数据,二者必须都指定发送方/接收方,source=为指定发送者。如果有发送的数据没有被接收,程序会阻塞。

          comm.send()和comm.recv()函数都是阻塞的函数,他们会一直阻塞调用者,直到数据使用完成,同时在MPI中,有两种方式发送和接收数据:

          - buffer模式

          - 同步模式

          在buffer模式中,只要需要发送的数据被拷贝到buffer中,执行权就会交回到主程序,此时数据并非已经发送/接收完成。在同步模式中,只有函数真正的结束发送/接收任务之后才会返回。

    6、避免死锁

          mpi4py没有提供特定的功能来解决这种情况,但是提供了一些程序员必须遵守的规则来避免死锁的问题。

          出现死锁的情况:

          

    from mpi4py import MPI
    
    comm = MPI.COMM_WORLD
    rank = comm.rank
    print("my rank is :",rank)
    
    if rank == 1:
        data_send = "a"
        destination_process = 5
        source_process = 5
        data_received = comm.recv(source=source_process)
        comm.send(data_send, dest=destination_process)
    
        print("sending data %s to process %d" %(data_send, destination_process))
        print("data received is = %s" %data_received)
    
    if rank == 5:
        data_send = "b"
        destination_process = 1
        source_process = 1
        data_received = comm.recv(source=source_process)
        comm.send(data_send, dest=destination_process)
        print("sending data %s to process %d" % (data_send, destination_process))
        print("data received is = %s" % data_received)

          运行结果:

          

          进程1和进程5产生阻塞,程序阻塞。

          此时两个进程都在等待对方,发生阻塞,因为recv和send都是阻塞的,两个函数都先使用的recv,所以调用者都在等待他们完成。所以讲上述代码改为如下即可解决阻塞:

    from mpi4py import MPI
    
    comm = MPI.COMM_WORLD
    rank = comm.rank
    print("my rank is :",rank)
    
    if rank == 1:
        data_send = "a"
        destination_process = 5
        source_process = 5
        comm.send(data_send, dest=destination_process)
        data_received = comm.recv(source=source_process)
    
        print("sending data %s to process %d" %(data_send, destination_process))
        print("data received is = %s" %data_received)
    
    if rank == 5:
        data_send = "b"
        destination_process = 1
        source_process = 1
        data_received = comm.recv(source=source_process)
        comm.send(data_send, dest=destination_process)
        print("sending data %s to process %d" % (data_send, destination_process))
        print("data received is = %s" % data_received)

          将其中一个函数的recv和send顺序调换。

          运行结果:

          

          也可通过Sendrecv函数解决,代码如下:

    from mpi4py import MPI
    
    comm = MPI.COMM_WORLD
    rank = comm.rank
    print("my rank is :",rank)
    
    if rank == 1:
        data_send = "a"
        destination_process = 5
        source_process = 5
        # comm.send(data_send, dest=destination_process)
        # data_received = comm.recv(source=source_process)
        data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process)
    
        print("sending data %s to process %d" %(data_send, destination_process))
        print("data received is = %s" %data_received)
    
    if rank == 5:
        data_send = "b"
        destination_process = 1
        source_process = 1
        # data_received = comm.recv(source=source_process)
        # comm.send(data_send, dest=destination_process)
        data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process)
        print("sending data %s to process %d" % (data_send, destination_process))
        print("data received is = %s" % data_received)

          运行结果:

          

    7、集体通讯:Broadcast

          在并行代码的开发中,会经常需要在多个进程间共享某个变量运行时的值,或操作多个进程提供的变量。MPI库提供了在多个进程之间交换信息的方法,将所有进程变成通讯者的这种方法叫做集体交流。因此,一个集体交流通常是2个以上的进程,也可以称为广播——一个进程将消息发送给其他进程。mpi4py模块通过以下方式提供广播的功能:

    buf = comm.bcast(data_to_share, rank_of_root_process)

          这个函数将root消息中包含的信息发送给属于comm通讯组其他的进程,每个进程必须通过相同的root和comm来调用它。

          

          测试代码:

    from mpi4py import MPI
    
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    
    if rank == 0:
        variable_to_share = 100
    else:
        variable_to_share = None
    
    variable_to_share = comm.bcast(variable_to_share, root=0)
    print("process = %d  variable shared = %d" %(rank, variable_to_share))

          运行结果:

          

          rank等于0的root进程初始化了一个变量,variable_to_share,值为100,然后声明了一个广播variable_to_share = comm.bcast(variable_to_share, root=0)

          这个变量将通过通讯组发送给其他进程。

          集体通讯允许组中的多个进程同时进行数据交流。在mpi4py模块中,只提供了阻塞版本的集体通讯(阻塞调用者,直到缓存中的数据全部安全发送。)

          广泛应用的集体通讯应该是:

                - 组中的进程提供通讯的屏障

                - 通讯方式包括:

                      - 将一个进程的数据广播到组中其他进程中

                      - 从其他进程收集数据发给一个进程

                      - 从一个进程散播数据到其他进程中

                - 减少操作

     8、集体通讯:Scatter

          scatter函数和广播很像,但是不同的是comm.bcast将相同的数据发送给所有在监听的进程,comm.scatter可以将数据放在数据中,发送给不同的进程。

          

          comm.scatter函数接收一个array,根据进程的rank将其中的元素发给不同的进程,第一个元素发送给进程0,第二个元素发给进程1,以此类推。

          测试用例:

    from mpi4py import MPI
    
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    
    # array_to_share = ["a","b","c","d","e","f","g","h","i","j"]
    if rank == 0:
        array_to_share = [0,1,2,3,4,5,6,7,8,9]
    else:
        array_to_share = None
    
    recvbuf = comm.scatter(array_to_share, root=0)
    print("Process = %d  recvbuf = %s" %(rank, recvbuf))

          执行结果:

          

          注意:列表中的元素个数,需要个进程保持一致。否则会出现如下错误。

          

     9、集体通讯:gather

          gather函数基本上是反向的scatter,即收集所有进程发送到root进程数据。方法如下:

    recvbuf = comm.gather(sendbuf, rank_of_root_process)

          sendbuf是要发送的数据,rank_of_root_process代表要接收数据的进程。

          

          测试用例:

    from mpi4py import MPI
    
    
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    # print(size)
    rank = comm.Get_rank()
    data = "process %s" %rank
    # print("start %s"%data)
    data = comm.gather(data, root=0)
    # print(data)
    if rank == 0:
        print("rank = %s receiving data to other process" %rank)
        for i in range(1, size):
            #data[i] = (i+1) ** 2
            value = data[i]
            print("process %s receiving %s from process %s" %(rank, value, i))
        # print(data)

          执行结果:

          

     10、使用Alltoall通讯

          Alltoall集体通讯结合了scatter和gather的功能。在mpi4py中,有以下三类的Alltoall集体通讯。

          - comm.Alltoall(sendbuf, recvbuf);

          - comm.Alltoallv(sendbuf, recvbuf);

          - comm.Alltoallw(sendbuf, recvbuf);

          Alltoall测试用例:

    from mpi4py import MPI
    import numpy
    
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()
    
    a_size = 1
    
    # print("numpy arange: %s" %numpy.arange(size, dtype=int))
    senddata = (rank+1)*numpy.arange(size, dtype=int)
    recvdata = numpy.empty(size * a_size, dtype=int)
    print("senddata is %s , recvdata is %s" %(senddata, recvdata))
    # print("Recvdata is %s: , 
     numpy.empty is %s" %(recvdata, numpy.empty(size * a_size, dtype=int)))
    
    comm.Alltoall(senddata, recvdata)
    print("process %s sending %s, receiving %s" %(rank, senddata, recvdata))

          运行结果:

          

          comm.alltoall方法将task j的sendbuf的第j个对象拷贝到task i中,recvbuf的第j个对象,一一对应。发送过程如图:

          

          可以将左右两个方格看做xy轴,结果一一对应,如左图的(0,0)对应的值为0,其对应的有图的值为右图的(0,0)也为0。左图的3,4对应的值为16,右图(4,3)也为16。

          P0包含的数据[0 1 2 3 4],它将值0赋值给自己,1传给进程P1,2传给进程P2,3传给进程P3,以此类推。

          相同的P1的数据为[0 2 4 6 8] , 它将0传给P0,2传给P1,4传给P2,以此类推。

          All-to-all定制通讯也叫全部交换,这种操作经常用于各种并发算法中,比如快速傅里叶变换,矩阵变换,样本排序以及一些数据库的 Join 操作。

     11、简化操作

          同comm.gather一样,comm.reduce接收一个数组,每一个元素是一个进程的输入,然后返回一个数组,每一个元素是进程的输出,返回给root进程。输出的元素包含了简化的结果。

          简化定义如下:comm.Reduce(sendbuf, recvbuf, rank_of_root_process, op = type_of_reduction_operation)

          这里需要注意的是,参数op和comm.gather不同,它代表你想应用在数据上的操作,mpi4py模块代表定义了一系列的简化操作,包括:

          - MPI.MAX:返回最大的元素

          - MPI.MIN:返回最小的元素

          - MPI.SUM:对所有的元素相加

          - MPI.PROD:对所有元素相乘

          - MPI.LAND:对所有元素进行逻辑操作

          - MPI.MAXLOC:返回最大值,以及拥有它的进程

          - MPI.MINLOC:返回最小值,以及拥有它的进程

          测试用例:

    import numpy as np
    from mpi4py import MPI
    
    comm = MPI.COMM_WORLD
    size = comm.size
    rank = comm.rank
    array_size = 3
    recvdata = np.zeros(array_size, dtype=np.int)
    senddata = (rank+1)*np.arange(size, dtype=np.int)
    print("+++++++++++++%s+++++++++++++%s++++++++++++" %(recvdata, senddata))
    print("Process %s sending %s" %(rank, senddata))
    comm.Reduce(senddata, recvdata, root=0, op=MPI.SUM)
    print("on task %s, after Reduce: data = %s" %(rank, recvdata))

          执行结果:

          

          MPI.SUM为求和操作,过程如下:

          

          简化操作将每个task的第i个元素相加,然后放回到P0进程(root进程)的第i个元素中。

  • 相关阅读:
    MySQL mysqldump数据导出详解
    FTP上传下载 C#辅助类
    FastDFS java 辅助类
    Ajax 提交表单【包括文件上传】
    bootstrap-table 基础用法
    MVC dropdownlist 后端设置select属性后前端依然不能默认选中的解决方法
    jQuery实现鼠标移到元素上动态提示消息框效果
    给Jquery动态添加的元素添加事件
    centos7部署mysql5.7一主多从
    iOS浏览器 new Date() 返回 NaN
  • 原文地址:https://www.cnblogs.com/dukuan/p/9811057.html
Copyright © 2011-2022 走看看