zoukankan      html  css  js  c++  java
  • 第八篇:经典案例

    前言

           在计算机领域,排序的重要性不用多说。而排序的算法,效率分析等也一直是研究的热点。

           本文将给出使用Hadoop分布式方案进行排序的例子,这能极大提高排序的速度,是需要重点掌握的一个案例。

    需求

           对输入文件中的数据进行排序。

           输入文件中的每行内容都是一个数字,要求在输出文件中每行有两个数字,第一个数字代表位次,第二个数字为原始数据。

           比如文件1包含以下数据:

           1

           3

           5

           2

           4

           6

           文件2包含以下数据:

           2

           4

           6

           3

           1

           5

           那么输出文件应当为:

           1  1

           2  1

           3  2

           4  2

           ...

    方案制定

           表面上看,这是一个非常简单的例子 - Hadoop中存放的键值对本身就是有序的,直接将输入存放进来然后再取出来就完成排序了。

           但事实上,直接这样做行不通。为何?因为默认的排序过程是在单个的节点上完成的。也就是说,每个reduce节点收到键值对是在该节点局部有序,而不是在所有reduce节点里全局有序。

           解决之道是重写Partition方法,请仔细阅读以下内容:

           在shuffle阶段之后(或者说是shuffle最后),将根据map中间输出键值对中的key值来决定将此键值对划分给哪个Partition区间,或者说哪个reduce节点。

           可以根据数据的最大最小值将数据划分为多个区间,这样,每个reduce节点就能获取到某个数据段的完整的数据,而且根据hadoop特性,这些数据在单个的reduce节点之内都是有序存放的。

           因此每个reduce节点的任务很简单,输出结果就可以了。

           至于说位次,只需要在reduce类中声明一个static变量,让这个static变量在不同的reduce调用之间共享就可以了。

           要说明的是这里统计的只是数据在每个reduce节点之内的位次,如果要获得全局位次,则需要再遍历一次所有reduce输出文件。时间复杂度仅为O(n)。

    代码实现

      1 package org.apache.hadoop.examples;
      2 
      3 import java.io.IOException;
      4 
      5 //导入各种Hadoop包
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.fs.Path;
      8 import org.apache.hadoop.io.IntWritable;
      9 import org.apache.hadoop.io.Text;
     10 import org.apache.hadoop.mapreduce.Job;
     11 import org.apache.hadoop.mapreduce.Mapper;
     12 import org.apache.hadoop.mapreduce.Partitioner;
     13 import org.apache.hadoop.mapreduce.Reducer;
     14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     16 import org.apache.hadoop.util.GenericOptionsParser;
     17 
     18 // 主类
     19 public class Sort {
     20         
     21     // Mapper类
     22     public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{
     23         
     24         // new一个值为1的IntWritable对象
     25         private static IntWritable data = new IntWritable(1);
     26                 
     27         // 实现map函数
     28         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
     29             
     30             // 将切分后的value作为中间输出的key,然后value值为1。
     31             String line = value.toString();
     32             data.set(Integer.parseInt(line));
     33             context.write(data, new IntWritable(1));
     34         }
     35     }
     36         
     37     // Reducer类
     38     public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
     39     
     40         // new一个值为空的IntWritable对象
     41         private static IntWritable linenum = new IntWritable();
     42                 
     43         // 实现reduce函数
     44         public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
     45                 
     46             // 写入结果键值对
     47             for (IntWritable val : values) {
     48                 context.write(linenum, key);
     49                 linenum = new IntWritable(linenum.get()+1);
     50             }
     51         }
     52     }
     53 
     54     // 重写Partitioner类
     55     public static class Partition extends Partitioner <IntWritable, IntWritable> {
     56         
     57         // 重载getPartition方法。下面的三个参数分别为map中间输出的键,值,以及分割区间的个数。
     58         public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
     59             
     60             // 依次将键值对分配到各个分割区间
     61             int MaxNumber = 65223;
     62             int bound = MaxNumber/numPartitions + 1;
     63             int keynumber = key.get();
     64             
     65             for (int i=0; i<numPartitions; i++) {
     66                 if (keynumber < bound * (i+1) && keynumber >= bound*i) {
     67                     
     68                     // 返回的 i 就是分配到的区间号
     69                     return i;
     70                 }
     71             }
     72             
     73             return -1;
     74         }
     75     }
     76     
     77     // 主函数
     78     public static void main(String[] args) throws Exception {
     79     
     80         // 获取配置参数
     81         Configuration conf = new Configuration();
     82         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     83                 
     84         // 检查命令语法
     85         if (otherArgs.length != 2) {
     86             System.err.println("Usage: Dedup <in> <out>");
     87             System.exit(2);
     88         }
     89 
     90         // 定义作业对象
     91         Job job = new Job(conf, "Sort");
     92         // 注册分布式类
     93         job.setJarByClass(Sort.class);
     94         // 注册Mapper类
     95         job.setMapperClass(Map.class);
     96         // 注册Reducer类
     97         job.setReducerClass(Reduce.class);
     98         // 注册Partition类
     99         job.setPartitionerClass(Partition.class);
    100         // 注册输出格式类
    101         job.setOutputKeyClass(IntWritable.class);
    102         job.setOutputValueClass(IntWritable.class);
    103         // 设置输入输出路径
    104         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    105         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    106                 
    107         // 运行程序
    108         System.exit(job.waitForCompletion(true) ? 0 : 1);
    109     }
    110 }

    运行结果

           输入文件1,2分别为:

           

           

    小结

    1. 掌握Partitioner方法的重写技巧,这是本程序最核心的部分。

    2. 熟悉hadoop的key默认有序的性质。

    3. 本文采取的是伪分布式,故只有1个reduce节点,体现不出hadoop的优越性。当对海量数据进行排序的时候,它的速度价值才能真正体现出来。

  • 相关阅读:
    PBOC规范研究之九---静态数据认证(转)
    PBOC规范研究之五、安全相关的PKI基础知识(转)
    PBOC规范研究之三、TypeB协议(转)
    PBOC规范研究之二、PBOC规范中,对于通讯速率的约定(转)
    PBOC规范研究之一、ISO14443协议和PBOC关于CID的约定(转)
    qml js
    qml 信号与信号 信号与方法链接使用 带参数会报错
    调试bug的几种方法
    CDN方式使用iview
    iView--3
  • 原文地址:https://www.cnblogs.com/muchen/p/6881607.html
Copyright © 2011-2022 走看看