zoukankan      html  css  js  c++  java
  • reduce端的连接实现

    背景:现在有两张表:customer、order,他们有个共同的cid,需要通过cid实现两张表的连接,并且通过cid进行分组排序

    思路:首先通过mapper从context里面获取到文件切片,从文件切片中得到路径,从而判断是customer.txt,还是order.txt

    然后依次读取每行内容,并且为每行数据打上一个标签,0表示customer,1表示order,最后组成一个新的组合key

    然后调用排序对比器的方法,按照cid进行升序排列,同一个客户的order进行升序排列

    然后进行reducer,将客户信息和订单信息进行拼接,最后进行cid分组,同一个客户的信息为一组,并按照cid升序排列

    最后根据cid进行分区

    comkey

    package com.cr.reduceJoin;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class comkey_1 implements WritableComparable<comkey_1>{
        //0表示客户信息 1表示订单信息
        private int type;
        private int cid;
        private int oid;
        private String customerInfo = "";
        private String orderInfo = "";
    
    
        public int getType() {
            return type;
        }
    
        public void setType(int type) {
            this.type = type;
        }
    
        public int getCid() {
            return cid;
        }
    
        public void setCid(int cid) {
            this.cid = cid;
        }
    
        public int getOid() {
            return oid;
        }
    
        public void setOid(int oid) {
            this.oid = oid;
        }
    
        public String getCustomerInfo() {
            return customerInfo;
        }
    
        public void setCustomerInfo(String customerInfo) {
            this.customerInfo = customerInfo;
        }
    
        public String getOrderInfo() {
            return orderInfo;
        }
    
        public void setOrderInfo(String orderInfo) {
            this.orderInfo = orderInfo;
        }
    
        @Override
        public int compareTo(comkey_1 o) {
            int type1 = o.type;
            int cid1 = o.cid;
            int oid1 = o.oid;
            String customerInfo1 = o.customerInfo;
            String orderInfo1 = o.orderInfo;
    
            //首先判断是否是同一个客户的数据
            if(cid1 == cid){
                //判断是否是同一个客户的两个订单
                if(type1 == type){
                    //升序排列
                    return oid - oid1;
                }else {
                    //如果一个是客户一个是订单,让客户信息在前
                    if(type == 0){
                        return -1;
                    }else
                        return  1;
                }
            }else {
                //如果是不同客户,升序排列
                return cid - cid1;
            }
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(type);
            out.writeInt(cid);
            out.writeInt(oid);
            out.writeUTF(customerInfo);
            out.writeUTF(orderInfo);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.type = in.readInt();
            this.cid = in.readInt();
            this.oid = in.readInt();
            this.customerInfo = in.readUTF();
            this.orderInfo = in.readUTF();
        }
    }
    

    mapper

    package com.cr.reduceJoin;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    public class ReduceJoinMapper extends Mapper<LongWritable,Text,comkey_1,NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            //从环境变量中获取文件切片,从文件切片中获取路径
    
            InputSplit inputSplit = context.getInputSplit();
            FileSplit fileSplit = (FileSplit)inputSplit;
            String path = fileSplit.getPath().toString();
            comkey_1 comkey1 = new comkey_1();
            //整理文本信息成为自己定义的comkey的格式
            if(path.contains("customer")){
                String cid = line.substring(0, line.indexOf(","));
                String cusInfo = line;
                comkey1.setType(0);
                comkey1.setCid(Integer.parseInt(cid));
                comkey1.setCustomerInfo(cusInfo);
            }else {
                String cid = line.substring(line.lastIndexOf(",") + 1);
                String oInfo = line.substring(0, line.lastIndexOf(","));
                String oid = line.substring(0, line.indexOf(","));
                comkey1.setType(1);
                comkey1.setCid(Integer.parseInt(cid));
                comkey1.setOid(Integer.parseInt(oid));
                comkey1.setOrderInfo(oInfo);
            }
    
            //写入上下文
            context.write(comkey1,NullWritable.get());
    
        }
    }
    

    组合key排序对比器

    package com.cr.reduceJoin;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    
    /**
     * 组合key排序对比器
     */
    public class Comkey2Comparator extends WritableComparator {
    
        protected Comkey2Comparator(){
            super(comkey_1.class,true);
    
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            comkey_1 a1 = (comkey_1) a;
            comkey_1 b1 = (comkey_1) b;
            return a1.compareTo(b1);
        }
    }
    

    reducer

    package com.cr.reduceJoin;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    public class JoinReducer  extends Reducer<comkey_1,NullWritable,Text,NullWritable>{
        @Override
        protected void reduce(comkey_1 key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            //获取迭代器
            Iterator<NullWritable> iterator = values.iterator();
            //第一条一定是客户信息
            iterator.next();
            int type = key.getType();
            int cid = key.getCid();
            String customerInfo = key.getCustomerInfo();
            //从第二条开始就是订单信息
            while (iterator.hasNext()){
                iterator.next();
                String orderInfo = key.getOrderInfo();
                //按照客户+订单的方式进行拼接
                context.write(new Text(customerInfo + "," + orderInfo),NullWritable.get());
            }
        }
    }
    

    cid分组对比器

    package com.cr.reduceJoin;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    
    /*
    CID分组对比器
     */
    public class CIDgroupComparator extends WritableComparator {
    
        protected CIDgroupComparator(){
            super(comkey_1.class,true);
        }
    
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            comkey_1 a1 = (comkey_1) a;
            comkey_1 b1 = (comkey_1) b;
            return a1.getCid() - b1.getCid();
        }
    }
    

    根据cid分区

    package com.cr.reduceJoin;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class CIDpartition extends Partitioner<comkey_1,NullWritable>{
        //自定义分区类,按照cid分区
        @Override
        public int getPartition(comkey_1 comkey_1, NullWritable nullWritable, int i) {
            return comkey_1.getCid() % i ;
        }
    }
    

    app

    package com.cr.reduceJoin;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class ReduceJoinApp {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","file:///");
            Job job = Job.getInstance(conf);
    
            job.setJobName("ReduceJoinApp");
            job.setJarByClass(ReduceJoinApp.class);
    
            FileInputFormat.addInputPath(job,new Path(args[0]));
    
            Path out = new Path(args[1]);
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(out)) {
                fs.delete(out, true);
            }
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            job.setMapperClass(ReduceJoinMapper.class);
            job.setReducerClass(JoinReducer.class);
    
            job.setMapOutputKeyClass(comkey_1.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            job.setPartitionerClass(CIDpartition.class);
            job.setGroupingComparatorClass(CIDgroupComparator.class);
            job.setSortComparatorClass(Comkey2Comparator.class);
    
            job.setNumReduceTasks(2);
            job.waitForCompletion(true);
    
        }
    }
    

    part-r-000                                   part-r-001

    欢迎关注我的公众号:小秋的博客 CSDN博客:https://blog.csdn.net/xiaoqiu_cr github:https://github.com/crr121 联系邮箱:rongchen633@gmail.com 有什么问题可以给我留言噢~
  • 相关阅读:
    算法-第四版-练习1.3.2解答
    彻底理解线索二叉树(转载)
    C/C++——C++中new与malloc的10点区别(转载)
    C语言中的struct和typedef struct(转载)
    C语言实现贪吃蛇
    C语言实现2048小游戏
    案例开发准备
    WordCount单词计数
    MapReduce原理与实现
    HDFS简介
  • 原文地址:https://www.cnblogs.com/flyingcr/p/10326875.html
Copyright © 2011-2022 走看看