zoukankan      html  css  js  c++  java
  • Parallel Computing–Cannon算法 (MPI 实现)

    原理不解释,直接上代码

    代码中被注释的源程序可用于打印中间结果,检查运算是否正确。

    #include "mpi.h"
    #include <math.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    
    
    void scatter_matrix(int* fstream,int n1,int n2,int*Q,int root,int tag){
    	/*每个矩阵块的大小*/
    	int rows=(n1+root-1)/root;
    	int cols=(n2+root-1)/root;
    	int* tmp_matrix=(int*)malloc(rows*cols*sizeof(int));
    	
    	int i,j;
    	memset(Q,0,rows*cols*sizeof(int));
    	for(i=0;i<root;i++){
    		for(j=0;j<root;j++){
    			int p=0,q=0;
    			int imin=i*rows*n2;
    			int jmin=j*cols;
    			memset(tmp_matrix,0,sizeof(tmp_matrix));
    
    			/*在划分矩阵时,由于地空间不连续,需要另开辟一个数组连续的保存起来,以便于调用MPI_Send*/
    			for(p=0;p<rows;p++,imin+=n2){
    				for(q=0;q<cols;q++){
    					tmp_matrix[p*cols+q]=fstream[imin+jmin+q];
    				}
    			}
    			if(i==0&&j==0){
    				/*进程0 不需要使用MPI_Send将数据发送给自己,直接使用memcpy将结果拷贝即可*/
    				memcpy(Q,tmp_matrix,rows*cols*sizeof(int));
    			}else{
    				/*将分块发送给位于i行,j列的进程*/
    				MPI_Send(tmp_matrix,rows*cols,MPI_INT,i*root+j,tag,MPI_COMM_WORLD);
    			}	
    		}
    	}
    }
    /*
     *@row:矩阵所在的行
     *@col:矩阵所在的列
     *@sp:sp=root=sqrt(nprocs)
     *@return 根据行列号计算进程实际编号
    */
    int get_index(int row,int col,int sp){
    	int tmp=((row+sp)%sp)*sp+(col+sp)%sp;
    	return tmp;
    }
    /*计算矩阵乘法,将结果存入C中*/
    void matrix_multi(int* A,int *B,int *C,int n1,int n2,int n3,int myid){
    	int i=0,j=0,k=0;
    	int* tmp_C=(int*)malloc(n1*n3*sizeof(int));
    	memset(tmp_C,0,sizeof(int)*n1*n3);
    	
    	for(i=0;i<n1;i++){
    		for(j=0;j<n3;j++){
    			for(k=0;k<n2;k++){
    				tmp_C[i*n3+j]+=A[i*n2+k]*B[k*n3+j];
    			}
    			C[i*n3+j]+=tmp_C[i*n3+j];
    		}
    		
    	}
    	
    }
    
    /*用于矩阵下标定位对齐*/
    void shuffle(int*A,int*buf_A,int buf_A_size,int *B,int*buf_B,int buf_B_size,int root,int myid){
    	int i,j;
    	MPI_Status status;
    	int cur_col=0;
    	int cur_row=0;
    	/*通过进程编号计算获得当前进程所在的行号和列号*/
    	cur_row=myid/root;
    	cur_col=myid-cur_row*root;
    	/*对于矩阵A,第i行的矩阵需要向左平移i次*/
    	for(i=0;i<cur_row;i++){
    		/*接收来自右边的数据,并将当前矩阵发送给左边的进程*/
    		MPI_Sendrecv(A,buf_A_size,MPI_INT,get_index(cur_row,cur_col-1,root),102,
    			     buf_A,buf_A_size,MPI_INT,get_index(cur_row,cur_col+1,root),102,MPI_COMM_WORLD,&status);
    		memcpy(A,buf_A,buf_A_size*sizeof(int));/*buf_A用于通信时缓存矩阵*/
    		memset(buf_A,0,buf_A_size*sizeof(int));	
    	}
    	/*对于矩阵B,第j列的矩阵需要向上平移j次*/	
    	for(j=0;j<cur_col;j++){
    		/*接收来自下边的数据,并将当前矩阵发送给上边的进程*/
    		MPI_Sendrecv(B,buf_B_size,MPI_INT,get_index(cur_row-1,cur_col,root),103,
    			     buf_B,buf_B_size,MPI_INT,get_index(cur_row+1,cur_col,root),103,MPI_COMM_WORLD,&status);
    		memcpy(B,buf_B,buf_B_size*sizeof(int));/*buf_B用于通信时缓存矩阵*/
    		memset(buf_B,0,buf_B_size*sizeof(int));
    	}
    	/*printf("I have shuffled!
    ");*/
    }
    void cannon(int*A,int*buf_A,int buf_A_size,int *B,int*buf_B,int buf_B_size,
    	int *C,int buf_C_size,int row_a,int col_a,int col_b,int root,int myid){
    	MPI_Status status;
    	double elapsed_time,multiply_time=0,passdata_time=0;
    	int i,j;
    	memset(C,0,sizeof(int)*buf_C_size);
    	int cur_col=0;
    	int cur_row=0;
    	/*通过进程编号计算获得当前进程所在的行号和列号*/
    	cur_row=myid/root;
    	cur_col=myid-cur_row*root;
    	
    
    	for(i=0;i<root;i++){/*一共需要循环root次,root=sqrt(nprocs)*/
    		elapsed_time=MPI_Wtime();
    		matrix_multi(A,B,C,row_a,col_a,col_b,myid);/*计算矩阵乘法*/
    		elapsed_time=MPI_Wtime()-elapsed_time;
    		multiply_time+=elapsed_time;
    		/*elapsed_time=MPI_Wtime();		*/
    		/*接收来自右边(row,col+1)的数据,并将当前矩阵发送给左边(row,col-1)的进程*/
    		MPI_Sendrecv(A,buf_A_size,MPI_INT,get_index(cur_row,cur_col-1,root),102,
    			     buf_A,buf_A_size,MPI_INT,get_index(cur_row,cur_col+1,root),102,MPI_COMM_WORLD,&status);
    		/*接收来自下边(row+1,col)的数据,并将当前矩阵发送给上边(row-1,col)的进程*/
    		MPI_Sendrecv(B,buf_B_size,MPI_INT,get_index(cur_row-1,cur_col,root),103,
    			     buf_B,buf_B_size,MPI_INT,get_index(cur_row+1,cur_col,root),103,MPI_COMM_WORLD,&status);
    		/*elapsed_time=MPI_Wtime()-elapsed_time;
    		passdata_time+=elapsed_time;*/
    		memcpy(B,buf_B,buf_B_size*sizeof(int));/*将buf_B中的数据拷贝至B中*/
    		memcpy(A,buf_A,buf_A_size*sizeof(int));/*将buf_A中的数据拷贝至A中*/
    		
    	}
    	/*将计算结果发送给数组C*/
    	MPI_Send(C,row_a*col_b,MPI_INT,0,104,MPI_COMM_WORLD);
    	
    	printf("proc:%d, passdata time:%lf    multiply time:%lf
    ",myid,passdata_time,multiply_time);
    }
     
    void gather_matrix(int *fstream,int n1,int n3,int*C,int root,FILE*fhc){
    	MPI_Status status;
    	int rows=(n1+root-1)/root;
    	int cols=(n3+root-1)/root;
    	int* tmp_matrix=(int*)malloc(rows*cols*sizeof(int));
    	int i,j;
    
    	for(i=0;i<root;i++){
    		for(j=0;j<root;j++){
    			int p,q;
    			int imin=i*rows*n3;
    			int jmin=j*cols;
    			memset(tmp_matrix,0,sizeof(tmp_matrix));
    			/*接收来自各个进程的数据*/
    			MPI_Recv(tmp_matrix,rows*cols,MPI_INT,i*root+j,104,MPI_COMM_WORLD,&status);	
    			/*printf("I am passed proc:%d 
    ",i*root+j);*/
    			/*将接收的矩阵tmp拼接到矩阵C中去,需要按照合理顺序拼接,否则结果会出错*/
    			for(p=0;p<rows;p++,imin+=n3){
    				for(q=0;q<cols;q++){
    					fstream[imin+jmin+q]=tmp_matrix[p*cols+q];
    					/*printf("%d ",((int*)fstream)[imin+jmin+q]);*/
    				}
    				/*printf("
    ");*/
    			}
    		}
    	}
    
    	/*将结果打印到文件中*/
    	for(i=0;i<n1;i++){
    		for(j=0;j<n3;j++){
    			fprintf(fhc,"%d ",fstream[i*n3+j]);
    		}
    		fprintf(fhc,"
    ");
    	}
    }
    
    int main(int argc,char**argv){
    	int myid,numprocs;
    	int i,j;
    	MPI_Status status;
    	int root=0;
    	int dim[3];
    	double elapsed_time=0;
    	int max_rows_a,max_cols_a,max_rows_b,max_cols_b;
    	int buf_A_size,buf_B_size,buf_C_size;
    	FILE* fhc;
    	/*suppose A:n1*n2 ,B:n2*n3;n1,n2,n3 are read from input file*/
    	int n1,n2,n3;
    	/*buffer for matrix A,B,C will be shifted ,so they each have two buffer*/
    	int *A,*B,*C,*buf_A,*buf_B;
    
    	/*on proc0,buffers to cache matrix files of A,B and C*/
    	int *fstream_a=NULL,*fstream_b=NULL,*fstream_c=NULL;
    	MPI_Init(&argc,&argv);/*初始化*/
    	MPI_Comm_rank(MPI_COMM_WORLD,&myid);/*获取当前进程ID*/
    	MPI_Comm_size(MPI_COMM_WORLD,&numprocs);/*获取全部进程数量*/
    
    	root=sqrt(numprocs);
    	if(numprocs!=root*root){
    		/*如果进程总数不是平方数,则结束程序*/
    		printf("process number must be a squre!
    ");
    		exit(-1);
    	}
    
    	/*on proc0,preprocess the command line,read in file
    	 for A,B and put their sizes in dim[]*/
    	if(myid==0){
    		FILE *file_a,*file_b,*file_c;
    		int n1,n2,n3;
    		int i,j;
    		file_a=fopen(argv[1],"r");/*打开文件a,文件名从运行时给的参数中获得*/
    		file_b=fopen(argv[2],"r");/*打开文件b,文件名从运行时给的参数中获得*/
    		fscanf(file_a,"%d %d",&n1,&n2);/*从文件a中读取矩阵A的行数,列数*/
    		fscanf(file_b,"%d %d",&n2,&n3);/*从文件b中读取矩阵B的行数,列数*/
    	
    		dim[0]=n1,dim[1]=n2,dim[2]=n3;
    		fstream_a=(int*)malloc(n1*n2*sizeof(int));/*分配一块内存,用于将矩阵A读入内存*/
    		fstream_b=(int*)malloc(n2*n3*sizeof(int));/*分配一块内存,用于将矩阵B读入内存*/
    		/*printf("Yeah! I got n1=%d,n2=%d,n3=%d
    ",n1,n2,n3);*/
    		/*读入矩阵A,保存在fstream_a中*/
    		for(i=0;i<n1;i++)
    			for(j=0;j<n2;j++)
    			fscanf(file_a,"%d",&((int*)fstream_a)[i*n2+j]);
    		/*读入矩阵B,保存在fstream_b中*/	
    		for(i=0;i<n2;i++)
    			for(j=0;j<n3;j++)
    				fscanf(file_b,"%d",&((int*)fstream_b)[i*n3+j]);
    	}
    	/*将矩阵的行数,列数通过Bcast广播给所有进程*/
    	MPI_Bcast(dim,3,MPI_INT,0,MPI_COMM_WORLD);
    	n1=dim[0],n2=dim[1],n3=dim[2];
    
    	/*begin new version*/
    
    	max_rows_a=(n1+root-1)/root;/*子矩阵块A的行数*/
    	max_cols_a=(n2+root-1)/root;/*子矩阵块A的列数*/
    	max_rows_b=max_cols_a;      /*子矩阵块B的行数*/
    	max_cols_b=(n3+root-1)/root;/*子矩阵块B的列数*/
    	buf_A_size=max_rows_a*max_cols_a;/*子矩阵块A的大小*/
    	buf_B_size=max_rows_b*max_cols_b;/*子矩阵块B的大小*/
    	buf_C_size=max_rows_a*max_cols_b;/*子矩阵块C的大小*/
    
    
    	/*给A,,buf_A,buf_B,B,C分配内存空间,其中buf_A,buf_B用于通讯中的缓存*/
    	A=(int*)malloc(sizeof(int)*buf_A_size);
    	buf_A=(int*)malloc(sizeof(int)*buf_A_size);
    	B=(int*)malloc(sizeof(int)*buf_B_size);
    	buf_B=(int*)malloc(sizeof(int)*buf_B_size);
    	C=(int*)malloc(sizeof(int)*buf_C_size);
    	if(A==NULL||buf_A==NULL||B==NULL||buf_B==NULL||C==NULL)
    	{
    		/*如果内存申请失败,就退出*/
    		printf("Memory allocation failed!
    ");
    		exit(-1);
    	}
    
    	/*proc 0 scatter A,B to other procs in a 2D block distribution fashion*/
    	if(myid==0){
    		/*printf("max_rows_a:%d
    ",max_rows_a);
    		printf("max_cols_a:%d
    ",max_cols_a);
    		printf("max_rows_b:%d
    ",max_rows_b);
    		printf("max_cols_b:%d
    ",max_cols_b);*/
    		/*进程0 将矩阵A,B划分成小块,分发给其他进程*/
    		scatter_matrix((int*)fstream_a,n1,n2,A,root,100);
    		/*printf("I am debuging!
    ");*/
    		scatter_matrix((int*)fstream_b,n2,n3,B,root,101);
    		/*printf("I am finding fault!
    ");*/
    	}else{
    		/*其他进程接收来自进程0 发送的矩阵A,B*/
    		MPI_Recv(A,max_rows_a*max_cols_a,MPI_INT,0,100,MPI_COMM_WORLD,&status);
    		MPI_Recv(B,max_rows_b*max_cols_b,MPI_INT,0,101,MPI_COMM_WORLD,&status);
    	}
    
    	MPI_Barrier(MPI_COMM_WORLD);/*等待全部进程完成数据接收工作。*/
    
    	/*printf("I am proc %d
    ",myid);
    	for(i=0;i<max_rows_a;i++){
    		printf("%d:      ",myid);
    		for(j=0;j<max_cols_a;j++){
    			printf("%d ",A[i*max_cols_a+j]);
    		}
    		printf("
    ");
    	}
    	printf("I am proc %d
    ",myid);
    	for(i=0;i<max_rows_b;i++){
    		printf("%d:      ",myid);
    		for(j=0;j<max_cols_b;j++){
    			printf("%d ",B[i*max_cols_b+j]);
    		}
    		printf("
    ");
    	}*/
    
    	
    	/*compute C=A*B by Cannon algorithm*/
    	/*矩阵块必须定位对齐,先做预处理*/	
    	shuffle(A,buf_A,buf_A_size,B,buf_B,buf_B_size,root,myid);
    	elapsed_time=MPI_Wtime();
    	/*包含cannon全部内容*/
    	cannon(A,buf_A,buf_A_size,B,buf_B,buf_B_size,
    		C,buf_C_size,max_rows_a,max_cols_a,max_cols_b,root,myid);
    	MPI_Barrier(MPI_COMM_WORLD);
    	elapsed_time=MPI_Wtime()-elapsed_time;/*统计cannon算法实际耗时*/
    	
    
    	MPI_Barrier(MPI_COMM_WORLD);/*等待所有进程完成cannon算法,将结果发送给进程0*/
    
    
    	int fsize_c=sizeof(int)*n1*n3;
    	if(myid==0){
    		/*进程0创建文件写句柄,准备将计算结果写入文件中*/
    		if(!(fhc=fopen(argv[3],"w"))){
    			printf("Cant't open file %s
    ",argv[3]);
    			MPI_Finalize();
    		}
    		fstream_c=(int*)malloc(fsize_c);
    		/*进程0 接收来自各个进程的结果矩阵,拼接成一个完整的结果,写入文件,持久化数据结果*/
    		gather_matrix(fstream_c,n1,n3,C,root,fhc);
    	}
    	
    	MPI_Barrier(MPI_COMM_WORLD);    /*make sure proc 0 read all it needs*/
    
    	if(myid==0){
    		int i,j;
    		printf("Cannon algorithm :multiply a %d* %d with a %d*%d, use %lf(s)
    ",
    			n1,n2,n2,n3,elapsed_time);
    		/*printf("I have finished!
    ");*/
    		fclose(fhc);/*关闭文件读写句柄*/
    		/*释放申请的内存空间*/
    		free(fstream_a);
    		free(fstream_b);
    		free(fstream_c);
    	}
    
    	/*释放申请的内存空间*/
    	free(A);free(buf_A);
    	free(B);free(buf_B);
    	free(C);
    	MPI_Finalize();
    	return 0;
    }
    
  • 相关阅读:
    SQL 左外连接,右外连接,全连接,内连接
    CentOS 7.0默认使用的是firewall作为防火墙
    Python-WEB自动化-三种等待方式
    Python + Selenium的web自动化框架
    BS架构和CS架构的区别
    ERROR 1062 (23000): Duplicate entry '%-root' for key 'PRIMARY'
    CentOS 7镜像下载
    CentOS7各个版本镜像下载地址
    CentOS7的Minimal版本安装
    Python 文字转语音
  • 原文地址:https://www.cnblogs.com/ZJUT-jiangnan/p/3934557.html
Copyright © 2011-2022 走看看