zoukankan      html  css  js  c++  java
  • mapreduce之数据倾斜

    如上图,InputFormat决定文件的输入格式,切片的计算是由输入格式来进行计算的,我们默认使用的输入格式是文本输入格式TextInputFormat,而TextInputForm是继承自FileInputFormat这个抽象类,在FileInputFormat这个类中,有一个getsplites()方法,代码如下:

    public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    StopWatch sw = new StopWatch().start();
    FileStatus[] files = listStatus(job);

    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0; // compute total size
    for (FileStatus file: files) { // check we have valid files
    if (file.isDirectory()) {
    throw new IOException("Not a file: "+ file.getPath());
    }
    totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
    FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
    Path path = file.getPath();
    long length = file.getLen();
    if (length != 0) {
    FileSystem fs = path.getFileSystem(job);
    BlockLocation[] blkLocations;
    if (file instanceof LocatedFileStatus) {
    blkLocations = ((LocatedFileStatus) file).getBlockLocations();
    } else {
    blkLocations = fs.getFileBlockLocations(file, 0, length);
    }
    if (isSplitable(fs, path)) {
    long blockSize = file.getBlockSize();
    long splitSize = computeSplitSize(goalSize, minSize, blockSize);

    long bytesRemaining = length;
    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
    length-bytesRemaining, splitSize, clusterMap);
    splits.add(makeSplit(path, length-bytesRemaining, splitSize,
    splitHosts[0], splitHosts[1]));
    bytesRemaining -= splitSize;
    }

    if (bytesRemaining != 0) {
    String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
    - bytesRemaining, bytesRemaining, clusterMap);
    splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
    splitHosts[0], splitHosts[1]));
    }
    } else {
    String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
    splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
    }
    } else {
    //Create empty hosts array for zero length files
    splits.add(makeSplit(path, 0, length, new String[0]));
    }
    }
    sw.stop();
    if (LOG.isDebugEnabled()) {
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()
    + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits.toArray(new FileSplit[splits.size()]);
    }
    这段代码就决定了文件怎么切片,然后TextInputFormat和reader进行交互
    在Map端可以先进行分区和聚合操作,聚合操作combiner是在reduce端实现进行reduce化的过程,注意在Map端先运行分区partitioner()再运行聚合函数comber(),现在map端做一次reduce,在reduce端产生的是(word,1),如果直接发到Reducer端,
    将会产生非常大的网络负载带宽,
    自定义分区函数
    ---------------------
      
    package com.heima.hdfs.mr2;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;

    /**
    * 自定义分区
    */
    public class MyPartitioner extends Partitioner<Text,IntWritable>{
    @Override
    public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
    return 0;
    }
    }
    combiner
    -----------------
      
  • 相关阅读:
    DEDECMS点击主栏目默认显示第一个子栏目列表的方法
    Dede 删除文档同时文章中的图片的方法
    DEDECMS点击主栏目默认显示第一个子栏目列表的方法
    Inside SharePoint 2010 (5): Pages and Navigation
    开源中文分词FudanNLP
    Caching And Processing 2TB Mozilla Crash Reports In Memory With Hazelcast
    盛大 牛人 blog
    a phd
    lily project
    使用redis实现trie结构
  • 原文地址:https://www.cnblogs.com/stone-learning/p/9265669.html
Copyright © 2011-2022 走看看