zoukankan      html  css  js  c++  java
  • MapReduce之Reduce Join

    一 介绍

    Reduce Join其主要思想如下:
     在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag), 比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积),即:reduce阶段进行实际的连接操作。

    在这个例子中我们假设有两个数据文件如下:
    存储客户信息的文件:customers.csv

    1,stephaie leung,555-555-5555
    2,edward kim,123-456-7890
    3,jose madriz,281-330-8004
    4,david storkk,408-55-0000
    

    存储订单信息的文件:orders.csv

    3,A,12.95,02-Jun-2008
    1,B,88.25,20-May-2008
    2,C,32.00,30-Nov-2007
    3,D,25.02,22-Jan-2009

    要求最终的输出结果为:

    1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008
    2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007
    3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008
    3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009

    二 代码部分

    自定义数据类型:用于对不同文件数据打标签

     1 package mapreduce.reducejoin;
     2 
     3 import java.io.DataInput;
     4 import java.io.DataOutput;
     5 import java.io.IOException;
     6 import org.apache.hadoop.io.Writable;
     7 
     8 public class DataJoinWritable implements Writable {
     9 
    10     // mark ,customer / order
    11     private String tag;
    12 
    13     // info
    14     private String data;
    15 
    16     public DataJoinWritable() {
    17 
    18     }
    19 
    20     public DataJoinWritable(String tag, String data) {
    21         this.set(tag, data);
    22     }
    23 
    24     public void set(String tag, String data) {
    25         this.setTag(tag);
    26         this.setData(data);
    27     }
    28 
    29     public String getTag() {
    30         return tag;
    31     }
    32 
    33     public void setTag(String tag) {
    34         this.tag = tag;
    35     }
    36 
    37     public String getData() {
    38         return data;
    39     }
    40 
    41     public void setData(String data) {
    42         this.data = data;
    43     }
    44 
    45     public void write(DataOutput out) throws IOException {
    46         out.writeUTF(this.getTag());
    47         out.writeUTF(this.getData());
    48     }
    49 
    50     public void readFields(DataInput in) throws IOException {
    51         this.setTag(in.readUTF());
    52         this.setData(in.readUTF());
    53     }
    54 
    55     @Override
    56     public int hashCode() {
    57         final int prime = 31;
    58         int result = 1;
    59         result = prime * result + ((data == null) ? 0 : data.hashCode());
    60         result = prime * result + ((tag == null) ? 0 : tag.hashCode());
    61         return result;
    62     }
    63 
    64     @Override
    65     public boolean equals(Object obj) {
    66         if (this == obj)
    67             return true;
    68         if (obj == null)
    69             return false;
    70         if (getClass() != obj.getClass())
    71             return false;
    72         DataJoinWritable other = (DataJoinWritable) obj;
    73         if (data == null) {
    74             if (other.data != null)
    75                 return false;
    76         } else if (!data.equals(other.data))
    77             return false;
    78         if (tag == null) {
    79             if (other.tag != null)
    80                 return false;
    81         } else if (!tag.equals(other.tag))
    82             return false;
    83         return true;
    84     }
    85 
    86     @Override
    87     public String toString() {
    88         return tag + "," + data;
    89     }
    90 }

    MapReduce代码部分

      1 package mapreduce.reducejoin;
      2 
      3 import java.io.IOException;
      4 import java.util.ArrayList;
      5 import java.util.List;
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.conf.Configured;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.LongWritable;
     10 import org.apache.hadoop.io.NullWritable;
     11 import org.apache.hadoop.io.Text;
     12 import org.apache.hadoop.mapreduce.Job;
     13 import org.apache.hadoop.mapreduce.Mapper;
     14 import org.apache.hadoop.mapreduce.Reducer;
     15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     17 import org.apache.hadoop.util.Tool;
     18 import org.apache.hadoop.util.ToolRunner;
     19 
     20 public class DataJoinMapReduce extends Configured implements Tool {
     21 
     22     // step 1: Mapper
     23     public static class DataJoinMapper extends
     24             Mapper<LongWritable, Text, LongWritable, DataJoinWritable> {
     25 
     26         // map output key
     27         private LongWritable mapOutputKey = new LongWritable();
     28 
     29         // map output value
     30         private DataJoinWritable mapOutputValue = new DataJoinWritable();
     31 
     32         @Override
     33         public void setup(Context context) throws IOException,
     34                 InterruptedException {
     35         }
     36 
     37         @Override
     38         public void map(LongWritable key, Text value, Context context)
     39                 throws IOException, InterruptedException {
     40 
     41             // line value
     42             String lineValue = value.toString();
     43 
     44             // split
     45             String[] vals = lineValue.split(",");
     46 
     47             int length = vals.length;
     48 
     49             if ((3 != length) && (4 != length)) {
     50                 return;
     51             }
     52 
     53             // get cid
     54             Long cid = Long.valueOf(vals[0]);
     55 
     56             // get name
     57             String name = vals[1];
     58 
     59             // set customer
     60             if (3 == length) {
     61                 String phone = vals[2];
     62 
     63                 // set
     64                 mapOutputKey.set(cid);
     65                 mapOutputValue.set("customer", name + "," + phone);
     66             }
     67 
     68             // set order
     69             if (4 == length) {
     70                 String price = vals[2];
     71                 String date = vals[3];
     72 
     73                 // set
     74                 mapOutputKey.set(cid);
     75                 mapOutputValue.set("order", name + "," + price + "," + date);
     76             }
     77 
     78             // output
     79             context.write(mapOutputKey, mapOutputValue);
     80 
     81         }
     82 
     83         @Override
     84         public void cleanup(Context context) throws IOException,
     85                 InterruptedException {
     86         }
     87     }
     88 
     89     // step 2: Reducer
     90     public static class DataJoinReducer extends
     91             Reducer<LongWritable, DataJoinWritable, NullWritable, Text> {
     92 
     93         private Text outputValue = new Text();
     94 
     95         @Override
     96         protected void setup(Context context) throws IOException,
     97                 InterruptedException {
     98         }
     99 
    100         @Override
    101         protected void reduce(LongWritable key,
    102                 Iterable<DataJoinWritable> values, Context context)
    103                 throws IOException, InterruptedException {
    104             String customerInfo = null;
    105             List<String> orderList = new ArrayList<String>();
    106 
    107             for (DataJoinWritable value : values) {
    108                 if ("customer".equals(value.getTag())) {
    109                     customerInfo = value.getData();
    110                 } else if ("order".equals(value.getTag())) {
    111                     orderList.add(value.getData());
    112                 }
    113             }
    114 
    115             // output
    116             for (String order : orderList) {
    117 
    118                 // ser outout value
    119                 outputValue.set(key.get() + "," + customerInfo + "," + order);
    120 
    121                 // output
    122                 context.write(NullWritable.get(), outputValue);
    123             }
    124         }
    125 
    126         @Override
    127         protected void cleanup(Context context) throws IOException,
    128                 InterruptedException {
    129         }
    130     }
    131 
    132     /**
    133      * Execute the command with the given arguments.
    134      * 
    135      * @param args
    136      *            command specific arguments.
    137      * @return exit code.
    138      * @throws Exception
    139      */
    140 
    141     // step 3: Driver
    142     public int run(String[] args) throws Exception {
    143 
    144         Configuration configuration = this.getConf();
    145         
    146         // set job
    147         Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
    148         job.setJarByClass(DataJoinMapReduce.class);
    149         
    150         // input
    151         Path inpath = new Path(args[0]);
    152         FileInputFormat.addInputPath(job, inpath);
    153 
    154         // output
    155         Path outPath = new Path(args[1]);
    156         FileOutputFormat.setOutputPath(job, outPath);
    157 
    158         // Mapper
    159         job.setMapperClass(DataJoinMapper.class);
    160         job.setMapOutputKeyClass(LongWritable.class);
    161         job.setMapOutputValueClass(DataJoinWritable.class);
    162 
    163         // Reducer
    164         job.setReducerClass(DataJoinReducer.class);
    165         job.setOutputKeyClass(NullWritable.class);
    166         job.setOutputValueClass(Text.class);
    167 
    168         // submit job -> YARN
    169         boolean isSuccess = job.waitForCompletion(true);
    170         return isSuccess ? 0 : 1;
    171 
    172     }
    173 
    174     public static void main(String[] args) throws Exception {
    175 
    176         Configuration configuration = new Configuration();
    177 
    178         args = new String[] {
    179                 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/input/reducejoin",
    180                 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/output" };
    181 
    182         // run job
    183         int status = ToolRunner.run(configuration, new DataJoinMapReduce(),
    184                 args);
    185 
    186         // exit program
    187         System.exit(status);
    188     }
    189 }

    执行代码后查询结果

    [hadoop@beifeng01 hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -text /user/beifeng01/mapreduce/output/p*
    1,stephaie leung,555-555-5555,B,88.25,20-May-2008
    2,edward kim,123-456-7890,C,32.00,30-Nov-2007
    3,jose madriz,281-330-8004,D,25.02,22-Jan-2009
    3,jose madriz,281-330-8004,A,12.95,02-Jun-2008
  • 相关阅读:
    HTTPHelper
    C# 捕获全局亲测可用
    C# Excel的读写
    C# combox 绑定数据
    MySQL在CenterOS和Ubuntu的安装
    VMware虚拟机里的Centos7的IP
    Docker安装和部署
    linux安装git方法(转)
    mysql安装最后一步 Apply Security Settings 出错
    Linux下安装MySQL
  • 原文地址:https://www.cnblogs.com/perfectdata/p/10125120.html
Copyright © 2011-2022 走看看