实习生活告一段落,我正式从一名.NET程序员转入Java阵营,不得不说刚开始用Java的东西是多么的不习惯,但是经过三个月的使用与开发,我也发现了Java的优势:不在于语言,而在于开源。这意味着有更多免费可用的东西,直接复用,但是研究它的人也可以通过代码深造自己的技术水平。
题外话说到这吧,很简单的一个问题,读取一个大型文件(可能超过内存),分析其中英文单词的词频,并输出结果。简化起见,我们假定编码不是Unicode,而是UTF-8或者ANSI,最快速度,榨干磁盘IO是关键所在。
1、最开始
一般来说,遇到这个问题,我们可能想法都是这样:
-------------- ---------
| File | --> | Buffer | --> 分析 --> 结果
-------------- ---------
2、多线程
再下来就可以开辟多个线程,分段读取文件,并以同步的方式将分析结果保存起来:
---------
| Buffer | --> 分析
/ ---------
-------------- ---------
| File | --> | Buffer | --> 分析 --> 结果
-------------- --------- /
.... --> ...
这时有一个问题,如果保证分段时单词不被割断,如段落:...i love you...,如果正好选取在v上,那么love将会被拆成两个单词。对应的,我的解决办法是后搜机制:分割时,往后读char,直到遇到非字母、数字时认为分割完成。
另外一个问题,结果的保存有两种方式,一种是同步机制,这会影响性能,但占用的内存空间小;另一种是各个线程维护一个结果集,然后在全部完成后结算,这种方式下速度更快,但会占用N倍于第一种的内存空间(N是线程数)。在内存允许的情况下,我更倾向于第二种解决方案。
3、单IO线程
目前来说,程序各个线程都会有IO操作,无疑,这在磁盘IO只有100Mb/s左右的时候,增加了线程切换、IO中断的开销,于是设计应该是统用一个大Buffer(大小取决于内存大小),然后各个线程再在Buffer的[start,end]区间分段进行分析:
---------
| s1, e1| --> 分析 --> 结果1
/ ---------
-------------- IO ------------- ---------
| File | ------> | Big Buffer | --> | s2, e2 | --> 分析 --> 结果2 --> 结果
---------------- ------------- ---------
.... --> ... --> ... /
跟前面一样,[start, end]形式的分段分析也存在割断单词的情况,所以也有后搜机制来保证单词不被截断。所幸分割数取决于线程数N,而且由于单词长度有限,在内存内的后搜操作也非常迅速。总之与IO比起来,可以完全忽略了。
4、双缓冲
经过上面步骤的调整,IO已经完全独立出来了,但是在读取一个Buffer后,IO便会等待分析完成才会继续读入,有什么方法可以让IO线程在分析时也不停歇么?有,这便是双缓冲:
---------
| s1, e1| --> 分析 --> 结果1
/ ---------
--------------- ---------
| Big Buffer 1 | -----|| ---> | s2, e2 | --> 分析 --> 结果2 --> 结果
-------------- / --------------- 切 ---------
| File | IO --------------- 换 ..... --> ... --> ... /
-------------- | Big Buffer 2 | -----||
---------------
这种方式的优势在于,Buffer 1在读入完成时,马上会进行分析,然后Buffer 2继续读入;当分析一个Buffer 1完成后,切换到另一个Buffer 2进行分析,然后Buffer 1继续进行读入。这就在一定程度上保证了IO的连贯性,充分利用IO资源(分析操作在内存中是相当快的)。
最后,我会附上我的代码,大家可以自己试着写下,其实跟算法没什么关系,主要是练习下多线程、IO方面的基础知识,蛮有意思的。另外,我这份代码需要支持C++ 11标准的编译器才能编译的哦~~

1 #include <iostream> 2 #include <fstream> 3 #include <unordered_map> 4 #include <time.h> 5 #include <thread> 6 #include <cstring> 7 using namespace std; 8 9 struct CharCmp 10 { 11 bool operator()(const char *str1,const char * str2) const 12 { 13 return strcmp(str1,str2) == 0; 14 } 15 }; 16 17 struct WordHash 18 { 19 // BKDR hash algorithm 20 int operator()(char * str) const 21 { 22 int seed = 131; // 31 131 1313 131313 etc.. 23 int hash = 0; 24 while(*str) 25 hash = hash * seed + (*str++); 26 return hash & (0x7FFFFFFF); 27 } 28 }; 29 30 typedef unordered_map<char*, unsigned int, WordHash, CharCmp> HashMap; 31 typedef unordered_map<char*, unsigned int, WordHash, CharCmp>::iterator KeySet; 32 33 bool words[128]; 34 int threadCount = 4; 35 streamsize loadsize = 536870912; // 1024*1024*1024 1879048192 1610612736 1073741824 536870912 268435456 36 char* loadedFile[2]; 37 HashMap* wordMaps; 38 39 // 声明 40 void readBlock(int,int,streamoff,streamsize); 41 streamsize inline getRealSize(ifstream*,streamoff,streamsize); 42 void inline readLoad(int,ifstream*,streamoff,streamsize); 43 streamsize inline getBlockSize(int,streamoff,streamsize); 44 45 int main(int argc, char *argv[]){ 46 47 ios::sync_with_stdio(false); 48 if (argc==1) 49 { 50 cout<<"WordCount多线程统计词频程序 参数: Path必需,ThreadNum可选(默认为4) BufferSize可选(双缓冲,实际占用双倍,1879048192 1610612736 1073741824 536870912 268435456,默认512M) Usage: WordCount [Path] [ThreadNum] [BufferSize] Example: WordCount input.txt"<<endl; 51 exit(0); 52 } 53 if(argc>2) 54 threadCount = atoi(argv[2]); 55 if(argc>3) 56 loadsize = atol(argv[3]); 57 wordMaps = new HashMap[threadCount]; 58 char *filename = argv[1]; 59 // 双缓冲 60 streamsize maxsize = loadsize+256; 61 loadedFile[0] = new char[maxsize]; 62 loadedFile[1] = new char[maxsize]; 63 64 cout<<"Starting to calculate with "<< threadCount <<" threads..."<<endl; 65 time_t t_start,t_end; 66 t_start = time(NULL); 67 68 // 初始化可识别字符 69 memset(words,false,128); 70 for (char c=97;c!=123;++c) 71 words[c] = true; 72 for (char c=65;c!=91;++c) 73 words[c] = true; 74 for (char c=48;c!=58;++c) 75 words[c] = true; 76 77 // 读取文件 78 ifstream file; 79 file.open(filename,ios::binary|ios::in); 80 if (!file) 81 { 82 cout<<"Error: file ""<<filename<<"" do not exist!"<<endl; // 失败 83 exit(1); 84 } 85 else 86 { 87 // 确认文件大小 88 streamoff start=0; 89 file.seekg(0,ios::end); 90 streamoff size,len = file.tellg(); 91 if (len>3) 92 { 93 // 确认有无BOM 94 char bom[3]; 95 file.seekg(0); 96 file.read(bom,3); 97 if (bom[0]==-17&&bom[1]==-69&&bom[2]==-65){ 98 start = 3; 99 size = len - 3; 100 }else 101 size = len; 102 }else 103 size = len; 104 // 读入文件数据到缓存 105 thread* threads = new thread[threadCount]; 106 streamsize realsize; 107 streamoff index,part; 108 bool step = 0,needWait = false; 109 while (size) 110 { 111 // 缓冲 112 realsize = size>maxsize ? getRealSize(&file,start,loadsize) : size; 113 readLoad(step,&file,start,realsize); 114 start+=realsize; 115 size-=realsize; 116 // 等待 117 if(needWait) 118 for (int i=0;i<threadCount;++i) threads[i].join(); 119 else 120 needWait = true; 121 // 多线程计算 122 index=0,part = realsize/threadCount; 123 for (int i=1;i<threadCount;++i) 124 { 125 len = getBlockSize(step,index,part); 126 // 开算 127 threads[i] = thread(readBlock,step,i,index,len); 128 index+=len; 129 } 130 threads[0] = thread(readBlock,step,0,index,realsize-index); 131 // 转换 132 step = !step; 133 } 134 // 清理 135 for (int i=0;i<threadCount;++i) threads[i].join(); 136 delete loadedFile[0]; 137 delete loadedFile[1]; 138 file.close(); // 关闭 139 // 结算累加 140 HashMap* map = wordMaps; 141 for (int i=1;i<threadCount;++i) 142 { 143 KeySet p=(wordMaps+i)->begin(),end=(wordMaps+i)->end(); 144 for (; p!=end; ++p) 145 (*map)[p->first] += p->second; 146 } 147 // 输出 148 cout<<"Done. Different words: "<< map->size()<<endl; 149 KeySet p=map->begin(),end=map->end(); 150 long total = 0; 151 for (; p!=end; ++p) 152 total+=p->second; 153 cout<<"Total words:"<<total<<endl; 154 cout<<" Each words count:"<<endl; 155 for (KeySet i = map->begin(); i!=map->end(); ++i) 156 cout << i->first << " = " << i->second << endl; 157 //out.close(); 158 } 159 t_end = time(NULL); 160 // 结束 161 cout<<" All completed in "<<difftime(t_end,t_start) <<"s."<<endl; 162 return 0; 163 } 164 165 // 文件获取临界不截断的真正大小 166 streamsize inline getRealSize(ifstream* file,streamoff start,streamsize size) 167 { 168 file->seekg(start+size); 169 while (words[file->get()]) 170 ++size; 171 return size; 172 } 173 174 // 文件读入到堆 175 void inline readLoad(int step,ifstream* file,streamoff start,streamsize size){ 176 file->seekg(start); 177 file->read(loadedFile[step],size); 178 } 179 180 // 分块读取 181 void readBlock(int step,int id,streamoff start,streamsize size){ 182 char c = '