zoukankan      html  css  js  c++  java
  • Map/Reduce中Join查询实现

    http://www.cnblogs.com/MengYan-LongYou/p/3360613.html

     

    在做这个Join查询的时候,必然涉及数据,我这里设计了2张表,分别较data.txtinfo.txt,字段之间以/t划分。

    data.txt内容如下:

    201001    1003    abc

    201002    1005    def

    201003    1006    ghi

    201004    1003    jkl

    201005    1004    mno

    201006    1005    pqr

    info.txt内容如下:

    1003    kaka

    1004    da

    1005    jue

    1006    zhao

    期望输出结果:

    1003    201001    abc    kaka

    1003    201004    jkl    kaka

    1004    201005    mno    da

    1005    201002    def    jue

    1005    201006    pqr    jue

    1006    201003    ghi    zhao

    四、Map代码

    首先是map的代码,我贴上,然后简要说说

    public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {

            @Override

            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

                // 获取输入文件的全路径和名称

                String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();

                if (pathName.contains("data.txt")) {

                    String values[] = value.toString().split("/t");

                    if (values.length < 3) {

                        // data数据格式不规范,字段小于3,抛弃数据

                        return;

                    } else {

                        // 数据格式规范,区分标识为1

                        TextPair tp = new TextPair(new Text(values[1]), new Text("1"));

                        context.write(tp, new Text(values[0] + "/t" + values[2]));

                    }

                }

                if (pathName.contains("info.txt")) {

                    String values[] = value.toString().split("/t");

                    if (values.length < 2) {

                        // data数据格式不规范,字段小于2,抛弃数据

                        return;

                    } else {

                        // 数据格式规范,区分标识为0

                        TextPair tp = new TextPair(new Text(values[0]), new Text("0"));

                        context.write(tp, new Text(values[1]));

                    }

                }

            }

        }

    这里需要注意以下部分:

    ApathName是文件在HDFS中的全路径(例如:hdfs://M1:9000/MengYan/join/data/info.txt),可以以endsWith()的方法来判断。

    B、资料表,也就是这里的info.txt需要放在前面,也就是标识号是0.否则无法输出理想结果。

    CMap执行完成之后,输出的中间结果如下:

    1003,0    kaka

    1004,0    da

    1005,0    jue

    1006,0    zhao

    1003,1    201001    abc

    1003,1    201004    jkl

    1004,1    201005    mon

    1005,1    201002    def

    1005,1    201006    pqr

    1006,1    201003    ghi

    五、分区和分组

    1map之后的输出会进行一些分区的操作,代码贴出来:

    public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {

            @Override

            public int getPartition(TextPair key, Text value, int numParititon) {

                return Math.abs(key.getFirst().hashCode() * 127) % numParititon;

            }

        }

    分区我在以前的文档中写过,这里不做描述了,就说是按照map输出的符合key的第一个字段做分区关键字。分区之后,相同key会划分到一个reduce中去处理(如果reduce设置是1,那么就是分区有多个,但是还是在一个reduce中处理。但是结果会按照分区的原则排序)。分区后结果大致如下:

    同一区:

    1003,0    kaka

    1003,1    201001    abc

    1003,1    201004    jkl

    同一区:

    1004,0    da

    1004,1    201005    mon

    同一区:

    1005,0    jue

    1005,1    201002    def

    1005,1    201006    pqr

    同一区:

    1006,0    zhao

    1006,1    201003    ghi

    2、分组操作,代码如下

    public static class Example_Join_01_Comparator extends WritableComparator {

            public Example_Join_01_Comparator() {

                super(TextPair.class, true);

            }

            @SuppressWarnings("unchecked")

            public int compare(WritableComparable a, WritableComparable b) {

                TextPair t1 = (TextPair) a;

                TextPair t2 = (TextPair) b;

                return t1.getFirst().compareTo(t2.getFirst());

            }

        }

    分组操作就是把在相同分区的数据按照指定的规则进行分组的操作,就以上来看,是按照复合key的第一个字段做分组原则,达到忽略复合key的第二个字段值的目的,从而让数据能够迭代在一个reduce中。输出后结果如下:

    同一组:

    1003,0    kaka

    1003,0    201001    abc

    1003,0    201004    jkl

    同一组:

    1004,0    da

    1004,0    201005    mon

    同一组:

    1005,0    jue

    1005,0    201002    def

    1005,0    201006    pqr

    同一组:

    1006,0    zhao

    1006,0    201003    ghi

    六、reduce操作

    贴上代码如下:

    public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {

            protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,

                    InterruptedException {

                Text pid = key.getFirst();

                String desc = values.iterator().next().toString();

                while (values.iterator().hasNext()) {

                    context.write(pid, new Text(values.iterator().next().toString() + "/t" + desc));

                }

            }

        }

    1、代码比较简单,首先获取关键的ID值,就是key的第一个字段。

    2、获取公用的字段,通过排组织后可以看到,一些共有字段是在第一位,取出来即可。

    3、遍历余下的结果,输出。

    七、其他的支撑代码

    1、首先是TextPair代码,没有什么可以细说的,贴出来:

    public class TextPair implements WritableComparable<TextPair> {

        private Text first;

        private Text second;

        public TextPair() {

            set(new Text(), new Text());

        }

        public TextPair(String first, String second) {

            set(new Text(first), new Text(second));

        }

        public TextPair(Text first, Text second) {

            set(first, second);

        }

        public void set(Text first, Text second) {

            this.first = first;

            this.second = second;

        }

        public Text getFirst() {

            return first;

        }

        public Text getSecond() {

            return second;

        }

        public void write(DataOutput out) throws IOException {

            first.write(out);

            second.write(out);

        }

        public void readFields(DataInput in) throws IOException {

            first.readFields(in);

            second.readFields(in);

        }

        public int compareTo(TextPair tp) {

            int cmp = first.compareTo(tp.first);

            if (cmp != 0) {

                return cmp;

            }

            return second.compareTo(tp.second);

        }

    }

    2Job的入口函数

    public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {

            Configuration conf = new Configuration();

            GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);

            String[] otherArgs = parser.getRemainingArgs();

            if (agrs.length < 3) {

                System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");

                System.exit(2);

            }

            //conf.set("hadoop.job.ugi", "root,hadoop");

            Job job = new Job(conf, "Example_Join_01");

            // 设置运行的job

            job.setJarByClass(Example_Join_01.class);

            // 设置Map相关内容

            job.setMapperClass(Example_Join_01_Mapper.class);

            // 设置Map的输出

            job.setMapOutputKeyClass(TextPair.class);

            job.setMapOutputValueClass(Text.class);

            // 设置partition

            job.setPartitionerClass(Example_Join_01_Partitioner.class);

            // 在分区之后按照指定的条件分组

            job.setGroupingComparatorClass(Example_Join_01_Comparator.class);

            // 设置reduce

            job.setReducerClass(Example_Join_01_Reduce.class);

            // 设置reduce的输出

            job.setOutputKeyClass(Text.class);

            job.setOutputValueClass(Text.class);

            // 设置输入和输出的目录

            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

            FileInputFormat.addInputPath(job, new Path(otherArgs[1]));

            FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

            // 执行,直到结束就退出

            System.exit(job.waitForCompletion(true) ? 0 : 1);

        }

    八、总结

    1、这是个简单的join查询,可以看到,我在处理输入源的时候是在map端做来源判断。其实在0.19可以用MultipleInputs.addInputPath()的方法,但是它用了JobConf做参数。这个方法原理是多个数据源就采用多个map来处理。方法各有优劣。

    2、对于资源表,如果我们采用01这样的模式来区分,资源表是需要放在前的。例如本例中info.txt就是资源表,所以标识位就是0.如果写为1的话,可以试下,在分组之后,资源表对应的值放在了迭代器最后一位,无法追加在最后所有的结果集合中。

    3、关于分区,并不是所有的map都结束才开始的,一部分数据完成就会开始执行。同样,分组操作在一个分区内执行,如果分区完成,分组将会开始执行,也不是等所有分区完成才开始做分组的操作。

  • 相关阅读:
    Navicat Premium 12 破解汉化64位 windows版本
    Tkinter入门简明教程
    python tkinter-消息框、对话框、文件对话框
    python中ui自动化selenium方法封装分享
    Windows利用EasyWebSvr起web服务
    Windows安装mysql服务端
    北京个人所得税纳税记录打印
    Java并发之ReentrantReadWriteLock源码解析(二)
    Java并发之ReentrantReadWriteLock源码解析(一)
    Java并发之Semaphore源码解析(二)
  • 原文地址:https://www.cnblogs.com/mmcmmc/p/3963508.html
Copyright © 2011-2022 走看看