1.Partitioner是partitioner的基类,如果需要定制Partitioner也需要继承该类。
2. HashPartitioner是mapreduce的默认partitioner。计算方法是 which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。
3.说明,Partitioner是在Mapper执行完成,Reducer执行前。它有两个参数,就是Mapper的输出参数,在这里,有几个Reducer就有几个Partitioner
4.根据数据分区,将数据传入不同的Reducer中
说明,PrividerPartitioner需要继承Partitioner,并重写getPartition方法,这样我们就可以将数据写入不同的文件中
示例:
public static class ProviderPartitioner extends Partitioner<Text, DataBean>
{
//静态,从上往下执行
private static Map<String, Integer> providerMap = new HashMap<String, Integer>();
//静态块,在执行方法前执行
static {
providerMap.put("135", 1);
providerMap.put("136", 1);
providerMap.put("137", 1);
providerMap.put("138", 1);
providerMap.put("139", 1);
providerMap.put("150", 2);
providerMap.put("159", 2);
providerMap.put("182", 3);
providerMap.put("183", 3);
}
@Override
public int getPartition(Text key, DataBean value, int numPartitions) {
String accountString = key.toString();
String sub_accString = accountString.substring(0, 3);
Integer codeInteger = providerMap.get(sub_accString);
if (codeInteger == null)
{
codeInteger = 0;
}
return codeInteger;
}
}
最后在waitForCompletion前将相关Partitioner设置
//partitioner
job.setPartitionerClass(ProviderPartitioner.class);
//调置启动Reduce的数量
job.setNumReduceTasks(Integer.parseInt(args[2]));
//
job.waitForCompletion(true);
5.生成jar包,不用指定main方法,需要指定需要启动Reducer的数量
hadoop jar /root/mrs.jar cn.itcast.hadoop.mr.dc.DataCount /data.doc /data-p6 6