zoukankan      html  css  js  c++  java
  • 现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个整数为第二个整数的排序位次,第二个整数为原待排列的整数。

    package org.apache.hadoop.examples;
    import java.util.HashMap;
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class B_sortInt {
    	public static Integer numsum = new Integer(0);
    	public B_sortInt() {
    	}
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		conf.set("fs.defaultFS", "hdfs://localhost:9000");
    		String[] otherArgs = new String[]{"input","output"};
    		if(otherArgs.length < 2) {
    			System.err.println("Usage: wordcount <in> [<in>...] <out>");
    			System.exit(2);
    		}
    
    		Job job = Job.getInstance(conf, "sort");
    		job.setJarByClass(B_sortInt.class);
    		job.setMapperClass(B_sortInt.TokenizerMapper.class);
            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(Text.class);
    		job.setReducerClass(B_sortInt.IntSumReducer.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    
    		for(int i = 0; i < otherArgs.length - 1; ++i) {
    			FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    		}
    
    		FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
    		System.exit(job.waitForCompletion(true)?0:1);
    	}
    
    	public static class IntSumReducer extends Reducer<IntWritable, Text, Text, Text> {
    		private Text word = new Text();
    		public IntSumReducer() {
    		}
    
    		public void reduce(IntWritable key, Iterable<Text> values, Reducer<IntWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
    			this.word.set(key.toString());
    			numsum+=1;
    			context.write(new Text(numsum.toString()), word);
    		}
    		//System.out.println(key.toString()+"
    "+result.toString());
    	}
    
    
    	public static class TokenizerMapper extends Mapper<Object, Text, IntWritable, Text> {
    		private IntWritable one = new IntWritable();
    		public TokenizerMapper() {
    		}
    
    		public void map(Object key, Text value, Mapper<Object, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException {
    			StringTokenizer itr = new StringTokenizer(value.toString());
    			while(itr.hasMoreTokens()) {
    				String tmpstr = itr.nextToken();
    				this.one.set(Integer.parseInt(tmpstr));
    				//System.out.println("["+one.toString()+"]");
    				context.write(one, new Text("a"));
    			}
    
    		}
    	}
    }
    
  • 相关阅读:
    Java控制台常用命令
    redis如何查看所有的key
    An internal error has occurred. Java heap space
    redis演练
    各种编程实现的树
    MYSQL两个数据库字符集保持一致问题
    进程控制之fork函数
    进程控制之进程标识符
    进程环境之getrlimit和setrlimit函数
    进程环境之setjmp和longjmp函数
  • 原文地址:https://www.cnblogs.com/MiraculousB/p/14106847.html
Copyright © 2011-2022 走看看