zoukankan      html  css  js  c++  java
  • MapReduce:输入是两个文件,file1代表工厂表,包含工厂名列和地址编号列;file2代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名----地址名"表

    文件如下:

    file1:

    Beijing Red Star    1
    Shenzhen Thunder    3
    Guangzhou Honda    2
    Beijing Rising    1
    Guangzhou Development Bank    2
    Tencent    3
    Back of Beijing    1

    file2:

    1    Beijing
    2    Guangzhou
    3    Shenzhen
    4    Xian

    代码如下(由于水平有限,不保证完全正确,如果发现错误欢迎指正):

    package com;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class Test {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration config = new  Configuration();
            config.set("fs.defaultFS", "hdfs://192.168.0.100:9000");
            config.set("yarn.resourcemanager.hostname", "192.168.0.100");
            
            FileSystem fs = FileSystem.get(config);
            
            Job job = Job.getInstance(config);
            
            job.setJarByClass(Test.class);
            
            //设置所用到的map类
            job.setMapperClass(myMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            //设置用到的reducer类
            job.setReducerClass(myReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            //设置s输入输出地址
            FileInputFormat.addInputPath(job, new Path("/FactoryName/"));
            
            Path path = new Path("/output2/");
            
            if(fs.exists(path)){
                fs.delete(path, true);
            }
            
            //指定文件的输出地址
            FileOutputFormat.setOutputPath(job, path);
            
            //启动处理任务job
            boolean completion = job.waitForCompletion(true);
            if(completion){
                System.out.println("Job Success!");
            }
        }
        
        public static class myMapper extends Mapper<Object, Text, Text, Text> {
    
            // 实现map函数
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String temp=new String();// 左右表标识
                String values=value.toString();
                String words[]=values.split("	");
                
                String mapkey = new String();
                String mapvalue = new String(); 
               
                //右表:1  Beijing
                if (words[0].charAt(0) >= '0' && words[0].charAt(0) <= '4') {
                    mapkey = words[0];
                    mapvalue =words[1];
                    temp = "2";
                    
                }else{
                    //左表:Beijing Red Star  1
                    mapkey = words[1];
                    mapvalue =words[0];
                    temp = "1";
                }
                
                // 输出左右表
                //左表:(1,1+Beijing Red Star)
                //右表:(1,2+Beijing)
                context.write(new Text(mapkey), new Text(temp + "+"+ mapvalue));
            }
        }
    
        //reduce解析map输出,将value中数据按照左右表分别保存
        public static class myReducer extends Reducer<Text, Text, Text, Text> {
            // 实现reduce函数
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                
                List<String> factory = new ArrayList<String>();
                List<String> address = new ArrayList<String>();
                
                for (Text value : values) {
                    // 取得左右表标识
                    char temp=(char) value.charAt(0);
                    String words[]=value.toString().split("[+]");//1,Beijing Red Star
                    
                    if(temp=='1'){
                        factory.add(words[1]);// 左表
                    }
                    
                    if(temp=='2'){
                        address.add(words[1]);// 右表
                    }
                }
                
                //求出笛卡尔积,并输出
                for (String f : factory) {
                    for (String a : address) {
                        context.write(new Text(f), new Text(a));
                    }
                }
            }
        } 
    }

     输出结果如下:

    如果您认为这篇文章还不错或者有所收获,您可以通过右边的“打赏”功能 打赏我一杯咖啡【物质支持】,也可以点击下方的【好文要顶】按钮【精神支持】,因为这两种支持都是使我继续写作、分享的最大动力!

  • 相关阅读:
    人才培养的金字塔模型
    3D建模技术-讲稿-5-基础
    3D建模技术-讲稿-4-基础
    3D建模技术-讲稿-3-基础
    3D建模技术-讲稿-1-入门
    3D建模技术-讲稿-2-基础
    3D建模技术-讲稿-1-方凳
    the latest Ext2Fsd(ext2fsd-0.69) doesn't work after windows 10 --- solved using ext2fsd-0.53 or 0.68
    欢迎同学们参加新乡学院2019年3D打印暑期夏令营
    (OK) 自己动手构建Linux发行版---简版
  • 原文地址:https://www.cnblogs.com/supiaopiao/p/7268756.html
Copyright © 2011-2022 走看看