zoukankan      html  css  js  c++  java
  • java写hadoop全局排序

    前言:

    一直不会用java,都是streaming的方式用C或者python写mapper或者reducer的可执行程序。但是有些情况,如全排序等等用streaming的方式往往不好处理,于是乎用原生语言来写map-reduce;

    开发环境eclipse,windows,把hadoop相关的jar附加到程序中,打包后放回linux虚机执行;

    输入数据

     1 haha    10
      2 haha    9
      3 haha    100
      4 haha    1
      5 haha    1
      6 haha    2
      7 haha    3
      8 haha    1000
      9 haha    1000
     10 haha    999
     11 haha    888
     12 haha    10000

    输出数据 cat part*-*>o.txt

    1 haha    1                                                                                                        
      2 haha    1
      3 haha    2
      4 haha    3
      5 haha    9
      6 haha    10
      7 haha    100
      8 haha    888
      9 haha    999
    10 haha    1000
    11 haha    1000
    12 haha    10000

    代码 MyMapper

    package com.globalsort;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
     
    
    public class MyMapper extends
     
       Mapper<LongWritable, Text, LongWritable, Text> {
     
            @Override
            protected void map(LongWritable  key, Text value, Context context)
                            throws IOException, InterruptedException {
            		String temp=value.toString();
            		String[] segs = temp.split("	"); 
            		if (segs.length!=2)
            		{
            			return;
            		}
            		int newval = Integer.parseInt(segs[1]);
                    context.write(new LongWritable(newval),
                                    new Text(segs[0]));
     
    
            }
     
    
    }
    

    重写reducer

    package com.globalsort;
    
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.util.Iterator;  
    
    public class MyReducer extends
     
                    Reducer<LongWritable, Text,Text,LongWritable > {
    
            @Override
     
            protected void reduce(LongWritable key, Iterable<Text> values,
     
                            Context context) throws IOException, InterruptedException {
     
            	Iterator<Text> it = values.iterator(); 
            	while (it.hasNext()) 
            	{
            		String data = it.next().toString();
                    context.write(new Text(data),key);
     
            	}
            }
     
    
    }
    

     重写patitioner

    package com.globalsort;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    public class MyPartitioner extends Partitioner<LongWritable, Text> {
     
    
            @Override
     
            public int getPartition(LongWritable key, Text value, int numPartitions) {
                    long tmp = key.get();
                    if (tmp <= 100) { 
                            return 0 % numPartitions;
     
                    } else if (tmp <= 1000) { 
                            return 1 % numPartitions;
     
                    } else {
                            return 2 % numPartitions;
     
                    }  
     
            }
           
     
    
    }
    

      runer

    package com.globalsort;
     
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;  
    
    public class GlobalSortMain implements Tool {
    	
    	private Configuration conf;
    	
    	@Override 
    	public Configuration getConf() {
    		return conf;
    	}
    	
        @Override
        public void setConf (Configuration conf){
        	this.conf=conf;
        }
        @Override
        public int run(String[] args) throws Exception {
        	String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        	if (otherArgs.length != 3) {                                                    
        		 System.err.println("Usage:  must contain <in> <out>");  
        	 }
        		Job job = configureJob(otherArgs);
        	 return (job.waitForCompletion(true) ? 0 : 1);
        }
        
        private Job configureJob(String[] args) throws IOException {
        	
        	conf.set("mapred.job.priority", "VERY_HIGH");
        //	conf.setBoolean("mapred.compress.map.output", true);
        	//conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
         //	conf.setBoolean("mapred.compress.reduce.output", true);
        	//conf.setClass("mapred.reduce.output.compression.codec", GzipCodec.class, CompressionCodec.class);
            Job job = new Job(conf, "global sort liuyu");
            job.setJarByClass(GlobalSortMain.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            job.setPartitionerClass(MyPartitioner.class);
            job.setNumReduceTasks(3);
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[2]));  
        	return job;
        }
        
    
            public static void main(String[] args) throws Exception {
     
                    Configuration conf = new Configuration();
                    ToolRunner.run(conf, new GlobalSortMain(), args);
            }
     
    }
    

      

  • 相关阅读:
    spring学习记录_Spring中的新注解
    spring学习记录_spring的 注解
    spring学习记录_spring的 ioc核心容器
    关于myeclipse项目运行报错:Access denied for user 'root'@'localhost' (using password: YES)
    vue项目中实现多语言切换
    OC中限制UITextView的最大字数的实现
    简单瀑布流的实现
    仿购物车的实现
    仿QQ好友列表界面的实现
    类似QQ侧滑菜单功能实现
  • 原文地址:https://www.cnblogs.com/finallyliuyu/p/5091693.html
Copyright © 2011-2022 走看看