zoukankan      html  css  js  c++  java
  • Mapreduce中的join操作

    一、背景

        MapReduce提供了表连接操作其中包括Map端join、Reduce端join还有半连接,现在我们要讨论的是Map端join,Map端join是指数据到达map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。

    二、具体join

       1、join的例子

        比如我们有两个文件,分别存储 订单信息:products.txt,和 商品信息:orders.txt ,详细数据如下:

      • products.txt:
        //商品ID,商品名称,商品类型(数字表示,我们假设有一个数字和具体类型的映射)
        p0001,xiaomi,001
        p0002,chuizi,001
      • orders.txt:
        //订单号,时间,商品id,购买数量 
        1001,20170710,p0001,1 
        1002,20170710,p0001,3 
        1003,20170710,p0001,3 
        1004,20170710,p0002,1

        我们想象有多个商品,并有海量的订单信息,并且存储在多个 HDFS 块中。

        xiaomi,7
        chuizi,1

        该怎么处理? 我们分析上面我们想要的结果,商品名称和销量,这两个属性分别存放到不同的文件中,那我们就要考虑 在一个地方(mapper)读取这两个文件的数据,并把数据在一个地方(reducer)进行结合。这就是 MapReduce 中的 Join 了。

      • 代码如下:
        • Mapper:
          public class joinMapper extends Mapper<LongWritable,Text,Text,Text> {
          
              private Text outKey=new Text();
              private Text outValue=new Text();
              @Override
              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                  String line = value.toString();
                  String[] split = line.split(",");
                  FileSplit inputSplit = (FileSplit) context.getInputSplit();
                  String name = inputSplit.getPath().getName();
                  //两个文件 在一个 mapper 中处理
                  //通过文件名判断是那种数据
                  if(name.startsWith("a")){
                      //取商品ID 作为 输出key 和 商品名称 作为 输出value,即 第0、1 的数据
                      outKey.set(split[0]);
                      outValue.set("product#" + split[1]);
                      context.write(outKey, outValue);
                  }else{
                      //取商品ID 作为 输出key 和 购买数量 作为 输出value,即 第2、3 的数据
                      outKey.set(split[2]);
                      outValue.set("order#" + split[3]);
                      context.write(outKey, outValue);
                  }
              }
          }
        • Reducer

          public class joinReducer extends Reducer<Text,Text,Text,Text> {
              private Text outValue = new Text();
              @Override
              protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                  //用来存放:商品ID、商品名称
                  List<String> productsList = new ArrayList<String>();
                  //用来存放:商品ID、购买数量
                  List<Integer> ordersList = new ArrayList<Integer>();
          
                  for (Text text:values){
                      String value = text.toString();
                      if(value.startsWith("product#")) {
                          productsList.add(value.split("#")[1]); //取出 商品名称
                      } else if(value.startsWith("order#")){
                          ordersList.add(Integer.parseInt(text.toString().split("#")[1].trim())); //取出商品的销量
                      }
                  }
                  int totalOrders = 0;
                  for (int i=0; i < productsList.size(); i++) {
                      System.out.println(productsList.size());
          
                      for (int j=0; j < ordersList.size(); j++) {
                          System.out.println(ordersList.size());
                          totalOrders += ordersList.get(j);
                      }
                      outValue.set(productsList.get(i) + "	" + totalOrders );
                      //最后的输出是:商品ID、商品名称、购买数量
                      context.write(key, outValue);
                  }
          
              }
          }
        • App:

          public class App  {
              public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                  Configuration conf = new Configuration();
                  conf.set("fs.defaultFS", "file:///");
          
                  Path path = new Path("F:\mr\join\out");
                  FileSystem fileSystem = path.getFileSystem(conf);
                  if(fileSystem.isDirectory(path)){
                      fileSystem.delete(path,true);
                  }
                  Job job = Job.getInstance(conf);
                  //设置job的各种属性
                  job.setJobName("App");                        //作业名称
                  job.setJarByClass(App.class);                 //搜索类
                  job.setInputFormatClass(TextInputFormat.class); //设置输入格式
          
                  job.setMapperClass(joinMapper.class);
                  job.setReducerClass(joinReducer.class);
                  //添加输入路径
                  FileInputFormat.addInputPath(job,new Path("F:\mr\join\map"));
                  //设置输出路径
                  FileOutputFormat.setOutputPath(job,new Path("F:\mr\join\out"));
                  //map输出类型
                  job.setOutputKeyClass(Text.class);           //
                  job.setOutputValueClass(Text.class);        //
                  job.waitForCompletion(true);
          
              }
          }
        • 输出结果

          p0001    xiaomi    7
          p0002    chuizi    1
                  

    2、 Map Join   

        一个数据集很大,另一个数据集很小(能够被完全放进内存中),MAPJION会把小表全部读入内存中,把小表拷贝多份分发到大表数据所在实例上的内存里,在map阶段直接 拿另 外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率会高很多;

        适用于关联表中有小表的情形;可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度。并用distributedcache机制将小表的数据分发到每一个maptask执行节点,从而每一个maptask节点可以从本地加载到小表的数据,进而在本地即可实现join

      • left outer join的左表必须是大表    
      • right outer join的右表必须是大表
      • inner join左表或右表均可以作为大表
      • full outer join不能使用mapjoin;
      •  mapjoin支持小表为子查询,使用mapjoin时需要引用小表或是子查询时,需要引用别名;在mapjoin中,可以使用不等值连接或者使用or连接多个条件;    

      1.2、 Map Join事例

        • product表

          p0001,xiaomi,001
          p0002,chuizi,001

        • orders表

          1001,20170710,p0001,1
          1002,20170710,p0001,3
          1003,20170710,p0001,3
          1004,20170710,p0002,1

        • 期望输出

          xiaomi 1001,20170710,p0001,1
          xiaomi 1002,20170710,p0001,3
          xiaomi 1003,20170710,p0001,3
          chuizi 1004,20170710,p0002,1

        • 代码实现
          • Mapper
            /**
             * 链接操作  map端链接
             */
            public class MapJoinMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
            
                private Map<String,String> pdInfoMap =new HashMap<String,String>();
                private Text keyOut=new Text();
                /**
                 * 通过阅读父类Mapper的源码,发现 setup方法是在maptask处理数据之前调用一次 可以用来做一些初始化工作
                 */
                @Override
                protected void setup(Context context) {
                    try {
                        Configuration conf = context.getConfiguration();
                        FileSystem fs= null;
                        fs = FileSystem.get(conf);
                        FSDataInputStream fis = fs.open(new Path("file:/F:/mr/join/map/input/a.txt"));
                        //得到缓冲区阅读器
                        BufferedReader br = new BufferedReader(new InputStreamReader(fis));
                        String line=null;
                        while((line=br.readLine())!=null){
                            String[] fields = line.split(",");
                            pdInfoMap.put(fields[0],fields[1]);
                        }
                        fis.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                // 由于已经持有完整的产品信息表,所以在map方法中就能实现join逻辑了
                @Override
                protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                    //订单信息
                    String orderline = value.toString();
                    String[] fields = orderline.split(",");
                    String pName = pdInfoMap.get(fields[2]);
                    keyOut.set(pName+"	"+orderline);
                    context.write(keyOut,NullWritable.get());
                }
            }
          • App
            public class MapJoinApp {
                public static void main(String[] args) throws Exception {
                    Configuration conf = new Configuration();
                    conf.set("fs.defaultFS", "file:///");
                    Job job = Job.getInstance(conf);
                    //设置job的各种属性
                    job.setJobName("MapJoinApp");                        //作业名称
                    job.setJarByClass(MapJoinApp.class);                 //搜索类
                    //添加输入路径
                    FileInputFormat.addInputPath(job,new Path("F:/mr/join/map/input/b.txt"));
                    //设置输出路径
                    FileOutputFormat.setOutputPath(job,new Path("F:/mr/join/map/output"));
                    job.setMapperClass(MapJoinMapper.class);             //mapper类
                    //没有reduce
                    job.setNumReduceTasks(0);
                    job.setMapOutputKeyClass(Text.class);           //
                    job.setMapOutputValueClass(NullWritable.class);  //
            
                    job.waitForCompletion(true);
                }
            }
          • 输出和期望输出一致

    3、Reduce端Join

      • Reduce端连接比Map端连接更为普遍,因为输入的数据不需要特定的结构,但是效率比较低,因为所有数据都必须经过Shuffle过程。
      • 基本思路:
        1. Map端读取所有的文件,并在输出的内容里加上标示,代表数据是从哪个文件里来的。
        2. 在reduce处理函数中,按照标识对数据进行处理
        3. 然后根据Key去join来求出结果直接输出。
      • 例子
        • 数据如上
        • 计算过程:
          • 在Map阶段,把所有数据标记成<key,value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于products表的记录,value的值为"products#"+name;来源于orders的记录,value的值为"orders#"+score。
          • 在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终的结果。
        • 代码如下:
          • Mapper
            /**
             * map阶段打标记
             */
            public class reduceMapper extends Mapper<LongWritable,Text,Text,Text> {
                @Override
                protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                    String line = value.toString();
                    String[] fields = line.split(",");
            FileSplit fileSplit
            = (FileSplit)context.getInputSplit(); String pathName = fileSplit.getPath().toString(); pathName=pathName.substring(27); //通过文件名判断是那种数据 if (pathName.startsWith("a")){//product数据 //System.out.println(keyOut+" "+valueOut); context.write(new Text(fields[0]),new Text("product#"+fields[1])); }else if (pathName.startsWith("b")){ context.write(new Text(fields[2]),new Text("order#"+fields[0]+" "+fields[1]+" "+fields[3])); } } }
          • Reducer
            public class reduceReducer extends Reducer<Text,Text,Text,Text> {
                @Override
                protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                    //存放产品信息
                    List<String> proInfo = new ArrayList<String>();
                    //存放订单信息
                    List<String> ordInfo = new ArrayList<String>();
                    for (Text text:values){
                        System.out.println("key="+key+"  value="+text);
                        //将数组中的数据添加到对应的数组中去
                        if (text.toString().startsWith("product")){
                            proInfo.add(text.toString().split("#")[1]);
                        }else if(text.toString().startsWith("order")){
                            ordInfo.add(text.toString().split("#")[1]);
                        }
                    }
                    //获取两个数组的大小
                    int sizePro = proInfo.size();
                    int sizeOrd = ordInfo.size();
                    //遍历两个数组将结果写出去
                    for (int i=0;i<sizePro;i++){
                        for (int j=0;j<sizeOrd;j++){
                            context.write(key,new Text(proInfo.get(i)+" "+ordInfo.get(j)));
                        }
                    }
                }
            }
          • App
            public class ReduceApp {
                public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                    Configuration conf = new Configuration();
                    conf.set("fs.defaultFS", "file:///");
                    Job job = Job.getInstance(conf);
            
                    Path path = new Path("F:\mr\join\map/output1");
                    FileSystem fileSystem = path.getFileSystem(conf);
                    if(fileSystem.isDirectory(path)){
                        fileSystem.delete(path,true);
                    }
            
                    //设置job的各种属性
                    job.setJobName("ReduceApp");                        //作业名称
                    job.setJarByClass(ReduceApp.class);                 //搜索类
                    //添加输入路径
                    FileInputFormat.addInputPath(job,new Path("F:\mr\join\map\input"));
                    //设置输出路径
                    FileOutputFormat.setOutputPath(job,new Path("F:\mr\join\map/output1"));
            
                    job.setMapperClass(reduceMapper.class);             //mapper类
                    job.setReducerClass(reduceReducer.class);         //reducer类
            
                    job.setMapOutputKeyClass(Text.class);           //
                    job.setMapOutputValueClass(Text.class);  //
            
                    job.waitForCompletion(true);
                }
            }
          • 输出结果
            p0001    xiaomi 1003    20170710    3 
            p0001    xiaomi 1002    20170710    3 
            p0001    xiaomi 1001    20170710    1 
            p0002    chuizi 1004    20170710    1

             细节:

        • 当map读取源文件时,如何区分出是file1还是file2 
          FileSplit fileSplit = (FileSplit)context.getInputSplit();
          String path =  fileSplit.getPath().toString();

                 根据path就可以知道文件的来源咯。

  • 相关阅读:
    开源一个常用的小软件的源码——系统数据库服务管理软件
    MySql Windws 下自动备份脚本
    ubuntu-14.04-server配置Jexus --安装步骤记录
    Jumony快速抓取网页 --- Jumony使用笔记--icode
    视频教程--ASP.NET MVC 使用 Petapoco 微型ORM框架+NpgSql驱动连接 PostgreSQL数据库
    收录.NET跨平台及跨数据库的博文...
    ASP.NET MVC 使用 Petapoco 微型ORM框架+NpgSql驱动连接 PostgreSQL数据库
    Windbg程序调试系列
    QCY蓝牙耳机 左右两只耳机配对 方法
    wpf 的 Window或UserControl绑定自己后台属性
  • 原文地址:https://www.cnblogs.com/tongxupeng/p/10417527.html
Copyright © 2011-2022 走看看