zoukankan      html  css  js  c++  java
  • MapReduce实现多表链接

    多表链接

    输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名——地址名"表。

    factory:

    factoryname                    addressed
    Beijing Red Star                    1
    Shenzhen Thunder                3
    Guangzhou Honda                2
    Beijing Rising                       1
    Guangzhou Development Bank      2
    Tencent                        3
    Back of Beijing                     1

    address:

    addressID    addressname
    1            Beijing
    2            Guangzhou
    3            Shenzhen
    4            Xian

    设计思路

    取出两个表中共同列作为map中的key,同时需要标识每个列所在的表,供在reduce中拆分。

    代码实现

    Mapper类

    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class MyMapper extends Mapper<LongWritable,Text,Text,Text> {
    
        private static Text k = new Text();
        private static Text v= new Text();
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            String path = ((FileSplit)context.getInputSplit()).getPath().getName();//获取文件名
            String line = value.toString();
            StringTokenizer st = new StringTokenizer(value.toString());
            String[] tmp = line.split("    +");
            if(tmp.length ==2){
                String first = tmp[0];
                String second = tmp[1];
                if(path.equals("factory")){
                    if(first.equals("factoryname")) return;
                    k.set(second);
                    v.set(first+"1");
                }else if(path.equals("address")){
                    if(second.equals("addressname")) return;
                    k.set(first);
                    v.set(second+"2");
                }
                context.write(k,v);
            }
        }
    }

    Reducer类

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyReducer extends Reducer<Text, Text, Text, Text>{
        
        private Text k = new Text();
        private Text v = new Text();
        
        @Override
        protected void setup(Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            context.write(new Text("factoryname"), new Text("addressname"));
        }
    
        @Override
        protected void reduce(Text key, Iterable<Text> value,Context context)
                throws IOException, InterruptedException {
                List<String> factory = new ArrayList<String>();
                List<String> address = new ArrayList<String>();
                for(Text val : value){
                    String str = val.toString();
                    String stf = str.substring(str.length()-1);
                    String con = str.substring(0,str.length()-1);
                    int flag = Integer.parseInt(stf);
                    if(flag == 1){
                        factory.add(con);
                    }else if(flag ==2){
                        address.add(con);
                    }
                }
                for(int i=0;i<factory.size();i++){
                    k.set(factory.get(i));
                    for(int j=0;j<address.size();j++){
                        v.set(address.get(j));
                        context.write(k, v);
                    }
                }
        }
    }

    Job驱动类

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class MTJoin {
    
        public static void main(String[] args) throws Exception {
            
            Configuration conf = new Configuration();
            Job job = new Job(conf,"multi table join");
            job.setJarByClass(MTJoin.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path("hdfs://127.0.0.1:9000/usr/qqx/mtinput"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://127.0.0.1:9000/usr/qqx/mtoutput"));
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }
  • 相关阅读:
    aa
    MySQL5.8下载及安装——免安装版
    eclipse中修改项目名
    利用kibana学习 elasticsearch restful api (DSL)
    https://www.cnblogs.com/shwee/p/9084535.html
    springboot+mybatis实现动态切换数据源
    docker-machine命令安装
    Docker 安装 RabbitMq
    yum 找不到程序,yum更换国内阿里源
    CentOS安装etcd和flannel实现Docker跨物理机通信
  • 原文地址:https://www.cnblogs.com/qiaoqianxiong/p/4986940.html
Copyright © 2011-2022 走看看