zoukankan      html  css  js  c++  java
  • hadoop实验

    实验原理

    以本实验的buyer1(buyer_id,friends_id)表为例来阐述单表连接的实验原理。单表连接,连接的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一个表。因此,在map阶段将读入数据分割成buyer_idfriends_id之后,会将buyer_id设置成keyfriends_id设置成value,直接输出并将其作为左表;再将同一对buyer_idfriends_id中的friends_id设置成keybuyer_id设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在valueString最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个keyvalue-list就包含了"buyer_idfriends_id--friends_idbuyer_id"关系。取出每个keyvalue-list进行解析,将左表中的buyer_id放入一个数组,右表中的friends_id放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。

    Map代码
    1.public static class Map extends Mapper<Object,Text,Text,Text>{  
    2.   //实现map函数  
    3.public void map(Object key,Text value,Context context)  
    4.                throws IOException,InterruptedException{  
    5.                String line = value.toString();  
    6.                String[] arr = line.split("	");   //按行截取  
    7.                String mapkey=arr[0];  
    8.                String mapvalue=arr[1];  
    9.                String relationtype=new String();  //左右表标识  
    10.                relationtype="1";  //输出左表  
    11.                context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));  
    12.                //System.out.println(relationtype+"+"+mapvalue);  
    13.                relationtype="2";  //输出右表  
    14.                context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));  
    15.                //System.out.println(relationtype+"+"+mapvalue);  
    16.  
    17.        }  
    18.    }  
    Map处理的是一个纯文本文件,Mapper处理的数据是由InputFormat将数据集切分成小的数据集InputSplit,并用RecordReader解析成<key/value>对提供给map函数使用。map函数中用split("	")方法把每行数据进行截取,并把数据存入到数组arr[],把arr[0]赋值给mapkey,arr[1]赋值给mapvalue。用两个context的write()方法把数据输出两份,再通过标识符relationtype为1或2对两份输出数据的value打标记。
    Reduce代码
    1.public static class Reduce extends Reducer<Text, Text, Text, Text>{  
    2//实现reduce函数  
    3.public void reduce(Text key,Iterable<Text> values,Context context)  
    4.    throws IOException,InterruptedException{  
    5.    int buyernum=0;  
    6.    String[] buyer=new String[20];  
    7.    int friendsnum=0;  
    8.    String[] friends=new String[20];  
    9.    Iterator ite=values.iterator();  
    10.    while(ite.hasNext()){  
    11.    String record=ite.next().toString();  
    12.    int len=record.length();  
    13.    int i=2;  
    14.    if(0==len){  
    15.    continue;  
    16.    }  
    17.    //取得左右表标识  
    18.    char relationtype=record.charAt(0);  
    19.    //取出record,放入buyer  
    20.    if('1'==relationtype){  
    21.    buyer [buyernum]=record.substring(i);  
    22.    buyernum++;  
    23.    }  
    24.    //取出record,放入friends  
    25.    if('2'==relationtype){  
    26.    friends[friensnum]=record.substring(i);  
    27.    friendsnum++;  
    28.    }  
    29.    }  
    30.    buyernum和friendsnum数组求笛卡尔积  
    31.    if(0!=buyernum&&0!=friendsnum){  
    32.    for(int m=0;m<buyernum;m++){  
    33.    for(int n=0;n<friendsnum;n++){  
    34.    if(buyer[m]!=friends[n]){  
    35.    //输出结果  
    36.    context.write(new Text(buyer[m]),new Text(frinds[n]));  
    37.    }  
    38.    }  
    39.    }  
    40.    }  
    41.    }  
    reduce端在接收map端传来的数据时已经把相同key的所有value都放到一个Iterator容器中values。reduce函数中,首先新建两数组buyer[]和friends[]用来存放map端的两份输出数据。然后Iterator迭代中hasNext()和Next()方法加while循环遍历输出values的值并赋值给record,用charAt(0)方法获取record第一个字符赋值给relationtype,用if判断如果relationtype为1则把用substring(2)方法从下标为2开始截取record将其存放到buyer[]中,如果relationtype为2时将截取的数据放到frindes[]数组中。然后用三个for循环嵌套遍历输出<key,value>,其中key=buyer[m],value=friends[n]。
    完整代码
    1.package mapreduce;  
    2.import java.io.IOException;  
    3.import java.util.Iterator;  
    4.import org.apache.hadoop.conf.Configuration;  
    5.import org.apache.hadoop.fs.Path;  
    6.import org.apache.hadoop.io.Text;  
    7.import org.apache.hadoop.mapreduce.Job;  
    8.import org.apache.hadoop.mapreduce.Mapper;  
    9.import org.apache.hadoop.mapreduce.Reducer;  
    10.import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    11.import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    12.public class DanJoin {  
    13.    public static class Map extends Mapper<Object,Text,Text,Text>{  
    14.        public void map(Object key,Text value,Context context)  
    15.                throws IOException,InterruptedException{  
    16.                String line = value.toString();  
    17.                String[] arr = line.split("	");  
    18.                String mapkey=arr[0];  
    19.                String mapvalue=arr[1];  
    20.                String relationtype=new String();  
    21.                relationtype="1";  
    22.                context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));  
    23.                //System.out.println(relationtype+"+"+mapvalue);  
    24.                relationtype="2";  
    25.                context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));  
    26.                //System.out.println(relationtype+"+"+mapvalue);  
    27.        }  
    28.    }  
    29.    public static class Reduce extends Reducer<Text, Text, Text, Text>{  
    30.        public void reduce(Text key,Iterable<Text> values,Context context)  
    31.    throws IOException,InterruptedException{  
    32.    int buyernum=0;  
    33.    String[] buyer=new String[20];  
    34.    int friendsnum=0;  
    35.    String[] friends=new String[20];  
    36.    Iterator ite=values.iterator();  
    37.    while(ite.hasNext()){  
    38.    String record=ite.next().toString();  
    39.    int len=record.length();  
    40.    int i=2;  
    41.    if(0==len){  
    42.    continue;  
    43.    }  
    44.    char relationtype=record.charAt(0);  
    45.    if('1'==relationtype){  
    46.    buyer [buyernum]=record.substring(i);  
    47.    buyernum++;  
    48.    }  
    49.    if('2'==relationtype){  
    50.    friends[friendsnum]=record.substring(i);  
    51.    friendsnum++;  
    52.    }  
    53.    }  
    54.    if(0!=buyernum&&0!=friendsnum){  
    55.    for(int m=0;m<buyernum;m++){  
    56.    for(int n=0;n<friendsnum;n++){  
    57.    if(buyer[m]!=friends[n]){  
    58.    context.write(new Text(buyer[m]),new Text(friends[n]));  
    59.    }  
    60.    }  
    61.    }  
    62.    }  
    63.    }  
    64.    }  
    65.    public static void main(String[] args) throws Exception{  
    66.  
    67.    Configuration conf=new Configuration();  
    68.    String[] otherArgs=new String[2];  
    69.    otherArgs[0]="hdfs://localhost:9000/mymapreduce7/in/buyer1";  
    70.    otherArgs[1]="hdfs://localhost:9000/mymapreduce7/out";  
    71.    Job job=new Job(conf," Table join");  
    72.    job.setJarByClass(DanJoin.class);  
    73.    job.setMapperClass(Map.class);  
    74.    job.setReducerClass(Reduce.class);  
    75.    job.setOutputKeyClass(Text.class);  
    76.    job.setOutputValueClass(Text.class);  
    77.    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
    78.    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
    79.    System.exit(job.waitForCompletion(true)?0:1);  
    80.  
    81.    }  
    82.    }  
    View Code

     

  • 相关阅读:
    JavaScript -- 条件语句和循环语句
    xpath的|
    Pythonic
    4k图片爬取+中文乱码
    xpath-房价爬取
    (.*?)实验室
    模块的循环导入
    bs4-爬取小说
    糗图-图片爬取
    re实战记录
  • 原文地址:https://www.cnblogs.com/zhjvvvvvv/p/14206594.html
Copyright © 2011-2022 走看看