zoukankan      html  css  js  c++  java
  • Hadoop自带Sort例子分析

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.hadoop.examples;
    
    import java.io.IOException;
    import java.net.URI;
    import java.util.*;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.mapred.ClusterStatus;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
    import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * This is the trivial map/reduce program that does absolutely nothing
     * other than use the framework to fragment and sort the input values.
     *
     * To run: bin/hadoop jar build/hadoop-examples.jar sort
     *            [-r <i>reduces</i>]
     *            [-inFormat <i>input format class</i>]
     *            [-outFormat <i>output format class</i>]
     *            [-outKey <i>output key class</i>]
     *            [-outValue <i>output value class</i>]
     *            [-totalOrder <i>pcnt</i> <i>num samples</i> <i>max splits</i>]
     *            <i>in-dir</i> <i>out-dir</i>
     */
    public class Sort<K,V> extends Configured implements Tool {
      public static final String REDUCES_PER_HOST =
        "mapreduce.sort.reducesperhost";
      private Job job = null;
    
      static int printUsage() {
        System.out.println("sort [-r <reduces>] " +
                           "[-inFormat <input format class>] " +
                           "[-outFormat <output format class>] " +
                           "[-outKey <output key class>] " +
                           "[-outValue <output value class>] " +
                           "[-totalOrder <pcnt> <num samples> <max splits>] " +
                           "<input> <output>");
        ToolRunner.printGenericCommandUsage(System.out);
        return 2;
      }
    
      /**
       * The main driver for sort program.
       * Invoke this method to submit the map/reduce job.
       * @throws IOException When there is communication problems with the
       *                     job tracker.
       */
      public int run(String[] args) throws Exception {
    
        Configuration conf = getConf();
        JobClient client = new JobClient(conf);
        ClusterStatus cluster = client.getClusterStatus();
        int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
        String sort_reduces = conf.get(REDUCES_PER_HOST);
        if (sort_reduces != null) {
           num_reduces = cluster.getTaskTrackers() *
                           Integer.parseInt(sort_reduces);
        }
        Class<? extends InputFormat> inputFormatClass =
          SequenceFileInputFormat.class;
        Class<? extends OutputFormat> outputFormatClass =
          SequenceFileOutputFormat.class;
        Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
        Class<? extends Writable> outputValueClass = BytesWritable.class;
        List<String> otherArgs = new ArrayList<String>();
        InputSampler.Sampler<K,V> sampler = null;
        for(int i=0; i < args.length; ++i) {
          try {
            if ("-r".equals(args[i])) {
              num_reduces = Integer.parseInt(args[++i]);
            } else if ("-inFormat".equals(args[i])) {
              inputFormatClass =
                Class.forName(args[++i]).asSubclass(InputFormat.class);
            } else if ("-outFormat".equals(args[i])) {
              outputFormatClass =
                Class.forName(args[++i]).asSubclass(OutputFormat.class);
            } else if ("-outKey".equals(args[i])) {
              outputKeyClass =
                Class.forName(args[++i]).asSubclass(WritableComparable.class);
            } else if ("-outValue".equals(args[i])) {
              outputValueClass =
                Class.forName(args[++i]).asSubclass(Writable.class);
            } else if ("-totalOrder".equals(args[i])) {
              double pcnt = Double.parseDouble(args[++i]);
              int numSamples = Integer.parseInt(args[++i]);
              int maxSplits = Integer.parseInt(args[++i]);
              if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
              sampler =
                new InputSampler.RandomSampler<K,V>(pcnt, numSamples, maxSplits);
            } else {
              otherArgs.add(args[i]);
            }
          } catch (NumberFormatException except) {
            System.out.println("ERROR: Integer expected instead of " + args[i]);
            return printUsage();
          } catch (ArrayIndexOutOfBoundsException except) {
            System.out.println("ERROR: Required parameter missing from " +
                args[i-1]);
            return printUsage(); // exits
          }
        }
        // Set user-supplied (possibly default) job configs
        job = Job.getInstance(conf);
        job.setJobName("sorter");
        job.setJarByClass(Sort.class);
    
        job.setMapperClass(Mapper.class);
        job.setReducerClass(Reducer.class);
    
        job.setNumReduceTasks(num_reduces);
    
        job.setInputFormatClass(inputFormatClass);
        job.setOutputFormatClass(outputFormatClass);
    
        job.setOutputKeyClass(outputKeyClass);
        job.setOutputValueClass(outputValueClass);
    
        // Make sure there are exactly 2 parameters left.
        if (otherArgs.size() != 2) {
          System.out.println("ERROR: Wrong number of parameters: " +
              otherArgs.size() + " instead of 2.");
          return printUsage();
        }
        FileInputFormat.setInputPaths(job, otherArgs.get(0));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
    
        if (sampler != null) {
          System.out.println("Sampling input to effect total-order sort...");
          job.setPartitionerClass(TotalOrderPartitioner.class);
          Path inputDir = FileInputFormat.getInputPaths(job)[0];
          inputDir = inputDir.makeQualified(inputDir.getFileSystem(conf));
          Path partitionFile = new Path(inputDir, "_sortPartitioning");
          TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
          InputSampler.<K,V>writePartitionFile(job, sampler);
          URI partitionUri = new URI(partitionFile.toString() +
                                     "#" + "_sortPartitioning");
          DistributedCache.addCacheFile(partitionUri, conf);
        }
    
        System.out.println("Running on " +
            cluster.getTaskTrackers() +
            " nodes to sort from " +
            FileInputFormat.getInputPaths(job)[0] + " into " +
            FileOutputFormat.getOutputPath(job) +
            " with " + num_reduces + " reduces.");
        Date startTime = new Date();
        System.out.println("Job started: " + startTime);
        int ret = job.waitForCompletion(true) ? 0 : 1;
        Date end_time = new Date();
        System.out.println("Job ended: " + end_time);
        System.out.println("The job took " +
            (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
        return ret;
      }
    
    
    
      public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Sort(), args);
        System.exit(res);
      }
    
      /**
       * Get the last job that was run using this instance.
       * @return the results of the last job that was run
       */
      public Job getResult() {
        return job;
      }
    }

    看了源码的第一印象就是,我啥时候写MapReduce也这么规范,这么屌......

  • 相关阅读:
    封装成帧、帧定界、帧同步、透明传输(字符计数法、字符串的首尾填充法、零比特填充的首尾标志法、违规编码法)
    计算机网络之数据链路层的基本概念和功能概述
    物理层设备(中继器、集线器)
    计算机网络之传输介质(双绞线、同轴电缆、光纤、无线电缆、微波、激光、红外线)
    计算机网络之编码与调制
    0953. Verifying an Alien Dictionary (E)
    1704. Determine if String Halves Are Alike (E)
    1551. Minimum Operations to Make Array Equal (M)
    0775. Global and Local Inversions (M)
    0622. Design Circular Queue (M)
  • 原文地址:https://www.cnblogs.com/tuhooo/p/7844997.html
Copyright © 2011-2022 走看看