本身是一个常见简单的需求,目的是得到一个权值流量的分布情况。数据原型是几T的日志数据,每条记录都有很多字段,其中有一个字段为该记录的权重。每一条记录是一个流量。
A B C
Tanx FourthView 0
Tencent Na 20
Allyes FirstView 200
Adx OtherView 5
Amx FirstView 133
Miaozhen Na 12
Baidu SecondView 0
Adx OtherView 5
Tanx Na 0
Adx OtherView 5
Allyes FirstView 31
Adx OtherView 5
Tanx Na 0
Adx OtherView 192
从原始日志中抽取三个字段来处理,目的是统计AB字段下字段C的分布,做法是将具有相同AB字段的记录聚合到一起,在每个聚合中,按C字段从小到大排序,然后对排序后的记录平均分成100份,每份的流量为总流量的百分之一。计算每份流量的C字段均值,就得到了基于AB字段的C字段在100个区间内的分布情况。
将键字段AB作为聚合键聚合数据,并以字段C做排序键排序所有记录是轻松的,pig和hive都有高效的实现。但如果把聚合排序好的记录平均分到100个区间,并在每个区间求均值的话,如果想用pig实现逻辑上会略显复杂。
具体的解决思路是:
1.先用pig整合数据,从原始日志中提取相应的字段,存入临时文件。
2.使用MapReduce程序,读取整合好的数据,在map函数中,拆分记录,以AB字段(聚合键)为键,C字段为值,输出。reduce函数中,对相同聚合键AB字段的记录排序,并对排序后的每条记录添加一个索引标识其排序后的位置。如排序后C字段为 55,77,88,添加索引后为 55 1,77 2,88 3。
3.经过MapReduce程序,每条记录有四个字段,最后一个字段是索引字段。索引字段按排序顺序给每条记录编号,这样一来,实现自动切分100个区间将极为方便。做法是:使用pig,先COUNT出每个聚合组的记录总数N,每个区间的记录条数为n=N/100,通过自定义一个公式就可以得到每条记录应属的区间。例如记每条记录的索引为 x,它所属的区间为 x/n+1。这里的除法都是取整,例如有999条记录,分为100个区间,每个区间n=999/100=99条。索引x为1-98的记录根据公式得到的区间号为1,99-197的记录区间为2,依次类推。这之间会有一些小BUG,可以通过调节分区公式来调节。最后得到每条记录所属的分区号后,使用pig的group操作,以AB字段和区间号作为聚合键,在聚合组内对C字段使用AVG函数即可。
先给出第三步的pig程序:
1 --flow_distrib_v2.pig 2 3 data = load '$input' as (platform:chararray,location:chararray,price:long,tag:long); 4 5 g_data = group data by (platform,location); 6 7 format_data = foreach g_data{ 8 generate (COUNT(data)/100) as sectionnum,flatten(data) ; 9 } 10 11 format_line = foreach format_data generate platform,location,price,sectionnum,(tag/sectionnum+1) as sectionid; 12 filter_format = filter format_line by sectionid <= 100; 13 14 g_section = group filter_format by (platform,location,sectionid,sectionnum); 15 16 cal_avg = foreach g_section { 17 cnt = filter_format.price; 18 generate 'null','0','$Date' as date,group.platform,group.location,'$agent','$log_type' as type,group.sectionid,group.sectionnum,AVG(cnt) as avg; 19 } 20 21 store cal_avg into '$output';
接下来就剩下第二步的MapReduce程序,如何给每条记录按顺序编号索引。这个部分也是在实践中关键,也是本文主要想记录的地方。
第二步MapReduce程序主要要达到功能是基于AB键聚合排序后的记录顺序编号,主要就是三个操作,分组聚合,排序,编号。下面是简单实现:
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ProduceTag extends Configured implements Tool{ public static class TagMapper extends Mapper<Object,Text,Text,Text>{ private Text priceKey = new Text(); private Text priceValue = new Text(); public void map(Object key,Text value,Context con) throws IOException, InterruptedException{ String[] str = (value.toString()).split(" "); String keystr = str[0]+" "+str[1]; priceKey.set(keystr); priceValue.set(str[2]); con.write(priceKey, priceValue); } } public static class TagReducer extends Reducer<Text,Text,Text,Text>{ private Text outKey = new Text(); private Text outTag = new Text(); private long counter = 0; private String keystr = ""; private String valuestr = ""; public void reduce(Text key,Iterable<Text> pricesgroup,Context con) throws IOException, InterruptedException{ List<Integer> fprices = new ArrayList<Integer>(); for(Text price:pricesgroup){ int format_price = Integer.parseInt(price.toString()); if(format_price < 200000) fprices.add(format_price); } java.util.Collections.sort(fprices); int index = 1; for(int i : fprices){ valuestr = i+" "+index; index++; outTag.set(valuestr); con.write(key, outTag); } } } public static class PlatformPatition extends Partitioner<Text,Text>{ private String[] platform = {"Adx","Amx","Allyes","Baidu","Inmobi","Iqiyi","Mogo","Miaozhen","Tanx","Tencent","Sina","Youku"}; private String[] location = {"Na","FirstView","SecondView","ThirdView","FourthView","FifthView","SixthView","SeventhView","EighthView","NinthView","TenthView","OtherView"}; @Override public int getPartition(Text key, Text value, int numreduce) { // TODO Auto-generated method stub String[] str = key.toString().split(" "); int pla = 0; for(int i = 0;i<platform.length;i++){ if(str[0].equals(platform[i])){ pla = i; break; } } int loc = 0; for(int j = 0;j<location.length;j++){ if(str[1].equals(location[j])){ loc = j+1; break; } } int par = (pla*12+loc)%numreduce; return par; } } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); Job job = new Job(conf,"Produce_Tag"); job.setJarByClass(ProduceTag.class); job.setMapperClass(TagMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(TagReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setPartitionerClass(PlatformPatition.class); job.setNumReduceTasks(28); FileInputFormat.addInputPath(job,new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args){ try{ if(args.length != 2){ System.err.println("usage error"); System.exit(0); }else{ int ret = ToolRunner.run(new ProduceTag(), args); System.exit(ret); } } catch (Exception e){ e.printStackTrace(); } } }
以上程序,map过程中将需要聚合的键作为KEY,其余字段作为Value 作为Map的输出。并自定义Partition类,将Map输出定制洗牌方式,最后具有相同AB字段的键被发送到一个Reduce下,并且每个Reduce只处理一种AB组合,Reduce的个数可以选择AB组合的所有情况,但实际数据处理中有些组合并没有值,故而取一个较小的值足以满足需求。
设计使每个Reduce只处理一种AB组合的初衷是希望提高并行度,分散内存消耗。同时也希望得到每个Reduce的输出文件是对一种组合的处理的结果。每种组合独立生成一个文件。事实上,后序的实践表明这完全是没有必要的。
Reduce过程中,对相同AB组合聚合的结果按C字段排序,采用的是JAVA语言定制的排序函数。先将C字段都插入到Arraylist中,插入中去掉了离群点,即权重大于200000的记录。这个离群点的去除在此处对程序的影响不大,毕竟离群点的个数和记录整体相比是微不足道的。但在后序的优化中,离群点的去除可以带来很大的性能提升。
使用JAVA排序函数排序后,按顺序遍历列表,并打算编号。这个过程是时间复杂度是O(n),排序的时间复杂度是O(nlgn),所以貌似整个Reduce过程在时间性能上是可接受的。
上述程序在逻辑上应该没有问题,对小数据集的测试也是通过的,但事实上这样的程序无法用在大数据处理上。不幸的是,本人一开始并未认识到错误的存在,在几十个结点上运行此程序处理几T的数据时,几乎所有Reduce结点都发生了内存堆满溢出的情况。
罪魁祸首在于排序,JAVA定制的排序函数是把所有待排序元素一次性加入内存排序,对于几T的数据,即使切分多个Reduce后,数据量仍然是非常大的,这必定导致运行该程序的结点内存爆满。
解决问题的方向是如何在尽量小的内存内对大数据排序。这个问题的解决取决于待排序数据的结构性质。对于整体数据的统计发现,大于2000000的点就可以认为是离群点,即每个数据大小集中在0~2000000。虽然数据的总是可能后又几十亿,但会有很多的重复值。
对这种类型数据排序有一个很好的方法,该方法不仅可以将排序内存限制在很小的内存以内,而且时间复杂度是线性的O(n),突破了比较排序的极限。这就是统计排序-键索引计数法。
下面是新的Reduce过程:
1 public static class TagReducer extends Reducer<Text,Text,Text,Text>{ 2 private Text outKey = new Text(); 3 private Text outTag = new Text(); 4 private String keystr = ""; 5 private String valuestr = ""; 6 public void reduce(Text key,Iterable<Text> prices,Context con) throws IOException, InterruptedException{ 7 long counter = 0; 8 int[] statis = new int[200001]; 9 int sub = 0; 10 for(Text price:prices){ 11 try{ 12 sub = Integer.parseInt(price.toString()); 13 if(sub < 200000){ 14 statis[sub]++; 15 }else{ 16 throw new NumberFormatException(); 17 } 18 }catch(NumberFormatException e){ 19 statis[200000]++; 20 } 21 } 22 23 for(int i = 0;i<statis.length-1;i++){ 24 int j = statis[i]; 25 while(j>0){ 26 counter++; 27 keystr = key.toString()+" "+i; 28 valuestr = counter+""; 29 outKey.set(keystr); 30 outTag.set(valuestr); 31 con.write(outKey, outTag); 32 j--; 33 } 34 } 35 } 36 }
新的reduce过程中,使用一个整型的statis数组保存所有数据,数组元素下标为数据的值,数组元素的值为 那些数据的值为该元素下标的 数据的个数。因为经过先验的统计分析表明非离群点的值都在0~2000000,所以大小为200 0000的数组足以表示所有数据的值。而且大小为200 0000的整型数组只需8M内存。接下来的顺序编号操作与原来的Reduce过程类似,也是线性操作。
经过修改的Reduce过程在处理大数据时也不会在有内存爆满的现象。并且即使数据量增加,也不会有问题。程序的性能取决于离群点的阈值。而在本实践中,离群点的阈值变动的幅度不会很大。
新的Reduce过程虽然能在大数据处理时也能保证正确了,但一开始基于自定义Partition以希望分散Reduce数据量,提高并行度的初衷并没有起到作用。事实上有些AB组合的数据量远远大于其他组合,这种现象称之为数据倾斜。Pig语言在处理大规模数据JOIN时有完善的处理数据倾斜方案。但在这里只能另谋他路。对程序在集群上运行的观察发现。某些Reduce因为非散列到的数据量过于庞大,使得Map到Reduce的COPY过程的IO延迟非常大。往往其他Reduce完成所有任务了,几个Reduce任务还缓慢的停留在COPY阶段。并且大量数据的COPY占用IO,也将导致其他人的任务变的缓慢。
需要加速COPY的IO效率,很容易想到的是使用Combiner。Combiner的使用局限在线性计算的任务。重新审视Reduce过程是Map数据的散列统计。从数据的值得到应该散列到的数组元素的下标,并对数组元素进行加一操作,表示数据值为x的元素又多了一个。事实上可以把加一操作变为加n操作。因为如果Map端输出数据是:
A1 B1 10
A1 B1 10
... ...省略1000条
A1 B1 10
这种形式,首先Map端没有必要把这样的数据存1000条到中间文件让Reduce去copy,其次,Reduce端也不需要对这样的数据做1000次累加,只需要对下标为10的数组元素加1000就可以了。
那么,Map端对这种形式的输出应该是 A1 B1 10 1000。最后一个字段表征这样的数据有多少条。
鉴于A1 B1是聚合后的结果,要知道这样的数据有多少条也需要一个统计过程这些过程完全可以交给Combiner来完成。Combiner对Map输出结果到Reduce之前做一个组合压缩处理。在这里正好可以对Map输出的多条相同数据进行一个合并格式化处理。
修改后的MapReduce程序加入了Combiner,并稍微修改Reduce处理数据的格式,完整代码如下:
1 public class ProduceTag extends Configured implements Tool{ 2 3 public static class TagMapper extends Mapper<Object,Text,Text,Text>{ 4 private Text priceKey = new Text(); 5 private Text priceValue = new Text(); 6 public void map(Object key,Text value,Context con) throws IOException, InterruptedException{ 7 String[] str = (value.toString()).split(" "); 8 String keystr = str[0]+" "+str[1]; 9 priceKey.set(keystr); 10 priceValue.set(str[2]); 11 con.write(priceKey, priceValue); 12 } 13 } 14 15 public static class TagCombiner extends Reducer<Text,Text,Text,Text>{ 16 private Text outTag = new Text(); 17 private String valueStr = ""; 18 public void reduce(Text key,Iterable<Text> prices,Context con) throws IOException,InterruptedException{ 19 int sub = 0; 20 int[] statis = new int[200001]; 21 try{ 22 for(Text price:prices){ 23 sub = Integer.parseInt(price.toString()); 24 if(sub < 200000){ 25 statis[sub]++; 26 } 27 } 28 }catch(NumberFormatException e){ 29 statis[200000]++; 30 } 31 32 for(int i = 0;i<statis.length-1;i++){ 33 int j = statis[i]; 34 valueStr = i + "#"+j; 35 outTag.set(valueStr); 36 if( j > 0){ 37 con.write(key, outTag); 38 } 39 } 40 } 41 } 42 43 44 public static class TagReducer extends Reducer<Text,Text,Text,Text>{ 45 private Text outKey = new Text(); 46 private Text outTag = new Text(); 47 private String keystr = ""; 48 private String valuestr = ""; 49 public void reduce(Text key,Iterable<Text> pricesgroup,Context con) throws IOException, InterruptedException{ 50 long counter = 0; 51 int[] statis = new int[200001]; 52 int sub = 0; 53 String[] combiner = null; 54 for(Text price:pricesgroup){ 55 try{ 56 String combiners= price.toString(); 57 combiner = combiners.split("#"); 58 sub = Integer.parseInt(combiner[0]); 59 int nums = Integer.parseInt(combiner[1]); 60 if(sub < 20000 && nums>0){ 61 statis[sub]+= nums; 62 }else{ 63 throw new NumberFormatException(); 64 } 65 }catch(NumberFormatException e){ 66 statis[200000]++; 67 } 68 } 69 70 for(int i = 0;i<statis.length-1;i++){ 71 int j = statis[i]; 72 while(j>0){ 73 counter++; 74 keystr = key.toString()+" "+i; 75 valuestr = counter+""; 76 outKey.set(keystr); 77 outTag.set(valuestr); 78 con.write(outKey, outTag); 79 j--; 80 } 81 } 82 } 83 } 84 85 @Override 86 public int run(String[] args) throws Exception { 87 // TODO Auto-generated method stub 88 Configuration conf = new Configuration(); 89 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 90 Job job = new Job(conf,"Produce_Tag"); 91 job.setJarByClass(ProduceTag.class); 92 job.setMapperClass(TagMapper.class); 93 job.setMapOutputKeyClass(Text.class); 94 job.setMapOutputValueClass(Text.class); 95 job.setReducerClass(TagReducer.class); 96 job.setOutputKeyClass(Text.class); 97 job.setOutputValueClass(Text.class); 98 job.setCombinerClass(TagCombiner.class); 99 FileInputFormat.addInputPath(job,new Path(otherArgs[0])); 100 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 101 return (job.waitForCompletion(true) ? 0 : 1); 102 } 103 104 105 public static void main(String[] args){ 106 try{ 107 if(args.length != 2){ 108 System.err.println("usage error"); 109 System.exit(0); 110 }else{ 111 int ret = ToolRunner.run(new ProduceTag(), args); 112 System.exit(ret); 113 } 114 } 115 catch (Exception e){ 116 e.printStackTrace(); 117 } 118 } 119 }
修改后的MapReduce程序,去掉了自定以Partition过程,而且没有定制Reduce个数,全部以自动方式运行。在集群上的运行表明,整个过程只需要起一个Reduce,而且COPY过程飞快,比PIG程序都要快好多。整个过程输入上万个Map,在很短的时间内就完成了所有的处理,反而后序的pig程序影响脱了后腿。
总结:
本篇博客记录了实习以来遇到的一个大数据处理问题,和自己找到的解决对策。总结经验教训,对于大型数据并在集群上并行处理的任务,永远不能掉以轻心,常规的方式往往会带来很多意想不到的问题。