zoukankan      html  css  js  c++  java
  • 04.Mapreduce实例——单表join

    04Mapreduce实例——单表join

    实验原理

    以本实验的buyer1(buyer_id,friends_id)表为例来阐述单表连接的实验原理。单表连接,连接的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一个表。因此,在map阶段将读入数据分割成buyer_id和friends_id之后,会将buyer_id设置成key,friends_id设置成value,直接输出并将其作为左表;再将同一对buyer_id和friends_id中的friends_id设置成key,buyer_id设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个key的value-list就包含了"buyer_idfriends_id--friends_idbuyer_id"关系。取出每个key的value-list进行解析,将左表中的buyer_id放入一个数组,右表中的friends_id放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。

    实验步骤

    1. 开启Hadoop服务

             Start-all.sh

    1. 建立目录

                mkdir -p /data/mapreduce7

    1. 将buyer1文件上传到该目录下
    2. 上传hadoop2lib文件并解压

             unzip hadoop2lib.zip

    1. 在hdfs上新建/mymapreduce7/in目录,然后将Linux本地/data/mapreduce7目录下的buyer1文件导入到hdfs的/mymapreduce7/in目录中。

             hadoop fs -mkdir -p /mymapreduce7/in 

             hadoop fs -put /data/mapreduce7/buyer1 /mymapreduce7/in

    1. IDEA中编写Java代码
    2. package mapreduce4;
      import java.io.IOException;
      import java.util.Iterator;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      public class DanJoin {
          public static class Map extends Mapper<Object,Text,Text,Text>{
              public void map(Object key,Text value,Context context)
                      throws IOException,InterruptedException{
                  String line = value.toString();
                  String[] arr = line.split(",");
                  String mapkey=arr[0];
                  String mapvalue=arr[1];
                  String relationtype=new String();
                  relationtype="1";
                  context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));
                  //System.out.println(relationtype+"+"+mapvalue);
                  relationtype="2";
                  context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));
                  //System.out.println(relationtype+"+"+mapvalue);
              }
          }
          public static class Reduce extends Reducer<Text, Text, Text, Text>{
              public void reduce(Text key,Iterable<Text> values,Context context)
                      throws IOException,InterruptedException{
                  int buyernum=0;
                  String[] buyer=new String[20];
                  int friendsnum=0;
                  String[] friends=new String[20];
                  Iterator ite=values.iterator();
                  while(ite.hasNext()){
                      String record=ite.next().toString();
                      int len=record.length();
                      int i=2;
                      if(0==len){
                          continue;
                      }
                      char relationtype=record.charAt(0);
                      if('1'==relationtype){
                          buyer [buyernum]=record.substring(i);
                          buyernum++;
                      }
                      if('2'==relationtype){
                          friends[friendsnum]=record.substring(i);
                          friendsnum++;
                      }
                  }
                  if(0!=buyernum&&0!=friendsnum){
                      for(int m=0;m<buyernum;m++){
                          for(int n=0;n<friendsnum;n++){
                              if(buyer[m]!=friends[n]){
                                  context.write(new Text(buyer[m]),new Text(friends[n]));
                              }
                          }
                      }
                  }
              }
          }
          public static void main(String[] args) throws Exception{

              Configuration conf=new Configuration();
              String[] otherArgs=new String[2];
              otherArgs[0]="hdfs://192.168.149.10:9000/mymapreduce7/in/buyer1";
              otherArgs[1]="hdfs://192.168.149.10:9000/mymapreduce7/out";
              Job job=new Job(conf," Table join");
              job.setJarByClass(DanJoin.class);
              job.setMapperClass(Map.class);
              job.setReducerClass(Reduce.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(Text.class);
              FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
              FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
              System.exit(job.waitForCompletion(true)?0:1);

          }
      }
    1. 将hadoop2lib目录中的jar包,拷贝到hadoop2lib目录下。
    2. 拷贝log4j.properties文件
    3. 运行结果

     

     

  • 相关阅读:
    项目测试与部署
    使用技术及部分代码截选
    校园电子设备报修回收系统需求分析
    java lambda expression
    Domain logic approaches
    Spring AOP Capabilities ang goals
    CDI Features
    JAVA DESIGN PATTERN
    LDAP & Implementation
    spring ref &history&design philosophy
  • 原文地址:https://www.cnblogs.com/dty602511/p/15577120.html
Copyright © 2011-2022 走看看