zoukankan      html  css  js  c++  java
  • map/reduce实现 排序

      1 import java.io.IOException;
      2 
      3 import org.apache.hadoop.conf.Configuration;
      4 import org.apache.hadoop.conf.Configured;
      5 import org.apache.hadoop.fs.Path;
      6 import org.apache.hadoop.io.IntWritable;
      7 import org.apache.hadoop.io.LongWritable;
      8 import org.apache.hadoop.io.Text;
      9 import org.apache.hadoop.mapreduce.Job;
     10 import org.apache.hadoop.mapreduce.Mapper;
     11 import org.apache.hadoop.mapreduce.Partitioner;
     12 import org.apache.hadoop.mapreduce.Reducer;
     13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     17 import org.apache.hadoop.util.Tool;
     18 import org.apache.hadoop.util.ToolRunner;
     19 public class Sort extends Configured implements Tool {
     20     /*
     21      * 排序
     22      * 输入格式:每个数据占一行
     23      * 输出格式:
     24      * 1 21
     25      * 2 32
     26      * 3 62
     27      * 设计思路:
     28      * 使用reduce自带的默认排序规则。MapReduce按照key值进行排序。如果Key值为Intwritable类型,则按照数字大小排序
     29      * 如果key值为Text类型,则按照字典顺序对字符串进行排序。
     30      * 注意:要重写Partition函数。Reduce排序只能保证自己局部的数据顺序,并不能保证全局的。
     31      * */
     32     public static class Map extends Mapper<LongWritable,Text,IntWritable,IntWritable>{
     33         private IntWritable line=new IntWritable();
     34         public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
     35             line.set(Integer.parseInt(value.toString()));
     36             context.write(line, new IntWritable(1));            
     37         }
     38         
     39     }
     40     
     41     public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
     42         private IntWritable num=new IntWritable(1);
     43         public void reduce(IntWritable key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
     44             for(IntWritable var:values){
     45             context.write(num, key);
     46             num=new IntWritable(num.get()+1);
     47             }
     48         }
     49         
     50     }
     51     
     52     public static class Partition extends Partitioner<IntWritable ,IntWritable>{
     53 
     54         @Override
     55         public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
     56             // TODO Auto-generated method stub
     57             System.out.println(numPartitions);
     58             int maxnum=65223;
     59             int bound=maxnum/numPartitions+1;
     60             for(int i=0;i<numPartitions;i++)
     61             {
     62                 if(key.get()>=bound*(i-1)&&key.get()<=bound*i)
     63                 {
     64                     return i;
     65                 }
     66             }
     67             return 0;
     68         }
     69         
     70     }
     71     
     72     public int run(String[] args)throws Exception{
     73         Configuration conf=new Configuration();
     74         Job job=new Job(conf,"Sort");
     75         job.setJarByClass(Sort.class);
     76         
     77         job.setOutputKeyClass(IntWritable.class);
     78         job.setOutputValueClass(IntWritable.class);
     79         
     80         
     81         job.setMapperClass(Map.class);
     82         job.setReducerClass(Reduce.class);
     83         job.setPartitionerClass(Partition.class);
     84         
     85         job.setInputFormatClass(TextInputFormat.class);
     86         job.setOutputFormatClass(TextOutputFormat.class);
     87         
     88         FileInputFormat.addInputPath(job, new Path(args[0]));
     89         FileOutputFormat.setOutputPath(job, new Path(args[1]));
     90         
     91         boolean success=job.waitForCompletion(true);
     92         return success?0:1;
     93     }
     94     
     95     public static void main(String[] args)throws Exception{
     96         int ret=ToolRunner.run(new Sort(), args);
     97         System.exit(ret);
     98     }
     99 
    100 }
  • 相关阅读:
    第一次冲刺02
    第一次冲刺01
    Android源码分析(十四)PackageManagerService服务分析
    Android源码分析(十三)ActivityManagerService服务分析
    Android源码分析(十二)ServiceManager服务分析
    Serializable和Parcelable之序列化
    ViewPager 相关使用
    AIDL介绍以及简单使用
    Android 四大组件 (五) 第五大组件
    Android 源码分析(十一) 事件传递机制
  • 原文地址:https://www.cnblogs.com/6tian/p/3829169.html
Copyright © 2011-2022 走看看