zoukankan      html  css  js  c++  java
  • Mapreduce之mappper_join

      Mapreduce的mapper方法里的多表联查。

      首先要确定一个大表和一个小表,然后将小表放在内存的缓冲区之中。

    job.addCacheFile(new URI("hdfs://master:9000/mapreduce2/in/product.txt"));

      然后在mapper方法之中,执行的时候要将本地缓冲区的数据写入到一个map集合。是重写一个setup方法。

      放入map集合的时候选取一个字段作为key。

      大表是要放在map方法中来进行操作的。将大表的字段作为key,取出value的值。这个key和之前放入map的key是同一属性值。

      看个例子:

      

    package MapJoin;
    
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.HashMap;
    
    public class Join_Mapper extends Mapper<LongWritable, Text,Text,Text> {
        private HashMap<String,String> map=new HashMap<String, String>();
        //一、将本地缓存区小表的数据读取到Map集合(只需要做一次)
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //获取分布式文件缓存列表
            URI[] cacheFiles = context.getCacheFiles();
            //获取分布式缓存文件的文件系统FileSystem
            FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
            //获取文件的输入流
            FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));
            //读取文件内容,并将数据存入Map集合
                //将字节输入流转为字符缓冲流
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
                //读取小表的内容,并将读取的数据存入Map集合
            String line = null;
    
            while((line = bufferedReader.readLine()) != null){
                String[] split = line.split(",");
                map.put(split[0],line);
            }
            //关闭流
    
            bufferedReader.close();
    //        fileSystem.close();
        }
        //二、将达标的数据和小表的数据进行join
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split(",");
            String name = split[2];
            //将name作为map的key,获取到value,然后进行拼接;得到V2
            String productLine = map.get(name);
            String valueLine = productLine+"	"+value.toString();
            context.write(new Text(name),new Text(valueLine));
    
    
        }
    }
    package MapJoin;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.net.URI;
    
    public class MapJoinJob extends Configured implements Tool {
        public int run(String[] strings) throws Exception {
            Job job = Job.getInstance(super.getConf(), "Map_job");
            //将小表放在分布式缓存中
            job.addCacheFile(new URI("hdfs://master:9000/mapreduce2/in/product.txt"));
    
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job,new Path("hdfs://master:9000/mapreduce2/in/order.txt"));
    
            job.setMapperClass(Join_Mapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/mapreduce2/Map_Join_out"));
            boolean b = job.waitForCompletion(true);
            return b?0:1;
        }
    
        public static void main(String[] args) throws Exception {
            Configuration configuration = new Configuration();
            int run = ToolRunner.run(configuration, new MapJoinJob(), args);
            System.exit(run);
    
        }
    }

      

      

  • 相关阅读:
    #include <NOIP2009 Junior> 细胞分裂 ——using namespace wxl;
    【NOIP合并果子】uva 10954 add all【贪心】——yhx
    NOIP2010普及组T4 三国游戏——S.B.S.
    NOIP2010普及组T3 接水问题 ——S.B.S.
    NOIP2011提高组 聪明的质监员 -SilverN
    NOIP2010提高组 关押罪犯 -SilverN
    uva 1471 defence lines——yhx
    json2的基本用法
    获取对象的属性个数
    替换指定规则的字符串
  • 原文地址:https://www.cnblogs.com/moxihuishou/p/14008747.html
Copyright © 2011-2022 走看看