zoukankan      html  css  js  c++  java
  • Hadoop MapReduce编程 API入门系列之二次排序(十六)

       

       不多说,直接上代码。

     

    2016-12-12 17:04:32,012 INFO [org.apache.hadoop.metrics.jvm.JvmMetrics] - Initializing JVM Metrics with processName=JobTracker, sessionId=
    2016-12-12 17:04:33,056 WARN [org.apache.hadoop.mapreduce.JobSubmitter] - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    2016-12-12 17:04:33,059 WARN [org.apache.hadoop.mapreduce.JobSubmitter] - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
    2016-12-12 17:04:33,083 INFO [org.apache.hadoop.mapreduce.lib.input.FileInputFormat] - Total input paths to process : 1
    2016-12-12 17:04:33,161 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - number of splits:1
    2016-12-12 17:04:33,562 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - Submitting tokens for job: job_local1173601391_0001
    2016-12-12 17:04:34,242 INFO [org.apache.hadoop.mapreduce.Job] - The url to track the job: http://localhost:8080/
    2016-12-12 17:04:34,244 INFO [org.apache.hadoop.mapreduce.Job] - Running job: job_local1173601391_0001
    2016-12-12 17:04:34,247 INFO [org.apache.hadoop.mapred.LocalJobRunner] - OutputCommitter set in config null
    2016-12-12 17:04:34,264 INFO [org.apache.hadoop.mapred.LocalJobRunner] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
    2016-12-12 17:04:34,371 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Waiting for map tasks
    2016-12-12 17:04:34,373 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Starting task: attempt_local1173601391_0001_m_000000_0
    2016-12-12 17:04:34,439 INFO [org.apache.hadoop.yarn.util.ProcfsBasedProcessTree] - ProcfsBasedProcessTree currently is supported only on Linux.
    2016-12-12 17:04:34,667 INFO [org.apache.hadoop.mapred.Task] - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@65bb90dc
    2016-12-12 17:04:34,676 INFO [org.apache.hadoop.mapred.MapTask] - Processing split: file:/D:/Code/MyEclipseJavaCode/myMapReduce/data/secondarySort/secondarySort.txt:0+120
    2016-12-12 17:04:34,762 INFO [org.apache.hadoop.mapred.MapTask] - (EQUATOR) 0 kvi 26214396(104857584)
    2016-12-12 17:04:34,763 INFO [org.apache.hadoop.mapred.MapTask] - mapreduce.task.io.sort.mb: 100
    2016-12-12 17:04:34,763 INFO [org.apache.hadoop.mapred.MapTask] - soft limit at 83886080
    2016-12-12 17:04:34,763 INFO [org.apache.hadoop.mapred.MapTask] - bufstart = 0; bufvoid = 104857600
    2016-12-12 17:04:34,763 INFO [org.apache.hadoop.mapred.MapTask] - kvstart = 26214396; length = 6553600
    2016-12-12 17:04:34,771 INFO [org.apache.hadoop.mapred.MapTask] - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
    2016-12-12 17:04:34,789 INFO [org.apache.hadoop.mapred.LocalJobRunner] - 
    2016-12-12 17:04:34,789 INFO [org.apache.hadoop.mapred.MapTask] - Starting flush of map output
    2016-12-12 17:04:34,789 INFO [org.apache.hadoop.mapred.MapTask] - Spilling map output
    2016-12-12 17:04:34,789 INFO [org.apache.hadoop.mapred.MapTask] - bufstart = 0; bufend = 216; bufvoid = 104857600
    2016-12-12 17:04:34,790 INFO [org.apache.hadoop.mapred.MapTask] - kvstart = 26214396(104857584); kvend = 26214328(104857312); length = 69/6553600
    2016-12-12 17:04:34,809 INFO [org.apache.hadoop.mapred.MapTask] - Finished spill 0
    2016-12-12 17:04:34,818 INFO [org.apache.hadoop.mapred.Task] - Task:attempt_local1173601391_0001_m_000000_0 is done. And is in the process of committing
    2016-12-12 17:04:34,838 INFO [org.apache.hadoop.mapred.LocalJobRunner] - map
    2016-12-12 17:04:34,838 INFO [org.apache.hadoop.mapred.Task] - Task 'attempt_local1173601391_0001_m_000000_0' done.
    2016-12-12 17:04:34,838 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Finishing task: attempt_local1173601391_0001_m_000000_0
    2016-12-12 17:04:34,839 INFO [org.apache.hadoop.mapred.LocalJobRunner] - map task executor complete.
    2016-12-12 17:04:34,846 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Waiting for reduce tasks
    2016-12-12 17:04:34,846 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Starting task: attempt_local1173601391_0001_r_000000_0
    2016-12-12 17:04:34,864 INFO [org.apache.hadoop.yarn.util.ProcfsBasedProcessTree] - ProcfsBasedProcessTree currently is supported only on Linux.
    2016-12-12 17:04:34,950 INFO [org.apache.hadoop.mapred.Task] - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@59b59452
    2016-12-12 17:04:34,954 INFO [org.apache.hadoop.mapred.ReduceTask] - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@73d5cf65
    2016-12-12 17:04:34,974 INFO [org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl] - MergerManager: memoryLimit=1327077760, maxSingleShuffleLimit=331769440, mergeThreshold=875871360, ioSortFactor=10, memToMemMergeOutputsThreshold=10
    2016-12-12 17:04:35,011 INFO [org.apache.hadoop.mapreduce.task.reduce.EventFetcher] - attempt_local1173601391_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
    2016-12-12 17:04:35,048 INFO [org.apache.hadoop.mapreduce.task.reduce.LocalFetcher] - localfetcher#1 about to shuffle output of map attempt_local1173601391_0001_m_000000_0 decomp: 254 len: 258 to MEMORY
    2016-12-12 17:04:35,060 INFO [org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput] - Read 254 bytes from map-output for attempt_local1173601391_0001_m_000000_0
    2016-12-12 17:04:35,123 INFO [org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl] - closeInMemoryFile -> map-output of size: 254, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->254
    2016-12-12 17:04:35,125 INFO [org.apache.hadoop.mapreduce.task.reduce.EventFetcher] - EventFetcher is interrupted.. Returning
    2016-12-12 17:04:35,126 INFO [org.apache.hadoop.mapred.LocalJobRunner] - 1 / 1 copied.
    2016-12-12 17:04:35,126 INFO [org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl] - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
    2016-12-12 17:04:35,136 INFO [org.apache.hadoop.mapred.Merger] - Merging 1 sorted segments
    2016-12-12 17:04:35,137 INFO [org.apache.hadoop.mapred.Merger] - Down to the last merge-pass, with 1 segments left of total size: 244 bytes
    2016-12-12 17:04:35,139 INFO [org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl] - Merged 1 segments, 254 bytes to disk to satisfy reduce memory limit
    2016-12-12 17:04:35,139 INFO [org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl] - Merging 1 files, 258 bytes from disk
    2016-12-12 17:04:35,140 INFO [org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl] - Merging 0 segments, 0 bytes from memory into reduce
    2016-12-12 17:04:35,141 INFO [org.apache.hadoop.mapred.Merger] - Merging 1 sorted segments
    2016-12-12 17:04:35,142 INFO [org.apache.hadoop.mapred.Merger] - Down to the last merge-pass, with 1 segments left of total size: 244 bytes
    2016-12-12 17:04:35,143 INFO [org.apache.hadoop.mapred.LocalJobRunner] - 1 / 1 copied.
    2016-12-12 17:04:35,150 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
    2016-12-12 17:04:35,158 INFO [org.apache.hadoop.mapred.Task] - Task:attempt_local1173601391_0001_r_000000_0 is done. And is in the process of committing
    2016-12-12 17:04:35,160 INFO [org.apache.hadoop.mapred.LocalJobRunner] - 1 / 1 copied.
    2016-12-12 17:04:35,160 INFO [org.apache.hadoop.mapred.Task] - Task attempt_local1173601391_0001_r_000000_0 is allowed to commit now
    2016-12-12 17:04:35,166 INFO [org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter] - Saved output of task 'attempt_local1173601391_0001_r_000000_0' to file:/D:/Code/MyEclipseJavaCode/myMapReduce/out/secondarySort/_temporary/0/task_local1173601391_0001_r_000000
    2016-12-12 17:04:35,167 INFO [org.apache.hadoop.mapred.LocalJobRunner] - reduce > reduce
    2016-12-12 17:04:35,167 INFO [org.apache.hadoop.mapred.Task] - Task 'attempt_local1173601391_0001_r_000000_0' done.
    2016-12-12 17:04:35,167 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Finishing task: attempt_local1173601391_0001_r_000000_0
    2016-12-12 17:04:35,168 INFO [org.apache.hadoop.mapred.LocalJobRunner] - reduce task executor complete.
    2016-12-12 17:04:35,248 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1173601391_0001 running in uber mode : false
    2016-12-12 17:04:35,249 INFO [org.apache.hadoop.mapreduce.Job] - map 100% reduce 100%
    2016-12-12 17:04:35,251 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1173601391_0001 completed successfully
    2016-12-12 17:04:35,271 INFO [org.apache.hadoop.mapreduce.Job] - Counters: 33
    File System Counters
    FILE: Number of bytes read=1186
    FILE: Number of bytes written=394623
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    Map-Reduce Framework
    Map input records=18
    Map output records=18
    Map output bytes=216
    Map output materialized bytes=258
    Input split bytes=145
    Combine input records=0
    Combine output records=0
    Reduce input groups=4
    Reduce shuffle bytes=258
    Reduce input records=18
    Reduce output records=18
    Spilled Records=36
    Shuffled Maps =1
    Failed Shuffles=0
    Merged Map outputs=1
    GC time elapsed (ms)=0
    CPU time spent (ms)=0
    Physical memory (bytes) snapshot=0
    Virtual memory (bytes) snapshot=0
    Total committed heap usage (bytes)=534773760
    Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
    File Input Format Counters 
    Bytes Read=120
    File Output Format Counters 
    Bytes Written=115

    代码

    IntPair.java

    package zhouls.bigdata.myMapReduce.SecondarySort;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import org.apache.hadoop.io.WritableComparable;
    
    
    //第一步:自定义IntPair类,将示例数据中的key/value封装成一个整体作为Key,同时实现 WritableComparable 接口并重写其方法。
    /**
    * 自己定义的key类应该实现WritableComparable接口
    */
    public  class IntPair implements WritableComparable<IntPair>{//类似对应于如TextPair
        int first;//第一个成员变量
        int second;//第二个成员变量
        
        public void set(int left, int right){//赋值
            first = left;
            second = right;
        }
        public int getFirst(){//读值
            return first;
        }
        public int getSecond(){//读值
            return second;
        }
        
        //反序列化,从流中的二进制转换成IntPair
        public void readFields(DataInput in) throws IOException{
            first = in.readInt();
            second = in.readInt();
        }
        
        //序列化,将IntPair转化成使用流传送的二进制
        public void write(DataOutput out) throws IOException{
            out.writeInt(first);
            out.writeInt(second);
        }
        
        //key的比较
        public int compareTo(IntPair o){
            // TODO Auto-generated method stub
            if (first != o.first){
                return first < o.first ? -1 : 1;
            }else if (second != o.second)
            {
                return second < o.second ? -1 : 1;
            }else
            {
                return 0;
            }
        }
        
        @Override
        public int hashCode(){
            return first * 157 + second;
        }
        @Override
        public boolean equals(Object right){
            if (right == null)
                return false;
            if (this == right)
                return true;
            if (right instanceof IntPair){
                IntPair r = (IntPair) right;
                return r.first == first && r.second == second;
            }else{
                return false;
            }
        }
    }

    SecondarySort.java

    package zhouls.bigdata.myMapReduce.SecondarySort;
    
    import zhouls.bigdata.myMapReduce.Join.JoinRecordAndStationName;
    
    import java.io.IOException;
    
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    
    /*
    SecondarySort内容是
    40    20
    40    10
    40    30
    40    5
    40    1
    30    30
    30    20
    30    10
    30    1
    20    20
    20    10
    20    1
    50    50
    50    40
    50    30 
    50    20
    50    10
    50    1
    */
    
    
    public class SecondarySort extends Configured implements Tool{
        // 自定义map
        public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>{
            private final IntPair intkey = new IntPair();
            private final IntWritable intvalue = new IntWritable();
            
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
                String line = value.toString();
                StringTokenizer tokenizer = new StringTokenizer(line);
                int left = 0;
                int right = 0;
                if (tokenizer.hasMoreTokens()){
                    left = Integer.parseInt(tokenizer.nextToken());
                    if (tokenizer.hasMoreTokens())
                        right = Integer.parseInt(tokenizer.nextToken());
                    intkey.set(left, right);//设为k2
                    intvalue.set(right);//设为v2
                    context.write(intkey,intvalue);//写入intkeyk2,intvalue是v2
    //                context.write(new IntPair(intkey),new IntWritable(intvalue));等价               
                    
                }
            }
        }
        
        
      //第二步:自定义分区函数类FirstPartitioner,根据 IntPair 中的first实现分区。
        /**
        * 分区函数类。根据first确定Partition。
        */
        public static class FirstPartitioner extends Partitioner< IntPair, IntWritable>{
                @Override
                public int getPartition(IntPair key, IntWritable value,int numPartitions){     
                    return Math.abs(key.getFirst() * 127) % numPartitions;
                }
        }
        
        
      //第三步:自定义 SortComparator 实现 IntPair 类中的first和second排序。本课程中没有使用这种方法,而是使用 IntPair 中的compareTo()方法实现的。
      //第四步:自定义 GroupingComparator 类,实现分区内的数据分组。
      /**
      *继承WritableComparator
      */
      public static class GroupingComparator extends WritableComparator{
              protected GroupingComparator(){
                  super(IntPair.class, true);
              }
              @Override
              //Compare two WritableComparables.
              public int compare(WritableComparable w1, WritableComparable w2){
                  IntPair ip1 = (IntPair) w1;
                  IntPair ip2 = (IntPair) w2;
                  int l = ip1.getFirst();
                  int r = ip2.getFirst();
                  return l == r ? 0 : (l < r ? -1 : 1);
              }
      }
    
      
        // 自定义reduce
        public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>{
            private final Text left = new Text();      
            public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ 
                left.set(Integer.toString(key.getFirst()));//设为k3
                for (IntWritable val : values){
                    context.write(left, val);//写入left是k3,val是v3
    //                context.write(new Text(left),new IntWritable(val));等价               
                }
            }
        }
       
        
        public int run(String[] args)throws Exception{
             // TODO Auto-generated method stub
            Configuration conf = new Configuration();
            Path mypath=new Path(args[1]);
            FileSystem hdfs = mypath.getFileSystem(conf);
            if (hdfs.isDirectory(mypath)){
                hdfs.delete(mypath, true);
            }
            
            Job job = new Job(conf, "secondarysort");
            job.setJarByClass(SecondarySort.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));//输入路径
            FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出路径
    
            job.setMapperClass(Map.class);// Mapper
            job.setReducerClass(Reduce.class);// Reducer
            //job.setNumReducerTask(3);
            
            job.setPartitionerClass(FirstPartitioner.class);// 分区函数
            //job.setSortComparatorClass(KeyComparator.Class);//本课程并没有自定义SortComparator,而是使用IntPair自带的排序
            job.setGroupingComparatorClass(GroupingComparator.class);// 分组函数
    
    
            job.setMapOutputKeyClass(IntPair.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
           
           
            return job.waitForCompletion(true) ? 0 : 1;
        }
        
        /**
         * @param args
         * @throws Exception 
         */
        public static void main(String[] args) throws Exception{
            // TODO Auto-generated method stub
            
    //        String[] args0={"hdfs://HadoopMaster:9000/secondarySort/secondarySort.txt",
    //                        "hdfs://HadoopMaster:9000/out/secondarySort"};
        
            String[] args0={"./data/secondarySort/secondarySort.txt",
                             "./out/secondarySort"};
            
            
            int ec =ToolRunner.run(new Configuration(),new SecondarySort(),args0);
            System.exit(ec);
        }
    }
  • 相关阅读:
    11Java网络编程
    Java字节流与字符流
    10缓冲流、转换流、序列化流、Files
    JavaFile类和递归
    Java线程池 与Lambda
    Java多线程与线程同步
    5Java异常处理
    php base64数据与图片的转换
    PHP的json_encode中文被转码的问题
    设置MySQL允许外网访问
  • 原文地址:https://www.cnblogs.com/zlslch/p/6165256.html
Copyright © 2011-2022 走看看