zoukankan      html  css  js  c++  java
  • 自定义分区随机分配解决数据倾斜的问题

    1、第一阶段有三个文本待统计(设置分区的个数为3)

    package com.cr.skew;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class SkewMapper extends Mapper<LongWritable,Text, Text,IntWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("进入mapper");
            String line = value.toString();
            String[] arr = line.split(" ");
            Text keyOut = new Text();
            IntWritable valueOut = new IntWritable();
    
            for (String s : arr){
                keyOut.set(s);
                valueOut.set(1);
                context.write(keyOut,valueOut);
            }
    
    
    
        }
    }
    

    package com.cr.skew;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class SkewReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
           for(IntWritable iw : values){
               count += iw.get();
           }
           context.write(key,new IntWritable(count));
        }
    }
    

    package com.cr.skew;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class SkewApp {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            //单例作业
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","file:///");
            Job job = Job.getInstance(conf);
            System.setProperty("hadoop.home.dir","E:\hadoop-2.7.5");
    
            //设置job的各种属性
            job.setJobName("SkewApp");                 //设置job名称
            job.setJarByClass(SkewApp.class);              //设置搜索类
            job.setInputFormatClass(TextInputFormat.class);
    
            //设置输入路径
            FileInputFormat.addInputPath(job,new Path(("D:\skew")));
            //设置输出路径
            Path path = new Path("D:\skew\out");
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(path)) {
                fs.delete(path, true);
            }
            FileOutputFormat.setOutputPath(job,path);
    
            job.setMapperClass(SkewMapper.class);               //设置mapper类
            job.setReducerClass(SkewReducer.class);               //设置reduecer类
    
            job.setMapOutputKeyClass(Text.class);            //设置之map输出key
            job.setMapOutputValueClass(IntWritable.class);   //设置map输出value
    
            job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
            job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value
    
    
            job.setNumReduceTasks(3);
            job.waitForCompletion(true);
    
        }
    
    }
    输出
    part-r-00000

    world3	3
    

    part-r-00001

    world1	3
    world4	3
    
    part-r-00002
    hello	15
    world2	3
    world5	3
    

    2、第二阶段设置随机分区函数

    package com.cr.skew1;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    import java.util.Random;
    
    //自定义分区数
    public class RandomPartition extends Partitioner<Text,IntWritable>{
        @Override
        public int getPartition(Text text, IntWritable intWritable, int numPartitioners) {
            //生成0-numPartitioners的随机数
            return new Random().nextInt(numPartitioners);
        }
    }
    

    输出三个分区
    hello	7
    world1	2
    world2	1
    world3	1
    world5	1
    
    hello	4
    world2	2
    world3	2
    
    hello	4
    world1	1
    world4	3
    world5	2
    

    3、对上面的reduce聚合进行再次mapper_reducer聚合

    package com.cr.skew1_stage2;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class SkewMapper2 extends Mapper<LongWritable,Text, Text,IntWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("进入mapper");
            String line = value.toString();
            String[] arr = line.split("	");
    
            context.write(new Text(arr[0]),new IntWritable(Integer.parseInt(arr[1])));
    
        }
    }
    
    package com.cr.skew1_stage2;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class SkewReducer1 extends Reducer<Text,IntWritable,Text,IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
           for(IntWritable iw : values){
               count += iw.get();
           }
           context.write(key,new IntWritable(count));
        }
    }
    
    package com.cr.skew1_stage2;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class SkewApp2 {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            //单例作业
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","file:///");
            Job job = Job.getInstance(conf);
            System.setProperty("hadoop.home.dir","E:\hadoop-2.7.5");
    
            //设置job的各种属性
            job.setJobName("SkewApp2");                 //设置job名称
            job.setJarByClass(SkewApp2.class);              //设置搜索类
            job.setInputFormatClass(TextInputFormat.class);
    
            //设置输入路径
            FileInputFormat.addInputPath(job,new Path(("D:\skew\out\part-r-00000")));
            FileInputFormat.addInputPath(job,new Path(("D:\skew\out\part-r-00001")));
            FileInputFormat.addInputPath(job,new Path(("D:\skew\out\part-r-00002")));
            //设置输出路径
            Path path = new Path("D:\skew\out2");
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(path)) {
                fs.delete(path, true);
            }
            FileOutputFormat.setOutputPath(job,path);
    
            job.setMapperClass(SkewMapper2.class);               //设置mapper类
            job.setReducerClass(SkewReducer1.class);               //设置reduecer类
    
            job.setMapOutputKeyClass(Text.class);            //设置之map输出key
            job.setMapOutputValueClass(IntWritable.class);   //设置map输出value
    
            job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
            job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value
    
            job.setNumReduceTasks(3);
            job.waitForCompletion(true);
    
        }
    
    }
    
    world3	3
    
    world1	3
    world4	3
    
    hello	15
    world2	3
    world5	3
    可以看到这里的结果和上面没有使用分区函数的结果是一样的

    4、如果在stage2阶段将job输入格式转为KeyValueTextInputForma

    就可以直接将第一阶段的输出作为key-value,而不用进行切割了
    package com.cr.skew1_stage_version2;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class SkewApp2 {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            //单例作业
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","file:///");
            Job job = Job.getInstance(conf);
            System.setProperty("hadoop.home.dir","E:\hadoop-2.7.5");
    
            //设置job的各种属性
            job.setJobName("SkewApp2");                 //设置job名称
            job.setJarByClass(SkewApp2.class);              //设置搜索类
            job.setInputFormatClass(KeyValueTextInputFormat.class);
    
            //设置输入路径
            FileInputFormat.addInputPath(job,new Path(("D:\skew\out\part-r-00000")));
            FileInputFormat.addInputPath(job,new Path(("D:\skew\out\part-r-00001")));
            FileInputFormat.addInputPath(job,new Path(("D:\skew\out\part-r-00002")));
            //设置输出路径
            Path path = new Path("D:\skew\out2");
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(path)) {
                fs.delete(path, true);
            }
            FileOutputFormat.setOutputPath(job,path);
    
            job.setMapperClass(SkewMapper2.class);               //设置mapper类
            job.setReducerClass(SkewReducer1.class);               //设置reduecer类
    
            job.setMapOutputKeyClass(Text.class);            //设置之map输出key
            job.setMapOutputValueClass(IntWritable.class);   //设置map输出value
    
            job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
            job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value
    
            job.setNumReduceTasks(3);
            job.waitForCompletion(true);
    
        }
    
    }
    查看源码可知
    public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
        public KeyValueTextInputFormat() {
        }
    这里的mapper输入为<text,text>类型
    package com.cr.skew1_stage_version2;
    
    import org.apache.commons.httpclient.methods.multipart.Part;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class SkewMapper2 extends Mapper<Text,Text, Text,IntWritable> {
    
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("进入mapper");
    
            context.write(key,new IntWritable(Integer.parseInt(value.toString())));
    
        }
    }
    这里的reducer不变
    发现结果和上面也是一摸一样的,所以换成job的输入格式为KeyValueTextInputFormat,可以省很多事
    欢迎关注我的公众号:小秋的博客 CSDN博客:https://blog.csdn.net/xiaoqiu_cr github:https://github.com/crr121 联系邮箱:rongchen633@gmail.com 有什么问题可以给我留言噢~
  • 相关阅读:
    观察者模式
    vim7.4+python3配置
    GAN_李弘毅讲解
    关于Anaconda的环境和包管理
    inception v1-v3 & Xception
    python使用PDB进行调试
    python中“*”、"*args"、"kwargs"三种用法
    解决样本类别不平衡以及困难样本问题的方法总结
    Focal Loss
    RefineDet网络简介(转载)
  • 原文地址:https://www.cnblogs.com/flyingcr/p/10326939.html
Copyright © 2011-2022 走看看