zoukankan      html  css  js  c++  java
  • 多线程外排序解决大数据排序问题2(最小堆并行k路归并)

    转自:AIfred

    事实证明外排序的效率主要依赖于磁盘,归并阶段采用K路归并可以显著减少IO量,最小堆并行k路归并,效率倍增。

    二路归并的思路会导致非常多冗余的磁盘访问,两组两组合并确定的是当前的相对位置并不能一次确定最终的位置。

    K路归并,每一轮归并直接确定的是最终的位置,不用重复访问,减少IO。该排序算法需要对每个整数做2次磁盘读和2次磁盘写。

    摘自维基百科:

    外排序的一个例子是外归并排序(External merge sort),它读入一些能放在内存内的数据量,在内存中排序后输出为一个顺串(即是内部数据有序的临时文件),处理完所有的数据后再进行归并。比如,要对900MB 的数据进行排序,但机器上只有100 MB的可用内存时,外归并排序按如下方法操作:

    1. 读入100 MB的数据至内存中,用某种常规方式(如快速排序堆排序归并排序等方法)在内存中完成排序。
    2. 将排序完成的数据写入磁盘。
    3. 重复步骤1和2直到所有的数据都存入了不同的100 MB的块(临时文件)中。在这个例子中,有900 MB数据,单个临时文件大小为100 MB,所以会产生9个临时文件。
    4. 读入每个临时文件(顺串)的前10 MB( = 100 MB / (9块 + 1))的数据放入内存中的输入缓冲区,最后的10 MB作为输出缓冲区。(实践中,将输入缓冲适当调小,而适当增大输出缓冲区能获得更好的效果。)
    5. 执行九路归并算法,将结果输出到输出缓冲区。一旦输出缓冲区满,将缓冲区中的数据写出至目标文件,清空缓冲区。一旦9个输入缓冲区中的一个变空,就从这个缓冲区关联的文件,读入下一个10M数据,除非这个文件已读完。这是“外归并排序”能在主存外完成排序的关键步骤 -- 因为“归并算法”(merge algorithm)对每一个大块只是顺序地做一轮访问(进行归并),每个大块不用完全载入主存。

    算法思路:

    1. 二分文件位置,选取每一个文件的枢轴,将每一个文件划分为thread个片段,使得每一个thread处理所有文件片段和相对均衡。

    2. 然后用每一个线程各自处理属于他们的K个文件片段,规模为K的最小堆维护K路归并,构造一个大小为k的堆,先将k个节点的头元素插入到堆中,然后每次取出头结点,取出来的元素属于哪个子数组,再添加这个子数组的下一个元素进入堆中,来维护这个堆。这里的排序结果也是最后的排序结果,直接输出到文件。多线程并行处理。

    用到知识点:

    1. 数据并行拆分: (partition_and_sort)

    • 切分的大小符合内存大小限制。
    • 禁止拆分数据线程间的依赖。
    • 汇总时处理并发冲突,原子操作。

    2. k路归并堆排序:(heapsort)

    构造一个大小为k的最小二叉堆,先将k个节点的头元素插入到堆中,然后每次取出头结点,取出来的元素属于哪个子数组,再添加这个子数组的下一个元素进入堆中,来维护这个堆。这里的排序结果也是最后的排序结果,减少IO操作

    3. 缓冲区(buffer)

    如果将每个节点的最小值放入内存,例如2,1,3,4放入内存,但是把最小值1拿掉之后需要补充一个元素,将外部内存的2拿到内存里来,可是外部内存可能在硬盘或网络,此过程相比内存操作会慢很多,不断读取外部内存效率很低,所以采用缓冲区,每次读取k个节点前部分数据到内存缓冲区(几k或几M)。

      1 #include <stdio.h>
      2 #include <cstring>
      3 #include <string>
      4 #include <atomic>
      5 #include <queue>
      6 #include <vector>
      7 #include <Windows.h>
      8 #include <ppl.h>
      9 #include <functional>
     10 #include <io.h>
     11 #include <time.h>
     12 #define MAX_THREADS 4
     13 #define MAX_K 100
     14 using namespace std;
     15 using namespace concurrency;
     16 const int dx = 20;//并行快速排序的dx优化
     17 const long long PARTITION_SIZE = 100000000;
     18 const long long BUFFER_SIZE = 200000;
     19 const long long EACH_NUM = (PARTITION_SIZE / MAX_THREADS);
     20 int parts, heapsize[MAX_THREADS];
     21 long long data_size;
     22 mutex m;
     23 typedef pair<int, int> node; // (int,文件id)
     24 // 每一个线程维护的最小堆,堆大小是文件数,K路归并
     25 node heap[MAX_THREADS][MAX_K + 10]; 
     26 
     27 void parallel_qsort(int *begin, int *end) {//并行快速排序
     28     if (begin >= end - 1) return;
     29     int *key = rand() % (end - begin) + begin;
     30     swap(*key, *begin);
     31     int *i = begin, *j = begin;
     32     for (key = begin; j < end; j++) {
     33         if (*j < *key) {
     34             i++;
     35             swap(*i, *j);
     36         }
     37     }
     38     swap(*begin, *i);
     39     if (i - begin > dx && end - i > dx) {//dx优化
     40         parallel_for(0, 2, [&](int x) {
     41             if (x) parallel_qsort(begin, i);
     42             else parallel_qsort(i + 1, end);
     43         });
     44     } else {
     45         parallel_qsort(begin, i);
     46         parallel_qsort(i + 1, end);
     47     }
     48 }
     49 
     50 // 添加新元素,向上找到合适的插入位置
     51 inline void up(int idx, int x) {
     52     int fa = x >> 1; node tmp = heap[idx][x];
     53     while (fa) {
     54         if (tmp < heap[idx][fa])//cmp
     55             heap[idx][x] = heap[idx][fa];
     56         else break;
     57         x = fa; fa = x >> 1;
     58     }
     59     heap[idx][x] = tmp;
     60 }
     61 
     62 // 向下找到合适的插入位置
     63 inline void down(int idx, int x) {
     64     int ch = x << 1; node tmp = heap[idx][x];
     65     while (ch <= heapsize[idx]) {
     66         if (ch < heapsize[idx] && heap[idx][ch + 1] < heap[idx][ch]) ch++;//cmp
     67         if (heap[idx][ch] < tmp)//cmp
     68             heap[idx][x] = heap[idx][ch];
     69         else break;
     70         x = ch; ch = x << 1;
     71     }
     72     heap[idx][x] = tmp;
     73 }
     74 
     75 inline void push(int idx, node val) { // 向最小堆插入元素
     76     heap[idx][++heapsize[idx]] = val;
     77     up(idx, heapsize[idx]);
     78 }
     79 inline node top(int idx) { return heap[idx][1]; }
     80 inline void pop(int idx) { // pop堆顶最小元素
     81     heap[idx][1] = heap[idx][heapsize[idx]--];
     82     down(idx, 1);
     83 }
     84 
     85 inline void ch_size(string file_name, fpos_t size) {
     86     FILE *fout = fopen(file_name.c_str(), "wb");
     87     _chsize_s(fileno(fout), size * sizeof(int));
     88     fclose(fout);
     89 }
     90 inline int seek_dat(FILE* &f, fpos_t pos) {
     91     int *get = new int;
     92     pos *= sizeof(int);
     93     fsetpos(f, &pos);
     94     fread(get, sizeof(int), 1, f);
     95     int tmp = *get; delete get;
     96     return tmp;
     97 }
     98 
     99 void partition_and_sort(string in_file) {
    100     int *arr = new int[PARTITION_SIZE];
    101     for (long long i = 0; i < (data_size - 1) / PARTITION_SIZE + 1; i++) {
    102         atomic_int each_get[MAX_THREADS + 1] = {};
    103         string tmp_file = "temp\part" + to_string(i) + ".dat";
    104         clock_t start = clock();
    105         cout << "Reading part " << i << "...";
    106         parallel_for(0, MAX_THREADS, [&](long long x) {
    107             FILE* fin = fopen(in_file.c_str(), "rb");
    108             fpos_t pos = (PARTITION_SIZE * i + EACH_NUM * x) * sizeof(int);
    109             if (fsetpos(fin, &pos) == 0)
    110                 each_get[x] = fread(arr + EACH_NUM * x, sizeof(int), EACH_NUM, fin);
    111             each_get[MAX_THREADS] += each_get[x];
    112             fclose(fin);
    113         });
    114         cout << "
    Sorting part " << i << "...";
    115         parallel_qsort(arr, arr + each_get[MAX_THREADS]); // 并行快速排序
    116         cout << "
    Writing part " << i << "...";
    117         ch_size(tmp_file, each_get[MAX_THREADS]);
    118         parallel_for(0, MAX_THREADS, [&](long long x) {
    119             FILE* fout = fopen(tmp_file.c_str(), "rb+");
    120             fpos_t pos = EACH_NUM * x * sizeof(int);
    121             if (fsetpos(fout, &pos) == 0)
    122                 fwrite(arr + EACH_NUM * x, sizeof(int), each_get[x], fout);
    123             fclose(fout);
    124         });
    125         clock_t end = clock();
    126         cout << "
    Part " << i << " established. Time usage = " << end - start << "ms.
    ";
    127     }
    128     delete[] arr;
    129 }
    130 
    131 void merge_file() {
    132     FILE* fin[MAX_K] = {};
    133     fpos_t size[MAX_K] = {}, seek_pos[MAX_THREADS + 1][MAX_K + 1] = {};
    134     for (int i = 0; i < parts; i++) {
    135         fin[i] = fopen(("temp\part" + to_string(i) + ".dat").c_str(), "rb");
    136         fseek(fin[i], 0, SEEK_END);
    137         fgetpos(fin[i], &size[i]);
    138         size[i] /= sizeof(int); // 有多少数
    139         seek_pos[MAX_THREADS][parts] += (seek_pos[MAX_THREADS][i] = size[i]); // seek_pos[线程id][文件id] = 文件位置
    140     }
    141     cout << "
    Initializing merging operation...
    ";
    142     for (long long i = 1; i < MAX_THREADS; i++) {
    143         fpos_t l0 = 0, r0 = size[0] - 1;
    144         while (r0 - l0 > 1) {  // 二分文件0的位置
    145             seek_pos[i][parts] = seek_pos[i][0] = (l0 + r0) / 2;
    146             int get0 = seek_dat(fin[0], seek_pos[i][0]);
    147             for (int idx = 1; idx < parts; idx++) {
    148                 fpos_t l = 0, r = size[idx];
    149                 while (r - l > 0) { // 二分其他文件的位置,找到get0
    150                     seek_pos[i][idx] = (l + r) / 2;
    151                     int get = seek_dat(fin[idx], seek_pos[i][idx]);
    152                     if (get0 <= get) r = seek_pos[i][idx];
    153                     else l = seek_pos[i][idx] + 1;
    154                 }
    155                 seek_pos[i][parts] += (seek_pos[i][idx] = r);
    156             }
    157             // 二分文件0位置的目的是使得分治的较为均衡,所有文件相对片段长度之和接近于 data_size / MAX_THREADS
    158             if (seek_pos[i][parts] * MAX_THREADS < data_size * i) l0 = seek_pos[i][0] + 1;
    159             else r0 = seek_pos[i][0] - 1;
    160         }
    161     }
    162     for (int i = 0; i < parts; i++) fclose(fin[i]);
    163     clock_t start = clock(); atomic_llong all_write = 0;
    164     parallel_for(0, MAX_THREADS, [&](int x) { // 线程处理外循环
    165         FILE *fin[MAX_K] = {}, *fout = fopen("ans.dat", "rb+");
    166         fpos_t fpos = seek_pos[x][parts] * sizeof(int);
    167         fsetpos(fout, &fpos);
    168         int **buf = new int*[MAX_K + 1];// 开文件数个buffer,K路归并
    169         for (int i = 0; i <= MAX_K; i++) buf[i] = new int[BUFFER_SIZE];
    170         int pos[MAX_K + 1] = {};//buffer pos
    171         fpos_t all[MAX_K] = {};
    172         for (int i = 0; i < parts; i++) {
    173             fin[i] = fopen(("temp\part" + to_string(i) + ".dat").c_str(), "rb");
    174             fpos = seek_pos[x][i] * sizeof(int);
    175             fsetpos(fin[i], &fpos);
    176             all[i] = seek_pos[x + 1][i] - seek_pos[x][i]; // 记录每一个文件一个线程处理的长度
    177             fread(buf[i], sizeof(int), BUFFER_SIZE, fin[i]);
    178         }
    179         for (int i = 0; i < parts; i++) {
    180             // 向最小堆中读入所有文件属于该线程处理的部分的第一个元素
    181             push(x, node(buf[i][0], i)); //(线程id,pair(buffer,文件id))
    182             pos[i] = 1; all[i]--;
    183         }
    184         while (heapsize[x]) {
    185             // buf[parts]: k路归并排好序的缓冲区
    186             if (pos[parts] == BUFFER_SIZE) {
    187                 fwrite(buf[parts], sizeof(int), BUFFER_SIZE, fout);
    188                 all_write += BUFFER_SIZE;
    189                 if (all_write % 1000000 == 0) {
    190                     m.lock();
    191                     cout << "
    Start merging... " << (all_write * 100) / data_size
    192                          << "% completed.";
    193                     m.unlock();
    194                 }
    195                 pos[parts] = 0;
    196             }
    197             int bel = top(x).second;
    198             buf[parts][pos[parts]++] = top(x).first;
    199             if (all[bel]) {
    200                 heap[x][1] = node(buf[bel][pos[bel]], bel); down(x, 1);// 该buffer的新元素替换heap的顶部最小元素
    201                 if ((++pos[bel]) == BUFFER_SIZE) {
    202                     fread(buf[bel], sizeof(int), BUFFER_SIZE, fin[bel]);
    203                     pos[bel] = 0;
    204                 }
    205                 all[bel]--;
    206             } else pop(x); // 该文件属于线程x的部分全部处理完了,就直接pop
    207         }
    208         fwrite(buf[parts], sizeof(int), pos[parts], fout); // 把余下排好序的buffer写入文件
    209         cout << "
    Start merging... 100% completed.";
    210         for (int i = 0; i < parts; i++) fclose(fin[i]); fclose(fout);
    211         for (int i = 0; i < MAX_K; i++) delete[] buf[i]; delete[] buf;
    212     });
    213     clock_t end = clock();
    214     cout << "
    Merging finished. Time usage = " << end - start << "ms.
    ";
    215 }
    216 
    217 
    218 int main() {
    219     string in_file;
    220     cout << "Enter data file name: ";
    221     cin >> in_file;
    222     FILE* fin = fopen(in_file.c_str(), "rb");
    223     if (fin == NULL) {
    224         cout << "Could not open that file.
    ";
    225         main();
    226     }
    227     clock_t start_time = clock();
    228     fseek(fin, 0, SEEK_END);
    229     fgetpos(fin, &data_size);
    230     data_size /= sizeof(int);
    231     parts = (data_size - 1) / PARTITION_SIZE + 1;
    232     fclose(fin);
    233     cout << "
    Partitioning " << data_size << " elements(int)...
    ";
    234     system("mkdir temp");
    235     parallel_for(0, 2, [&](int x) {
    236         if (x) partition_and_sort(in_file);
    237         else ch_size("ans.dat", data_size);
    238     });
    239     merge_file();
    240     clock_t end_time = clock();
    241     system("rd /s/q temp");
    242     cout << "
    External sorting complete, result saved to "ans.dat".
    "
    243          << "Time usage = " << end_time - start_time << "ms.
    ";
    244     system("pause");
    245     return 0;
    246 }

    参考:

    简单无堆无缓冲区单线程K路归并版本: https://www.cnblogs.com/this-543273659/archive/2011/07/30/2122083.html

    wiki:外排序

  • 相关阅读:
    Ubuntu 16.04下SecureCRT无法输入中文的解决思路
    Ubuntu 16.04安装SecureCRT替代XShell
    SVN提交时报错:Commit blocked by pre-commit hook (exit code 1) with no output.
    Spring Cloud ZooKeeper集成Feign的坑2,服务调用了一次后第二次调用就变成了500,错误:Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is com.n
    Spring Cloud ZooKeeper集成Feign的坑1,错误:Consider defining a bean of type 'org.springframework.web.client.RestTemplate' in your configuration.
    Ubuntu 16.04 GNOME下解决Sublime Text3中文输入(ibus)(转)
    Spring Cloud简介/版本选择/ZooKeeper例子搭建简单说明
    Maven奇怪的问题,当找不到Maven输出的提示错误时可以试下这个方法
    Ubuntu 16.04系统启动时卡在:(initramfs)
    Eclipse创建Maven多模块工程
  • 原文地址:https://www.cnblogs.com/demian/p/9593197.html
Copyright © 2011-2022 走看看