package com.huhu.day03; import; import java.util.Collections; import java.util.Map; import java.util.StringTokenizer; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import; import; import; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * a 80 b 78 r 70 .. 基于value来排序 * * @author huhu_k * */ public class Top10_1 extends ToolRunner implements Tool { private Configuration conf; public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while (st.hasMoreTokens()) { context.write(new Text(st.nextToken()), new IntWritable(1)); } } } public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> { // TreeMap倒叙排列 private TreeMap<Long, String> map = new TreeMap<>(Collections.reverseOrder()); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable v : values) { sum++; } map.put(Long.valueOf(sum), key.toString()); if (map.size() > 10) { // 按key排序 前十 降序 map.remove(map.lastKey()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (Map.Entry<Long, String> m : map.entrySet()) { context.write(new Text(m.getValue()), new IntWritable(Integer.parseInt(m.getKey() + ""))); } } } @Override public Configuration getConf() { if (conf != null) { return conf; } return new Configuration(); } @Override public void setConf(Configuration arg0) { } @Override public int run(String[] other) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Top10_1.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(other[0])); FileOutputFormat.setOutputPath(job, new Path(other[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { Top10_1 t = new Top10_1(); String[] other = new GenericOptionsParser(t.getConf(), args).getRemainingArgs(); if (other.length != 2) { System.out.println("your input args number is fail,you need input <in> and <out>"); System.exit(0); }, t, other); } }
package com.huhu.day03; import; import java.util.Iterator; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import; import; import; import; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * a 80 b 78 r 70 .. 基于value来排序 TreeSet * * @author huhu_k * */ public class Top10_2 extends ToolRunner implements Tool { private Configuration conf; public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(" "); for (String s : line) { context.write(new Text(s), new IntWritable(1)); } } } public static class MyReduce extends Reducer<Text, IntWritable, WCWritable, NullWritable> { // TreeSet倒叙排列 private TreeSet<WCWritable> set; private final int KEY = 11; @Override protected void setup(Context context) throws IOException, InterruptedException { set = new TreeSet<WCWritable>(); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { WCWritable w = new WCWritable(); int sum = 0; for (IntWritable i : values) { sum += i.get(); } w.setWord(key.toString()); w.setCount(sum); set.add(w); if (KEY < set.size()) { set.remove(set.last()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Iterator<WCWritable> iterator = set.iterator(); if (iterator.hasNext()) { context.write(, NullWritable.get()); } } } @Override public Configuration getConf() { if (conf != null) { return conf; } return new Configuration(); } @Override public void setConf(Configuration arg0) { } @Override public int run(String[] other) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Top10_2.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(WCWritable.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(other[0])); FileOutputFormat.setOutputPath(job, new Path(other[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { Top10_2 t = new Top10_2(); String[] other = new GenericOptionsParser(t.getConf(), args).getRemainingArgs(); if (other.length != 2) { System.out.println("your input args number is fail,you need input <in> and <out>"); System.exit(0); }, t, other); } }
package com.huhu.day03; import; import; import; import; public class WCWritable implements WritableComparable<WCWritable> { private String word; private int count; public WCWritable() { super(); } public WCWritable(String word, int count) { super(); this.word = word; this.count = count; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override public String toString() { return "WCWritable [word=" + word + ", count=" + count + "]"; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + count; result = prime * result + ((word == null) ? 0 : word.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; WCWritable other = (WCWritable) obj; if (count != other.count) return false; if (word == null) { if (other.word != null) return false; } else if (!word.equals(other.word)) return false; return true; } @Override public void readFields(DataInput in) throws IOException { this.word = in.readUTF(); this.count = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(word); out.writeInt(count); } @Override public int compareTo(WCWritable o) { if (this.count == o.count) { // 字典顺序 return this.word.compareTo(o.word); // return this.word.length() - o.word.length(); } return o.count - this.count; } }
package com.huhu.day03; import; import java.util.Iterator; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import; import; import; import; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * a 80 b 78 r 70 .. 基于value来排序 TreeSet * * @author huhu_k * */ public class Top10_2 extends ToolRunner implements Tool { private Configuration conf; static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(" "); for (String s : line) { context.write(new Text(s), new IntWritable(1)); } } } static class MyReducer extends Reducer<Text, IntWritable, WCWritable, NullWritable> { private TreeSet<WCWritable> set; private final int KEY = 10; @Override protected void setup(Context context) throws IOException, InterruptedException { set = new TreeSet<WCWritable>(); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { WCWritable w = new WCWritable(); int sum = 0; for (IntWritable v : values) { sum += v.get(); } w.setWord(key.toString()); w.setCount(sum); set.add(w); if (KEY < set.size()) { set.remove(set.last()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Iterator<WCWritable> iterator = set.iterator(); while (iterator.hasNext()) { context.write(, NullWritable.get()); } } } public static void main(String[] args) throws Exception { Top10_2 t = new Top10_2(); Configuration con = t.getConf(); String[] other = new GenericOptionsParser(con, args).getRemainingArgs(); if (other.length != 2) { System.err.println("number is fail"); } int run =, t, args); System.exit(run); } @Override public Configuration getConf() { if (conf != null) { return conf; } return new Configuration(); } @Override public void setConf(Configuration arg0) { } @Override public int run(String[] other) throws Exception { Configuration con = getConf(); Job job = Job.getInstance(con); job.setJarByClass(Top10_2.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 默认分区 job.setPartitionerClass(HashPartitioner.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(WCWritable.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(other[0])); FileOutputFormat.setOutputPath(job, new Path(other[1])); return job.waitForCompletion(true) ? 0 : 1; } }
package util; import; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import; import; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Frame extends ToolRunner implements Tool { private Configuration conf; public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(" "); } } public static class MyReduce extends Reducer<Text, Text, Text, Text> { @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { } @Override protected void cleanup(Context context) throws IOException, InterruptedException { } } public static void main(String[] args) throws Exception { Frame t = new Frame(); Configuration conf = t.getConf(); String[] other = new GenericOptionsParser(conf, args).getRemainingArgs(); if (other.length != 2) { System.err.println("number is fail"); } int run =, t, args); System.exit(run); } @Override public Configuration getConf() { if (conf != null) { return conf; } return new Configuration(); } @Override public void setConf(Configuration arg0) { } @Override public int run(String[] other) throws Exception { Configuration con = getConf(); Job job = Job.getInstance(con); job.setJarByClass(Frame.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //默认分区 job.setPartitionerClass(HashPartitioner.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(other[0])); FileOutputFormat.setOutputPath(job, new Path(other[1])); return job.waitForCompletion(true) ? 0 : 1; } }
package com.huhu.day03; import; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import; import; import; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 手动分区 * * @author huhu_k * */ public class ManualPartition extends ToolRunner implements Tool { private Configuration conf; public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(" "); for (String s : line) { context.write(new Text(s), new IntWritable(1)); } } } public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> { private MultipleOutputs<Text, IntWritable> mos; @Override protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { mos = new MultipleOutputs<>(context); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable v : values) { sum += v.get(); } if (key.toString().substring(0, 1).matches("[a-z]")) { mos.write("az", key.toString(), new IntWritable(sum)); } else if (key.toString().substring(0, 1).matches("[A-Z]")) { mos.write("AZ", key.toString(), new IntWritable(sum)); } else if (key.toString().substring(0, 1).matches("[0-9]")) { mos.write("09", key.toString(), new IntWritable(sum)); } else { mos.write("default", key.toString(), new IntWritable(sum)); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // 很重要 -->因为mos类时一个类似的缓冲区 hdfs可以写 更改 追加写 mos.close(); } } public static void main(String[] args) throws Exception { ManualPartition t = new ManualPartition(); Configuration conf = t.getConf(); String[] other = new GenericOptionsParser(conf, args).getRemainingArgs(); if (other.length != 2) { System.err.println("number is fail"); } int run =, t, args); System.exit(run); } @Override public Configuration getConf() { if (conf != null) { return conf; } return new Configuration(); } @Override public void setConf(Configuration arg0) { } @Override public int run(String[] other) throws Exception { Configuration con = getConf(); Job job = Job.getInstance(con); job.setJarByClass(ManualPartition.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 默认分区 job.setPartitionerClass(HashPartitioner.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(other[0])); FileOutputFormat.setOutputPath(job, new Path(other[1])); // 手动分区 MultipleOutputs.addNamedOutput(job, "az", TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "AZ", TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "09", TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "default", TextOutputFormat.class, Text.class, IntWritable.class); return job.waitForCompletion(true) ? 0 : 1; } }
package com.huhu.day03.partitioner; import; import; import org.apache.hadoop.mapreduce.Partitioner; public class WordCountAUTOPartitioner extends Partitioner<Text, IntWritable> { @Override public int getPartition(Text key, IntWritable value, int numPartitioner) { String firstChar = key.toString().substring(0, 1); if (firstChar.matches("[a-g]")) { //返回 return 0 % numPartitioner; } else if (firstChar.matches("[h-z]")) { return 1 % numPartitioner; } else if (firstChar.matches("[0-5]")) { return 2 % numPartitioner; } else if (firstChar.matches("[6-9]")) { return 3 % numPartitioner; } else if (firstChar.matches("[A-G]")) { return 0 % numPartitioner; } else if (firstChar.matches("[H-Z]")) { return 5 % numPartitioner; } else { return 6 % numPartitioner; } } }
package com.huhu.day03; import; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import; import; import; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.huhu.day03.partitioner.WordCountAUTOPartitioner; public class AutomaticPartition extends ToolRunner implements Tool { private Configuration conf; public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(" "); for (String s : line) { context.write(new Text(s), new IntWritable(1)); } } } public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable v : values) { sum += v.get(); } context.write(key, new IntWritable(sum)); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { } } public static void main(String[] args) throws Exception { AutomaticPartition t = new AutomaticPartition(); Configuration conf = t.getConf(); String[] other = new GenericOptionsParser(conf, args).getRemainingArgs(); if (other.length != 2) { System.err.println("number is fail"); } int run =, t, args); System.exit(run); } @Override public Configuration getConf() { if (conf != null) { return conf; } return new Configuration(); } @Override public void setConf(Configuration arg0) { } @Override public int run(String[] other) throws Exception { Configuration con = getConf(); Job job = Job.getInstance(con); job.setJarByClass(AutomaticPartition.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 默认分区 // job.setPartitionerClass(HashPartitioner.class); // 自定义分区 job.setPartitionerClass(WordCountAUTOPartitioner.class); // 分40个区 job.setNumReduceTasks(40); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(other[0])); FileOutputFormat.setOutputPath(job, new Path(other[1])); return job.waitForCompletion(true) ? 0 : 1; } }
package com.huhu.day03; import; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import; import; import; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import; import com.huhu.day03.pojo.Student; public class StudentAutoGroup extends ToolRunner implements Tool { private Configuration conf; public static class MyMapper extends Mapper<LongWritable, Text, Student, Student> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(" "); Student s = new Student(line[0], line[1], Integer.parseInt(line[2])); context.write(s, s); } } public static class MyReduce extends Reducer<Student, Student, Text, IntWritable> { @Override protected void reduce(Student key, Iterable<Student> values, Context context) throws IOException, InterruptedException { int sum = 0; for (Student s : values) { sum += s.getSccore(); } context.write(new Text(key.getGroup()), new IntWritable(sum)); } } public static void main(String[] args) throws Exception { StudentAutoGroup t = new StudentAutoGroup(); Configuration conf = t.getConf(); String[] other = new GenericOptionsParser(conf, args).getRemainingArgs(); if (other.length != 2) { System.err.println("number is fail"); } int run =, t, args); System.exit(run); } @Override public Configuration getConf() { if (conf != null) { return conf; } return new Configuration(); } @Override public void setConf(Configuration arg0) { } @Override public int run(String[] other) throws Exception { Configuration con = getConf(); Job job = Job.getInstance(con); job.setJarByClass(StudentAutoGroup.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Student.class); job.setMapOutputValueClass(Student.class); // 分组 job.setCombinerKeyGroupingComparatorClass(ClassGroupSort.class); // job.setGroupingComparatorClass(ClassGroupSort.class); // 默认分区 job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(other[0])); FileOutputFormat.setOutputPath(job, new Path(other[1])); return job.waitForCompletion(true) ? 0 : 1; } }
package com.huhu.day03.pojo; import; import; import; import; public class Student implements WritableComparable<Student> { private String name; private String group; private int sccore; public Student() { super(); // TODO Auto-generated constructor stub } public Student(String name, String group, int sccore) { super(); = name; = group; this.sccore = sccore; } public String getName() { return name; } public void setName(String name) { = name; } public String getGroup() { return group; } public void setGroup(String group) { = group; } public int getSccore() { return sccore; } public void setSccore(int sccore) { this.sccore = sccore; } @Override public String toString() { return "Student [name=" + name + ", group=" + group + ", sccore=" + sccore + "]"; } @Override public void readFields(DataInput in) throws IOException { = in.readUTF(); = in.readUTF(); this.sccore = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeUTF(group); out.writeInt(sccore); } @Override public int compareTo(Student o) { return; } }
package; import; import; import com.huhu.day03.pojo.Student; public class ClassGroupSort implements RawComparator<Student> { @Override public int compare(Student o1, Student o2) { return (int) (o1.getGroup().equals(o2.getGroup()) ? 0 : 1); } @Override public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) { return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8); } }