zoukankan      html  css  js  c++  java
  • MapReduce(三)

                                        MapReduce(三)

    MapReduce(三):

        1.关于倒叙排序前10名

                1)TreeMap根据key排序

                2)TreeSet排序,传入一个对象,排序按照类中的compareTo方法排序

          2.写一个MapReduce的模板

          3.MapReduce的分区

                 1)手动分区

                 2)自动分区

          4.自定义分区


    ----------------------------------------------------------------------------------------------------------------------------------

    一.关于倒叙排序前10名  

    将一个文章中字母单词出现的次数进行倒叙排序,只取前十

    1)TreeMap

    package com.huhu.day03;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.Map;
    import java.util.StringTokenizer;
    import java.util.TreeMap;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * a 80 b 78 r 70 .. 基于value来排序
     * 
     * @author huhu_k
     *
     */
    public class Top10_1 extends ToolRunner implements Tool {
    
    	private Configuration conf;
    
    	public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    		@Override
    		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    			StringTokenizer st = new StringTokenizer(value.toString());
    			while (st.hasMoreTokens()) {
    				context.write(new Text(st.nextToken()), new IntWritable(1));
    			}
    		}
    	}
    
    	public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    		// TreeMap倒叙排列
    		private TreeMap<Long, String> map = new TreeMap<>(Collections.reverseOrder());
    
    		@Override
    		protected void reduce(Text key, Iterable<IntWritable> values, Context context)
    				throws IOException, InterruptedException {
    			int sum = 0;
    			for (IntWritable v : values) {
    				sum++;
    			}
    			map.put(Long.valueOf(sum), key.toString());
    			if (map.size() > 10) {
    				// 按key排序 前十 降序
    				map.remove(map.lastKey());
    			}
    		}
    
    		@Override
    		protected void cleanup(Context context) throws IOException, InterruptedException {
    			for (Map.Entry<Long, String> m : map.entrySet()) {
    				context.write(new Text(m.getValue()), new IntWritable(Integer.parseInt(m.getKey() + "")));
    			}
    		}
    	}
    
    	@Override
    	public Configuration getConf() {
    		if (conf != null) {
    			return conf;
    		}
    		return new Configuration();
    	}
    
    	@Override
    	public void setConf(Configuration arg0) {
    
    	}
    
    	@Override
    	public int run(String[] other) throws Exception {
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf);
    		job.setJarByClass(Top10_1.class);
    		job.setMapperClass(MyMapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		job.setReducerClass(MyReduce.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.addInputPath(job, new Path(other[0]));
    		FileOutputFormat.setOutputPath(job, new Path(other[1]));
    
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    
    	public static void main(String[] args) throws Exception {
    		Top10_1 t = new Top10_1();
    		String[] other = new GenericOptionsParser(t.getConf(), args).getRemainingArgs();
    		if (other.length != 2) {
    			System.out.println("your input args number is fail,you need input <in> and <out>");
    			System.exit(0);
    		}
    		ToolRunner.run(t.conf, t, other);
    	}
    }
    package com.huhu.day03;
    
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.TreeSet;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * a 80 b 78 r 70 .. 基于value来排序 TreeSet
     * 
     * @author huhu_k
     *
     */
    public class Top10_2 extends ToolRunner implements Tool {
    
    	private Configuration conf;
    
    	public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    		@Override
    		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    			String[] line = value.toString().split(" ");
    			for (String s : line) {
    				context.write(new Text(s), new IntWritable(1));
    			}
    		}
    	}
    
    	public static class MyReduce extends Reducer<Text, IntWritable, WCWritable, NullWritable> {
    		// TreeSet倒叙排列
    		private TreeSet<WCWritable> set;
    		private final int KEY = 11;
    
    		@Override
    		protected void setup(Context context) throws IOException, InterruptedException {
    			set = new TreeSet<WCWritable>();
    		}
    
    		@Override
    		protected void reduce(Text key, Iterable<IntWritable> values, Context context)
    				throws IOException, InterruptedException {
    			WCWritable w = new WCWritable();
    			int sum = 0;
    			for (IntWritable i : values) {
    				sum += i.get();
    			}
    			w.setWord(key.toString());
    			w.setCount(sum);
    
    			set.add(w);
    
    			if (KEY < set.size()) {
    				set.remove(set.last());
    			}
    		}
    
    		@Override
    		protected void cleanup(Context context) throws IOException, InterruptedException {
    			Iterator<WCWritable> iterator = set.iterator();
    			if (iterator.hasNext()) {
    				context.write(iterator.next(), NullWritable.get());
    			}
    		}
    	}
    
    	@Override
    	public Configuration getConf() {
    		if (conf != null) {
    			return conf;
    		}
    		return new Configuration();
    	}
    
    	@Override
    	public void setConf(Configuration arg0) {
    
    	}
    
    	@Override
    	public int run(String[] other) throws Exception {
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf);
    		job.setJarByClass(Top10_2.class);
    		job.setMapperClass(MyMapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		job.setReducerClass(MyReduce.class);
    		job.setOutputKeyClass(WCWritable.class);
    		job.setOutputValueClass(NullWritable.class);
    
    		FileInputFormat.addInputPath(job, new Path(other[0]));
    		FileOutputFormat.setOutputPath(job, new Path(other[1]));
    
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    
    	public static void main(String[] args) throws Exception {
    		Top10_2 t = new Top10_2();
    		String[] other = new GenericOptionsParser(t.getConf(), args).getRemainingArgs();
    		if (other.length != 2) {
    			System.out.println("your input args number is fail,you need input <in> and <out>");
    			System.exit(0);
    		}
    		ToolRunner.run(t.getConf(), t, other);
    	}
    }
    

    运行还是在集群中运行,报错可以查看日志


    2)TreeSet

    package com.huhu.day03;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class WCWritable implements WritableComparable<WCWritable> {
    
    	private String word;
    	private int count;
    
    	public WCWritable() {
    		super();
    	}
    
    	public WCWritable(String word, int count) {
    		super();
    		this.word = word;
    		this.count = count;
    	}
    
    	public String getWord() {
    		return word;
    	}
    
    	public void setWord(String word) {
    		this.word = word;
    	}
    
    	public int getCount() {
    		return count;
    	}
    
    	public void setCount(int count) {
    		this.count = count;
    	}
    
    	@Override
    	public String toString() {
    		return "WCWritable [word=" + word + ", count=" + count + "]";
    	}
    
    	@Override
    	public int hashCode() {
    		final int prime = 31;
    		int result = 1;
    		result = prime * result + count;
    		result = prime * result + ((word == null) ? 0 : word.hashCode());
    		return result;
    	}
    
    	@Override
    	public boolean equals(Object obj) {
    		if (this == obj)
    			return true;
    		if (obj == null)
    			return false;
    		if (getClass() != obj.getClass())
    			return false;
    		WCWritable other = (WCWritable) obj;
    		if (count != other.count)
    			return false;
    		if (word == null) {
    			if (other.word != null)
    				return false;
    		} else if (!word.equals(other.word))
    			return false;
    		return true;
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		this.word = in.readUTF();
    		this.count = in.readInt();
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(word);
    		out.writeInt(count);
    	}
    
    	@Override
    	public int compareTo(WCWritable o) {
    		if (this.count == o.count) {
    			// 字典顺序
    			return this.word.compareTo(o.word);
    			// return this.word.length() - o.word.length();
    		}
    		return o.count - this.count;
    	}
    }

    package com.huhu.day03;
    
    
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.TreeSet;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    /**
     * a 80 b 78 r 70 .. 基于value来排序 TreeSet
     * 
     * @author huhu_k
     *
     */
    public class Top10_2 extends ToolRunner implements Tool {
    	private Configuration conf;
    
    
    	static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    
    		@Override
    		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    			String[] line = value.toString().split(" ");
    			for (String s : line) {
    				context.write(new Text(s), new IntWritable(1));
    			}
    
    
    		}
    	}
    
    
    	static class MyReducer extends Reducer<Text, IntWritable, WCWritable, NullWritable> {
    		private TreeSet<WCWritable> set;
    		private final int KEY = 10;
    
    
    		@Override
    		protected void setup(Context context) throws IOException, InterruptedException {
    			set = new TreeSet<WCWritable>();
    		}
    
    
    		@Override
    		protected void reduce(Text key, Iterable<IntWritable> values, Context context)
    				throws IOException, InterruptedException {
    			WCWritable w = new WCWritable();
    			int sum = 0;
    			for (IntWritable v : values) {
    				sum += v.get();
    			}
    			w.setWord(key.toString());
    			w.setCount(sum);
    
    
    			set.add(w);
    
    
    			if (KEY < set.size()) {
    				set.remove(set.last());
    			}
    		}
    
    
    		@Override
    		protected void cleanup(Context context) throws IOException, InterruptedException {
    			Iterator<WCWritable> iterator = set.iterator();
    			while (iterator.hasNext()) {
    				context.write(iterator.next(), NullWritable.get());
    			}
    		}
    	}
    
    
    	public static void main(String[] args) throws Exception {
    		Top10_2 t = new Top10_2();
    		Configuration con = t.getConf();
    		String[] other = new GenericOptionsParser(con, args).getRemainingArgs();
    		if (other.length != 2) {
    			System.err.println("number is fail");
    		}
    		int run = ToolRunner.run(con, t, args);
    		System.exit(run);
    	}
    
    
    	@Override
    	public Configuration getConf() {
    		if (conf != null) {
    			return conf;
    		}
    		return new Configuration();
    	}
    
    
    	@Override
    	public void setConf(Configuration arg0) {
    
    
    	}
    
    
    	@Override
    	public int run(String[] other) throws Exception {
    		Configuration con = getConf();
    		Job job = Job.getInstance(con);
    		job.setJarByClass(Top10_2.class);
    		job.setMapperClass(MyMapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    
    		// 默认分区
    		job.setPartitionerClass(HashPartitioner.class);
    
    
    		job.setReducerClass(MyReducer.class);
    		job.setOutputKeyClass(WCWritable.class);
    		job.setOutputValueClass(NullWritable.class);
    
    
    		FileInputFormat.addInputPath(job, new Path(other[0]));
    		FileOutputFormat.setOutputPath(job, new Path(other[1]));
    
    
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    }



    二.写一个MapReduce的模板

    package util;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class Frame extends ToolRunner implements Tool {
    
    	private Configuration conf;
    
    	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
    
    		@Override
    		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    			String[] line = value.toString().split(" ");
    
    		}
    	}
    
    	public static class MyReduce extends Reducer<Text, Text, Text, Text> {
    
    		@Override
    		protected void setup(Context context) throws IOException, InterruptedException {
    		}
    
    		@Override
    		protected void reduce(Text key, Iterable<Text> values, Context context)
    				throws IOException, InterruptedException {
    		}
    
    		@Override
    		protected void cleanup(Context context) throws IOException, InterruptedException {
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		Frame t = new Frame();
    		Configuration conf = t.getConf();
    		String[] other = new GenericOptionsParser(conf, args).getRemainingArgs();
    		if (other.length != 2) {
    			System.err.println("number is fail");
    		}
    		int run = ToolRunner.run(conf, t, args);
    		System.exit(run);
    	}
    
    	@Override
    	public Configuration getConf() {
    		if (conf != null) {
    			return conf;
    		}
    		return new Configuration();
    	}
    
    	@Override
    	public void setConf(Configuration arg0) {
    
    	}
    
    	@Override
    	public int run(String[] other) throws Exception {
    		Configuration con = getConf();
    		Job job = Job.getInstance(con);
    		job.setJarByClass(Frame.class);
    		job.setMapperClass(MyMapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    
    		//默认分区
    		job.setPartitionerClass(HashPartitioner.class);
    		
    		job.setReducerClass(MyReduce.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    
    		FileInputFormat.addInputPath(job, new Path(other[0]));
    		FileOutputFormat.setOutputPath(job, new Path(other[1]));
    
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    
    }

    .MapReduce的分区

    1)手动分
    package com.huhu.day03;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;                                                                                       
    /**
     * 手动分区
     * 
     * @author huhu_k
     *
     */
    public class ManualPartition extends ToolRunner implements Tool {
    
    	private Configuration conf;
    
    	public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    		@Override
    		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    			String[] line = value.toString().split(" ");
    			for (String s : line) {
    				context.write(new Text(s), new IntWritable(1));
    			}
    		}
    	}
    
    	public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    		private MultipleOutputs<Text, IntWritable> mos;
    
    		@Override
    		protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
    				throws IOException, InterruptedException {
    			mos = new MultipleOutputs<>(context);
    		}
    
    		@Override
    		protected void reduce(Text key, Iterable<IntWritable> values, Context context)
    				throws IOException, InterruptedException {
    			int sum = 0;
    			for (IntWritable v : values) {
    				sum += v.get();
    			}
    
    			if (key.toString().substring(0, 1).matches("[a-z]")) {
    				mos.write("az", key.toString(), new IntWritable(sum));
    			} else if (key.toString().substring(0, 1).matches("[A-Z]")) {
    				mos.write("AZ", key.toString(), new IntWritable(sum));
    			} else if (key.toString().substring(0, 1).matches("[0-9]")) {
    				mos.write("09", key.toString(), new IntWritable(sum));
    			} else {
    				mos.write("default", key.toString(), new IntWritable(sum));
    			}
    		}
    
    		@Override
    		protected void cleanup(Context context) throws IOException, InterruptedException {
    			// 很重要 -->因为mos类时一个类似的缓冲区 hdfs可以写 更改 追加写
    			mos.close();
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		ManualPartition t = new ManualPartition();
    		Configuration conf = t.getConf();
    		String[] other = new GenericOptionsParser(conf, args).getRemainingArgs();
    		if (other.length != 2) {
    			System.err.println("number is fail");
    		}
    		int run = ToolRunner.run(conf, t, args);
    		System.exit(run);
    	}
    
    	@Override
    	public Configuration getConf() {
    		if (conf != null) {
    			return conf;
    		}
    		return new Configuration();
    	}
    
    	@Override
    	public void setConf(Configuration arg0) {
    
    	}
    
    	@Override
    	public int run(String[] other) throws Exception {
    		Configuration con = getConf();
    		Job job = Job.getInstance(con);
    		job.setJarByClass(ManualPartition.class);
    		job.setMapperClass(MyMapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		// 默认分区
    		job.setPartitionerClass(HashPartitioner.class);
    
    		job.setReducerClass(MyReduce.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.addInputPath(job, new Path(other[0]));
    		FileOutputFormat.setOutputPath(job, new Path(other[1]));
    		// 手动分区
    		MultipleOutputs.addNamedOutput(job, "az", TextOutputFormat.class, Text.class, IntWritable.class);
    		MultipleOutputs.addNamedOutput(job, "AZ", TextOutputFormat.class, Text.class, IntWritable.class);
    		MultipleOutputs.addNamedOutput(job, "09", TextOutputFormat.class, Text.class, IntWritable.class);
    		MultipleOutputs.addNamedOutput(job, "default", TextOutputFormat.class, Text.class, IntWritable.class);
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    
    }


    2)自动分区
    package com.huhu.day03.partitioner;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class WordCountAUTOPartitioner extends Partitioner<Text, IntWritable> {
    
    	@Override
    	public int getPartition(Text key, IntWritable value, int numPartitioner) {
    		String firstChar = key.toString().substring(0, 1);
    		if (firstChar.matches("[a-g]")) {
    			//返回
    			return 0 % numPartitioner;
    		} else if (firstChar.matches("[h-z]")) {
    			return 1 % numPartitioner;
    		} else if (firstChar.matches("[0-5]")) {
    			return 2 % numPartitioner;
    		} else if (firstChar.matches("[6-9]")) {
    			return 3 % numPartitioner;
    		} else if (firstChar.matches("[A-G]")) {
    			return 0 % numPartitioner;
    		} else if (firstChar.matches("[H-Z]")) {
    			return 5 % numPartitioner;
    		} else {
    			return 6 % numPartitioner;
    		}
    	}
    
    }
    

    package com.huhu.day03;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.huhu.day03.partitioner.WordCountAUTOPartitioner;
    
    public class AutomaticPartition extends ToolRunner implements Tool {
    
    	private Configuration conf;
    
    	public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    		@Override
    		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    			String[] line = value.toString().split(" ");
    			for (String s : line) {
    				context.write(new Text(s), new IntWritable(1));
    			}
    		}
    	}
    
    	public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    		@Override
    		protected void setup(Context context) throws IOException, InterruptedException {
    		}
    
    		@Override
    		protected void reduce(Text key, Iterable<IntWritable> values, Context context)
    				throws IOException, InterruptedException {
    			int sum = 0;
    			for (IntWritable v : values) {
    				sum += v.get();
    			}
    			context.write(key, new IntWritable(sum));
    		}
    
    		@Override
    		protected void cleanup(Context context) throws IOException, InterruptedException {
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		AutomaticPartition t = new AutomaticPartition();
    		Configuration conf = t.getConf();
    		String[] other = new GenericOptionsParser(conf, args).getRemainingArgs();
    		if (other.length != 2) {
    			System.err.println("number is fail");
    		}
    		int run = ToolRunner.run(conf, t, args);
    		System.exit(run);
    	}
    
    	@Override
    	public Configuration getConf() {
    		if (conf != null) {
    			return conf;
    		}
    		return new Configuration();
    	}
    
    	@Override
    	public void setConf(Configuration arg0) {
    
    	}
    
    	@Override
    	public int run(String[] other) throws Exception {
    		Configuration con = getConf();
    		Job job = Job.getInstance(con);
    		job.setJarByClass(AutomaticPartition.class);
    		job.setMapperClass(MyMapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		// 默认分区
    		// job.setPartitionerClass(HashPartitioner.class);
    
    		// 自定义分区
    		job.setPartitionerClass(WordCountAUTOPartitioner.class);
    		// 分40个区
    		job.setNumReduceTasks(40);
    		
    		job.setReducerClass(MyReduce.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.addInputPath(job, new Path(other[0]));
    		FileOutputFormat.setOutputPath(job, new Path(other[1]));
    
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    
    }
    

    只有在我的分区类中设置的分区中有内容,别的没有,因为我没有设置

    四.自定义分组
    package com.huhu.day03;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.huhu.day03.group.ClassGroupSort;
    import com.huhu.day03.pojo.Student;
    
    public class StudentAutoGroup extends ToolRunner implements Tool {
    
    	private Configuration conf;
    
    	public static class MyMapper extends Mapper<LongWritable, Text, Student, Student> {
    
    		@Override
    		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    			String[] line = value.toString().split(" ");
    			Student s = new Student(line[0], line[1], Integer.parseInt(line[2]));
    			context.write(s, s);
    		}
    	}
    
    	public static class MyReduce extends Reducer<Student, Student, Text, IntWritable> {
    
    		@Override
    		protected void reduce(Student key, Iterable<Student> values, Context context)
    				throws IOException, InterruptedException {
    			int sum = 0;
    			for (Student s : values) {
    				sum += s.getSccore();
    			}
    			context.write(new Text(key.getGroup()), new IntWritable(sum));
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		StudentAutoGroup t = new StudentAutoGroup();
    		Configuration conf = t.getConf();
    		String[] other = new GenericOptionsParser(conf, args).getRemainingArgs();
    		if (other.length != 2) {
    			System.err.println("number is fail");
    		}
    		int run = ToolRunner.run(conf, t, args);
    		System.exit(run);
    	}
    
    	@Override
    	public Configuration getConf() {
    		if (conf != null) {
    			return conf;
    		}
    		return new Configuration();
    	}
    
    	@Override
    	public void setConf(Configuration arg0) {
    
    	}
    
    	@Override
    	public int run(String[] other) throws Exception {
    		Configuration con = getConf();
    		Job job = Job.getInstance(con);
    		job.setJarByClass(StudentAutoGroup.class);
    		job.setMapperClass(MyMapper.class);
    		job.setMapOutputKeyClass(Student.class);
    		job.setMapOutputValueClass(Student.class);
    
    		// 分组
    		job.setCombinerKeyGroupingComparatorClass(ClassGroupSort.class);
    		// job.setGroupingComparatorClass(ClassGroupSort.class);
    		// 默认分区
    		job.setPartitionerClass(HashPartitioner.class);
    		job.setNumReduceTasks(1);
    
    		job.setReducerClass(MyReduce.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.addInputPath(job, new Path(other[0]));
    		FileOutputFormat.setOutputPath(job, new Path(other[1]));
    
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    
    }
    
    package com.huhu.day03.pojo;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class Student implements WritableComparable<Student> {
    
    	private String name;
    	private String group;
    	private int sccore;
    
    	public Student() {
    		super();
    		// TODO Auto-generated constructor stub
    	}
    
    	public Student(String name, String group, int sccore) {
    		super();
    		this.name = name;
    		this.group = group;
    		this.sccore = sccore;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public String getGroup() {
    		return group;
    	}
    
    	public void setGroup(String group) {
    		this.group = group;
    	}
    
    	public int getSccore() {
    		return sccore;
    	}
    
    	public void setSccore(int sccore) {
    		this.sccore = sccore;
    	}
    
    	@Override
    	public String toString() {
    		return "Student [name=" + name + ", group=" + group + ", sccore=" + sccore + "]";
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		this.name = in.readUTF();
    		this.group = in.readUTF();
    		this.sccore = in.readInt();
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(name);
    		out.writeUTF(group);
    		out.writeInt(sccore);
    	}
    
    	@Override
    	public int compareTo(Student o) {
    		return this.group.compareTo(o.group);
    	}
    
    }
    
    package com.huhu.day03.group;
    
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.WritableComparator;
    
    import com.huhu.day03.pojo.Student;
    
    public class ClassGroupSort implements RawComparator<Student> {
    
    	@Override
    	public int compare(Student o1, Student o2) {
    		return (int) (o1.getGroup().equals(o2.getGroup()) ? 0 : 1);
    	}
    
    	@Override
    	public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
    		return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8);
    	}
    }
    



    总结下吧:

        MapReduce的分区是对key的类型分:可以根据【a-z】【A-Z】【0-9】.....等等。是对reduce中的key来分的

        MapReduce的分组则是忽略key:根据value来分的,比如你传入一个对象,根据对象的属性来比较,虽然传入的是不同的对象,但是只要属性相同,则可以对数据进行操作。

        他们之所以又是分组又是分区,一则为了清洗数据,二则为了给数据排序。

  • 相关阅读:
    一日一技:微信开发-自定义菜单
    Redis五种数据结构
    .NET 5 部署在docker上运行
    一日一技:微信开发-发送模板消息
    Redis快速入门及应用
    面试官扎心一问:防止重复请求提交,有什么方案?
    在Windows上安装Docker
    上班摸鱼神器—VSCode 里也可以看股票 & 基金实时数据
    C# Nuget程序集StackExchange.Redis操作Redis 及 Redis 视频资源 及 相关入门指令 牛逼不,全都有
    Mongodb 更新某一条记录 C#
  • 原文地址:https://www.cnblogs.com/meiLinYa/p/9252105.html
Copyright © 2011-2022 走看看