zoukankan      html  css  js  c++  java
  • map reduce输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名——地址名"表。

    package com.xzy.factory.address;

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    /**
     *     多表关联
            多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息。下面进入这个实例。
        5.1 实例描述
            输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名——地址名"表。
            样例输入如下所示。
            1)factory:
        
        factoryname    addressed
        Beijing Red Star    1
        Shenzhen Thunder    3
        Guangzhou Honda    2
        Beijing Rising    1
        Guangzhou Development Bank    2
        Tencent    3
        Back of Beijing    1
        
            2)address:
        
        addressID    addressname
        1    Beijing
        2    Guangzhou
        3    Shenzhen
        4    Xian
        
            样例输出如下所示。
        
        factoryname                        addressname
        Back of Beijing                          Beijing
        Beijing Red Star                        Beijing
        Beijing Rising                          Beijing
        Guangzhou Development Bank          Guangzhou
        Guangzhou Honda                    Guangzhou
        Shenzhen Thunder                    Shenzhen
        Tencent                            Shenzhen
     *
     *
     */
    public class FactoryAddressJob {
        public static class FactoryAddressMapper extends Mapper<LongWritable, Text, Text, Text>{
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                String[] split = value.toString().split(" ");
                if(split.length == 2){
                    if(split[0].equals("factoryname")){
                        return;
                    }else{
                        context.write(new Text(split[1]), new Text(split[0] + "_factory"));
                    }
                    if(split[0].equals("addressID")){
                        return;
                    }else{
                        context.write(new Text(split[0]), new Text(split[1]));
                    }
                }
            }
        }
        public static class FactoryAddressReducer extends Reducer<Text, Text, Text, Text>{
            @Override
            protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                List<String> k = new ArrayList<String>();
                List<String> val = new ArrayList<String>();
                for (Text v : value) {
                    if(v.toString().endsWith("_factory")){
                        String[] split = v.toString().split("_");
                        k.add(split[0]);
                    }else{
                        val.add(v.toString());
                    }
                }
                if(k.size() > 0 && val.size() > 0){
                    for(int i = 0;i < k.size(); i++){
                        for(int j = 0;j <val.size(); j++){
                            context.write(new Text(k.get(i)), new Text(val.get(j)));
                        }
                    }
                }
            }
        }
        public static void main(String[] args) throws Exception{
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(FactoryAddressJob.class);
            
            job.setMapperClass(FactoryAddressMapper.class);
            job.setReducerClass(FactoryAddressReducer.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            FileInputFormat.setInputPaths(job, args[0]);
            Path path = new Path(args[1]);
            FileSystem fs = FileSystem.get(conf);
            if(fs.exists(path))
                fs.delete(path,true);
            
            FileOutputFormat.setOutputPath(job, path);
            job.waitForCompletion(true);
        }
    }

  • 相关阅读:
    nfs只能挂载为nobody的解决方法
    Mysql一些记忆
    shell中交互输入自动化
    关闭SElinux
    《Windows核心编程系列》十谈谈同步设备IO与异步设备IO之异步IO
    《Windows核心编程系列》九谈谈同步设备IO与异步设备IO之同步设备IO
    《Windows核心编程系列》八谈谈用内核对象进行线程同步
    《windows核心编程系列》七谈谈用户模式下的线程同步
    《windows核心编程系列 》六谈谈线程调度、优先级和关联性
    《windows核心编程系列》五谈谈线程基础
  • 原文地址:https://www.cnblogs.com/xiaozhongyu/p/7418601.html
Copyright © 2011-2022 走看看