zoukankan      html  css  js  c++  java
  • MapReduce编程系列 — 6:多表关联

    1、项目名称:

    2、程序代码:
    版本一(详细版):
    package com.mtjoin;
    
    import java.io.IOException;
    import java.util.Iterator;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class MTjoin {
        public static int time = 0;
        public static class Map extends Mapper<Object, Text, Text, Text>{
            public void map(Object key, Text value, Context context)throws IOException,InterruptedException{
                System.out.println("mapper........................");
                String line = value.toString();            
                if(line.contains("factoryname")==true || line.contains("addressID")== true){
                    return ;
                }
                int i = 0;
                while(line.charAt(i) >= '9'|| line.charAt(i) <= '0'){
                    i++;
                }
    
                if(line.charAt(0) >= '9'|| line.charAt(0) <= '0'){
                    int j = i-1;
                    while(line.charAt(j) != ' ') j--;
                    System.out.println("key:"+line.substring(i)+"  value:"+line.substring(0,j));
    
                    String values[] = {line.substring(0, j),line.substring(i)};
    
                    context.write(new Text(values[1]), new Text("1+"+values[0]));
                }
                else {
                    int j = i + 1;
                    while(line.charAt(j)!=' ')  j++;
                    System.out.println("key:"+line.substring(0, i+1)+"  value:"+line.substring(j));
                    String values[] ={line.substring(0,i+1),line.substring(j)};
                    context.write(new Text(values[0]), new Text("2+"+values[1]));
                }
            }
        }
    
            public static class Reduce extends Reducer<Text, Text, Text, Text>{
                public void reduce(Text key, Iterable<Text> values, Context context)throws IOException,InterruptedException{
                    System.out.println("reducer........................");
                    if( time == 0){
                        context.write(new Text("factoryname"), new Text("addressname"));
                        time++;
                    }
                    int factorynum = 0;
                    String factory[] = new String[10];
                    int addressnum = 0;
                    String address[] = new String[10];
    
                    Iterator ite = values.iterator();
                    while(ite.hasNext()){
                        String record = ite.next().toString();
                        char type = record.charAt(0);
                        if(type == '1'){
                            factory[factorynum] = record.substring(2);
                            factorynum++;
                        }
                        else{
                            address[addressnum] = record.substring(2);
                            addressnum++;
                        }
                    }
                    if(factorynum != 0 && addressnum != 0){
                        for(int m = 0 ; m < factorynum ; m++){
                            for(int n = 0; n < addressnum; n++){
                                context.write(new Text(factory[m]), new Text(address[n]));
                                System.out.println("factoryname:"+factory[m]+"  addressname:"+address[n]);
                            }
                        }
                    }
                }
            }
        public static void main(String [] args)throws Exception{
            Configuration conf = new Configuration();
            String otherArgs[] = new GenericOptionsParser(conf,args).getRemainingArgs();
            if(otherArgs.length != 2){
                System.err.println("Usage:MTjoin<in><out>");
                System.exit(2);
            }
            Job job = new Job(conf,"multiple table join");
            job.setJarByClass(MTjoin.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));        
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true)? 0:1);
            }
    }

    版本二(简化版):

    package com.mtjoin;
    
    import java.io.IOException;
    import java.util.Iterator;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class MTjoin {
        public static int time = 0;
        public static class Map extends Mapper<Object, Text, Text, Text>{
            public void map(Object key, Text value, Context context)throws IOException,InterruptedException{
                System.out.println("mapper........................");
                String line = value.toString();            
                if(line.contains("factoryname")==true || line.contains("addressID")== true){
                    return ;
                }
                int len = line.length();
    
                if(line.charAt(0) > '9'|| line.charAt(0) < '0'){
                    System.out.println("key:"+line.substring(len-1)+"  value:"+line.substring(0,len-2));
    
                    String values[] = {line.substring(0, len-2),line.substring(len-1)};
    
                    context.write(new Text(values[1]), new Text("1+"+values[0]));
                }
                else {
                    System.out.println("key:"+line.substring(0, 1)+"  value:"+line.substring(2));
                    String values[] ={line.substring(0,1),line.substring(2)};
                    context.write(new Text(values[0]), new Text("2+"+values[1]));
                }
            }
        }
    
            public static class Reduce extends Reducer<Text, Text, Text, Text>{
                public void reduce(Text key, Iterable<Text> values, Context context)throws IOException,InterruptedException{
                    System.out.println("reducer........................");
                    if( time == 0){
                        context.write(new Text("factoryname"), new Text("addressname"));
                        time++;
                    }
                    int factorynum = 0;
                    String factory[] = new String[10];
                    int addressnum = 0;
                    String address[] = new String[10];
    
                    Iterator ite = values.iterator();
                    while(ite.hasNext()){
                        String record = ite.next().toString();
                        char type = record.charAt(0);
                        if(type == '1'){
                            factory[factorynum] = record.substring(2);
                            factorynum++;
                        }
                        else{
                            address[addressnum] = record.substring(2);
                            addressnum++;
                        }
                    }
                    if(factorynum != 0 && addressnum != 0){
                        for(int m = 0 ; m < factorynum ; m++){
                            for(int n = 0; n < addressnum; n++){
                                context.write(new Text(factory[m]), new Text(address[n]));
                                System.out.println("factoryname:"+factory[m]+"  addressname:"+address[n]);
                            }
                        }
                    }
                }
            }
    
        public static void main(String [] args)throws Exception{
            Configuration conf = new Configuration();
            String otherArgs[] = new GenericOptionsParser(conf,args).getRemainingArgs();
            if(otherArgs.length != 2){
                System.err.println("Usage:MTjoin<in><out>");
                System.exit(2);
            }
            Job job = new Job(conf,"multiple table join");
            job.setJarByClass(MTjoin.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));        
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true)? 0:1);
            }
    }
     
    3、测试数据:
    address:
    addressID addressname
    1 Beijing
    2 Guangzhou
    3 Shenzhen
    4 Xian
     
    factory:
    factoryname addressname
    Beijing Red Star 1
    Shenzhen Thunder 3
    Guangzhou Honda 2
    Beijing Rising 1
    Guangzhou Development Bank 2
    Tencent 3
    Bank of Beijing 1
     
     
    4、运行过程:
    14/09/24 09:39:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    14/09/24 09:39:55 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
    14/09/24 09:39:55 INFO input.FileInputFormat: Total input paths to process : 2
    14/09/24 09:39:55 WARN snappy.LoadSnappy: Snappy native library not loaded
    14/09/24 09:39:55 INFO mapred.JobClient: Running job: job_local_0001
    14/09/24 09:39:55 INFO util.ProcessTree: setsid exited with exit code 0
    14/09/24 09:39:55 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@e095722
    14/09/24 09:39:55 INFO mapred.MapTask: io.sort.mb = 100
    14/09/24 09:39:55 INFO mapred.MapTask: data buffer = 79691776/99614720
    14/09/24 09:39:55 INFO mapred.MapTask: record buffer = 262144/327680
    mapper........................
    mapper........................
    key:1  value:Beijing Red Star
    mapper........................
    key:3  value:Shenzhen Thunder
    mapper........................
    key:2  value:Guangzhou Honda
    mapper........................
    key:1  value:Beijing Rising
    mapper........................
    key:2  value:Guangzhou Development Bank
    mapper........................
    key:3  value:Tencent
    mapper........................
    key:1  value:Bank of Beijing
    14/09/24 09:39:55 INFO mapred.MapTask: Starting flush of map output
    14/09/24 09:39:55 INFO mapred.MapTask: Finished spill 0
    14/09/24 09:39:55 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
    14/09/24 09:39:56 INFO mapred.JobClient:  map 0% reduce 0%
    14/09/24 09:39:58 INFO mapred.LocalJobRunner:
    14/09/24 09:39:58 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
    14/09/24 09:39:58 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@7dabd20
    14/09/24 09:39:58 INFO mapred.MapTask: io.sort.mb = 100
    14/09/24 09:39:58 INFO mapred.MapTask: data buffer = 79691776/99614720
    14/09/24 09:39:58 INFO mapred.MapTask: record buffer = 262144/327680
    mapper........................
    mapper........................
    key:1  value:Beijing
    mapper........................
    key:2  value:Guangzhou
    mapper........................
    key:3  value:Shenzhen
    mapper........................
    key:4  value:Xian
    14/09/24 09:39:58 INFO mapred.MapTask: Starting flush of map output
    14/09/24 09:39:58 INFO mapred.MapTask: Finished spill 0
    14/09/24 09:39:58 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
    14/09/24 09:39:59 INFO mapred.JobClient:  map 100% reduce 0%
    14/09/24 09:40:01 INFO mapred.LocalJobRunner:
    14/09/24 09:40:01 INFO mapred.Task: Task 'attempt_local_0001_m_000001_0' done.
    14/09/24 09:40:01 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@49fa6f3c
    14/09/24 09:40:01 INFO mapred.LocalJobRunner:
    14/09/24 09:40:01 INFO mapred.Merger: Merging 2 sorted segments
    14/09/24 09:40:01 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 218 bytes
    14/09/24 09:40:01 INFO mapred.LocalJobRunner:
    reducer........................
    factoryname:Beijing Red Star  addressname:Beijing
    factoryname:Beijing Rising  addressname:Beijing
    factoryname:Bank of Beijing  addressname:Beijing
    reducer........................
    factoryname:Guangzhou Honda  addressname:Guangzhou
    factoryname:Guangzhou Development Bank  addressname:Guangzhou
    reducer........................
    factoryname:Shenzhen Thunder  addressname:Shenzhen
    factoryname:Tencent  addressname:Shenzhen
    reducer........................
    14/09/24 09:40:01 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
    14/09/24 09:40:01 INFO mapred.LocalJobRunner:
    14/09/24 09:40:01 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
    14/09/24 09:40:01 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://localhost:9000/user/hadoop/mtjoin_output02
    14/09/24 09:40:04 INFO mapred.LocalJobRunner: reduce > reduce
    14/09/24 09:40:04 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
    14/09/24 09:40:05 INFO mapred.JobClient:  map 100% reduce 100%
    14/09/24 09:40:05 INFO mapred.JobClient: Job complete: job_local_0001
    14/09/24 09:40:05 INFO mapred.JobClient: Counters: 22
    14/09/24 09:40:05 INFO mapred.JobClient:   Map-Reduce Framework
    14/09/24 09:40:05 INFO mapred.JobClient:     Spilled Records=22
    14/09/24 09:40:05 INFO mapred.JobClient:     Map output materialized bytes=226
    14/09/24 09:40:05 INFO mapred.JobClient:     Reduce input records=11
    14/09/24 09:40:05 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
    14/09/24 09:40:05 INFO mapred.JobClient:     Map input records=13
    14/09/24 09:40:05 INFO mapred.JobClient:     SPLIT_RAW_BYTES=238
    14/09/24 09:40:05 INFO mapred.JobClient:     Map output bytes=192
    14/09/24 09:40:05 INFO mapred.JobClient:     Reduce shuffle bytes=0
    14/09/24 09:40:05 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
    14/09/24 09:40:05 INFO mapred.JobClient:     Reduce input groups=4
    14/09/24 09:40:05 INFO mapred.JobClient:     Combine output records=0
    14/09/24 09:40:05 INFO mapred.JobClient:     Reduce output records=8
    14/09/24 09:40:05 INFO mapred.JobClient:     Map output records=11
    14/09/24 09:40:05 INFO mapred.JobClient:     Combine input records=0
    14/09/24 09:40:05 INFO mapred.JobClient:     CPU time spent (ms)=0
    14/09/24 09:40:05 INFO mapred.JobClient:     Total committed heap usage (bytes)=813170688
    14/09/24 09:40:05 INFO mapred.JobClient:   File Input Format Counters
    14/09/24 09:40:05 INFO mapred.JobClient:     Bytes Read=216
    14/09/24 09:40:05 INFO mapred.JobClient:   FileSystemCounters
    14/09/24 09:40:05 INFO mapred.JobClient:     HDFS_BYTES_READ=586
    14/09/24 09:40:05 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=122093
    14/09/24 09:40:05 INFO mapred.JobClient:     FILE_BYTES_READ=1658
    14/09/24 09:40:05 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=202
    14/09/24 09:40:05 INFO mapred.JobClient:   File Output Format Counters
    14/09/24 09:40:05 INFO mapred.JobClient:     Bytes Written=202
     
    5、运行结果:
    factoryname    addressname
    Beijing Red Star    Beijing
    Beijing Rising    Beijing
    Bank of Beijing    Beijing
    Guangzhou Honda    Guangzhou
    Guangzhou Development Bank    Guangzhou
    Shenzhen Thunder    Shenzhen
    Tencent    Shenzhen
  • 相关阅读:
    【crontab】误删crontab及其恢复
    New Concept English there (7)
    New Concept English there (6)
    New Concept English there (5)
    New Concept English there (4)
    New Concept English there (3)
    New Concept English there (2)Typing speed exercise
    New Concept English there (1)Typing speed exercise
    New Concept English Two 34 game over
    New Concept English Two 33 94
  • 原文地址:https://www.cnblogs.com/yangyquin/p/5021190.html
Copyright © 2011-2022 走看看