基数排序号称线性时间排序算法中性能最好,速度最快的排序算法。本文将简要概括其算法思想,串行代码及其并行化。
一、基数排序算法简介
1. 算法思想
基数排序属于“分配式排序”(distribution sort),是非比较类线性时间排序的一种,又称“桶子法”(bucket sort),顾名思义,它是透过键值的部分信息,将要排序的元素分配至某些“桶”中,藉以达到排序的作用。
2. 算法过程描述
基数排序(以整形为例),将整形10进制按每位拆分,然后从低位到高位依次比较各个位。主要分为两个过程:
(1)分配,先从个位开始,根据位值(0-9)分别放到0~9号桶中(比如64,个位为4,则放入4号桶中);
(2)收集,再将放置在0~9号桶中的数据按顺序放到数组中;
重复(1)(2)过程,从个位到最高位(比如32位无符号整形最大数4294967296,最高位为第10位)。基数排序的方式可以采用LSD(Least significant digital)或MSD(Most significant digital),LSD的排序方式由键值的最右边开始,而MSD则相反,由键值的最左边开始。
以【520 350 72 383 15 442 352 86 158 352】序列为例,排序过程描述如下:
排序完毕!
3. 复杂度分析
平均时间复杂度:O(dn)(d即表示整形的最高位数)。
空间复杂度:O(10n) (10表示0~9,用于存储临时的序列) 。
稳定性:稳定。
二、算法实现
1. C/C++串行版本
/********************************************************
*函数名称:GetDigitInPos
*参数说明:num 一个整形数据
* pos 表示要获得的整形的第pos位数据
*说明: 找到num的从低到高的第pos位的数据
*********************************************************/
inline int GetDigitInPos(int num,int pos)
{
int temp = 1;
for (int i = 0; i < pos - 1; i++)
temp *= 10;
return (num / temp) % 10;
}
/********************************************************
*函数名称:RadixSort
*参数说明:unorderArray无序数组;
* dataNum无序数据个数
*说明: 基数排序
*时间复杂度:O(dn),d无序数最大位数,n无序数个数
*********************************************************/
#define RADIX 10 //整形排序,基数为10,需要十个桶
#define KEYNUM 10 //关键字位数,这里为整形位数
inline void RadixSort(int* unorderArray, int dataNum)
{
int *radixArrays[RADIX]; //分别为0~9基数的存放空间
for (int i=0; i<10; i++)
{
radixArrays[i] = (int *)malloc(sizeof(int)*(dataNum + 1));
radixArrays[i][0] = 0; //index为0处记录这组数据的个数
}
for (int pos=1; pos<=KEYNUM; pos++) //从个位开始入桶并出桶
{
for (int i=0; i<dataNum; i++) //分配过程
{
int num = GetDigitInPos(unorderArray[i],pos);
int index = ++radixArrays[num][0];
radixArrays[num][index] = unorderArray[i];
}
for (int i=0, j=0; i<RADIX; i++)//收集
{
for (int k = 1; k <= radixArrays[i][0]; k++)
unorderArray[j++] = radixArrays[i][k];
radixArrays[i][0] = 0; //出桶完毕,复位
}
}
}
串行性能在本机测试100 * 1024 * 1024 =100M个32bits整型,需要12.536s,以下是本机软硬件参数,为Linux平台。
2. C/C++并行版本
基于串行版本,在Linux平台利用Pthreads实现多线程并行执行,提升基数排序的性能。
2.1并行思路
将待排序数组逻辑分块,将每个块分配给不同的线程执行,达到并行的效果。待各个块内排好序后,扫描各个块,进行整合,实现总体排序。
2.2实现代码
串行代码稍作修改,传入偏移量,用于逻辑分块,修改如下:
/********************************************************
*函数名称:RadixSort
*参数说明:unorderArray无序数组;
* dataNum块内数据个数
*说明: 基数排序
*时间复杂度:O(dn),d无序数最大位数,n无序数个数
*********************************************************/
inline void RadixSort(int* unorderArray,int offset,int dataNum)
{
int *radixArrays[RADIX]; //分别为0~9基数的存放空间
for (int i=0; i<10; i++)
{
radixArrays[i] = (int *)malloc(sizeof(int)*(dataNum + 1));
radixArrays[i][0] = 0; //index为0处记录这组数据的个数
}
for (int pos=1; pos<=KEYNUM; pos++) //从个位开始入桶并出桶
{
for (int i=0; i<dataNum; i++) //分配过程
{
int num = GetDigitInPos(unorderArray[i+offset],pos);
int index = ++radixArrays[num][0];
radixArrays[num][index] = unorderArray[i+offset];
}
for (int i=0, j=0; i<RADIX; i++)//收集
{
for (int k = 1; k <= radixArrays[i][0]; k++)
unorderArray[offset+j++] = radixArrays[i][k];
radixArrays[i][0] = 0; //出桶完毕,复位
}
}
}
加入线程函数,分配给各个线程,用于调用基数排序。
#define DataNum 100*1024*1024
int threadNum=0;
/****************************************
*函数名称:radix_exec
*参数: para指针,用于接收线程下边,表示第几个线程
*说明: 调用基数排序
*****************************************/
void* radix_exec(void *para)
{
int threadIndex=*(int*)para;
int blockLen=DataNum/threadNum;
int offset=threadIndex*blockLen;
RadixSort(randInt,offset,blockLen);
}
对块内有序的数组将各个块扫面归并,所需函数:
/***********************************************
*函数名称:mergeBlocks
*参数: pDataArray:块内有序的数组 arrayLen:数组长度
* blockNum:块数 resultArray:存放排序的结果
*说明: 合并有序的块
************************************************/
inline void mergeBlocks(int* const pDataArray,int arrayLen,const int blockNum,int* const resultArray)
{
int blockLen=arrayLen/blockNum;
int blockIndex[blockNum];//各个块中元素在数组中的下标,VC可能不支持变量作为数组的长度,解决办法可使用宏定义
for(int i=0;i<blockNum;++i)//初始化块内元素起始下标
{
blockIndex[i]=i*blockLen;
}
int smallest=0;
for(int i=0;i<arrayLen;++i)//扫描所有块内的所有元素
{
for(int j=0;j<blockNum;++j)//以第一个未扫描完的块内元素作为最小数
{
if(blockIndex[j]<(j*blockLen+blockLen))
{
smallest=pDataArray[blockIndex[j]];
break;
}
}
for(int j=0;j<blockNum;++j)//扫描各个块,寻找最小数
{
if((blockIndex[j]<(j*blockLen+blockLen))&&(pDataArray[blockIndex[j]]<smallest))
{
smallest=pDataArray[blockIndex[j]];
}
}
for(int j=0;j<blockNum;++j)//确定哪个块内元素下标进行自增
{
if((blockIndex[j]<(j*blockLen+blockLen))&&(pDataArray[blockIndex[j]]==smallest))
{
++blockIndex[j];
break;
}
}
resultArray[i]=smallest;//本次循环最小数放入结果数组
}
}
main函数中创建多线程完成并行排序,代码如下:
int main(int argc,char* argv[])
{
int threadBum=8;
int blockNum=threadNum;
struct timeval ts,te;
srand(time(NULL));
cout<<"RAND_MAX:"<<RAND_MAX<<endl;
for(int i=0;i<DataNum;++i)
{
randInt[i]=rand();
}
pthread_t tid[blockNum],ret[blockNum],threadIndex[blockNum];
//--------Radix sort-------
gettimeofday(&ts,NULL);
for(int i = 0; i < threadNum; ++i)
{
threadIndex[i]=i;
ret[i] = pthread_create(&tid[i], NULL,radix_exec,(void *)(threadIndex+i));
if(ret[i] != 0){
cout<<"thread "<<i<<" create error!"<<endl;
break;
}
}
for(int i = 0; i <threadNum; ++i)
{
pthread_join(tid[i], NULL);
}
mergeBlocks(randInt,DataNum,threadNum,resultInt);
gettimeofday(&te,NULL);
cout<<"RadixSort time: "<<(te.tv_sec-ts.tv_sec)*1000+(te.tv_usec-ts.tv_usec)/1000<<"ms"<<endl;
}
8线程情况下,测试性能为5.123s,加速比2.45。针对机器的缓存大小,通过提高缓存命中率,可继续进行算法优化,提高排序性能。
3.编译参数简介
Linux使用icpc编译器进行编译,可换g++进行编译。编译命令及参数如下:
icpc -std=c++11 -vec-report -O2 -o RadixSort radixSort.cpp
-std=c++11:采用C++2011标准
-vec-report:打印向量优化情况
-O2:采用二级优化,内含SSE向量优化
-o:指明可执行程序名称
参考文献
[1]Nadathur Satish,etc., Fast Sort on CPUs and GPUs: A Case for Bandwidth Oblivious SIMD Sort, SIGMOD’10, 2010
[2]http://blog.csdn.net/cjf_iceking/article/details/7943609#comments
版权声明:本文为博主原创文章,未经博主允许不得转载。