matadata:
hadoop a spark a hive a hbase a tachyon a storm a redis a
自定义分组
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class MyGroup { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length!=2){ System.err.println("Usage databaseV1 <inputpath> <outputpath>"); } Job job = Job.getInstance(conf, MyGroup.class.getSimpleName() + "1"); job.setJarByClass(MyGroup.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(MyMapper1.class); job.setGroupingComparatorClass(MyGroupComparator.class); job.setReducerClass(MyReducer1.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitForCompletion(true); } public static class MyMapper1 extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] spl=value.toString().split(" "); context.write(new Text(spl[0].trim()), new Text(spl[1].trim())); } } public static class MyReducer1 extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text k2, Iterable<Text> v2s, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { Long count=0L; for (@SuppressWarnings("unused") Text v2 : v2s) { count++; context.write(new Text("in--"+k2), new Text(count.toString())); } context.write(new Text("out--"+k2), new Text(count.toString())); } } public static class MyGroupComparator extends WritableComparator{ public MyGroupComparator(){ super(Text.class,true); } @SuppressWarnings("rawtypes") public int compare(WritableComparable a, WritableComparable b) { Text p1 = (Text) a; Text p2 = (Text) b; p1.compareTo(p2); return 0; } } }
结果
in--hadoop 1 in--hbase 2 in--hive 3 in--redis 4 in--spark 5 in--storm 6 in--tachyon 7 out--tachyon 7
然后看下默认分组
public static class MyGroupComparator extends WritableComparator{ public MyGroupComparator(){ super(Text.class,true); } @SuppressWarnings("rawtypes") public int compare(WritableComparable a, WritableComparable b) { Text p1 = (Text) a; Text p2 = (Text) b; return p1.compareTo(p2); } }
结果
in--hadoop 1 out--hadoop 1 in--hbase 1 out--hbase 1 in--hive 1 out--hive 1 in--redis 1 out--redis 1 in--spark 1 out--spark 1 in--storm 1 out--storm 1 in--tachyon 1 out--tachyon 1
通过对比,自定义分组就很容易理解了