zoukankan      html  css  js  c++  java
  • DataJoin: Reduceside join

    首先三个概念:

    1. A data source is analogous to a table in relational databases.( a single file or multiple files)
    2. tag:Tagging the record will ensure that specifi c metadata will always go along with the record.
    3. group key: like a join key in a relational database

      两个数据源data sources:

          Customers                  Orders
          1,Stephanie Leung,555-555-5555      3,A,12.95,02-Jun-2008
          2,Edward Kim,123-456-7890         1,B,88.25,20-May-2008
          3,Jose Madriz,281-330-8004         2,C,32.00,30-Nov-2007
          4,David Stork,408-555-0000          3,D,25.02,22-Jan-2009

     

    map的目的是给每数据源中的每条记录打包封装,封装成上面那样。

    MapClass继承DataJoinMapperBase(已经实现map()方法),我们需实现三个方法:

    1. Text generateInputTag(String inputFile);  //此方法在configre()中被调用,返回给DataJoinMapperBase的变量inputTag
    2. Text generateGroupKey(TaggedMapOutput aRecord);
    3. TaggedMapOutput generateTaggedMapOutput(Object value);

      在map()中调用第2,3方法实现对每条记录的封装并输出给reduce(). 如下图:

     

    Reduce继承DataJoinReduceBase(已经实现reduce()方法), 我们需实现combine()方法.

    reduce()对接收到的记录(相同groupKey的多个TaggedMapOutput数据)进行cross-product,并对每个cross-product结果调用combine().

    combine()需自行实现,在combine()中做具体的join工作.

    代码如下:

    需导入jar文件contrib\datajoin\hadoop-datajoin-*.jar

    需使用老的API

    public class DataJoin extends Configured implements Tool {

        public static class MapClass extends DataJoinMapperBase {   

            protected Text generateInputTag(String inputFile) {

                String datasource = inputFile.split("-")[0];

                return new Text(datasource);

            }   

         protected Text generateGroupKey(TaggedMapOutput aRecord) {

                String line = ((Text) aRecord.getData()).toString();

                String[] tokens = line.split(",");

                String groupKey = tokens[0];

                return new Text(groupKey);

          }

            protected TaggedMapOutput generateTaggedMapOutput(Object value) {

                TaggedWritable retv = new TaggedWritable((Text) value);

                retv.setTag(this.inputTag);

                return retv;

            }

        }

       

        public static class Reduce extends DataJoinReducerBase {   

            protected TaggedMapOutput combine(Object[] tags, Object[] values) {

                if (tags.length < 2) return null; 

                String joinedStr = "";

                for (int i=0; i<values.length; i++) {

                    if (i > 0) joinedStr += ",";

                    TaggedWritable tw = (TaggedWritable) values[i];

                    String line = ((Text) tw.getData()).toString();

                    String[] tokens = line.split(",", 2);

                    joinedStr += tokens[1];

                }

                TaggedWritable retv = new TaggedWritable(new Text(joinedStr));

                retv.setTag((Text) tags[0]);

                return retv;

            }

        }

       

        public static class TaggedWritable extends TaggedMapOutput {  

            private Writable data;     

            public TaggedWritable(Writable data) {

                this.tag = new Text("");

                this.data = data;

            }  

            public Writable getData() {

                return data;

            }     

            public void write(DataOutput out) throws IOException {

                this.tag.write(out);

                this.data.write(out);

            }    

            public void readFields(DataInput in) throws IOException {

                this.tag.readFields(in);

                this.data.readFields(in);

            }

        }

       

        public int run(String[] args) throws Exception {

            Configuration conf = getConf();      

            JobConf job = new JobConf(conf, DataJoin.class);  

        job.setJobName("DataJoin");   

            Path in = new Path(args[0]);

            Path out = new Path(args[1]);

            FileInputFormat.setInputPaths(job, in);

            FileOutputFormat.setOutputPath(job, out);      

           

            job.setMapperClass(MapClass.class);

            job.setReducerClass(Reduce.class);

           

            job.setInputFormat(TextInputFormat.class);

            job.setOutputFormat(TextOutputFormat.class);

            job.setOutputKeyClass(Text.class);

            job.setOutputValueClass(TaggedWritable.class);

            job.set("mapred.textoutputformat.separator", ",");

           

            JobClient.runJob(job);

            return 0;

        }

       

        public static void main(String[] args) throws Exception {

            int res = ToolRunner.run(new Configuration(), new DataJoin(), args);       

            System.exit(res);

        }

    }

  • 相关阅读:
    一加5安卓P刷入twrp的recovery
    使用flask搭建微信公众号:实现签到功能
    使用flask搭建微信公众号:接收与回复消息
    Python中的单例设计模式
    Python中的异常处理
    Python面向对象 --- 新旧式类、私有方法、类属性和类方法、静态方法
    Python面向对象的三大特征 --- 封装、继承、多态
    Python面向对象 --- 类的设计和常见的内置方法
    Python中函数装饰器及练习
    Python中函数练习
  • 原文地址:https://www.cnblogs.com/liangzh/p/2508358.html
Copyright © 2011-2022 走看看