zoukankan      html  css  js  c++  java
  • 利用hadoop mapreduce进行 并行计算

    如果想利用mapreduce并行计算的话,不能指望所有都是hadoop来跟你做,hadoop不可能跟你做很多事情。看下面一个问题:

    x1 / 1 + x2 / 2 + x3 / 3 + …… + xn / n = 1

    求出当n = 10时的所有正整数解。

    先确定我们要采用的办法,就是搜索穷举。搜出n个未知数的解。

    例如我们想用四个mapper来执行,就将输入文件划分为四个split就行了。例如,输入文件为:

     1 1
     2 2
     3 3
     4 4
     5 5
     6 6
     7 7
     8 8
     9 9
    10 10

    这10个值,为x1可能的10个值。也就是说,我们将搜索空间,划分为这10个,然后来用mapper来并行计算。但是,如果直接将这个文件扔到hdfs里,hadoop是不会跟我们开四个mapper来并行计算的,因为这个文件太小了。那么我们可以重写InputSplit,对输入文件进行划分,达到“并行计算”的效果。

     那么,我们将这个文件按行进行划分,每三行为一个块,这样就把输入文件划分为四个块,对应会有四个mapper来处理。下面看一下代码:

     1 package seven.ili;
     2  
     3  import org.apache.hadoop.classification.InterfaceAudience;
     4  import org.apache.hadoop.conf.Configuration;
     5  import org.apache.hadoop.fs.FSDataInputStream;
     6  import org.apache.hadoop.fs.FileStatus;
     7  import org.apache.hadoop.fs.FileSystem;
     8  import org.apache.hadoop.fs.Path;
     9  import org.apache.hadoop.io.LongWritable;
    10  import org.apache.hadoop.io.Text;
    11  import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    12  import org.apache.hadoop.mapred.JobConf;
    13  import org.apache.hadoop.mapreduce.InputSplit;
    14  import org.apache.hadoop.mapreduce.JobContext;
    15  import org.apache.hadoop.mapreduce.RecordReader;
    16  import org.apache.hadoop.mapreduce.TaskAttemptContext;
    17  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    18  import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    19  import org.apache.hadoop.util.LineReader;
    20  
    21  import java.io.FileInputStream;
    22  import java.io.FileNotFoundException;
    23  import java.io.IOException;
    24  import java.util.ArrayList;
    25  import java.util.List;
    26  
    27  /**
    28   * Created with IntelliJ IDEA.
    29   * User: Isaac Li
    30   * Date: 12/20/12
    31   * Time: 1:25 PM
    32   * To change this template use File | Settings | File Templates.
    33   */
    34  public class SplitByLine extends FileInputFormat<LongWritable, Text> {
    35      private int N = 3;
    36  
    37      @Override
    38      public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    39          return new LineRecordReader();
    40      }
    41  
    42      public List<InputSplit> getSplits(JobContext job) throws IOException {
    43          List<InputSplit> splits = new ArrayList<InputSplit>();
    44          for (FileStatus file: listStatus(job)){
    45              Path path = file.getPath();
    46              FileSystem fs = path.getFileSystem(job.getConfiguration());
    47              LineReader lr = null;
    48              try{
    49                  FSDataInputStream in = fs.open(path);
    50                  Configuration conf = job.getConfiguration();
    51                  lr = new LineReader(in, conf);
    52                  //N = conf.getInt("mapred.line.input.format.linespermap", 3);
    53                  Text line = new Text();
    54                  int numLines = 0;
    55                  long begin = 0;
    56                  long length = 0;
    57                  int num = -1;
    58                  while((num = lr.readLine(line)) > 0){
    59                      numLines++;
    60                      length += num;
    61                      if (numLines == N){
    62                          splits.add(new FileSplit(path, begin, length, new String[]{}));
    63                          begin += length;
    64                          length = 0;
    65                          numLines = 0;
    66                      }
    67                  }
    68                  if (numLines != 0) {
    69                      splits.add(new FileSplit(path, begin, length, new String[]{}));
    70                  }
    71              }finally {
    72                  if (lr != null){
    73                      lr.close();
    74                  }
    75              }
    76          }
    77          System.out.println("Total # of splits: " + splits.size());
    78          return splits;
    79      }
    80  }

    然后在Job class的main方法中,加上一句设置语句,

    job.setInputFormatClass(SplitByLine.class);

    到此,我们的第一阶段的任务完成了,将原始问题的解空间,划分成了四个,来进来并行计算。下面,在子空间中的搜索就交给mapper来执行了。这个方法就自己去写吧,一个搜索的算法。我就不写出了。对了,你可以用:

    job.setNumReduceTasks(0);

    这一条语句,先将reduce task num 设置为0, 你就可以看到mapper的输出,是不是有四个文件?一共有四个mapper?

    自己去验证吧。

    参考:

    http://www.hadoopor.com/viewthread.php?tid=1242

    揭秘InputFormat:掌控Map Reduce任务执行的利器:     http://www.infoq.com/cn/articles/HadoopInputFormat-map-reduce

  • 相关阅读:
    [JSOI2015]最小表示
    [洛谷2002]消息扩散
    [洛谷1726]上白泽慧音
    [CodeVS2822]爱在心中
    [POJ2186]Popular Cows
    [洛谷1991]无线通讯网
    [CQOI2009]跳舞
    [洛谷1342]请柬
    [USACO07JAN]Balanced Lineup
    [NOIp2003提高组]神经网络
  • 原文地址:https://www.cnblogs.com/hengli/p/2822119.html
Copyright © 2011-2022 走看看