zoukankan      html  css  js  c++  java
  • MapReduce编程系列 — 6:多表关联

    1、项目名称:

    2、程序代码:
    版本一(详细版):
    package com.mtjoin;
    
    import java.io.IOException;
    import java.util.Iterator;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class MTjoin {
        public static int time = 0;
        public static class Map extends Mapper<Object, Text, Text, Text>{
            public void map(Object key, Text value, Context context)throws IOException,InterruptedException{
                System.out.println("mapper........................");
                String line = value.toString();            
                if(line.contains("factoryname")==true || line.contains("addressID")== true){
                    return ;
                }
                int i = 0;
                while(line.charAt(i) >= '9'|| line.charAt(i) <= '0'){
                    i++;
                }
    
                if(line.charAt(0) >= '9'|| line.charAt(0) <= '0'){
                    int j = i-1;
                    while(line.charAt(j) != ' ') j--;
                    System.out.println("key:"+line.substring(i)+"  value:"+line.substring(0,j));
    
                    String values[] = {line.substring(0, j),line.substring(i)};
    
                    context.write(new Text(values[1]), new Text("1+"+values[0]));
                }
                else {
                    int j = i + 1;
                    while(line.charAt(j)!=' ')  j++;
                    System.out.println("key:"+line.substring(0, i+1)+"  value:"+line.substring(j));
                    String values[] ={line.substring(0,i+1),line.substring(j)};
                    context.write(new Text(values[0]), new Text("2+"+values[1]));
                }
            }
        }
    
            public static class Reduce extends Reducer<Text, Text, Text, Text>{
                public void reduce(Text key, Iterable<Text> values, Context context)throws IOException,InterruptedException{
                    System.out.println("reducer........................");
                    if( time == 0){
                        context.write(new Text("factoryname"), new Text("addressname"));
                        time++;
                    }
                    int factorynum = 0;
                    String factory[] = new String[10];
                    int addressnum = 0;
                    String address[] = new String[10];
    
                    Iterator ite = values.iterator();
                    while(ite.hasNext()){
                        String record = ite.next().toString();
                        char type = record.charAt(0);
                        if(type == '1'){
                            factory[factorynum] = record.substring(2);
                            factorynum++;
                        }
                        else{
                            address[addressnum] = record.substring(2);
                            addressnum++;
                        }
                    }
                    if(factorynum != 0 && addressnum != 0){
                        for(int m = 0 ; m < factorynum ; m++){
                            for(int n = 0; n < addressnum; n++){
                                context.write(new Text(factory[m]), new Text(address[n]));
                                System.out.println("factoryname:"+factory[m]+"  addressname:"+address[n]);
                            }
                        }
                    }
                }
            }
        public static void main(String [] args)throws Exception{
            Configuration conf = new Configuration();
            String otherArgs[] = new GenericOptionsParser(conf,args).getRemainingArgs();
            if(otherArgs.length != 2){
                System.err.println("Usage:MTjoin<in><out>");
                System.exit(2);
            }
            Job job = new Job(conf,"multiple table join");
            job.setJarByClass(MTjoin.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));        
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true)? 0:1);
            }
    }

    版本二(简化版):

    package com.mtjoin;
    
    import java.io.IOException;
    import java.util.Iterator;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class MTjoin {
        public static int time = 0;
        public static class Map extends Mapper<Object, Text, Text, Text>{
            public void map(Object key, Text value, Context context)throws IOException,InterruptedException{
                System.out.println("mapper........................");
                String line = value.toString();            
                if(line.contains("factoryname")==true || line.contains("addressID")== true){
                    return ;
                }
                int len = line.length();
    
                if(line.charAt(0) > '9'|| line.charAt(0) < '0'){
                    System.out.println("key:"+line.substring(len-1)+"  value:"+line.substring(0,len-2));
    
                    String values[] = {line.substring(0, len-2),line.substring(len-1)};
    
                    context.write(new Text(values[1]), new Text("1+"+values[0]));
                }
                else {
                    System.out.println("key:"+line.substring(0, 1)+"  value:"+line.substring(2));
                    String values[] ={line.substring(0,1),line.substring(2)};
                    context.write(new Text(values[0]), new Text("2+"+values[1]));
                }
            }
        }
    
            public static class Reduce extends Reducer<Text, Text, Text, Text>{
                public void reduce(Text key, Iterable<Text> values, Context context)throws IOException,InterruptedException{
                    System.out.println("reducer........................");
                    if( time == 0){
                        context.write(new Text("factoryname"), new Text("addressname"));
                        time++;
                    }
                    int factorynum = 0;
                    String factory[] = new String[10];
                    int addressnum = 0;
                    String address[] = new String[10];
    
                    Iterator ite = values.iterator();
                    while(ite.hasNext()){
                        String record = ite.next().toString();
                        char type = record.charAt(0);
                        if(type == '1'){
                            factory[factorynum] = record.substring(2);
                            factorynum++;
                        }
                        else{
                            address[addressnum] = record.substring(2);
                            addressnum++;
                        }
                    }
                    if(factorynum != 0 && addressnum != 0){
                        for(int m = 0 ; m < factorynum ; m++){
                            for(int n = 0; n < addressnum; n++){
                                context.write(new Text(factory[m]), new Text(address[n]));
                                System.out.println("factoryname:"+factory[m]+"  addressname:"+address[n]);
                            }
                        }
                    }
                }
            }
    
        public static void main(String [] args)throws Exception{
            Configuration conf = new Configuration();
            String otherArgs[] = new GenericOptionsParser(conf,args).getRemainingArgs();
            if(otherArgs.length != 2){
                System.err.println("Usage:MTjoin<in><out>");
                System.exit(2);
            }
            Job job = new Job(conf,"multiple table join");
            job.setJarByClass(MTjoin.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));        
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true)? 0:1);
            }
    }
     
    3、测试数据:
    address:
    addressID addressname
    1 Beijing
    2 Guangzhou
    3 Shenzhen
    4 Xian
     
    factory:
    factoryname addressname
    Beijing Red Star 1
    Shenzhen Thunder 3
    Guangzhou Honda 2
    Beijing Rising 1
    Guangzhou Development Bank 2
    Tencent 3
    Bank of Beijing 1
     
     
    4、运行过程:
    14/09/24 09:39:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    14/09/24 09:39:55 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
    14/09/24 09:39:55 INFO input.FileInputFormat: Total input paths to process : 2
    14/09/24 09:39:55 WARN snappy.LoadSnappy: Snappy native library not loaded
    14/09/24 09:39:55 INFO mapred.JobClient: Running job: job_local_0001
    14/09/24 09:39:55 INFO util.ProcessTree: setsid exited with exit code 0
    14/09/24 09:39:55 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@e095722
    14/09/24 09:39:55 INFO mapred.MapTask: io.sort.mb = 100
    14/09/24 09:39:55 INFO mapred.MapTask: data buffer = 79691776/99614720
    14/09/24 09:39:55 INFO mapred.MapTask: record buffer = 262144/327680
    mapper........................
    mapper........................
    key:1  value:Beijing Red Star
    mapper........................
    key:3  value:Shenzhen Thunder
    mapper........................
    key:2  value:Guangzhou Honda
    mapper........................
    key:1  value:Beijing Rising
    mapper........................
    key:2  value:Guangzhou Development Bank
    mapper........................
    key:3  value:Tencent
    mapper........................
    key:1  value:Bank of Beijing
    14/09/24 09:39:55 INFO mapred.MapTask: Starting flush of map output
    14/09/24 09:39:55 INFO mapred.MapTask: Finished spill 0
    14/09/24 09:39:55 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
    14/09/24 09:39:56 INFO mapred.JobClient:  map 0% reduce 0%
    14/09/24 09:39:58 INFO mapred.LocalJobRunner:
    14/09/24 09:39:58 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
    14/09/24 09:39:58 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@7dabd20
    14/09/24 09:39:58 INFO mapred.MapTask: io.sort.mb = 100
    14/09/24 09:39:58 INFO mapred.MapTask: data buffer = 79691776/99614720
    14/09/24 09:39:58 INFO mapred.MapTask: record buffer = 262144/327680
    mapper........................
    mapper........................
    key:1  value:Beijing
    mapper........................
    key:2  value:Guangzhou
    mapper........................
    key:3  value:Shenzhen
    mapper........................
    key:4  value:Xian
    14/09/24 09:39:58 INFO mapred.MapTask: Starting flush of map output
    14/09/24 09:39:58 INFO mapred.MapTask: Finished spill 0
    14/09/24 09:39:58 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
    14/09/24 09:39:59 INFO mapred.JobClient:  map 100% reduce 0%
    14/09/24 09:40:01 INFO mapred.LocalJobRunner:
    14/09/24 09:40:01 INFO mapred.Task: Task 'attempt_local_0001_m_000001_0' done.
    14/09/24 09:40:01 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@49fa6f3c
    14/09/24 09:40:01 INFO mapred.LocalJobRunner:
    14/09/24 09:40:01 INFO mapred.Merger: Merging 2 sorted segments
    14/09/24 09:40:01 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 218 bytes
    14/09/24 09:40:01 INFO mapred.LocalJobRunner:
    reducer........................
    factoryname:Beijing Red Star  addressname:Beijing
    factoryname:Beijing Rising  addressname:Beijing
    factoryname:Bank of Beijing  addressname:Beijing
    reducer........................
    factoryname:Guangzhou Honda  addressname:Guangzhou
    factoryname:Guangzhou Development Bank  addressname:Guangzhou
    reducer........................
    factoryname:Shenzhen Thunder  addressname:Shenzhen
    factoryname:Tencent  addressname:Shenzhen
    reducer........................
    14/09/24 09:40:01 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
    14/09/24 09:40:01 INFO mapred.LocalJobRunner:
    14/09/24 09:40:01 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
    14/09/24 09:40:01 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://localhost:9000/user/hadoop/mtjoin_output02
    14/09/24 09:40:04 INFO mapred.LocalJobRunner: reduce > reduce
    14/09/24 09:40:04 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
    14/09/24 09:40:05 INFO mapred.JobClient:  map 100% reduce 100%
    14/09/24 09:40:05 INFO mapred.JobClient: Job complete: job_local_0001
    14/09/24 09:40:05 INFO mapred.JobClient: Counters: 22
    14/09/24 09:40:05 INFO mapred.JobClient:   Map-Reduce Framework
    14/09/24 09:40:05 INFO mapred.JobClient:     Spilled Records=22
    14/09/24 09:40:05 INFO mapred.JobClient:     Map output materialized bytes=226
    14/09/24 09:40:05 INFO mapred.JobClient:     Reduce input records=11
    14/09/24 09:40:05 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
    14/09/24 09:40:05 INFO mapred.JobClient:     Map input records=13
    14/09/24 09:40:05 INFO mapred.JobClient:     SPLIT_RAW_BYTES=238
    14/09/24 09:40:05 INFO mapred.JobClient:     Map output bytes=192
    14/09/24 09:40:05 INFO mapred.JobClient:     Reduce shuffle bytes=0
    14/09/24 09:40:05 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
    14/09/24 09:40:05 INFO mapred.JobClient:     Reduce input groups=4
    14/09/24 09:40:05 INFO mapred.JobClient:     Combine output records=0
    14/09/24 09:40:05 INFO mapred.JobClient:     Reduce output records=8
    14/09/24 09:40:05 INFO mapred.JobClient:     Map output records=11
    14/09/24 09:40:05 INFO mapred.JobClient:     Combine input records=0
    14/09/24 09:40:05 INFO mapred.JobClient:     CPU time spent (ms)=0
    14/09/24 09:40:05 INFO mapred.JobClient:     Total committed heap usage (bytes)=813170688
    14/09/24 09:40:05 INFO mapred.JobClient:   File Input Format Counters
    14/09/24 09:40:05 INFO mapred.JobClient:     Bytes Read=216
    14/09/24 09:40:05 INFO mapred.JobClient:   FileSystemCounters
    14/09/24 09:40:05 INFO mapred.JobClient:     HDFS_BYTES_READ=586
    14/09/24 09:40:05 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=122093
    14/09/24 09:40:05 INFO mapred.JobClient:     FILE_BYTES_READ=1658
    14/09/24 09:40:05 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=202
    14/09/24 09:40:05 INFO mapred.JobClient:   File Output Format Counters
    14/09/24 09:40:05 INFO mapred.JobClient:     Bytes Written=202
     
    5、运行结果:
    factoryname    addressname
    Beijing Red Star    Beijing
    Beijing Rising    Beijing
    Bank of Beijing    Beijing
    Guangzhou Honda    Guangzhou
    Guangzhou Development Bank    Guangzhou
    Shenzhen Thunder    Shenzhen
    Tencent    Shenzhen
  • 相关阅读:
    Python多进程实现并行化随机森林
    Python多进程队列间传递对象
    Umlet和draw.io 使用心得
    简单认识Adam优化器
    使用BERT进行情感分类预测及代码实例
    【深度学习】深入理解Batch Normalization批标准化
    Vue插件总结
    Vue配置环境识别
    pc端微信上传BUG
    Vue原生订单列表
  • 原文地址:https://www.cnblogs.com/yangyquin/p/5021190.html
Copyright © 2011-2022 走看看