最近的MapReduce端的Partition根据map生成的Key来进行哈希,导致哈希出来的Reduce端处理任务数量非常不均匀,有些Reduce端处理的数据量非常小(几分钟就执行完成,而最后的part-结果显示其输出文件为0,没有处理任何任务),而有些Reduce端需要执行大量的任务(大概1个多小时)
根据下面的这篇大牛所写的文章,字符串hash算法也有很多种:
https://www.byvoid.com/en/blog/string-hash-compare
这些算法使用位运算使得每个字符都对最后的结果产生影响,作者对其展开了一系列评测,最终BKDR函数无论是在实际效果还是编码实现中,效果都是非常突出的,因此本重构也采用这种算法。
文中给出这种算法的C语言实现:
// BKDR Hash Function unsigned int BKDRHash(char *str) { unsigned int seed = 131; // 31 131 1313 13131 131313 etc.. unsigned int hash = 0; while (*str) { hash = hash * seed + (*str++); } return (hash & 0x7FFFFFFF); }
下面需要做的就是将其转换为Java实现,Java中使用long类型作为C语言中无符号整数的替代(避免int*计算的溢出),后面强制转换为int,去掉高位,并纠正“+/-”号
public static int bkdrHash(String hashString) { long seed = 131L; long hash = 0L; for (int i = 0; i < hashString.length(); i++) { char element = hashString.charAt(i); hash = hash * seed + element; } int hashInt = (int) hash; return hashInt & 0x7FFFFFFF; }
算法修改完成后,我们需要根据实际的结果来判断是否已经hash均匀。
为了确保实际情况中的数据能够有效地哈希均匀,我们直接修改Reduce端,让其直接在reduce函数中仅将key值输出,并将所有输出合并到一个文件以便进行分析。(未设置OutputFormat,直接输出Key文本作为一行)
collector.collect(new Text(iReportKey.getPartitionKey()), new Text(""))
进行均匀的简单分析程序如下:
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); String line; int totalCount = 0; while ((line = bufferedReader.readLine()) != null) { int index = Util.hashCode(line, numberPartition); result[index]++; totalCount++; } bufferedReader.close(); System.out.println("---------------------"); System.out.println("Total Count: " + totalCount); for (int i = 0; i < numberPartition; i++) { System.out.println( String.format("partition=%s, count=%s, percentage=%s%%", i, result[i], (double) result[i] * 100d / (double) totalCount)); } System.out.println("---------------------");
默认设置10个Reduce,分别对生成的Key文件的结果进行处理:
当Map生成的Key数据总量为4390398:
Total Count: 4390398 partition=0, count=632297, percentage=14.401815051847235% partition=1, count=410196, percentage=9.343025393142034% partition=2, count=406882, percentage=9.267542487036483% partition=3, count=531126, percentage=12.097445379667173% partition=4, count=569099, percentage=12.962355576874808% partition=5, count=324720, percentage=7.396140395472119% partition=6, count=394503, percentage=8.985586272588499% partition=7, count=343889, percentage=7.832752292616751% partition=8, count=384954, percentage=8.76808890674604% partition=9, count=392732, percentage=8.945248244008857%
数据量提高一个等级,当Map生成的Key数据总量为40976446时:
Total Count: 40976446 partition=0, count=4905825, percentage=11.972304772356294% partition=1, count=5172735, percentage=12.623678978894363% partition=2, count=3850931, percentage=9.397913620912853% partition=3, count=3595419, percentage=8.774355394316043% partition=4, count=3432017, percentage=8.375584842082205% partition=5, count=3625976, percentage=8.848927503375965% partition=6, count=3829224, percentage=9.344939285364084% partition=7, count=3844329, percentage=9.381801925916172% partition=8, count=4410943, percentage=10.76458168187646% partition=9, count=4309047, percentage=10.515911994905561%
可以看出数据量比较符合预期,最终的实际Reduce(设置为5)效果也比较好,Reduce的执行时间变得非常均匀了:
但是经过分析后,直接将long值截取一下并不是一个好的方案,有些暴力:
int hashInt = (int) hash;
考虑将算法中的每一步局部变量都设置成int,这样就不会有截取的麻烦,将&操作放到循环内:
public static int bkdrHash(String hashString) { int seed = 131; int hash = 0; for (int i = 0; i < hashString.length(); i++) { char element = hashString.charAt(i); hash = (hash * seed + element) & 0x7FFFFFFF; } return hash; }
但是我们知道,如果int值执行乘法操作时,是有可能溢出的,表现为结果直接返回一个负数。由于我们每次循环都需要*seed,必须保证hash出来的值*seed要小于Integer.MAX_VALUE。
Integer.MAX_VALUE=2147483647 (Integer.MAX_VALUE & 0x1FFFFFFF) * 131=1610612605
1610612605会加一个char值,不可能超出最大值,于是选择0x1FFFFFFF替代0x7FFFFFFF。
于是,我们最终的hash方法更改为下面的版本:
public static int bkdrHash(String hashString) { int seed = 131; int hash = 0; for (int i = 0; i < hashString.length(); i++) { char element = hashString.charAt(i); hash = (hash * seed + element) & 0x1FFFFFFF; } return hash; }
经过hash均匀测试,也同样满足要求。