概括:
combine和partition都是函数。中间的步骤应该仅仅有shuffle!
1.combine
combine分为map端和reduce端,作用是把同一个key的键值对合并在一起,能够自己定义的。
combine函数把一个map函数产生的<key,value>对(多个key,value)合并成一个新的<key2,value2>.将新的<key2,value2>作为输入到reduce函数中
这个value2亦可称之为values,由于有多个。这个合并的目的是为了降低网络传输。
详细实现是由Combine类。
实现combine函数,该类的主要功能是合并同样的key键。通过job.setCombinerClass()方法设置。默觉得null,不合并中间结果。实现map函数
详细调用:(下图是调用reduce,合并map的个数)
难点:不知道这个reduce和mapreduce中的reduce差别是什么?
以下简单说一下:后面慢慢琢磨:
在mapreduce中。map多,reduce少。
在reduce中因为数据量比較多。所以干脆。我们先把自己map里面的数据归类,这样到了reduce的时候就减轻了压力。
这里举个样例:
map与reduce的样例
map理解为销售人员,reduce理解为销售经理。
每一个人(map)仅仅管销售,赚了多少钱销售人员不统计。也就是说这个销售人员没有Combine,那么这个销售经理就累垮了。由于每一个人都没有统计,它须要统计全部人员卖了多少件。赚钱了多少钱。
这样是不行的。所以销售经理(reduce)为了减轻压力,每一个人(map)都必须统计自己卖了多少钱,赚了多少钱(Combine),然后经理所做的事情就是统计每一个人统计之后的结果。这样经理就轻松多了。所以Combine在map所做的事情。减轻了reduce的事情。
(这就是为什么说map的Combine干reduce的事情。相信你应该明确了)
public static void main(String[] args)throws IOException {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(Mapper.class);
job.setCombinerClass(reduce.class);
job.setPartitionerClass(HashPartitioner.class);
job.setReducerClass(Reducer.class);
job.setOutputFormatClass(TextOutFormat.class);
}
}
2.partition
partition是切割map每一个节点的结果,依照key分别映射给不同的reduce。也是能够自己定义的。这里事实上能够理解归类。
我们对于错综复杂的数据归类。比方在动物园里有牛羊鸡鸭鹅。他们都是混在一起的。可是到了晚上他们就各自牛回牛棚。羊回羊圈,鸡回鸡窝。partition的作用就是把这些数据归类。仅仅只是在敲代码的时候,mapreduce使用哈希HashPartitioner帮我们归类了。这个我们也能够自己定义。
HashPartitioner是mapreduce的默认partitioner。计算方法是
which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks。得到当前的目的reducer。
以下在看该怎样自己定义,该怎样调用:(以下便是自己定义了一个Partition函数。红字部分是算法的核心,也就是分区的核心)
public static class Partition extends Partitioner<IntWritable, IntWritable> {
@Override
public int getPartition(IntWritable key, IntWritable value,
int numPartitions) {
int Maxnumber = 65223;
int bound = Maxnumber / numPartitions + 1;
int keynumber = key.get();
for (int i = 0; i < numPartitions; i++) {
if (keynumber < bound * i && keynumber >= bound * (i - 1)) {
return i - 1;
}
}
return 0;
}
}
那么我们该怎样调用:(以下调用之后,你的分区函数就生效了)
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf, "sort");
job.setJarByClass(Sort.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(Partition.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, "/home/asheng/hadoop/in");
FileOutputFormat
.setOutputPath(job, new Path("/home/asheng/hadoop/out"));
job.waitForCompletion(true);
}
}
3.shuffle
shuffle就是map和reduce之间的过程。包括了两端的combine和partition。
它比較难以理解,由于我们摸不着。看不到它。它仅仅是理论存在的。并且确实存在,它属于mapreduce的框架。编程的时候。我们用不到它,它属于mapreduce框架。具体能够看通过实例让你真正明确mapreduce---填空式、分布(切割)编程。
3.1shuffle的作用是
Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后。通过OutputFormat,进行输出
shuffle阶段的主要函数是fetchOutputs(),这个函数的功能就是将map阶段的输出,copy到reduce 节点本地。
转载:https://www.cnblogs.com/brucemengbm/p/7053723.html