zoukankan      html  css  js  c++  java
  • 每日博客

    04Mapreduce实例——单表join

    实验目的

    1.准确理解MapReduce单表连接的设计原理

    2.熟练掌握MapReduce单表连接程序的编写

    3.了解单表连接的运用场景

    4.学会编写MapReduce单表连接程序代码解决问题

    实验原理

    以本实验的buyer1(buyer_id,friends_id)表为例来阐述单表连接的实验原理。单表连接,连接的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一个表。因此,在map阶段将读入数据分割成buyer_id和friends_id之后,会将buyer_id设置成key,friends_id设置成value,直接输出并将其作为左表;再将同一对buyer_id和friends_id中的friends_id设置成key,buyer_id设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个key的value-list就包含了"buyer_idfriends_id--friends_idbuyer_id"关系。取出每个key的value-list进行解析,将左表中的buyer_id放入一个数组,右表中的friends_id放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。

    实验环境

    Linux Ubuntu 14.04

    jdk-7u75-linux-x64

    hadoop-2.6.0-cdh5.4.5

    hadoop-2.6.0-eclipse-cdh5.4.5.jar

    eclipse-java-juno-SR2-linux-gtk-x86_64

     

    实验内容

    现有某电商的用户好友数据文件,名为 buyer1,buyer1中包含(buyer_id,friends_id)两个字段,内容是以"\t"分隔,编写MapReduce进行单表连接,查询出用户的间接好友关系。例如:10001的好友是10002,而10002的好友是10005,那么10001和10005就是间接好友关系。

    buyer1(buyer_id,friends_id)

    1. 10001   10002  
    2. 10002   10005  
    3. 10003   10002  
    4. 10004   10006  
    5. 10005   10007  
    6. 10006   10022  
    7. 10007   10032  
    8. 10009   10006  
    9. 10010   10005  
    10. 10011   10013  

    统计结果数据如下:

    1. 好友id  用户id  
    2. 10005   10001  
    3. 10005   10003  
    4. 10007   10010  
    5. 10007   10002  
    6. 10022   10004  
    7. 10022   10009  
    8. 10032   10005  

    实验步骤

    1.切换到/apps/hadoop/sbin目录下,开启hadoop。

    1. cd /apps/hadoop/sbin  
    2. ./start-all.sh  

    2.在Linux本地新建/data/mapreduce7目录。

    1. mkdir -p /data/mapreduce7  

    3.在Linux中切换到/data/mapreduce7目录下,用wget命令从http://192.168.1.100:60000/allfiles/mapreduce7/buyer1网址上下载文本文件buyer1。

    1. cd /data/mapreduce7  
    2. wget http://192.168.1.100:60000/allfiles/mapreduce7/buyer1  

    然后在当前目录下用wget命令从http://192.168.1.100:60000/allfiles/mapreduce7/hadoop2lib.tar.gz网址上下载项目用到的依赖包。

    1. wget http://192.168.1.100:60000/allfiles/mapreduce7/hadoop2lib.tar.gz  

    将hadoop2lib.tar.gz解压到当前目录下。

    1. tar zxvf hadoop2lib.tar.gz  

    4.首先在hdfs上新建/mymapreduce7/in目录,然后将Linux本地/data/mapreduce7目录下的buyer1文件导入到hdfs的/mymapreduce7/in目录中。

    1. hadoop fs -mkdir -p /mymapreduce7/in  
    2. hadoop fs -put /data/mapreduce7/buyer1 /mymapreduce7/in  

    5.新建Java Project项目,项目名为mapreduce7。

     

     

    在mapreduce7项目里新建包,包名为mapreduce。

     

     

    在mapreduce包下新建类,类名为DanJoin。

     

     

    6.添加项目所需依赖的jar包,右键单击mapreduce7,新建一个文件夹,用于存放项目所需的jar包。

     

    将/data/mapreduce7目录下,hadoop2lib目录中的jar包,拷贝到eclipse中mapreduce7项目的hadoop2lib目录下。

     

    选中所有项目hadoop2lib目录下所有jar包,并添加到Build Path中。

     

    7.编写Java代码,并描述其设计思路

    Map代码

    1. public static class Map extends Mapper<Object,Text,Text,Text>{  
    2.    //实现map函数  
    3. public void map(Object key,Text value,Context context)  
    4.                 throws IOException,InterruptedException{  
    5.                 String line = value.toString();  
    6.                 String[] arr = line.split("\t");   //按行截取  
    7.                 String mapkey=arr[0];  
    8.                 String mapvalue=arr[1];  
    9.                 String relationtype=new String();  //左右表标识  
    10.                 relationtype="1";  //输出左表  
    11.                 context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));  
    12.                 //System.out.println(relationtype+"+"+mapvalue);  
    13.                 relationtype="2";  //输出右表  
    14.                 context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));  
    15.                 //System.out.println(relationtype+"+"+mapvalue);  
    16.   
    17.         }  
    18.     }  

    Map处理的是一个纯文本文件,Mapper处理的数据是由InputFormat将数据集切分成小的数据集InputSplit,并用RecordReader解析成<key/value>对提供给map函数使用。map函数中用split("\t")方法把每行数据进行截取,并把数据存入到数组arr[],把arr[0]赋值给mapkey,arr[1]赋值给mapvalue。用两个context的write()方法把数据输出两份,再通过标识符relationtype为1或2对两份输出数据的value打标记。

    Reduce代码

    1. public static class Reduce extends Reducer<Text, Text, Text, Text>{  
    2.  //实现reduce函数  
    3. public void reduce(Text key,Iterable<Text> values,Context context)  
    4.     throws IOException,InterruptedException{  
    5.     int buyernum=0;  
    6.     String[] buyer=new String[20];  
    7.     int friendsnum=0;  
    8.     String[] friends=new String[20];  
    9.     Iterator ite=values.iterator();  
    10.     while(ite.hasNext()){  
    11.     String record=ite.next().toString();  
    12.     int len=record.length();  
    13.     int i=2;  
    14.     if(0==len){  
    15.     continue;  
    16.     }  
    17.     //取得左右表标识  
    18.     char relationtype=record.charAt(0);  
    19.     //取出record,放入buyer  
    20.     if('1'==relationtype){  
    21.     buyer [buyernum]=record.substring(i);  
    22.     buyernum++;  
    23.     }  
    24.     //取出record,放入friends  
    25.     if('2'==relationtype){  
    26.     friends[friensnum]=record.substring(i);  
    27.     friendsnum++;  
    28.     }  
    29.     }  
    30.     buyernum和friendsnum数组求笛卡尔积  
    31.     if(0!=buyernum&&0!=friendsnum){  
    32.     for(int m=0;m<buyernum;m++){  
    33.     for(int n=0;n<friendsnum;n++){  
    34.     if(buyer[m]!=friends[n]){  
    35.     //输出结果  
    36.     context.write(new Text(buyer[m]),new Text(frinds[n]));  
    37.     }  
    38.     }  
    39.     }  
    40.     }  
    41.     }  

    reduce端在接收map端传来的数据时已经把相同key的所有value都放到一个Iterator容器中values。reduce函数中,首先新建两数组buyer[]和friends[]用来存放map端的两份输出数据。然后Iterator迭代中hasNext()和Next()方法加while循环遍历输出values的值并赋值给record,用charAt(0)方法获取record第一个字符赋值给relationtype,用if判断如果relationtype为1则把用substring(2)方法从下标为2开始截取record将其存放到buyer[]中,如果relationtype为2时将截取的数据放到frindes[]数组中。然后用三个for循环嵌套遍历输出<key,value>,其中key=buyer[m],value=friends[n]。

    完整代码

    1. package mapreduce;  
    2. import java.io.IOException;  
    3. import java.util.Iterator;  
    4. import org.apache.hadoop.conf.Configuration;  
    5. import org.apache.hadoop.fs.Path;  
    6. import org.apache.hadoop.io.Text;  
    7. import org.apache.hadoop.mapreduce.Job;  
    8. import org.apache.hadoop.mapreduce.Mapper;  
    9. import org.apache.hadoop.mapreduce.Reducer;  
    10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    12. public class DanJoin {  
    13.     public static class Map extends Mapper<Object,Text,Text,Text>{  
    14.         public void map(Object key,Text value,Context context)  
    15.                 throws IOException,InterruptedException{  
    16.                 String line = value.toString();  
    17.                 String[] arr = line.split("\t");  
    18.                 String mapkey=arr[0];  
    19.                 String mapvalue=arr[1];  
    20.                 String relationtype=new String();  
    21.                 relationtype="1";  
    22.                 context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));  
    23.                 //System.out.println(relationtype+"+"+mapvalue);  
    24.                 relationtype="2";  
    25.                 context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));  
    26.                 //System.out.println(relationtype+"+"+mapvalue);  
    27.         }  
    28.     }  
    29.     public static class Reduce extends Reducer<Text, Text, Text, Text>{  
    30.         public void reduce(Text key,Iterable<Text> values,Context context)  
    31.     throws IOException,InterruptedException{  
    32.     int buyernum=0;  
    33.     String[] buyer=new String[20];  
    34.     int friendsnum=0;  
    35.     String[] friends=new String[20];  
    36.     Iterator ite=values.iterator();  
    37.     while(ite.hasNext()){  
    38.     String record=ite.next().toString();  
    39.     int len=record.length();  
    40.     int i=2;  
    41.     if(0==len){  
    42.     continue;  
    43.     }  
    44.     char relationtype=record.charAt(0);  
    45.     if('1'==relationtype){  
    46.     buyer [buyernum]=record.substring(i);  
    47.     buyernum++;  
    48.     }  
    49.     if('2'==relationtype){  
    50.     friends[friendsnum]=record.substring(i);  
    51.     friendsnum++;  
    52.     }  
    53.     }  
    54.     if(0!=buyernum&&0!=friendsnum){  
    55.     for(int m=0;m<buyernum;m++){  
    56.     for(int n=0;n<friendsnum;n++){  
    57.     if(buyer[m]!=friends[n]){  
    58.     context.write(new Text(buyer[m]),new Text(friends[n]));  
    59.     }  
    60.     }  
    61.     }  
    62.     }  
    63.     }  
    64.     }  
    65.     public static void main(String[] args) throws Exception{  
    66.   
    67.     Configuration conf=new Configuration();  
    68.     String[] otherArgs=new String[2];  
    69.     otherArgs[0]="hdfs://localhost:9000/mymapreduce7/in/buyer1";  
    70.     otherArgs[1]="hdfs://localhost:9000/mymapreduce7/out";  
    71.     Job job=new Job(conf," Table join");  
    72.     job.setJarByClass(DanJoin.class);  
    73.     job.setMapperClass(Map.class);  
    74.     job.setReducerClass(Reduce.class);  
    75.     job.setOutputKeyClass(Text.class);  
    76.     job.setOutputValueClass(Text.class);  
    77.     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
    78.     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
    79.     System.exit(job.waitForCompletion(true)?0:1);  
    80.   
    81.     }  
    82.     }  

    8.在DanJoin类文件中,右键并点击=>Run As=>Run on Hadoop选项,将MapReduce任务提交到Hadoop中。

    9.待执行完毕后,进入命令模式下,在hdfs上从Java代码指定的输出路径中查看实验结果。

    1. hadoop fs -ls /mymapreduce7/out  
    2. hadoop fs -cat /mymapreduce7/out/part-r-00000  

     

     

  • 相关阅读:
    反转链表 16
    CodeForces 701A Cards
    hdu 1087 Super Jumping! Jumping! Jumping!(动态规划)
    hdu 1241 Oil Deposits(水一发,自我的DFS)
    CodeForces 703B(容斥定理)
    poj 1067 取石子游戏(威佐夫博奕(Wythoff Game))
    ACM 马拦过河卒(动态规划)
    hdu 1005 Number Sequence
    51nod 1170 1770 数数字(数学技巧)
    hdu 2160 母猪的故事(睡前随机水一发)(斐波那契数列)
  • 原文地址:https://www.cnblogs.com/hfy717/p/15563532.html
Copyright © 2011-2022 走看看