zoukankan      html  css  js  c++  java
  • hadoop之数据倾斜

    数据倾斜

    1.产生原因   大量的key进入到了一个或者是少数的几个reduce.

    2.解决办法:

        1.重新设计key;

         2.设计分区类    随机分区 (常用);

         3.自定义Shuffle机制。   

    实例:

      1 import org.apache.hadoop.conf.Configuration;
      2 import org.apache.hadoop.fs.Path;
      3 import org.apache.hadoop.io.IntWritable;
      4 import org.apache.hadoop.io.Text;
      5 import org.apache.hadoop.mapreduce.Job;
      6 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      7 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
      8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      9 
     10 /**
     11  * Created by Administrator on 2017/6/1.
     12  */
     13 public class App {
     14     public static void main(String[] args) throws Exception {
     15         Configuration conf = new Configuration();
     16         conf.set("fs.defaultFS", "file:///");
     17         conf.set("mapreduce.framework.name", "local");
     18         Job job = Job.getInstance(conf);
     19         job.setJobName("WordCount");
     20         job.setJarByClass(App.class);
     21 
     22         FileInputFormat.addInputPath(job, new Path("d:/mr/wc.txt"));
     23         FileOutputFormat.setOutputPath(job, new Path("d:/mr/out"));
     24         
     25         job.setMapperClass(WordCountMapper.class);
     26         job.setReducerClass(WordCountReducer.class);
     27         
     28         job.setNumReduceTasks(3);
     29 
     30         //设置输出kv类型
     31         job.setOutputKeyClass(Text.class);
     32         job.setOutputValueClass(IntWritable.class);
     33 
     34         //设置分区类
     35         job.setPartitionerClass(RandomPartitioner.class);
     36         boolean b = job.waitForCompletion(true);
     37         //进行二次job
     38         if(b){
     39             conf = new Configuration();
     40             conf.set("fs.defaultFS", "file:///");
     41             conf.set("mapreduce.framework.name", "local");
     42             job = Job.getInstance(conf);
     43             job.setJobName("WordCount2");
     44             job.setJarByClass(App.class);
     45             FileInputFormat.addInputPath(job, new Path("d:/mr/out/part-r*"));
     46             FileOutputFormat.setOutputPath(job, new Path("d:/mr/out2"));
     47 
     48             job.setInputFormatClass(KeyValueTextInputFormat.class);
     49 
     50             job.setMapperClass(WordCountMapper2.class);
     51             job.setReducerClass(WordCountReducer.class);
     52 
     53             job.setNumReduceTasks(3);
     54 
     55             //设置输出kv类型
     56             job.setOutputKeyClass(Text.class);
     57             job.setOutputValueClass(IntWritable.class);
     58             job.waitForCompletion(true);
     59         }
     60     }
     61 }
     62 
     63 
     64 import org.apache.hadoop.io.IntWritable;
     65 import org.apache.hadoop.io.LongWritable;
     66 import org.apache.hadoop.io.Text;
     67 import org.apache.hadoop.mapreduce.Mapper;
     68 
     69 import java.io.IOException;
     70 
     71 /**
     72  * 创建Mapper
     73  */
     74 public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
     75     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
     76         String[] arr = value.toString().split(" ");
     77         for(String word : arr){
     78             context.write(new Text(word),new IntWritable(1));
     79         }
     80     }
     81 }
     82 
     83 
     84 
     85 import org.apache.hadoop.io.IntWritable;
     86 import org.apache.hadoop.io.LongWritable;
     87 import org.apache.hadoop.io.Text;
     88 import org.apache.hadoop.mapreduce.Mapper;
     89 
     90 import java.io.IOException;
     91 
     92 /**
     93  * 创建Mapper2
     94  */
     95 public class WordCountMapper2 extends Mapper<Text,Text,Text,IntWritable> {
     96     protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
     97         context.write(key,new IntWritable(Integer.parseInt(value.toString())));
     98     }
     99 }
    100 
    101 
    102 
    103 import org.apache.hadoop.io.IntWritable;
    104 import org.apache.hadoop.io.LongWritable;
    105 import org.apache.hadoop.io.Text;
    106 import org.apache.hadoop.mapreduce.Reducer;
    107 
    108 import java.io.IOException;
    109 
    110 /**
    111  * Reducer
    112  */
    113 public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    114     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    115         int count = 0 ;
    116         for(IntWritable iw : values){
    117             count = count + iw.get();
    118         }
    119         context.write(key,new IntWritable(count));
    120     }
    121 }
    122 
    123 
    124 import org.apache.hadoop.io.IntWritable;
    125 import org.apache.hadoop.io.Text;
    126 import org.apache.hadoop.mapreduce.Partitioner;
    127 
    128 import java.util.Random;
    129 
    130 /**
    131  * Created by Administrator on 2017/6/1.
    132  */
    133 public class RandomPartitioner extends Partitioner<Text,IntWritable>{
    134     Random r = new Random();
    135     public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
    136         return r.nextInt(numPartitions);
    137     }
    138 }
  • 相关阅读:
    HTML 5 --- 移动端viewport:
    Mac Apache 启动Web工程(webserver):
    java对xml文件做增删改查------摘录
    google code 上传源码
    一段下载文件的源码
    delphi 截取指定符号之间的字符串-随机读取
    从github 读取 绝对地址 ini文件
    使用Delphi读取网络上的文本文件,html文件
    ubuntu下新立得(synaptic)软件包管理器安装
    ubuntu下卸载旧Mysql并安装新Mysql(升级)
  • 原文地址:https://www.cnblogs.com/yihaifutai/p/6931520.html
Copyright © 2011-2022 走看看