zoukankan      html  css  js  c++  java
  • hadoop —— teragen & terasort

    这两个类所在目录:

    hadoop-examples-0.20.2-cdh3u6.jar 中:

           

    代码:

    TeraGen.java:

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.hadoop.examples.terasort;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.zip.Checksum;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableUtils;
    import org.apache.hadoop.mapreduce.Cluster;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.InputFormat;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.MRJobConfig;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.PureJavaCrc32;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * Generate the official GraySort input data set.
     * The user specifies the number of rows and the output directory and this
     * class runs a map/reduce program to generate the data.
     * The format of the data is:
     * <ul>
     * <li>(10 bytes key) (constant 2 bytes) (32 bytes rowid) 
     *     (constant 4 bytes) (48 bytes filler) (constant 4 bytes)
     * <li>The rowid is the right justified row id as a hex number.
     * </ul>
     *
     * <p>
     * To run the program: 
     * <b>bin/hadoop jar hadoop-*-examples.jar teragen 10000000000 in-dir</b>
     */
    public class TeraGen extends Configured implements Tool {
      private static final Log LOG = LogFactory.getLog(TeraSort.class);
    
      public static enum Counters {CHECKSUM}
    
      public static final String NUM_ROWS = "mapreduce.terasort.num-rows";
      /**
       * An input format that assigns ranges of longs to each mapper.
       */
      static class RangeInputFormat 
          extends InputFormat<LongWritable, NullWritable> {
        
        /**
         * An input split consisting of a range on numbers.
         */
        static class RangeInputSplit extends InputSplit implements Writable {
          long firstRow;
          long rowCount;
    
          public RangeInputSplit() { }
    
          public RangeInputSplit(long offset, long length) {
            firstRow = offset;
            rowCount = length;
          }
    
          public long getLength() throws IOException {
            return 0;
          }
    
          public String[] getLocations() throws IOException {
            return new String[]{};
          }
    
          public void readFields(DataInput in) throws IOException {
            firstRow = WritableUtils.readVLong(in);
            rowCount = WritableUtils.readVLong(in);
          }
    
          public void write(DataOutput out) throws IOException {
            WritableUtils.writeVLong(out, firstRow);
            WritableUtils.writeVLong(out, rowCount);
          }
        }
        
        /**
         * A record reader that will generate a range of numbers.
         */
        static class RangeRecordReader 
            extends RecordReader<LongWritable, NullWritable> {
          long startRow;
          long finishedRows;
          long totalRows;
          LongWritable key = null;
    
          public RangeRecordReader() {
          }
          
          public void initialize(InputSplit split, TaskAttemptContext context) 
              throws IOException, InterruptedException {
            startRow = ((RangeInputSplit)split).firstRow;
            finishedRows = 0;
            totalRows = ((RangeInputSplit)split).rowCount;
          }
    
          public void close() throws IOException {
            // NOTHING
          }
    
          public LongWritable getCurrentKey() {
            return key;
          }
    
          public NullWritable getCurrentValue() {
            return NullWritable.get();
          }
    
          public float getProgress() throws IOException {
            return finishedRows / (float) totalRows;
          }
    
          public boolean nextKeyValue() {
            if (key == null) {
              key = new LongWritable();
            }
            if (finishedRows < totalRows) {
              key.set(startRow + finishedRows);
              finishedRows += 1;
              return true;
            } else {
              return false;
            }
          }
          
        }
    
        public RecordReader<LongWritable, NullWritable> 
            createRecordReader(InputSplit split, TaskAttemptContext context) 
            throws IOException {
          return new RangeRecordReader();
        }
    
        /**
         * Create the desired number of splits, dividing the number of rows
         * between the mappers.
         */
        public List<InputSplit> getSplits(JobContext job) {
          long totalRows = getNumberOfRows(job);
          int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
          LOG.info("Generating " + totalRows + " using " + numSplits);
          List<InputSplit> splits = new ArrayList<InputSplit>();
          long currentRow = 0;
          for(int split = 0; split < numSplits; ++split) {
            long goal = 
              (long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
            splits.add(new RangeInputSplit(currentRow, goal - currentRow));
            currentRow = goal;
          }
          return splits;
        }
    
      }
      
      static long getNumberOfRows(JobContext job) {
        return job.getConfiguration().getLong(NUM_ROWS, 0);
      }
      
      static void setNumberOfRows(Job job, long numRows) {
        job.getConfiguration().setLong(NUM_ROWS, numRows);
      }
    
      /**
       * The Mapper class that given a row number, will generate the appropriate 
       * output line.
       */
      public static class SortGenMapper 
          extends Mapper<LongWritable, NullWritable, Text, Text> {
    
        private Text key = new Text();
        private Text value = new Text();
        private Unsigned16 rand = null;
        private Unsigned16 rowId = null;
        private Unsigned16 checksum = new Unsigned16();
        private Checksum crc32 = new PureJavaCrc32();
        private Unsigned16 total = new Unsigned16();
        private static final Unsigned16 ONE = new Unsigned16(1);
        private byte[] buffer = new byte[TeraInputFormat.KEY_LENGTH +
                                         TeraInputFormat.VALUE_LENGTH];
        private Counter checksumCounter;
    
        public void map(LongWritable row, NullWritable ignored,
            Context context) throws IOException, InterruptedException {
          if (rand == null) {
            rowId = new Unsigned16(row.get());
            rand = Random16.skipAhead(rowId);
            checksumCounter = context.getCounter(Counters.CHECKSUM);
          }
          Random16.nextRand(rand);
          GenSort.generateRecord(buffer, rand, rowId);
          key.set(buffer, 0, TeraInputFormat.KEY_LENGTH);
          value.set(buffer, TeraInputFormat.KEY_LENGTH, 
                    TeraInputFormat.VALUE_LENGTH);
          context.write(key, value);
          crc32.reset();
          crc32.update(buffer, 0, 
                       TeraInputFormat.KEY_LENGTH + TeraInputFormat.VALUE_LENGTH);
          checksum.set(crc32.getValue());
          total.add(checksum);
          rowId.add(ONE);
        }
    
        @Override
        public void cleanup(Context context) {
          if (checksumCounter != null) {
            checksumCounter.increment(total.getLow8());
          }
        }
      }
    
      private static void usage() throws IOException {
        System.err.println("teragen <num rows> <output dir>");
      }
    
      /**
       * Parse a number that optionally has a postfix that denotes a base.
       * @param str an string integer with an option base {k,m,b,t}.
       * @return the expanded value
       */
      private static long parseHumanLong(String str) {
        char tail = str.charAt(str.length() - 1);
        long base = 1;
        switch (tail) {
        case 't':
          base *= 1000 * 1000 * 1000 * 1000;
          break;
        case 'b':
          base *= 1000 * 1000 * 1000;
          break;
        case 'm':
          base *= 1000 * 1000;
          break;
        case 'k':
          base *= 1000;
          break;
        default:
        }
        if (base != 1) {
          str = str.substring(0, str.length() - 1);
        }
        return Long.parseLong(str) * base;
      }
      
      /**
       * @param args the cli arguments
       */
      public int run(String[] args) 
          throws IOException, InterruptedException, ClassNotFoundException {
        Job job = Job.getInstance(getConf());
        if (args.length != 2) {
          usage();
          return 2;
        }
        setNumberOfRows(job, parseHumanLong(args[0]));
        Path outputDir = new Path(args[1]);
        if (outputDir.getFileSystem(getConf()).exists(outputDir)) {
          throw new IOException("Output directory " + outputDir + 
                                " already exists.");
        }
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setJobName("TeraGen");
        job.setJarByClass(TeraGen.class);
        job.setMapperClass(SortGenMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(RangeInputFormat.class);
        job.setOutputFormatClass(TeraOutputFormat.class);
        return job.waitForCompletion(true) ? 0 : 1;
      }
    
      public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new TeraGen(), args);
        System.exit(res);
      }
    }
    TeraGen

    TeraSort.java:

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.hadoop.examples.terasort;
    
    import java.io.DataInputStream;
    import java.io.IOException;
    import java.io.PrintStream;
    import java.net.URI;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configurable;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.MRJobConfig;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * Generates the sampled split points, launches the job, and waits for it to
     * finish. 
     * <p>
     * To run the program: 
     * <b>bin/hadoop jar hadoop-*-examples.jar terasort in-dir out-dir</b>
     */
    public class TeraSort extends Configured implements Tool {
      private static final Log LOG = LogFactory.getLog(TeraSort.class);
      static String SIMPLE_PARTITIONER = "mapreduce.terasort.simplepartitioner";
      static String OUTPUT_REPLICATION = "mapreduce.terasort.output.replication";
    
      /**
       * A partitioner that splits text keys into roughly equal partitions
       * in a global sorted order.
       */
      static class TotalOrderPartitioner extends Partitioner<Text,Text>
          implements Configurable {
        private TrieNode trie;
        private Text[] splitPoints;
        private Configuration conf;
    
        /**
         * A generic trie node
         */
        static abstract class TrieNode {
          private int level;
          TrieNode(int level) {
            this.level = level;
          }
          abstract int findPartition(Text key);
          abstract void print(PrintStream strm) throws IOException;
          int getLevel() {
            return level;
          }
        }
    
        /**
         * An inner trie node that contains 256 children based on the next
         * character.
         */
        static class InnerTrieNode extends TrieNode {
          private TrieNode[] child = new TrieNode[256];
          
          InnerTrieNode(int level) {
            super(level);
          }
          int findPartition(Text key) {
            int level = getLevel();
            if (key.getLength() <= level) {
              return child[0].findPartition(key);
            }
            return child[key.getBytes()[level] & 0xff].findPartition(key);
          }
          void setChild(int idx, TrieNode child) {
            this.child[idx] = child;
          }
          void print(PrintStream strm) throws IOException {
            for(int ch=0; ch < 256; ++ch) {
              for(int i = 0; i < 2*getLevel(); ++i) {
                strm.print(' ');
              }
              strm.print(ch);
              strm.println(" ->");
              if (child[ch] != null) {
                child[ch].print(strm);
              }
            }
          }
        }
    
        /**
         * A leaf trie node that does string compares to figure out where the given
         * key belongs between lower..upper.
         */
        static class LeafTrieNode extends TrieNode {
          int lower;
          int upper;
          Text[] splitPoints;
          LeafTrieNode(int level, Text[] splitPoints, int lower, int upper) {
            super(level);
            this.splitPoints = splitPoints;
            this.lower = lower;
            this.upper = upper;
          }
          int findPartition(Text key) {
            for(int i=lower; i<upper; ++i) {
              if (splitPoints[i].compareTo(key) > 0) {
                return i;
              }
            }
            return upper;
          }
          void print(PrintStream strm) throws IOException {
            for(int i = 0; i < 2*getLevel(); ++i) {
              strm.print(' ');
            }
            strm.print(lower);
            strm.print(", ");
            strm.println(upper);
          }
        }
    
    
        /**
         * Read the cut points from the given sequence file.
         * @param fs the file system
         * @param p the path to read
         * @param job the job config
         * @return the strings to split the partitions on
         * @throws IOException
         */
        private static Text[] readPartitions(FileSystem fs, Path p,
            Configuration conf) throws IOException {
          int reduces = conf.getInt(MRJobConfig.NUM_REDUCES, 1);
          Text[] result = new Text[reduces - 1];
          DataInputStream reader = fs.open(p);
          for(int i=0; i < reduces - 1; ++i) {
            result[i] = new Text();
            result[i].readFields(reader);
          }
          reader.close();
          return result;
        }
    
        /**
         * Given a sorted set of cut points, build a trie that will find the correct
         * partition quickly.
         * @param splits the list of cut points
         * @param lower the lower bound of partitions 0..numPartitions-1
         * @param upper the upper bound of partitions 0..numPartitions-1
         * @param prefix the prefix that we have already checked against
         * @param maxDepth the maximum depth we will build a trie for
         * @return the trie node that will divide the splits correctly
         */
        private static TrieNode buildTrie(Text[] splits, int lower, int upper, 
                                          Text prefix, int maxDepth) {
          int depth = prefix.getLength();
          if (depth >= maxDepth || lower == upper) {
            return new LeafTrieNode(depth, splits, lower, upper);
          }
          InnerTrieNode result = new InnerTrieNode(depth);
          Text trial = new Text(prefix);
          // append an extra byte on to the prefix
          trial.append(new byte[1], 0, 1);
          int currentBound = lower;
          for(int ch = 0; ch < 255; ++ch) {
            trial.getBytes()[depth] = (byte) (ch + 1);
            lower = currentBound;
            while (currentBound < upper) {
              if (splits[currentBound].compareTo(trial) >= 0) {
                break;
              }
              currentBound += 1;
            }
            trial.getBytes()[depth] = (byte) ch;
            result.child[ch] = buildTrie(splits, lower, currentBound, trial, 
                                         maxDepth);
          }
          // pick up the rest
          trial.getBytes()[depth] = (byte) 255;
          result.child[255] = buildTrie(splits, currentBound, upper, trial,
                                        maxDepth);
          return result;
        }
    
        public void setConf(Configuration conf) {
          try {
            FileSystem fs = FileSystem.getLocal(conf);
            this.conf = conf;
            Path partFile = new Path(TeraInputFormat.PARTITION_FILENAME);
            splitPoints = readPartitions(fs, partFile, conf);
            trie = buildTrie(splitPoints, 0, splitPoints.length, new Text(), 2);
          } catch (IOException ie) {
            throw new IllegalArgumentException("can't read partitions file", ie);
          }
        }
    
        public Configuration getConf() {
          return conf;
        }
        
        public TotalOrderPartitioner() {
        }
    
        public int getPartition(Text key, Text value, int numPartitions) {
          return trie.findPartition(key);
        }
        
      }
      
      /**
       * A total order partitioner that assigns keys based on their first 
       * PREFIX_LENGTH bytes, assuming a flat distribution.
       */
      public static class SimplePartitioner extends Partitioner<Text, Text>
          implements Configurable {
        int prefixesPerReduce;
        private static final int PREFIX_LENGTH = 3;
        private Configuration conf = null;
        public void setConf(Configuration conf) {
          this.conf = conf;
          prefixesPerReduce = (int) Math.ceil((1 << (8 * PREFIX_LENGTH)) / 
            (float) conf.getInt(MRJobConfig.NUM_REDUCES, 1));
        }
        
        public Configuration getConf() {
          return conf;
        }
        
        @Override
        public int getPartition(Text key, Text value, int numPartitions) {
          byte[] bytes = key.getBytes();
          int len = Math.min(PREFIX_LENGTH, key.getLength());
          int prefix = 0;
          for(int i=0; i < len; ++i) {
            prefix = (prefix << 8) | (0xff & bytes[i]);
          }
          return prefix / prefixesPerReduce;
        }
      }
    
      public static boolean getUseSimplePartitioner(JobContext job) {
        return job.getConfiguration().getBoolean(SIMPLE_PARTITIONER, false);
      }
    
      public static void setUseSimplePartitioner(Job job, boolean value) {
        job.getConfiguration().setBoolean(SIMPLE_PARTITIONER, value);
      }
    
      public static int getOutputReplication(JobContext job) {
        return job.getConfiguration().getInt(OUTPUT_REPLICATION, 1);
      }
    
      public static void setOutputReplication(Job job, int value) {
        job.getConfiguration().setInt(OUTPUT_REPLICATION, value);
      }
    
      public int run(String[] args) throws Exception {
        LOG.info("starting");
        Job job = Job.getInstance(getConf());
        Path inputDir = new Path(args[0]);
        Path outputDir = new Path(args[1]);
        boolean useSimplePartitioner = getUseSimplePartitioner(job);
        TeraInputFormat.setInputPaths(job, inputDir);
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setJobName("TeraSort");
        job.setJarByClass(TeraSort.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TeraInputFormat.class);
        job.setOutputFormatClass(TeraOutputFormat.class);
        if (useSimplePartitioner) {
          job.setPartitionerClass(SimplePartitioner.class);
        } else {
          long start = System.currentTimeMillis();
          Path partitionFile = new Path(outputDir, 
                                        TeraInputFormat.PARTITION_FILENAME);
          URI partitionUri = new URI(partitionFile.toString() +
                                     "#" + TeraInputFormat.PARTITION_FILENAME);
          try {
            TeraInputFormat.writePartitionFile(job, partitionFile);
          } catch (Throwable e) {
            LOG.error(e.getMessage());
            return -1;
          }
          job.addCacheFile(partitionUri);  
          long end = System.currentTimeMillis();
          System.out.println("Spent " + (end - start) + "ms computing partitions.");
          job.setPartitionerClass(TotalOrderPartitioner.class);
        }
        
        job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
        TeraOutputFormat.setFinalSync(job, true);
        int ret = job.waitForCompletion(true) ? 0 : 1;
        LOG.info("done");
        return ret;
      }
    
      /**
       * @param args
       */
      public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new TeraSort(), args);
        System.exit(res);
      }
    
    }
    TeraSort

    作用:

    TeraGen.java:用于产生数据

    TeraSort.java:用于对数据排序

    命令:

    1. 产生数据:

    bin/hadoop jar hadoop-examples-0.20.2-cdh3u6.jar teragen 10000000 /terasort/input1GB

        teragen后的数值单位是行数;因为每行100个字节,所以如果要产生1G的数据量,则这个数值应为1G/100=10000000(7个0)

    2. 排序:

    bin/hadoop jar hadoop-examples-0.20.2-cdh3u6.jar terasort /terasort/input1GB /terasort/output1GB  

    生成数据示例:

          

    数据构成:

           每行记录由3段组成:

                   前10个字节:随机binary code的十个字符,为key

                   中间10个字节:行id

                   后面80个字节:8段,每段10字节相同随机大写字母

    TeraSort原理:

      

    参考:

    1. http://blog.csdn.net/leafy1980/article/details/6633828

    2. http://dongxicheng.org/mapreduce/hadoop-terasort-analyse/

  • 相关阅读:
    新手如何运营自媒体?必看!
    公众号停更,短视频岗位暴增,2020年,新媒体人如何更值钱?
    别再费力讨好,先看看你的标题有没有入这些坑!
    经常反思自己的自媒体账号,为什么还只是几百的阅读量?
    文章发布显示“敏感词汇”怎么办?如何提升文章原创率?
    如何利用标题最大化引流,让属于自己原创、混剪视频的推荐量直线上升?
    【转载】JAVA字符串格式化-String.format()的使用
    【转载】浅谈大型网络入侵检测建设
    渗透测试工具 —— Nmap
    【转载】任意用户密码重置的10种常见姿势
  • 原文地址:https://www.cnblogs.com/-wangjiannan/p/3619373.html
Copyright © 2011-2022 走看看