zoukankan      html  css  js  c++  java
  • 2021.11.20 MapReduce实验

    一、今日学习内容

      单表join

    04Mapreduce实例——单表join

    实验目的

    1.准确理解MapReduce单表连接的设计原理

    2.熟练掌握MapReduce单表连接程序的编写

    3.了解单表连接的运用场景

    4.学会编写MapReduce单表连接程序代码解决问题

    实验原理

    以本实验的buyer1(buyer_id,friends_id)表为例来阐述单表连接的实验原理。单表连接,连接的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一个表。因此,在map阶段将读入数据分割成buyer_idfriends_id之后,会将buyer_id设置成keyfriends_id设置成value,直接输出并将其作为左表;再将同一对buyer_idfriends_id中的friends_id设置成keybuyer_id设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在valueString最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个keyvalue-list就包含了"buyer_idfriends_id--friends_idbuyer_id"关系。取出每个keyvalue-list进行解析,将左表中的buyer_id放入一个数组,右表中的friends_id放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。

    实验环境

    Linux Ubuntu 14.04

    jdk-7u75-linux-x64

    hadoop-2.6.0-cdh5.4.5

    hadoop-2.6.0-eclipse-cdh5.4.5.jar

    eclipse-java-juno-SR2-linux-gtk-x86_64

     

    实验内容

    现有某电商的用户好友数据文件,名为 buyer1buyer1中包含(buyer_id,friends_id)两个字段,内容是以"\t"分隔,编写MapReduce进行单表连接,查询出用户的间接好友关系。例如:10001的好友是10002,而10002的好友是10005,那么1000110005就是间接好友关系。

    buyer1(buyer_id,friends_id)

    1. 10001   10002  
    2. 10002   10005  
    3. 10003   10002  
    4. 10004   10006  
    5. 10005   10007  
    6. 10006   10022  
    7. 10007   10032  
    8. 10009   10006  
    9. 10010   10005  
    10. 10011   10013  

    统计结果数据如下:

    1. 好友id  用户id  
    2. 10005   10001  
    3. 10005   10003  
    4. 10007   10010  
    5. 10007   10002  
    6. 10022   10004  
    7. 10022   10009  
    8. 10032   10005  

    实验步骤

    1.切换到/apps/hadoop/sbin目录下,开启hadoop

    1. cd /apps/hadoop/sbin  
    2. ./start-all.sh  

    2.Linux本地新建/data/mapreduce7目录。

    1. mkdir -p /data/mapreduce7  

    3.Linux中切换到/data/mapreduce7目录下,用wget命令从http://192.168.1.100:60000/allfiles/mapreduce7/buyer1网址上下载文本文件buyer1

    1. cd /data/mapreduce7  
    2. wget http://192.168.1.100:60000/allfiles/mapreduce7/buyer1  

    然后在当前目录下用wget命令从http://192.168.1.100:60000/allfiles/mapreduce7/hadoop2lib.tar.gz网址上下载项目用到的依赖包。

    1. wget http://192.168.1.100:60000/allfiles/mapreduce7/hadoop2lib.tar.gz  

    hadoop2lib.tar.gz解压到当前目录下。

    1. tar zxvf hadoop2lib.tar.gz  

    4.首先在hdfs上新建/mymapreduce7/in目录,然后将Linux本地/data/mapreduce7目录下的buyer1文件导入到hdfs/mymapreduce7/in目录中。

    1. hadoop fs -mkdir -p /mymapreduce7/in  
    2. hadoop fs -put /data/mapreduce7/buyer1 /mymapreduce7/in  

    5.新建maven项目,项目名为mapreduce7

    导入依赖

    6.编写Java代码,并描述其设计思路

    Map代码

    1. public static class Map extends Mapper<Object,Text,Text,Text>{  
    2. //实现map函数  
    3. public void map(Object key,Text value,Context context)  
    4. throws IOException,InterruptedException{  
    5. String line = value.toString();  
    6. String[] arr = line.split("\t");   //按行截取  
    7. String mapkey=arr[0];  
    8. String mapvalue=arr[1];  
    9. String relationtype=new String();  //左右表标识  
    10. relationtype="1";  //输出左表  
    11. context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));  
    12. //System.out.println(relationtype+"+"+mapvalue);  
    13. relationtype="2";  //输出右表  
    14. context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));  
    15. //System.out.println(relationtype+"+"+mapvalue);  
    16. }  
    17. }  

    Map处理的是一个纯文本文件,Mapper处理的数据是由InputFormat将数据集切分成小的数据集InputSplit,并用RecordReader解析成<key/value>对提供给map函数使用。map函数中用split("\t")方法把每行数据进行截取,并把数据存入到数组arr[],把arr[0]赋值给mapkeyarr[1]赋值给mapvalue。用两个contextwrite()方法把数据输出两份,再通过标识符relationtype12对两份输出数据的value打标记。

    Reduce代码

    1. public static class Reduce extends Reducer<Text, Text, Text, Text>{  
    2. //实现reduce函数  
    3. public void reduce(Text key,Iterable<Text> values,Context context)  
    4. throws IOException,InterruptedException{  
    5. int buyernum=0;  
    6. String[] buyer=new String[20];  
    7. int friendsnum=0;  
    8. String[] friends=new String[20];  
    9. Iterator ite=values.iterator();  
    10. while(ite.hasNext()){  
    11. String record=ite.next().toString();  
    12. int len=record.length();  
    13. int i=2;  
    14. if(0==len){  
    15. continue;  
    16. }  
    17. //取得左右表标识  
    18. char relationtype=record.charAt(0);  
    19. //取出record,放入buyer  
    20. if('1'==relationtype){  
    21. buyer [buyernum]=record.substring(i);  
    22. buyernum++;  
    23. }  
    24. //取出record,放入friends  
    25. if('2'==relationtype){  
    26. friends[friensnum]=record.substring(i);  
    27. friendsnum++;  
    28. }  
    29. }  
    30. buyernumfriendsnum数组求笛卡尔积  
    31. if(0!=buyernum&&0!=friendsnum){  
    32. for(int m=0;m<buyernum;m++){  
    33. for(int n=0;n<friendsnum;n++){  
    34. if(buyer[m]!=friends[n]){  
    35. //输出结果  
    36. context.write(new Text(buyer[m]),new Text(frinds[n]));  
    37. }  
    38. }  
    39. }  
    40. }  
    41. }  

    reduce端在接收map端传来的数据时已经把相同key的所有value都放到一个Iterator容器中valuesreduce函数中,首先新建两数组buyer[]friends[]用来存放map端的两份输出数据。然后Iterator迭代中hasNext()Next()方法加while循环遍历输出values的值并赋值给record,用charAt(0)方法获取record第一个字符赋值给relationtype,用if判断如果relationtype1则把用substring(2)方法从下标为2开始截取record将其存放到buyer[]中,如果relationtype2时将截取的数据放到frindes[]数组中。然后用三个for循环嵌套遍历输出<key,value>,其中key=buyer[m]value=friends[n]

    完整代码

    package mapreduce;  
    import java.io.IOException;  
    import java.util.Iterator;  
    import org.apache.hadoop.conf.Configuration;  
    import org.apache.hadoop.fs.Path;  
    import org.apache.hadoop.io.Text;  
    import org.apache.hadoop.mapreduce.Job;  
    import org.apache.hadoop.mapreduce.Mapper;  
    import org.apache.hadoop.mapreduce.Reducer;  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    public class DanJoin {  
    public static class Map extends Mapper<Object,Text,Text,Text>{  
    public void map(Object key,Text value,Context context)  
    throws IOException,InterruptedException{  
    String line = value.toString();  
    String[] arr = line.split("\t");  
    String mapkey=arr[0];  
    String mapvalue=arr[1];  
    String relationtype=new String();  
    relationtype="1";  
    context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));  
    //System.out.println(relationtype+"+"+mapvalue);  
    relationtype="2";  
    context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));  
    //System.out.println(relationtype+"+"+mapvalue);  
    }  
    }  
    public static class Reduce extends Reducer<Text, Text, Text, Text>{  
    public void reduce(Text key,Iterable<Text> values,Context context)  
    throws IOException,InterruptedException{  
    int buyernum=0;  
    String[] buyer=new String[20];  
    int friendsnum=0;  
    String[] friends=new String[20];  
    Iterator ite=values.iterator();  
    while(ite.hasNext()){  
    String record=ite.next().toString();  
    int len=record.length();  
    int i=2;  
    if(0==len){  
    continue;  
    }  
    char relationtype=record.charAt(0);  
    if('1'==relationtype){  
    buyer [buyernum]=record.substring(i);  
    buyernum++;  
    }  
    if('2'==relationtype){  
    friends[friendsnum]=record.substring(i);  
    friendsnum++;  
    }  
    }  
    if(0!=buyernum&&0!=friendsnum){  
    for(int m=0;m<buyernum;m++){  
    for(int n=0;n<friendsnum;n++){  
    if(buyer[m]!=friends[n]){  
    context.write(new Text(buyer[m]),new Text(friends[n]));  
    }  
    }  
    }  
    }  
    }  
    }  
    public static void main(String[] args) throws Exception{  
    Configuration conf=new Configuration();  
    String[] otherArgs=new String[2];  
    otherArgs[0]="hdfs://localhost:9000/mymapreduce7/in/buyer1";  
    otherArgs[1]="hdfs://localhost:9000/mymapreduce7/out";  
    Job job=new Job(conf," Table join");  
    job.setJarByClass(DanJoin.class);  
    job.setMapperClass(Map.class);  
    job.setReducerClass(Reduce.class);  
    job.setOutputKeyClass(Text.class);  
    job.setOutputValueClass(Text.class);  
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
    System.exit(job.waitForCompletion(true)?0:1);  
    }  
    }  

    7.待执行完毕后,进入命令模式下,在hdfs上从Java代码指定的输出路径中查看实验结果。

    1. hadoop fs -ls /mymapreduce7/out  
    2. hadoop fs -cat /mymapreduce7/out/part-r-00000  

     

     

     

  • 相关阅读:
    HDU 6182 A Math Problem 水题
    HDU 6186 CS Course 位运算 思维
    HDU 6188 Duizi and Shunzi 贪心 思维
    HDU 2824 The Euler function 欧拉函数
    HDU 3037 Saving Beans 多重集合的结合 lucas定理
    HDU 3923 Invoker Polya定理
    FZU 2282 Wand 组合数学 错排公式
    HDU 1452 Happy 2004 数论
    HDU 5778 abs 数论
    欧拉回路【判断连通+度数为偶】
  • 原文地址:https://www.cnblogs.com/wmdww/p/15580839.html
Copyright © 2011-2022 走看看