海量数据多路归并排序的c++实现(归并时利用了败者树) - harryshayne - 博客园
海量数据多路归并排序的c++实现(归并时利用了败者树)
问题:如何给10^7个数据量的磁盘文件排序(《编程珠玑》第一章)
下面的问题描述及相关文字都参考于CSDN中JULY的博客,在此对JULY表示感谢。JULY的博客地址如下:
http://blog.csdn.net/v_JULY_v/article/details/6451990
1、问题描述:
输入:一个最多含有n个不重复的正整数(也就是说可能含有少于n个不重复正整数)的文件,其中每个数都小于等于n,且n=10^7。
输出:得到按从小到大升序排列的包含所有输入的整数的列表。
条件:最多有大约1MB的内存空间可用,但磁盘空间足够。且要求运行时间在5分钟以下,10秒为最佳结果。2、算法描述:
在编程珠玑中,描述了三种解决方法,分别是外排多路归并法、多通道排序法和位图排序法,在待排序文件中不含重复数的情况下,位图排序法是最高效的,但在更一般的情况下,外排多路归并法具有通用性,因此,本文描述这种外排法。
假设文件中整数个数为N(N是亿级的),整数之间用空格分开。首先分多次从该文件中读取M(十万级)个整数,每次将M个整数在内存中使用内部排序之后存入临时文件,这样就得到多个外部文件,对应于多个外部文件,我们可以利用多路归并将各个临时文件中的数据一边读入内存,一边进行归并输出到输出文件。显然,该排序算法需要对每个整数做2次磁盘读和2次磁盘写。(如果根据初始外部文件的个数设置归并的路数,则会对每个整数做多次读/写,具体次数可参考严蔚敏书籍)
本算法的流程图如下:
3、算法实现:
下面是算法的具体实现,这是针对JULY算法的修改,因为在JULY的算法中,并没有利用败者树来进行归并排序,在每次选择多路文件在数组中的元素的最小值的时候,只是简单地遍历数组来进行选择,没有利用败者树来进行选择,因此在效率上会有一定的差别(对磁盘的访问效率是一样的,不同的是内部选择的时候)。
View Code1 //copyright@ 纯净的天空 && yansha
2 //5、July,updated,2010.05.28。
3 //harryshayne,update again。2011.6.30
4 #include <iostream>
5 #include <ctime>
6 #include <fstream>
7 //#include "ExternSort.h"
8 using namespace std;
9
10 //使用多路归并进行外排序的类
11 //ExternSort.h
12
13 /*
14 * 大数据量的排序
15 * 多路归并排序
16 * 以千万级整数从小到大排序为例
17 * 一个比较简单的例子,没有建立内存缓冲区
18 */
19
20 #ifndef EXTERN_SORT_H
21 #define EXTERN_SORT_H
22
23 #include <cassert>
24 //#define k 5
25 #define MIN -1//这里开始的时候出现了一个BUG,如果定义的MIN大于等于待排序的数,则会是算法出现错误
26 #define MAX 10000000//最大值,附加在归并文件结尾
27 typedef int* LoserTree;
28 typedef int* External;
29
30 class ExternSort
31 {
32 public:
33 void sort()
34 {
35 time_t start = time(NULL);
36
37 //将文件内容分块在内存中排序,并分别写入临时文件
38 k = memory_sort(); //
39
40 //归并临时文件内容到输出文件
41 //merge_sort(file_count);
42 ls=new int[k];
43 b=new int[k+1];
44 K_Merge();
45 delete []ls;
46 delete []b;
47
48 time_t end = time(NULL);
49 printf("total time:%f\n", (end - start) * 1000.0/ CLOCKS_PER_SEC);
50 }
51
52 //input_file:输入文件名
53 //out_file:输出文件名
54 //count: 每次在内存中排序的整数个数
55 ExternSort(const char *input_file, const char * out_file, int count)
56 {
57 m_count = count;
58 m_in_file = new char[strlen(input_file) + 1];
59 strcpy(m_in_file, input_file);
60 m_out_file = new char[strlen(out_file) + 1];
61 strcpy(m_out_file, out_file);
62 }
63 virtual ~ExternSort()
64 {
65 delete [] m_in_file;
66 delete [] m_out_file;
67 }
68
69 private:
70 int m_count; //数组长度
71 char *m_in_file; //输入文件的路径
72 char *m_out_file; //输出文件的路径
73 int k;//归并数,此数必须要内排序之后才能得到,所以下面的ls和b都只能定义为指针(注意和书上区别)
74 LoserTree ls;//定义成为指针,之后动态生成数组
75 External b;//定义成为指针,在成员函数中可以把它当成数组使用
76 //int External[k];
77 protected:
78 int read_data(FILE* f, int a[], int n)
79 {
80 int i = 0;
81 while(i < n && (fscanf(f, "%d", &a[i]) != EOF)) i++;
82 printf("read:%d integer\n", i);
83 return i;
84 }
85 void write_data(FILE* f, int a[], int n)
86 {
87 for(int i = 0; i < n; ++i)
88 fprintf(f, "%d ", a[i]);
89 fprintf(f,"%d",MAX);//在最后写上一个最大值
90 }
91 char* temp_filename(int index)
92 {
93 char *tempfile = new char[100];
94 sprintf(tempfile, "temp%d.txt", index);
95 return tempfile;
96 }
97 static int cmp_int(const void *a, const void *b)
98 {
99 return *(int*)a - *(int*)b;
100 }
101
102 int memory_sort()
103 {
104 FILE* fin = fopen(m_in_file, "rt");
105 int n = 0, file_count = 0;
106 int *array = new int[m_count];
107
108 //每读入m_count个整数就在内存中做一次排序,并写入临时文件
109 while(( n = read_data(fin, array, m_count)) > 0)
110 {
111 qsort(array, n, sizeof(int), cmp_int);
112 //这里,调用了库函数阿,在第四节的c实现里,不再调用qsort。
113 char *fileName = temp_filename(file_count++);
114 FILE *tempFile = fopen(fileName, "w");
115 free(fileName);
116 write_data(tempFile, array, n);
117 fclose(tempFile);
118 }
119
120 delete [] array;
121 fclose(fin);
122
123 return file_count;
124 }
125
126 void Adjust(int s)
127 {//沿从叶子节点b[s]到根节点ls[0]的路径调整败者树
128 int t=(s+k)/2;//ls[t]是b[s]的双亲节点
129 while(t>0)
130 {
131 if(b[s]>b[ls[t]])//如果失败,则失败者位置s留下,s指向新的胜利者
132 {
133 int tmp=s;
134 s=ls[t];
135 ls[t]=tmp;
136 }
137 t=t/2;
138 }
139 ls[0]=s;//ls[0]存放调整后的最大值的位置
140 }
141
142 void CreateLoserTree()
143 {
144 b[k]=MIN;//额外的存储一个最小值
145 for(int i=0;i<k;i++)ls[i]=k;//先初始化为指向最小值,这样后面的调整才是正确的
146 //这样能保证非叶子节点都是子树中的“二把手”
147 for(i=k-1;i>=0;i--)
148 Adjust(i);//依次从b[k-1],b[k-2]...b[0]出发调整败者树
149 }
150
151 void K_Merge()
152 {//利用败者数把k个输入归并段归并到输出段中
153 //b中前k个变量存放k个输入段中当前记录的元素
154 //归并临时文件
155 FILE *fout = fopen(m_out_file, "wt");
156 FILE* *farray = new FILE*[k];
157 int i;
158 for(i = 0; i < k; ++i) //打开所有k路输入文件
159 {
160 char* fileName = temp_filename(i);
161 farray[i] = fopen(fileName, "rt");
162 free(fileName);
163 }
164
165 for(i = 0; i < k; ++i) //初始读取
166 {
167 if(fscanf(farray[i], "%d", &b[i]) == EOF)//读每个文件的第一个数到data数组
168 {
169 printf("there is no %d file to merge!",k);
170 return;
171 }
172 }
173 // for(int i=0;i<k;i++)input(b[i]);
174
175 CreateLoserTree();
176 int q;
177 while(b[ls[0]]!=MAX)//
178 {
179 q=ls[0];//q用来存储b中最小值的位置,同时也对应一路文件
180 //output(q);
181 fprintf(fout,"%d ",b[q]);
182 //input(b[q],q);
183 fscanf(farray[q],"%d",&b[q]);
184 Adjust(q);
185 }
186 //output(ls[0]);
187 fprintf(fout,"%d ",b[ls[0]]);
188 //delete [] hasNext;
189 //delete [] data;
190
191 for(i = 0; i < k; ++i) //清理工作
192 {
193 fclose(farray[i]);
194 }
195 delete [] farray;
196 fclose(fout);
197 }
198 /*
199 void merge_sort(int file_count)
200 {
201 if(file_count <= 0) return;
202
203 //归并临时文件
204 FILE *fout = fopen(m_out_file, "wt");
205 FILE* *farray = new FILE*[file_count];
206 int i;
207 for(i = 0; i < file_count; ++i)
208 {
209 char* fileName = temp_filename(i);
210 farray[i] = fopen(fileName, "rt");
211 free(fileName);
212 }
213
214 int *data = new int[file_count];//存储每个文件当前的一个数字
215 bool *hasNext = new bool[file_count];//标记文件是否读完
216 memset(data, 0, sizeof(int) * file_count);
217 memset(hasNext, 1, sizeof(bool) * file_count);
218
219 for(i = 0; i < file_count; ++i) //初始读取
220 {
221 if(fscanf(farray[i], "%d", &data[i]) == EOF)//读每个文件的第一个数到data数组
222 hasNext[i] = false;
223 }
224
225 while(true) //循环读取和输出,选择最小数的方法是简单遍历选择法
226 {
227 //求data中可用的最小的数字,并记录对应文件的索引
228 int min = data[0];
229 int j = 0;
230
231 while (j < file_count && !hasNext[j]) //顺序跳过已读取完毕的文件
232 j++;
233
234 if (j >= file_count) //没有可取的数字,终止归并
235 break;
236
237
238 for(i = j +1; i < file_count; ++i) //选择最小数,这里应该是i=j吧!但结果是一样的!
239 {
240 if(hasNext[i] && min > data[i])
241 {
242 min = data[i];
243 j = i;
244 }
245 }
246
247 if(fscanf(farray[j], "%d", &data[j]) == EOF) //读取文件的下一个元素
248 hasNext[j] = false;
249 fprintf(fout, "%d ", min);
250
251 }
252
253 delete [] hasNext;
254 delete [] data;
255
256 for(i = 0; i < file_count; ++i)
257 {
258 fclose(farray[i]);
259 }
260 delete [] farray;
261 fclose(fout);
262 }
263 */
264 };
265
266 #endif
267
268
269 //测试主函数文件
270 /*
271 * 大文件排序
272 * 数据不能一次性全部装入内存
273 * 排序文件里有多个整数,整数之间用空格隔开
274 */
275
276 const unsigned int count = 10000000; // 文件里数据的行数
277 const unsigned int number_to_sort = 100000; //在内存中一次排序的数量
278 const char *unsort_file = "unsort_data.txt"; //原始未排序的文件名
279 const char *sort_file = "sort_data.txt"; //已排序的文件名
280 void init_data(unsigned int num); //随机生成数据文件
281
282 int main(int argc, char* *argv)
283 {
284 srand(time(NULL));
285 init_data(count);
286 ExternSort extSort(unsort_file, sort_file, number_to_sort);
287 extSort.sort();
288 system("pause");
289 return 0;
290 }
291
292 void init_data(unsigned int num)
293 {
294 FILE* f = fopen(unsort_file, "wt");
295 for(int i = 0; i < num; ++i)
296 fprintf(f, "%d ", rand());
297 fclose(f);
298 }下面再献上JULY算法的原版(有些我修改的注释部分没删,目的是为了看代码的时候可以比较一下),读者可以对照关键部分的区别:
View Code1 //copyright@ 纯净的天空 && yansha
2 //5、July,updated,2010.05.28。
3 #include <iostream>
4 #include <ctime>
5 #include <fstream>
6 //#include "ExternSort.h"
7 using namespace std;
8
9 //使用多路归并进行外排序的类
10 //ExternSort.h
11
12 /*
13 * 大数据量的排序
14 * 多路归并排序
15 * 以千万级整数从小到大排序为例
16 * 一个比较简单的例子,没有建立内存缓冲区
17 */
18
19 #ifndef EXTERN_SORT_H
20 #define EXTERN_SORT_H
21
22 #include <cassert>
23 //#define MIN -1//这里开始的时候出现了一个BUG,如果定义的MIN大于等于待排序的数,则会是算法出现错误
24 //#define MAX 10000000//最大值,附加在归并文件结尾
25 //typedef int* LoserTree;
26 //typedef int* External;
27
28 class ExternSort
29 {
30 public:
31 void sort()
32 {
33 time_t start = time(NULL);
34
35 //将文件内容分块在内存中排序,并分别写入临时文件
36 int file_count = memory_sort(); //
37
38 //归并临时文件内容到输出文件
39 merge_sort(file_count);
40 //ls=new int[k];
41 //b=new int[k+1];
42 //K_Merge();
43 //delete []ls;
44 //delete []b;
45
46 time_t end = time(NULL);
47 printf("total time:%f\n", (end - start) * 1000.0/ CLOCKS_PER_SEC);
48 }
49
50 //input_file:输入文件名
51 //out_file:输出文件名
52 //count: 每次在内存中排序的整数个数
53 ExternSort(const char *input_file, const char * out_file, int count)
54 {
55 m_count = count;
56 m_in_file = new char[strlen(input_file) + 1];
57 strcpy(m_in_file, input_file);
58 m_out_file = new char[strlen(out_file) + 1];
59 strcpy(m_out_file, out_file);
60 }
61 virtual ~ExternSort()
62 {
63 delete [] m_in_file;
64 delete [] m_out_file;
65 }
66
67 private:
68 int m_count; //数组长度
69 char *m_in_file; //输入文件的路径
70 char *m_out_file; //输出文件的路径
71 // int k;//归并数,此数必须要内排序之后才能得到,所以下面的ls和b都只能定义为指针
72 // LoserTree ls;//定义成为指针
73 // External b;//定义成为指针,在成员函数中可以把它当成数组使用
74 //int External[k];
75 protected:
76 int read_data(FILE* f, int a[], int n)
77 {
78 int i = 0;
79 while(i < n && (fscanf(f, "%d", &a[i]) != EOF)) i++;
80 printf("read:%d integer\n", i);
81 return i;
82 }
83 void write_data(FILE* f, int a[], int n)
84 {
85 for(int i = 0; i < n; ++i)
86 fprintf(f, "%d ", a[i]);
87 //fprintf(f,"%d",MAX);//在最后写上一个最大值
88 }
89 char* temp_filename(int index)
90 {
91 char *tempfile = new char[100];
92 sprintf(tempfile, "temp%d.txt", index);
93 return tempfile;
94 }
95 static int cmp_int(const void *a, const void *b)
96 {
97 return *(int*)a - *(int*)b;
98 }
99
100 int memory_sort()
101 {
102 FILE* fin = fopen(m_in_file, "rt");
103 int n = 0, file_count = 0;
104 int *array = new int[m_count];
105
106 //每读入m_count个整数就在内存中做一次排序,并写入临时文件
107 while(( n = read_data(fin, array, m_count)) > 0)
108 {
109 qsort(array, n, sizeof(int), cmp_int);
110 //这里,调用了库函数阿,在第四节的c实现里,不再调用qsort。
111 char *fileName = temp_filename(file_count++);
112 FILE *tempFile = fopen(fileName, "w");
113 free(fileName);
114 write_data(tempFile, array, n);
115 fclose(tempFile);
116 }
117
118 delete [] array;
119 fclose(fin);
120
121 return file_count;
122 }
123 /*
124 void Adjust(int s)
125 {
126 int t=(s+k)/2;
127 while(t>0)
128 {
129 if(b[s]>b[ls[t]])//如果失败,则失败者位置s留下,s指向新的胜利者
130 {
131 int tmp=s;
132 s=ls[t];
133 ls[t]=tmp;
134 }
135 t=t/2;
136 }
137 ls[0]=s;
138 }
139
140 void CreateLoserTree()
141 {
142 b[k]=MIN;//额外的存储一个最小值
143 for(int i=0;i<k;i++)ls[i]=k;//先初始化为指向最小值,这样后面的调整才是正确的
144 //这样能保证非叶子节点都是子树中的“二把手”
145 for(i=k-1;i>=0;i--)
146 Adjust(i);//依次从b[k-1],b[k-2]...b[0]出发调整败者树
147 }
148
149 void K_Merge()
150 {//利用败者数把k个输入归并段归并到输出段中
151 //b中前k个变量存放k个输入段中当前记录的元素
152 //归并临时文件
153 FILE *fout = fopen(m_out_file, "wt");
154 FILE* *farray = new FILE*[k];
155 int i;
156 for(i = 0; i < k; ++i) //打开所有k路输入文件
157 {
158 char* fileName = temp_filename(i);
159 farray[i] = fopen(fileName, "rt");
160 free(fileName);
161 }
162
163 // int *data = new int[file_count];//存储每个文件当前的一个数字
164 //bool *hasNext = new bool[k];//标记文件是否读完
165 // memset(data, 0, sizeof(int) * file_count);
166 // memset(hasNext, 1, sizeof(bool) * file_count);
167
168 for(i = 0; i < k; ++i) //初始读取
169 {
170 if(fscanf(farray[i], "%d", &b[i]) == EOF)//读每个文件的第一个数到data数组
171 // hasNext[i] = false;
172 {
173 printf("there is no %d file to merge!");
174 return;
175 }
176 }
177 // for(int i=0;i<k;i++)input(b[i]);
178
179 CreateLoserTree();
180 int q;
181 while(b[ls[0]]!=MAX)//
182 {
183 q=ls[0];//q用来存储b中最小值的位置,同时也对应一路文件
184 //output(q);
185 fprintf(fout,"%d ",b[q]);
186 //input(b[q],q);
187 fscanf(farray[q],"%d",&b[q]);
188 Adjust(q);
189 }
190 //output(ls[0]);
191 fprintf(fout,"%d ",b[ls[0]]);
192 //delete [] hasNext;
193 //delete [] data;
194
195 for(i = 0; i < k; ++i) //清理工作
196 {
197 fclose(farray[i]);
198 }
199 delete [] farray;
200 fclose(fout);
201 }
202 */
203 void merge_sort(int file_count)
204 {
205 if(file_count <= 0) return;
206
207 //归并临时文件
208 FILE *fout = fopen(m_out_file, "wt");
209 FILE* *farray = new FILE*[file_count];
210 int i;
211 for(i = 0; i < file_count; ++i)
212 {
213 char* fileName = temp_filename(i);
214 farray[i] = fopen(fileName, "rt");
215 free(fileName);
216 }
217
218 int *data = new int[file_count];//存储每个文件当前的一个数字
219 bool *hasNext = new bool[file_count];//标记文件是否读完
220 memset(data, 0, sizeof(int) * file_count);
221 memset(hasNext, 1, sizeof(bool) * file_count);
222
223 for(i = 0; i < file_count; ++i) //初始读取
224 {
225 if(fscanf(farray[i], "%d", &data[i]) == EOF)//读每个文件的第一个数到data数组
226 hasNext[i] = false;
227 }
228
229 while(true) //循环读取和输出,选择最小数的方法是简单遍历选择法
230 {
231 //求data中可用的最小的数字,并记录对应文件的索引
232 int min = data[0];
233 int j = 0;
234
235 while (j < file_count && !hasNext[j]) //顺序跳过已读取完毕的文件
236 j++;
237
238 if (j >= file_count) //没有可取的数字,终止归并
239 break;
240
241
242 for(i = j +1; i < file_count; ++i) //选择最小数,这里应该是i=j吧!但结果是一样的!
243 {
244 if(hasNext[i] && min > data[i])
245 {
246 min = data[i];
247 j = i;
248 }
249 }
250
251 if(fscanf(farray[j], "%d", &data[j]) == EOF) //读取文件的下一个元素
252 hasNext[j] = false;
253 fprintf(fout, "%d ", min);
254
255 }
256
257 delete [] hasNext;
258 delete [] data;
259
260 for(i = 0; i < file_count; ++i)
261 {
262 fclose(farray[i]);
263 }
264 delete [] farray;
265 fclose(fout);
266 }
267
268 };
269
270 #endif
271
272
273 //测试主函数文件
274 /*
275 * 大文件排序
276 * 数据不能一次性全部装入内存
277 * 排序文件里有多个整数,整数之间用空格隔开
278 */
279
280 const unsigned int count = 10000000; // 文件里数据的行数
281 const unsigned int number_to_sort = 100000; //在内存中一次排序的数量
282 const char *unsort_file = "unsort_data.txt"; //原始未排序的文件名
283 const char *sort_file = "sort_data.txt"; //已排序的文件名
284 void init_data(unsigned int num); //随机生成数据文件
285
286 int main(int argc, char* *argv)
287 {
288 srand(time(NULL));
289 init_data(count);
290 ExternSort extSort(unsort_file, sort_file, number_to_sort);
291 extSort.sort();
292 system("pause");
293 return 0;
294 }
295
296 void init_data(unsigned int num)
297 {
298 FILE* f = fopen(unsort_file, "wt");
299 for(int i = 0; i < num; ++i)
300 fprintf(f, "%d ", rand());
301 fclose(f);
302 }4、测试分析:
先在分别测试上述两个代码,第一组测试的参数如下:
- const unsigned int count = 10000000; // 待排序文件里数据的个数 (JULY的源文件中为文件里数据的行数,有误)
- const unsigned int number_to_sort = 1000000; //在内存中一次排序的数量
- const char *unsort_file = "unsort_data.txt"; //原始未排序的文件名
- const char *sort_file = "sort_data.txt"; //已排序的文件名
- void init_data(unsigned int num); //随机生成数据文件
关键是前两行,一个是待排序数据的总的个数10000000,一个是每次在内存中排序的个数1000000(也就是划分后各个小文件中数据的大小)。于是程序为10路归并排序。
JULY原版代码的运行结果:
败者树版本代码运行结果:
由以上可知,利用了败者树的程序运行速度略高于直接选择版本程序,但是差别不是很大,对于不同的机器来说,差别可能在1到2秒之内,甚至没有差别,读者可以自行测试。
接下来,我们进行第二组测试,测试数据如下
- const unsigned int count = 10000000; // 待排序文件里数据的个数 (JULY的源文件中为文件里数据的行数,有误)
- const unsigned int number_to_sort = 100000; //在内存中一次排序的数量 (注意这里改为十万)
- const char *unsort_file = "unsort_data.txt"; //原始未排序的文件名
- const char *sort_file = "sort_data.txt"; //已排序的文件名
- void init_data(unsigned int num); //随机生成数据文件
关键是前两行,一个是待排序数据的总的个数10000000,一个是每次在内存中排序的个数100000(也就是划分后各个小文件中数据的大小)。于是程序为100路归并排序。
JULY原版代码的运行结果:
败者树版本代码运行结果:
由以上可知,两者的差别明显地体现了出来,败者树版本程序显然快于直接选择版本。
另外还可以看出,10路和100路对于败者树版本来说,耗时差不多,说明败者树版本随路数的增加,耗时的增加相对较缓;而对于直接选择版本来说,耗时会随着路数的增加而增加,至于是线性的还是指数型的读者可以自行验证~~。
5、结论
当多路归并的路数比较小时,败者树的优势体现不出来,但是当路数达到一定规模时,败者树可以显著地减少排序时间,当然,由于败者树只作用于内存的最小关键字选择,所以直接提高的也只是内存的速度而已。但是别忘了,外排时所需读写外存的次数是和归并的次数成正比的,路数越多,归并的次数越少,也就可以间接减少外存读写次数了,所以说,败者树的优势是相当强大的~~
与学习海量数据处理同学共勉之~!