zoukankan      html  css  js  c++  java
  • MapReduce实现多表链接

    多表链接

    输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名——地址名"表。

    factory:

    factoryname                    addressed
    Beijing Red Star                    1
    Shenzhen Thunder                3
    Guangzhou Honda                2
    Beijing Rising                       1
    Guangzhou Development Bank      2
    Tencent                        3
    Back of Beijing                     1

    address:

    addressID    addressname
    1            Beijing
    2            Guangzhou
    3            Shenzhen
    4            Xian

    设计思路

    取出两个表中共同列作为map中的key,同时需要标识每个列所在的表,供在reduce中拆分。

    代码实现

    Mapper类

    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class MyMapper extends Mapper<LongWritable,Text,Text,Text> {
    
        private static Text k = new Text();
        private static Text v= new Text();
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            String path = ((FileSplit)context.getInputSplit()).getPath().getName();//获取文件名
            String line = value.toString();
            StringTokenizer st = new StringTokenizer(value.toString());
            String[] tmp = line.split("    +");
            if(tmp.length ==2){
                String first = tmp[0];
                String second = tmp[1];
                if(path.equals("factory")){
                    if(first.equals("factoryname")) return;
                    k.set(second);
                    v.set(first+"1");
                }else if(path.equals("address")){
                    if(second.equals("addressname")) return;
                    k.set(first);
                    v.set(second+"2");
                }
                context.write(k,v);
            }
        }
    }

    Reducer类

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyReducer extends Reducer<Text, Text, Text, Text>{
        
        private Text k = new Text();
        private Text v = new Text();
        
        @Override
        protected void setup(Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            context.write(new Text("factoryname"), new Text("addressname"));
        }
    
        @Override
        protected void reduce(Text key, Iterable<Text> value,Context context)
                throws IOException, InterruptedException {
                List<String> factory = new ArrayList<String>();
                List<String> address = new ArrayList<String>();
                for(Text val : value){
                    String str = val.toString();
                    String stf = str.substring(str.length()-1);
                    String con = str.substring(0,str.length()-1);
                    int flag = Integer.parseInt(stf);
                    if(flag == 1){
                        factory.add(con);
                    }else if(flag ==2){
                        address.add(con);
                    }
                }
                for(int i=0;i<factory.size();i++){
                    k.set(factory.get(i));
                    for(int j=0;j<address.size();j++){
                        v.set(address.get(j));
                        context.write(k, v);
                    }
                }
        }
    }

    Job驱动类

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class MTJoin {
    
        public static void main(String[] args) throws Exception {
            
            Configuration conf = new Configuration();
            Job job = new Job(conf,"multi table join");
            job.setJarByClass(MTJoin.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path("hdfs://127.0.0.1:9000/usr/qqx/mtinput"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://127.0.0.1:9000/usr/qqx/mtoutput"));
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }
  • 相关阅读:
    Linux下安装Tomcat服务器
    记一次操蛋的:Could not find parameter map java.lang.Long
    在IDEA中使用MyBatis Generator逆向工程生成代码
    理解jquery的$.extend()、$.fn和$.fn.extend()
    在 Win10 系统下安装 JDK 及配置环境变量的方法
    怎样设置才能允许外网访问MySQL
    基于JavaMail的Java邮件发送:简单邮件发送Demo
    前端学习第57天 背景样式、精灵图、盒子模型布局细节、盒子模型案例、w3c主页
    前端学习第56天高级选择器、盒子模型、边界圆角、其他属性
    前端学习第55天 css三种引用、‘引用的优先级’、基本选择器、属性、display
  • 原文地址:https://www.cnblogs.com/qiaoqianxiong/p/4986940.html
Copyright © 2011-2022 走看看