zoukankan      html  css  js  c++  java
  • java写hadoop全局排序

    前言:

    一直不会用java,都是streaming的方式用C或者python写mapper或者reducer的可执行程序。但是有些情况,如全排序等等用streaming的方式往往不好处理,于是乎用原生语言来写map-reduce;

    开发环境eclipse,windows,把hadoop相关的jar附加到程序中,打包后放回linux虚机执行;

    输入数据

     1 haha    10
      2 haha    9
      3 haha    100
      4 haha    1
      5 haha    1
      6 haha    2
      7 haha    3
      8 haha    1000
      9 haha    1000
     10 haha    999
     11 haha    888
     12 haha    10000

    输出数据 cat part*-*>o.txt

    1 haha    1                                                                                                        
      2 haha    1
      3 haha    2
      4 haha    3
      5 haha    9
      6 haha    10
      7 haha    100
      8 haha    888
      9 haha    999
    10 haha    1000
    11 haha    1000
    12 haha    10000

    代码 MyMapper

    package com.globalsort;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
     
    
    public class MyMapper extends
     
       Mapper<LongWritable, Text, LongWritable, Text> {
     
            @Override
            protected void map(LongWritable  key, Text value, Context context)
                            throws IOException, InterruptedException {
            		String temp=value.toString();
            		String[] segs = temp.split("	"); 
            		if (segs.length!=2)
            		{
            			return;
            		}
            		int newval = Integer.parseInt(segs[1]);
                    context.write(new LongWritable(newval),
                                    new Text(segs[0]));
     
    
            }
     
    
    }
    

    重写reducer

    package com.globalsort;
    
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.util.Iterator;  
    
    public class MyReducer extends
     
                    Reducer<LongWritable, Text,Text,LongWritable > {
    
            @Override
     
            protected void reduce(LongWritable key, Iterable<Text> values,
     
                            Context context) throws IOException, InterruptedException {
     
            	Iterator<Text> it = values.iterator(); 
            	while (it.hasNext()) 
            	{
            		String data = it.next().toString();
                    context.write(new Text(data),key);
     
            	}
            }
     
    
    }
    

     重写patitioner

    package com.globalsort;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    public class MyPartitioner extends Partitioner<LongWritable, Text> {
     
    
            @Override
     
            public int getPartition(LongWritable key, Text value, int numPartitions) {
                    long tmp = key.get();
                    if (tmp <= 100) { 
                            return 0 % numPartitions;
     
                    } else if (tmp <= 1000) { 
                            return 1 % numPartitions;
     
                    } else {
                            return 2 % numPartitions;
     
                    }  
     
            }
           
     
    
    }
    

      runer

    package com.globalsort;
     
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;  
    
    public class GlobalSortMain implements Tool {
    	
    	private Configuration conf;
    	
    	@Override 
    	public Configuration getConf() {
    		return conf;
    	}
    	
        @Override
        public void setConf (Configuration conf){
        	this.conf=conf;
        }
        @Override
        public int run(String[] args) throws Exception {
        	String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        	if (otherArgs.length != 3) {                                                    
        		 System.err.println("Usage:  must contain <in> <out>");  
        	 }
        		Job job = configureJob(otherArgs);
        	 return (job.waitForCompletion(true) ? 0 : 1);
        }
        
        private Job configureJob(String[] args) throws IOException {
        	
        	conf.set("mapred.job.priority", "VERY_HIGH");
        //	conf.setBoolean("mapred.compress.map.output", true);
        	//conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
         //	conf.setBoolean("mapred.compress.reduce.output", true);
        	//conf.setClass("mapred.reduce.output.compression.codec", GzipCodec.class, CompressionCodec.class);
            Job job = new Job(conf, "global sort liuyu");
            job.setJarByClass(GlobalSortMain.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            job.setPartitionerClass(MyPartitioner.class);
            job.setNumReduceTasks(3);
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[2]));  
        	return job;
        }
        
    
            public static void main(String[] args) throws Exception {
     
                    Configuration conf = new Configuration();
                    ToolRunner.run(conf, new GlobalSortMain(), args);
            }
     
    }
    

      

  • 相关阅读:
    UVA 254 Towers of Hanoi
    UVA 701 The Archeologists' Dilemma
    UVA 185 Roman Numerals
    UVA 10994 Simple Addition
    UVA 10570 Meeting with Aliens
    UVA 306 Cipher
    UVA 10160 Servicing Stations
    UVA 317 Hexagon
    UVA 10123 No Tipping
    UVA 696 How Many Knights
  • 原文地址:https://www.cnblogs.com/finallyliuyu/p/5091693.html
Copyright © 2011-2022 走看看