zoukankan      html  css  js  c++  java
  • 关于Hadoop中的采样器

    1.为什么要使用采样器

    在这个网页上有一段描述比较靠谱 http://www.philippeadjiman.com/blog/2009/12/20/hadoop-tutorial-series-issue-2-getting-started-with-customized-partitioning/

     简单的来说就是解决"How to automatically find “good” partitioning function",因为很多时候无法直接制订固定的partitioner策略,所以需要知道实际的数据分布.糟糕的策略导致的结果就是每个reduce节点得到的数据部均匀,对效率影响挺大

    2.如何使用采样器

     

      conf.setPartitionerClass(TotalOrderPartitioner.class);//关于partitioner可以参考这个实现 使用采样器产生的文件

      InputSampler.RandomSampler<IntWritable, NullWritable> sampler =
       new InputSampler.RandomSampler<IntWritable, NullWritable>(0.1,10000,10);
      
      Path partitionFile = new Path(input,”_partitions”);
      TotalOrderPartitioner.setPartitionFile(conf, partitionFile);////
      InputSampler.writePartitionFile(conf, sampler);
      
    //一般都将该文件做distribute cache处理
      URI partitionURI = new URI(partitionFile.toString() + “#_partitions”);
      DistributedCache.addCacheFile(partitionURI, conf);
      DistributedCache.createSymlink(conf);
     
    //从上面可以看出 采样器是在map阶段之前进行的 在提交job的client端完成的
     

    3.常用的采样器介绍

    http://blog.csdn.net/andyelvis/article/details/7294811

    Hadoop中采样是由org.apache.hadoop.mapred.lib.InputSampler类来实现的。


    InputSampler类实现了三种采样方法:RandomSampler,SplitSampler和IntervalSampler。//RandomSampler最耗时

    RandomSamplerSplitSampler、RandomSampler和IntervalSampler都是InputSampler的静态内部类,它们都实现了InputSampler的内部接口Sampler接口

    public interface Sampler<K,V>{
          K[] getSample(InputFormat<K,V> inf,JobConf job) throws IOException;
    }

    getSample方法根据job的配置信息以及输入格式获得抽样结果,三个采样类各自有不同的实现。


    RandomSampler随机地从输入数据中抽取Key,是一个通用的采样器。RandomSampler类有三个属性:freq(一个Key被选中的概率),numSamples(从所有被选中的分区中获得的总共的样本数目),maxSplitsSampled(需要检查扫描的最大分区数目)。
    RandomSampler中getSample方法的实现如下: 

        public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
          InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
          ArrayList<K> samples = new ArrayList<K>(numSamples);
          int splitsToSample = Math.min(maxSplitsSampled, splits.length);

          Random r = new Random();
          long seed = r.nextLong();
          r.setSeed(seed);
          LOG.debug("seed: " + seed);
          // shuffle splits
          for (int i = 0; i < splits.length; ++i) {
            InputSplit tmp = splits[i];
            int j = r.nextInt(splits.length);
            splits[i] = splits[j];
            splits[j] = tmp;
          }
          // our target rate is in terms of the maximum number of sample splits,
          
    // but we accept the possibility of sampling additional splits to hit
          
    // the target sample keyset
          for (int i = 0; i < splitsToSample ||
                         (i < splits.length && samples.size() < numSamples); ++i) {
            RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
                Reporter.NULL);
            K key = reader.createKey();
            V value = reader.createValue();
            while (reader.next(key, value)) {
              if (r.nextDouble() <= freq) {
                if (samples.size() < numSamples) {
                  samples.add(key);
                } else {
                  // When exceeding the maximum number of samples, replace a
                  
    // random element with this one, then adjust the frequency
                  
    // to reflect the possibility of existing elements being
                  
    // pushed out
                  int ind = r.nextInt(numSamples);
                  if (ind != numSamples) {
                    samples.set(ind, key);
                  }
                  freq *= (numSamples - 1) / (double) numSamples;
                }
                key = reader.createKey();
              }
            }
            reader.close();
          }
          return (K[])samples.toArray();
        }

    首先通过InputFormat的getSplits方法得到所有的输入分区;然后确定需要抽样扫描的分区数目,取输入分区总数与用户输入的maxSplitsSampled两者的较小的值得到splitsToSample;然后对输入分区数组shuffle排序,打乱其原始顺序;然后循环逐个扫描每个分区中的记录进行采样,循环的条件是当前已经扫描的分区数小于splitsToSample或者当前已经扫描的分区数超过了splitsToSample但是小于输入分区总数并且当前的采样数小于最大采样数numSamples。

    每个分区中记录采样的具体过程如下:

    从指定分区中取出一条记录,判断得到的随机浮点数是否小于等于采样频率freq,如果大于则放弃这条记录,然后判断当前的采样数是否小于最大采样数,如果小于则这条记录被选中,被放进采样集合中,否则从【0,numSamples】中选择一个随机数,如果这个随机数不等于最大采样数numSamples,则用这条记录替换掉采样集合随机数对应位置的记录,同时采样频率freq减小变为freq*(numSamples-1)/numSamples。然后依次遍历分区中的其它记录。

    SplitSampler从s个分区中采样前n个记录,是采样随机数据的一种简便方式。SplitSampler类有两个属性:numSamples(最大采样数),maxSplitsSampled(最大分区数)。其getSample方法实现如下:

        public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
          InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
          ArrayList<K> samples = new ArrayList<K>(numSamples);
          int splitsToSample = Math.min(maxSplitsSampled, splits.length);
          int splitStep = splits.length / splitsToSample;
          int samplesPerSplit = numSamples / splitsToSample;
          long records = 0;
          for (int i = 0; i < splitsToSample; ++i) {
            RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
                job, Reporter.NULL);
            K key = reader.createKey();
            V value = reader.createValue();
            while (reader.next(key, value)) {
              samples.add(key);
              key = reader.createKey();
              ++records;
              if ((i+1) * samplesPerSplit <= records) {
                break;
              }
            }
            reader.close();
          }
          return (K[])samples.toArray();
        }

    首先根据InputFormat得到输入分区数组;然后确定需要采样的分区数splitsToSample为最大分区数和输入分区总数之间的较小值;然后确定对分区采样时的间隔splitStep为输入分区总数除splitsToSample的商;然后确定每个分区的采样数samplesPerSplit为最大采样数除splitsToSample的商。被采样的分区下标为i*splitStep,已经采样的分区数目达到splitsToSample即停止采样。

    对于每一个分区,读取一条记录,将这条记录添加到样本集合中,如果当前样本数大于当前的采样分区所需要的样本数,则停止对这个分区的采样。如此循环遍历完这个分区的所有记录。

    IntervalSampler根据一定的间隔从s个分区中采样数据,非常适合对排好序的数据采样。IntervalSampler类有两个属性:freq(哪一条记录被选中的概率),maxSplitsSampled(采样的最大分区数)。其getSample方法实现如下:

    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
          InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
          ArrayList<K> samples = new ArrayList<K>();
          int splitsToSample = Math.min(maxSplitsSampled, splits.length);
          int splitStep = splits.length / splitsToSample;
          long records = 0;
          long kept = 0;
          for (int i = 0; i < splitsToSample; ++i) {
            RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
                job, Reporter.NULL);
            K key = reader.createKey();
            V value = reader.createValue();
            while (reader.next(key, value)) {
              ++records;
              if ((double) kept / records < freq) {
                ++kept;
                samples.add(key);
                key = reader.createKey();
              }
            }
            reader.close();
          }
          return (K[])samples.toArray();
        }

    首先根据InputFormat得到输入分区数组;然后确定需要采样的分区数splitsToSample为最大分区数和输入分区总数之间的较小值;然后确定对分区采样时的间隔splitStep为输入分区总数除splitsToSample的商。被采样的分区下标为i*splitStep,已经采样的分区数目达到splitsToSample即停止采样。

    对于每一个分区,读取一条记录,如果当前样本数与已经读取的记录数的比值小于freq,则将这条记录添加到样本集合,否则读取下一条记录。这样依次循环遍历完这个分区的所有记录。

    4.采样器在实际中的使用

      常见的例子是terasort

     http://blog.csdn.net/scutshuxue/article/details/5915697

    排序的基本思想是利用了mapreduce的自动排序功能,在hadoop中,从map到reduce阶段,map出来的结构会按照各个key按照hash值分配到各个reduce中,其中,在reduce中所有的key都是有序的了。如果使用一个reduce,那么我们直接将他output出来就行了,但是这不能够体现分布式的好处,所以,我们还是要用多个reduce来跑。

          比方说我们有1000个1-10000的数据,跑10个ruduce任务, 如果我们运行进行partition的时候,能够将在1-1000中数据的分配到第一个reduce中,1001-2000的数据分配到第二个reduce中,以此类推。即第n个reduce所分配到的数据全部大于第n-1个reduce中的数据。这样,每个reduce出来之后都是有序的了,我们只要cat所有的输出文件,变成一个大的文件,就都是有序的了。

           基本思路就是这样,但是现在有一个问题,就是数据的区间如何划分,在数据量大,还有我们并不清楚数据分布的情况下。一个比较简单的方法就是采样,假如有一亿的数据,我们可以对数据进行采样,如取10000个数据采样,然后对采样数据分区间。在Hadoop中,patition我们可以用TotalOrderPartitioner替换默认的分区。然后将采样的结果传给他,就可以实现我们想要的分区。在采样时,我们可以使用hadoop的几种采样工具,RandomSampler,InputSampler,IntervalSampler。

     

     

  • 相关阅读:
    SQL SERVER2017 安装程序无法与下载服务器联系。无法安装机器学习服务的问题解决方式
    Kali Linux无法访问网络的问题
    Vue的冒泡事件
    记录阿里云ECS(Centos7.4)安装mysql 8.0.X服务
    沧桑巨变中焕发青春活力-记极1s HC5661A 打怪升级之路
    Asp.Net MVC过滤器小试牛刀
    C# Windows Service调用IBM Lotus Notes发送邮件
    记录一些js框架用途
    vc14(vs2015) 编译php7 记录
    C++ API方式连接mysql数据库实现增删改查
  • 原文地址:https://www.cnblogs.com/xuxm2007/p/2379143.html
Copyright © 2011-2022 走看看