zoukankan      html  css  js  c++  java
  • MapReduce的自定义排序

      1 package com.mengyao.hadoop.mapreduce;
      2 
      3 import java.io.DataInput;
      4 import java.io.DataOutput;
      5 import java.io.IOException;
      6 import java.text.SimpleDateFormat;
      7 import java.util.Date;
      8 
      9 import org.apache.hadoop.conf.Configuration;
     10 import org.apache.hadoop.conf.Configured;
     11 import org.apache.hadoop.fs.Path;
     12 import org.apache.hadoop.io.LongWritable;
     13 import org.apache.hadoop.io.Text;
     14 import org.apache.hadoop.io.WritableComparable;
     15 import org.apache.hadoop.mapreduce.Job;
     16 import org.apache.hadoop.mapreduce.Mapper;
     17 import org.apache.hadoop.mapreduce.Reducer;
     18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     20 import org.apache.hadoop.util.Tool;
     21 import org.apache.hadoop.util.ToolRunner;
     22 
     23 public class SortApp extends Configured implements Tool {
     24 
     25     static class Num2 implements WritableComparable<Num2> {
     26 
     27         private long c1;
     28         private long c2;
     29         
     30         public Num2(){
     31             
     32         }
     33         
     34         public Num2(long c1, long c2) {
     35             this.c1 = c1;
     36             this.c2 = c2;
     37         }
     38 
     39         @Override
     40         public void write(DataOutput out) throws IOException {
     41             out.writeLong(this.c1);
     42             out.writeLong(this.c2);
     43         }
     44 
     45         @Override
     46         public void readFields(DataInput in) throws IOException {
     47             this.c1 = in.readLong();
     48             this.c2 = in.readLong();
     49         }
     50 
     51         @Override
     52         public int compareTo(Num2 num2) {
     53             long min = this.c1 - num2.c1;
     54             if (min != 0) {
     55                 return (int)min;
     56             }
     57             
     58             return (int)(this.c2 - num2.c2);
     59         }
     60 
     61         public void set(long c1, long c2) {
     62             this.c1 = c1;
     63             this.c2 = c2;
     64         }
     65         
     66     }
     67     
     68     static class SortMapper extends Mapper<LongWritable, Text, Num2, LongWritable> {
     69         
     70         private Num2 k = null;
     71         private LongWritable v = null;
     72         
     73         @Override
     74         protected void setup(Mapper<LongWritable, Text, Num2, LongWritable>.Context context)
     75                 throws IOException, InterruptedException {
     76             k = new Num2();
     77             v = new LongWritable();
     78         }
     79 
     80         @Override
     81         protected void map(LongWritable key, Text value, Context context)
     82                 throws IOException, InterruptedException {
     83             String[] fields = value.toString().split("\t");
     84             if (fields != null && fields.length == 2) {
     85                 k.set(Long.parseLong(fields[0]), Long.parseLong(fields[1]));
     86                 v.set(Long.parseLong(fields[0]));
     87                 context.write(k, v);                
     88             }
     89         }
     90     }
     91     
     92     static class SortReducer extends Reducer<Num2, LongWritable, LongWritable, LongWritable> {
     93         @Override
     94         protected void reduce(Num2 key, Iterable<LongWritable> value, Context context)
     95                 throws IOException, InterruptedException {
     96             context.write(new LongWritable(key.c1), new LongWritable(key.c2));
     97         }
     98     }
     99     
    100     @Override
    101     public int run(String[] args) throws Exception {
    102         Configuration conf = getConf();
    103         conf.set("mapreduce.job.jvm.numtasks", "-1");        
    104         conf.set("mapreduce.map.speculative", "false");        
    105         conf.set("mapreduce.reduce.speculative", "false");    
    106         conf.set("mapreduce.map.maxattempts", "4");            
    107         conf.set("mapreduce.reduce.maxattempts", "4");        
    108         conf.set("mapreduce.map.skip.maxrecords", "0");    
    109         Job job = Job.getInstance(conf, SortApp.class.getSimpleName());
    110         job.setJarByClass(SortApp.class);
    111         
    112         FileInputFormat.addInputPath(job, new Path(args[0]));
    113         FileOutputFormat.setOutputPath(job, new Path(args[1]));
    114         
    115         job.setMapperClass(SortMapper.class);
    116         job.setMapOutputKeyClass(Num2.class);
    117         job.setMapOutputValueClass(LongWritable.class);
    118         
    119         job.setReducerClass(SortReducer.class);
    120         job.setOutputKeyClass(LongWritable.class);
    121         job.setOutputValueClass(LongWritable.class);
    122         
    123         return job.waitForCompletion(true)?0:1;
    124     }
    125 
    126     public static int createJob(String[] params) {
    127         Configuration conf = new Configuration();
    128         int status = 1;
    129         try {
    130             status = ToolRunner.run(conf, new SortApp(), params);
    131         } catch (Exception e) {
    132             e.printStackTrace();
    133             throw new RuntimeException(e);
    134         }
    135         
    136         return status;
    137     }
    138     
    139     public static void main(String[] args) throws Exception {
    140         args = new String[]{"/testdata/sortdata", "/job/mapreduce/"+SortApp.class.getSimpleName()+"_"+new SimpleDateFormat("yyyyMMddhhMMss").format(new Date())};
    141         if (args.length != 2) {
    142             System.err.println("Usage: "+SortApp.class.getSimpleName()+" <in> <out>");
    143             System.exit(2);
    144         } else {
    145             int status = createJob(args);
    146             System.exit(status);
    147         }
    148     }
    149 
    150 }
  • 相关阅读:
    SpringBoot 第一篇:HelloWorld 跑起来
    CentOS 7.X 静默安装Oracle 12C数据库
    CentOS7 服务器连接超时自动断开问题解决
    Nexus3.0搭建私服上传JAR包 Windows10
    JQuery-FullCalendar 多数据源实现日程展示
    Maven构建 SpringMVC+Spring+MyBatis 环境整合
    Android内存越界检测工具ASAN
    Cookie、Session、Token 的区别
    Java线程池详解
    @Autowired 与@Resource的区别(详细) 转载
  • 原文地址:https://www.cnblogs.com/mengyao/p/4865599.html
Copyright © 2011-2022 走看看