zoukankan      html  css  js  c++  java
  • 【MPI学习4】MPI并行程序设计模式:非阻塞通信MPI程序设计

    这一章讲了MPI非阻塞通信的原理和一些函数接口,最后再用非阻塞通信方式实现Jacobi迭代,记录学习中的一些知识。

    (1)阻塞通信与非阻塞通信

    阻塞通信调用时,整个程序只能执行通信相关的内容,而无法执行计算相关的内容;

    非阻塞调用的初衷是尽量让通信和计算重叠进行,提高程序整体执行效率。

    整体对比见下图:

    (2)非阻塞通信的要素

    非阻塞通信调用返回意味着通信开始启动;而非阻塞通信完成则需要调用其他的接口来查询。

    要素1:非阻塞通信的调用接口

    要素2:非阻塞通信的完成查询接口

    理想的非阻塞通信设计应该如下:

    非阻塞通信的 发送 和 接受 过程都需要同时具备以上两个要素,“调用+完成”

    “调用”按照通信方式的不同(标准、缓存、同步、就绪),有各种函数接口,具体用到哪个就查手册的性质。

    这里“完成”是重点,因为程序员需要知道非阻塞调用是否执行完成了,来做下一步的操作。

    MPI为“完成”定义了一个内部变量MPI_Request request,每个request与一个在非阻塞调用发生时与该调用发生关联(这里的调用包括发送和接收)。

    “完成”不区分通信方式的不同,统一用MPI_Wait系列函数来完成,这里对MPI_Wait函数做一点说明:

    1)MPI_Wait(MPI_Request *request),均等着request执行完毕了,再往下进行

    2)对于非重复非阻塞通信,MPI_Wait系列函数调用的返回,还意味着request对象被释放了,程序员不用再显式释放request变量。

    3)对于重复非阻塞通信,MPI_Wait系列函数调用的返回,意味着将于request对象关联的非阻塞通信处于不激活状态,并不释放request

    关于2)3)看后面的代码示例就了解了

    (3)非阻塞调用实现Jacobi迭代

    有了非阻塞调用的技术,可以再将Jacobi迭代的程序效率提升,其总体的实现思路如下:

    1)先计算Jacobi迭代下次计算所需要的边界数据,这些数据与每个计算节点中的计算无关,可以先独立计算好

    2)启动非阻塞通信,将边界数据在进程间传递

    3)计算每个计算节点可以独立计算的部分;此时,2)中启动的非阻塞通信也在进行中,这时通信和计算就重叠了

    4)等着非阻塞通信完成,再进行下一次迭代

    再回顾一下之前用阻塞通信实现Jacobi迭代的思路:

    1)先传递边界数据

    2)等着数据都传递完了,再进行计算

    3)等着计算完成了,进行下一次迭代

    可以看到阻塞通信中实现Jacobi迭代的程序中,在同一计算节点下,通信和计算是分别进行的,效率不如非阻塞通信。

    总结起来:

    “单机程序 → 阻塞通信MPI程序” 实现单机计算到多机计算,用并行代替串行提高效率。

    “阻塞MPI程序 → 非阻塞MPI程序” 不仅将多台机器之间的并行,而且还能将每台机器的通信与计算过程并行,实现更高效的并行。 

    (4)非阻塞通信实现Jacobi迭代的代码

    书上的源代码是Fortan的,数据存储是列优先的,矩阵按列分块;下面的代码是我翻译的C的代码,数据存储是行优先的,矩阵按行分块。

      1 #include "mpi.h"
      2 #include <stdio.h>
      3 #include <stdlib.h>
      4 
      5 #define N 8
      6 #define SIZE N/4
      7 #define T 2
      8 
      9 void print_matrix(int myid, float myRows[][N]);
     10 
     11 int main(int argc, char *argv[])
     12 {
     13     float matrix1[SIZE+2][N], matrix2[SIZE+2][N];
     14     int myid;
     15     MPI_Status status[4];
     16     MPI_Request request[4];
     17 
     18     MPI_Init(&argc, &argv);
     19     MPI_Comm_rank(MPI_COMM_WORLD, &myid);
     20 
     21     // 初始化
     22     int i,j;
     23     for(i=0; i<SIZE+2; i++)
     24     {
     25         for(j=0; j<N; j++)
     26         {
     27             matrix1[i][j] = matrix2[i][j] = 0;
     28         }
     29     }
     30     if(0==myid) // 按行划分 上面第一分块矩阵 上边界
     31     {
     32         for(j=0; j<N; j++) matrix1[1][j] = matrix2[1][j] = N;
     33     }
     34     if (3==myid) { // 按行划分 最下面一分块矩阵 下边界
     35         for(j=0; j<N; j++) matrix1[SIZE][j] = matrix2[SIZE][j] = N;
     36     }
     37     for(i=1; i<SIZE+1; i++) // 每个矩阵的两侧边界
     38     {
     39         matrix1[i][0] = matrix1[i][N-1] = matrix2[i][0] = matrix2[i][N-1] = N;
     40     }
     41     // 引入虚拟进程 并计算每个进程上下相邻进程
     42     int up_proc_id = myid==0 ? MPI_PROC_NULL : myid-1;
     43     int down_proc_id = myid==3 ? MPI_PROC_NULL : myid+1;
     44     // jacobi迭代过程
     45     int t,row,col;
     46     for(t=0; t<T; t++)
     47     {
     48         // 1 计算边界数据
     49         if(0==myid) // 最上的矩阵块
     50         {
     51             for (col=1; col<N-1; col++)
     52             {
     53                 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25;
     54             }
     55         }
     56         else if (3==myid) { // 最下的矩阵块
     57             for (col=1; col<N-1; col++)
     58             {
     59                 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25;
     60             }
     61         }
     62         else {
     63             for(col=1; col<N-1; col++) // 中间的矩阵块
     64             {
     65                 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25;
     66                 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25;
     67             }
     68         }
     69         // 2 利用非阻塞函数传递边界数据 为下一次计算做准备
     70         int tag1 = 1, tag2 = 2;
     71         MPI_Isend(&matrix2[1][0], N, MPI_FLOAT, up_proc_id, tag1, MPI_COMM_WORLD, &request[0]);
     72         MPI_Isend(&matrix2[SIZE][0], N, MPI_FLOAT, down_proc_id, tag2, MPI_COMM_WORLD, &request[1]);
     73         MPI_Irecv(&matrix1[SIZE+1][0], N, MPI_FLOAT, down_proc_id, tag1, MPI_COMM_WORLD, &request[2]);
     74         MPI_Irecv(&matrix1[0][0], N, MPI_FLOAT, up_proc_id, tag2, MPI_COMM_WORLD, &request[3]);
     75         // 3 计算中间数据
     76         int begin_row = 0==myid ? 2 : 1;
     77         int end_row = 3==myid ? (SIZE-1) : SIZE; 
     78         for (row=begin_row; row<end_row; row++)
     79         {
     80             for (col=1; col<N-1; col++)
     81             {
     82                 matrix2[row][col] = (matrix1[row][col-1]+matrix1[row][col+1]+matrix1[row+1][col]+matrix1[row-1][col])*0.25;
     83             }
     84         }
     85         // 4 更新矩阵 并等待各个进程间数据传递完毕
     86         for (row=begin_row; row<=end_row; row++)
     87         {
     88             for (col=1; col<N-1; col++)
     89             {
     90                 matrix1[row][col] = matrix2[row][col];
     91             }
     92         }
     93         MPI_Waitall(4, &request[0], &status[0]);
     94     }
     95     MPI_Barrier(MPI_COMM_WORLD);
     96     print_matrix(myid, matrix1);
     97     MPI_Finalize();
     98 }
     99 
    100 
    101 void print_matrix(int myid, float myRows[][N])
    102 {
    103     int i,j;
    104     int buf[1];
    105     MPI_Status status;
    106     buf[0] = 1;
    107     if ( myid>0 ) {
    108         MPI_Recv(buf, 1, MPI_INT, myid-1, 0, MPI_COMM_WORLD, &status);
    109     }
    110     printf("Result in process %d:
    ", myid);
    111     for ( i = 0; i<SIZE+2; i++)
    112     {
    113         for ( j = 0; j<N; j++)
    114             printf("%1.3f	", myRows[i][j]);
    115         printf("
    ");
    116     }
    117     if ( myid<3 ) {
    118         MPI_Send(buf, 1, MPI_INT, myid+1, 0, MPI_COMM_WORLD);
    119     }
    120     MPI_Barrier(MPI_COMM_WORLD);
    121 }

    程序的执行结果如下:

    上述程序设计的逻辑如下:

    1)各个分块矩阵的边界数据是可以需要通信交换的

    2)先计算边界数据,尽量把需要通信交换而且又相对独立的数据先计算出来

    3)用非阻塞通信传递分块矩阵的边界数据;同时每个节点内计算内部的数据;计算与通信并行

    4)等到每个计算节点的2个发送、2个接收,总共4个非阻塞调用都完成了,进行下一轮迭代

    (5)重复非阻塞通信

    上面实现Jacobi迭代的代码中,以进程1和进程2为例:

    1)迭代一轮二者之间就需要互相通信一次

    2)每次互相通信,随着MPI_Wait的执行,request通信对象释放,两个进程通信完全被切断了

    3)两个进程之间每次通信,有一些通信连接操作都是重复的,最好不用每次通信都重新执行这些连接操作,以此提高效率

    4)因此,比上面实现jacobi迭代更优化一些的做法是:每次不完全掐断两个进程的非阻塞通信,保持那些基础的通用的操作,每次迭代只需要更新需要传输的数据,再激活两个进程之间的非阻塞通信

    依照上面的思路,MPI给出了重复非阻塞的通信调用实现。用重复非阻塞的通信再实现一次Jacobi迭代,代码如下:

      1 #include "mpi.h"
      2 #include <stdio.h>
      3 #include <stdlib.h>
      4 
      5 #define N 8
      6 #define SIZE N/4
      7 #define T 2
      8 
      9 void print_matrix(int myid, float myRows[][N]);
     10 
     11 int main(int argc, char *argv[])
     12 {
     13     float matrix1[SIZE+2][N], matrix2[SIZE+2][N];
     14     int myid;
     15     MPI_Status status[4];
     16     MPI_Request request[4];
     17 
     18     MPI_Init(&argc, &argv);
     19     MPI_Comm_rank(MPI_COMM_WORLD, &myid);
     20 
     21     // 初始化
     22     int i,j;
     23     for(i=0; i<SIZE+2; i++)
     24     {
     25         for(j=0; j<N; j++)
     26         {
     27             matrix1[i][j] = matrix2[i][j] = 0;
     28         }
     29     }
     30     if(0==myid) // 按行划分 上面第一分块矩阵 上边界
     31     {
     32         for(j=0; j<N; j++) matrix1[1][j] = matrix2[1][j] = N;
     33     }
     34     if (3==myid) { // 按行划分 最下面一分块矩阵 下边界
     35         for(j=0; j<N; j++) matrix1[SIZE][j] = matrix2[SIZE][j] = N;
     36     }
     37     for(i=1; i<SIZE+1; i++) // 每个矩阵的两侧边界
     38     {
     39         matrix1[i][0] = matrix1[i][N-1] = matrix2[i][0] = matrix2[i][N-1] = N;
     40     }
     41     // 引入虚拟进程 并计算每个进程上下相邻进程
     42     int up_proc_id = myid==0 ? MPI_PROC_NULL : myid-1;
     43     int down_proc_id = myid==3 ? MPI_PROC_NULL : myid+1;
     44     // 初始化重复非阻塞通信
     45     int tag1 = 1, tag2 = 2;
     46     MPI_Send_init(&matrix2[1][0], N, MPI_FLOAT, up_proc_id, tag1, MPI_COMM_WORLD, &request[0]);
     47     MPI_Send_init(&matrix2[SIZE][0], N, MPI_FLOAT, down_proc_id, tag2, MPI_COMM_WORLD, &request[1]);
     48     MPI_Recv_init(&matrix1[SIZE+1][0], N, MPI_FLOAT, down_proc_id, tag1, MPI_COMM_WORLD, &request[2]);
     49     MPI_Recv_init(&matrix1[0][0], N, MPI_FLOAT, up_proc_id, tag2, MPI_COMM_WORLD, &request[3]);
     50     // jacobi迭代过程
     51     int t,row,col;
     52     for(t=0; t<T; t++)
     53     {
     54         // 1 计算边界数据
     55         if(0==myid) // 最上的矩阵块
     56         {
     57             for (col=1; col<N-1; col++)
     58             {
     59                 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25;
     60             }
     61         }
     62         else if (3==myid) { // 最下的矩阵块
     63             for (col=1; col<N-1; col++)
     64             {
     65                 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25;
     66             }
     67         }
     68         else {
     69             for(col=1; col<N-1; col++) // 中间的矩阵块
     70             {
     71                 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25;
     72                 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25;
     73             }
     74         }
     75         // 2 启动重复非阻塞通信
     76         MPI_Startall(4, &request[0]);
     77         // 3 计算中间数据
     78         int begin_row = 0==myid ? 2 : 1;
     79         int end_row = 3==myid ? (SIZE-1) : SIZE; 
     80         for (row=begin_row; row<end_row; row++)
     81         {
     82             for (col=1; col<N-1; col++)
     83             {
     84                 matrix2[row][col] = (matrix1[row][col-1]+matrix1[row][col+1]+matrix1[row+1][col]+matrix1[row-1][col])*0.25;
     85             }
     86         }
     87         // 4 更新矩阵 并等待各个进程间数据传递完毕
     88         for (row=begin_row; row<=end_row; row++)
     89         {
     90             for (col=1; col<N-1; col++)
     91             {
     92                 matrix1[row][col] = matrix2[row][col];
     93             }
     94         }
     95         MPI_Waitall(4, &request[0], &status[0]);
     96     }
     97     int n;
     98     for(n = 0; n < 4; n++) MPI_Request_free(&request[n]); // 释放非阻塞通信对象
     99     MPI_Barrier(MPI_COMM_WORLD);
    100     print_matrix(myid, matrix1);
    101     MPI_Finalize();
    102 }
    103 
    104 
    105 void print_matrix(int myid, float myRows[][N])
    106 {
    107     int i,j;
    108     int buf[1];
    109     MPI_Status status;
    110     buf[0] = 1;
    111     if ( myid>0 ) {
    112         MPI_Recv(buf, 1, MPI_INT, myid-1, 0, MPI_COMM_WORLD, &status);
    113     }
    114     printf("Result in process %d:
    ", myid);
    115     for ( i = 0; i<SIZE+2; i++)
    116     {
    117         for ( j = 0; j<N; j++)
    118             printf("%1.3f	", myRows[i][j]);
    119         printf("
    ");
    120     }
    121     if ( myid<3 ) {
    122         MPI_Send(buf, 1, MPI_INT, myid+1, 0, MPI_COMM_WORLD);
    123     }
    124     MPI_Barrier(MPI_COMM_WORLD);
    125 }

    1)上述的代码不难理解,可以查阅相关函数手册;最核心的思想就是,如果两个进程有多次迭代通信,就可以用这种重复非阻塞的通信函数。

    2)另外,对于重复非阻塞通信的调用,在调用MPI_Wait系列函数时,不会释放与通信关联的request函数(即上面说的保持一些共性的通信设定操作,不完全掐断),因此,需要在line98中,程序员手动释放非则色通信操作对象

  • 相关阅读:
    P1629 邮递员送信
    P1119 灾后重建
    最短路问题
    P1194 买礼物
    最小生成树
    P1038 神经网络
    P2661 信息传递
    mysql 5.7启动报错
    docker flannel网络部署和路由走向分析
    k8s---无头服务
  • 原文地址:https://www.cnblogs.com/xbf9xbf/p/5211129.html
Copyright © 2011-2022 走看看