zoukankan      html  css  js  c++  java
  • 9.2.2 hadoop采样分组源码解析SplitSampler、RandomSampler、IntervalSampler

    采样分组

    为了实现输出的全局排序,可以对温度数据进行分组处理,实现多个reduce处理,组间有序,组内有序,从而实现全局有序。而如何分组才能保证每个reduce分到的数据差不多,这样作业中的任务执行时间也差不多。例如将处理温度数据,要求温度按顺序输出。分成4组个分组,<-10℃,-10℃~0℃, 0℃~10℃,>10℃。

    <-10℃

    -10℃~0℃

     0℃~10℃

    >10℃

    1%

    %10

    29%

    60%

    显然这样分区后,每个ruduce获取的数据量相差很大。调整下分区后使每个分区获取到的数据量差不多,reduce任务分析起来处理时间差不多。而这些分区的边界值就需要通过采样来决定。

    <-4℃

    4℃~13℃

     13℃~25℃

    >5℃

    23%

    %24

    27%

    26%

    用InputSampler对象对输入数据进行采样,得到数据的采样区间分隔值,将这些值写入到一个文件中。然后TotalOrderPartitioner类读取这些边界值作为分区依据。采样分组就是通过采集输入的部分数据,得到相对均匀的分布区间,每个区间的数据量差不多。InputSampler是采样方式有三种:前n条记录采样SplitSample,随机采样RandomSample,固定间隔采样?IntervalSample。

    类名称

    采样方式

    构造方法

    效率

    SplitSampler(int numSamples, int maxSplitsSampled)

    对输入分片均匀采样,每个分片取前n个。

    采样总数,用于采样的分片数

    最高

    RandomSampler(double freq, int numSamples, int maxSplitsSampled)  

    遍历所有数据,随机采样

    采样频率,采样总数,划分数

    最低

    IntervalSampler(double freq, int maxSplitsSampled)

    固定间隔采样对有序的数据十分适用

    采样频率,划分数

    (4)InputSampler原理

    1)InputSampler是个hadoop任务类,继承Configured,实现Tool接口,main函数作为入口函数,run函数用来执行任务,InputSampler还有另外一个writePartitionFile函数,它是将采样的值排序,然后按照分区的数量进行划分,得到边界值写入分区文件,其定义为如下:

    public class InputSampler<K, V> extends Configured implements Tool {
        private static final Log LOG = LogFactory.getLog(InputSampler.class);

        static int printUsage() {
            System.out.println("sampler -r <reduces>       [-inFormat <input format class>]       [-keyClass <map input & output key class>]       [-splitRandom <double pcnt> <numSamples> <maxsplits> |              // Sample from random splits at random (general)        -splitSample <numSamples> <maxsplits> |              // Sample from first records in splits (random data)        -splitInterval <double pcnt> <maxsplits>]             // Sample from splits at intervals (sorted data)");
            System.out.println("Default sampler: -splitRandom 0.1 10000 10");
            ToolRunner.printGenericCommandUsage(System.out);
            return -1;
        }

        public InputSampler(Configuration conf) {
            this.setConf(conf);
        }

        public static <K, V> void writePartitionFile(Job job, InputSampler.Sampler<K, V> sampler) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = job.getConfiguration();
            InputFormat inf = (InputFormat)ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
        //有numPartitions个reduce任务就有numPartitions分区,产生numPartitions个文件
            int numPartitions = job.getNumReduceTasks();
          //获取采样的值
            K[] samples = (Object[])sampler.getSample(inf, job);
            LOG.info("Using " + samples.length + " samples");
          //获取排序函数,对采样值进行排序
            RawComparator<K> comparator = job.getSortComparator();
            Arrays.sort(samples, comparator);
          //获取分区文件的路径
            Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
            FileSystem fs = dst.getFileSystem(conf);
          //如果存在,删除原来的分区文件
            if (fs.exists(dst)) {
                fs.delete(dst, false);
            }
          创建写入对象,创建新的文件
            Writer writer = SequenceFile.createWriter(fs, conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
            NullWritable nullValue = NullWritable.get();
          //获取间隔值的步长,已经排好序之后,每隔stepSize取一个值作为分组的边界值
            float stepSize = (float)samples.length / (float)numPartitions;
            int last = -1;

            for(int i = 1; i < numPartitions; ++i) {
                int k;
                for(k = Math.round(stepSize * (float)i); last >= k && comparator.compare(samples[last], samples[k]) == 0; ++k) {
                    ;
                }

                writer.append(samples[k], nullValue);
                last = k;
            }

            writer.close();
        }
    //run函数执行采样任务,入参是采样类型
        public int run(String[] args) throws Exception {
            Job job = new Job(this.getConf());
            ArrayList<String> otherArgs = new ArrayList();
            InputSampler.Sampler<K, V> sampler = null;

            for(int i = 0; i < args.length; ++i) {
                try {
                    if ("-r".equals(args[i])) {
                        ++i;
                        job.setNumReduceTasks(Integer.parseInt(args[i]));
                    } else if ("-inFormat".equals(args[i])) {
                        ++i;
                        job.setInputFormatClass(Class.forName(args[i]).asSubclass(InputFormat.class));
                    } else if ("-keyClass".equals(args[i])) {
                        ++i;
                        job.setMapOutputKeyClass(Class.forName(args[i]).asSubclass(WritableComparable.class));
                    } else if ("-splitSample".equals(args[i])) {
                        ++i;
                        int numSamples = Integer.parseInt(args[i]);
                        ++i;
                        int maxSplits = Integer.parseInt(args[i]);
                        if (0 >= maxSplits) {
                            maxSplits = 2147483647;
                        }
    分区采样
                        sampler = new InputSampler.SplitSampler(numSamples, maxSplits);
                    } else {
                        int maxSplits;
                        double pcnt;
                        if ("-splitRandom".equals(args[i])) {
                            ++i;
                            pcnt = Double.parseDouble(args[i]);
                            ++i;
                            maxSplits = Integer.parseInt(args[i]);
                            ++i;
                            int maxSplits = Integer.parseInt(args[i]);
                            if (0 >= maxSplits) {
                                maxSplits = 2147483647;
                            }
                         //随机采样
                            sampler = new InputSampler.RandomSampler(pcnt, maxSplits, maxSplits);
                        } else if ("-splitInterval".equals(args[i])) {
                            ++i;
                            pcnt = Double.parseDouble(args[i]);
                            ++i;
                            maxSplits = Integer.parseInt(args[i]);
                            if (0 >= maxSplits) {
                                maxSplits = 2147483647;
                            }
                         //间隔采样
                            sampler = new InputSampler.IntervalSampler(pcnt, maxSplits);
                        } else {
                            otherArgs.add(args[i]);
                        }
                    }
                } catch (NumberFormatException var10) {
                    System.out.println("ERROR: Integer expected instead of " + args[i]);
                    return printUsage();
                } catch (ArrayIndexOutOfBoundsException var11) {
                    System.out.println("ERROR: Required parameter missing from " + args[i - 1]);
                    return printUsage();
                }
            }
          // reduce任务数量不能<=2,否则分组就没有了任何意义

            if (job.getNumReduceTasks() <= 1) {
                System.err.println("Sampler requires more than one reducer");
                return printUsage();
            } else if (otherArgs.size() < 2) {
                System.out.println("ERROR: Wrong number of parameters: ");
                return printUsage();
            } else {
                if (null == sampler) {
                   //默认采用随机采样
                    sampler = new InputSampler.RandomSampler(0.1D, 10000, 10);
                }

                Path outf = new Path((String)otherArgs.remove(otherArgs.size() - 1));
                TotalOrderPartitioner.setPartitionFile(this.getConf(), outf);
                Iterator i$ = otherArgs.iterator();

                while(i$.hasNext()) {
                    String s = (String)i$.next();
                    FileInputFormat.addInputPath(job, new Path(s));
                }
               //默任执行写入分区文件
                writePartitionFile(job, (InputSampler.Sampler)sampler);
                return 0;
            }
        }

        public static void main(String[] args) throws Exception {
            InputSampler<?, ?> sampler = new InputSampler(new Configuration());
            int res = ToolRunner.run(sampler, args);
            System.exit(res);
        }
    public interface Sampler<K, V> {
            K[] getSample(InputFormat<K, V> var1, Job var2) throws IOException, InterruptedException;
        }
    }

    2)InputSampler类定义个一个采样接口Sample接口,定义方法getSample,SplitSample、RandomSample、IntervalSample类都实现了这个接口,采用不同的方法获取采样值。三个类都是InputSampler的内部静态类,实现了getSample方法,下面分别阐述。

    SplitSample类定义

    总的采样数除以用于采样的分片数量,得到每个分片的取样数n,采取每个分片的前n个数据。

    public static class SplitSampler<K, V> implements InputSampler.Sampler<K, V> {
        protected final int numSamples;
        protected final int maxSplitsSampled;
        public SplitSampler(int numSamples) {
            this(numSamples, 2147483647);
        }

        public SplitSampler(int numSamples, int maxSplitsSampled) {
            this.numSamples = numSamples;//采样总数
            this.maxSplitsSampled = maxSplitsSampled;// 用于取样的分片数量,不大于实际分片数
        }

        public K[] getSample(InputFormat<K, V> inf, Job job) throws IOException, InterruptedException {
            //获取分片数
        List<InputSplit> splits = inf.getSplits(job);
        //采样总数创建数组
            ArrayList<K> samples = new ArrayList(this.numSamples);      
        //用于取样的分片数量
            int splitsToSample = Math.min(this.maxSplitsSampled, splits.size());
            //每个分片需要采集多少个数据
        int samplesPerSplit = this.numSamples / splitsToSample;
            long records = 0L;

            for(int i = 0; i < splitsToSample; ++i) {
                TaskAttemptContext samplingContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
    //创建读取记录的reader
                RecordReader<K, V> reader = inf.createRecordReader((InputSplit)splits.get(i), samplingContext);
                reader.initialize((InputSplit)splits.get(i), samplingContext);

                while(reader.nextKeyValue()) {
    //采样数据写入smaple数组
                    samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), (Object)null));
                    ++records;
                 //每个分片只采集前个samplesPerSplit数据,超出则退出
                    if ((long)((i + 1) * samplesPerSplit) <= records) {
                        break;
                    }
                }

                reader.close();
            }

            return (Object[])samples.toArray();
        }
    }

    IntervalSample类定义

    遍历用于采样的分片数据,根据采样率来等间隔采集数据,例如采样率是0.1,则每隔10个采集一个数据。

    public static class IntervalSampler<K, V> implements InputSampler.Sampler<K, V> {
        protected final double freq;
        protected final int maxSplitsSampled;

        public IntervalSampler(double freq) {
            this(freq, 2147483647);
        }

        public IntervalSampler(double freq, int maxSplitsSampled) {
            this.freq = freq;//采样率
            this.maxSplitsSampled = maxSplitsSampled;//用于采样的分片数
        }

        public K[] getSample(InputFormat<K, V> inf, Job job) throws IOException, InterruptedException {
            List<InputSplit> splits = inf.getSplits(job);
            ArrayList<K> samples = new ArrayList();
            int splitsToSample = Math.min(this.maxSplitsSampled, splits.size());
            long records = 0L;//遍历的记录数
            long kept = 0L;//采集的记录数

            for(int i = 0; i < splitsToSample; ++i) {
                TaskAttemptContext samplingContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
                RecordReader<K, V> reader = inf.createRecordReader((InputSplit)splits.get(i), samplingContext);
                reader.initialize((InputSplit)splits.get(i), samplingContext);

                while(reader.nextKeyValue()) {
                 //假设freq为0.1,第一次循环,record为1,kept为0,0/1小于freq0.1,第一条记录会被采到,kept变为1;第二次循环,record=2,kept=1,1/2大于freq0.1,第二条记录不会取到;kept/records的值从1/2,1/3,1/4……1/10大于等于freq0.1,第11条记录时,1/11小于0.1,第11条记录会被取到,kept变成2,只有到2/21时,才会取第三条数据,所以是每隔10条取一个,是等间隔取数据。
                 ++records;
                    if ((double)kept / (double)records < this.freq) {
                        samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), (Object)null));
                        ++kept;
                    }
                }

                reader.close();
            }

            return (Object[])samples.toArray();
        }
    }

    RandomSample类定义

    随机采样输入参数是采样频率,采样总数,用于采样的的分片数。遍历用于采样的分片中的记录,如果随机数小于采样率则进行采样,添加进入采样数组,或者更换已满数组中的值。同时减小采样率,越往后面,采集到数据的概率越小。

    public static class RandomSampler<K, V> implements InputSampler.Sampler<K, V> {
        protected double freq;
        protected final int numSamples;
        protected final int maxSplitsSampled;

        public RandomSampler(double freq, int numSamples) {
            this(freq, numSamples, 2147483647);
        }

        public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
            this.freq = freq;//采样率
            this.numSamples = numSamples;//采样总数
            this.maxSplitsSampled = maxSplitsSampled;//用于采样的分片数
        }

        public K[] getSample(InputFormat<K, V> inf, Job job) throws IOException, InterruptedException {
            List<InputSplit> splits = inf.getSplits(job);
            ArrayList<K> samples = new ArrayList(this.numSamples);//采样保存申请空间
            int splitsToSample = Math.min(this.maxSplitsSampled, splits.size());//计算用于采样的分片数
            Random r = new Random();//创建随机对象
            long seed = r.nextLong();//创建随机种子
            r.setSeed(seed);
            InputSampler.LOG.debug("seed: " + seed);

            int i;//将分片打乱顺序,随机获取第j个分片,和第i个分片进行交换
            for(i = 0; i < splits.size(); ++i) {
                InputSplit tmp = (InputSplit)splits.get(i);
                int j = r.nextInt(splits.size());
                splits.set(i, splits.get(j));
                splits.set(j, tmp);
            }
        //循环从用于采样的分片中随机获取数据,直到采样分片遍历完(可能数量不够numSamples个),或者已经采集到numSamples个数据
            for(i = 0; i < splitsToSample || i < splits.size() && samples.size() < this.numSamples; ++i) {
                TaskAttemptContext samplingContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
                RecordReader<K, V> reader = inf.createRecordReader((InputSplit)splits.get(i), samplingContext);
                reader.initialize((InputSplit)splits.get(i), samplingContext);

                while(reader.nextKeyValue()) {
    //随机double值小于采样率,符合条件,进行获取当前值,这样有可能,遍历所有的值,可能没有获取到指定的numSamples记录?
                    if (r.nextDouble() <= this.freq) {
    //采样数组中数据还不足则add进去,如果已经采集到了numSamples个记录,则随机替换set到sample数组中
                        if (samples.size() < this.numSamples) {
                            samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), (Object)null));
                        } else {
                            int ind = r.nextInt(this.numSamples);
                            if (ind != this.numSamples) {
                                samples.set(ind, ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), (Object)null));
                            }
    //每采样到一个数据,采样率会减小,r.nextDouble() <= this.freq采样到的数据概率会减小
                            this.freq *= (double)(this.numSamples - 1) / (double)this.numSamples;
                        }
                    }
                }

                reader.close();
            }

            return (Object[])samples.toArray();
        }
    }

    自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

    https://www.cnblogs.com/bclshuai/p/11380657.html

  • 相关阅读:
    Aptana 由于没有关闭编辑器而导致的启动不起来了。
    postgresql备份导入数据库小记
    [转] js 事件冒泡 阻止
    ruby 取得ip
    [ 转 ] 网页聊天室的原理
    ryby 数组笔记
    第一个rails应用
    vue-router-5-命名路由
    vue-router-4-编程式导航
    vue-router-3-嵌套路由
  • 原文地址:https://www.cnblogs.com/bclshuai/p/12315353.html
Copyright © 2011-2022 走看看