zoukankan      html  css  js  c++  java
  • MapReduce的Partitioner案例

     

     

    项目简介

    这里给出一个经典的词频统计的案例:统计如下样本数据中每个单词出现的次数。

    
    
    SparkHBase
    HiveFlinkStormHadoopHBaseSpark
    Flink
    HBaseStorm
    HBaseHadoopHiveFlink
    HBaseFlinkHiveStorm
    HiveFlinkHadoop
    HBaseHive
    HadoopSparkHBaseStorm
    HBaseHadoopHiveFlink
    HBaseFlinkHiveStorm
    HiveFlinkHadoop
    HBaseHive
     
     
     
    13
     
     
     
    1
    SparkHBase
    2
    HiveFlinkStormHadoopHBaseSpark
    3
    Flink
    4
    HBaseStorm
    5
    HBaseHadoopHiveFlink
    6
    HBaseFlinkHiveStorm
    7
    HiveFlinkHadoop
    8
    HBaseHive
    9
    HadoopSparkHBaseStorm
    10
    HBaseHadoopHiveFlink
    11
    HBaseFlinkHiveStorm
    12
    HiveFlinkHadoop
    13
    HBaseHive
     
     
    
    

    项目依赖

    想要进行 MapReduce 编程,需要导入 hadoop-client 依赖:

    
    
    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
    </dependency>
     
     
     
    5
     
     
     
    1
    <dependency>
    2
       <groupId>org.apache.hadoop</groupId>
    3
       <artifactId>hadoop-client</artifactId>
    4
       <version>${hadoop.version}</version>
    5
    </dependency>
     
     
    
    

     WordCountMapper

    将每行数据按照指定分隔符进行拆分。这里需要注意在 MapReduce 中必须使用 Hadoop 定义的类型,因为 Hadoop 预定义的类型都是可序列化,可比较的,所有类型均实现了 WritableComparable接口。

    
    
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
       @Override
       protected void map(LongWritable key, Text value, Context context) throws IOException, 
                                                                         InterruptedException {
           String[] words = value.toString().split("	");
           for (String word : words) {
               context.write(new Text(word), new IntWritable(1));
          }
      }
    }
     
     
     
    11
     
     
     
    1
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    2
    
    
    3
       @Override
    4
       protected void map(LongWritable key, Text value, Context context) throws IOException, 
    5
                                                                         InterruptedException {
    6
           String[] words = value.toString().split("	");
    7
           for (String word : words) {
    8
               context.write(new Text(word), new IntWritable(1));
    9
          }
    10
      }
    11
    }
     
     
    
    

    WordCountMapper 对应下图的 Mapping 操作:

    WordCountMapper继承自 Mappe类,这是一个泛型类,定义如下:

    
    
    WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
    
    public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
      
    }
     
     
     
    5
     
     
     
    1
    WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
    2
    
    
    3
    public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    4
      
    5
    }
     
     
    
    
    • KEYIN : mapping输入 key 的类型,即每行的偏移量 (每行第一个字符在整个文本中的位置),Long类型,对应 Hadoop 中的 LongWritable类型;
    • VALUEIN : mapping输入 value 的类型,即每行数据;String类型,对应 Hadoop 中 Text类型;
    • KEYOUTmapping输出的 key 的类型,即每个单词;String类型,对应 Hadoop 中 Text类型;
    • VALUEOUTmapping输出 value 的类型,即每个单词出现的次数;这里用 int 类型,对应 IntWritable类型。

     

    WordCountReducer

    在 Reduce 中进行单词出现次数的统计:

    
    
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
       @Override
       protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, 
                                                                                     InterruptedException {
           int count = 0;
           for (IntWritable value : values) {
               count += value.get();
          }
           context.write(key, new IntWritable(count));
      }
    }
     
     
     
    12
     
     
     
    1
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    2
    
    
    3
       @Override
    4
       protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, 
    5
                                                                                     InterruptedException {
    6
           int count = 0;
    7
           for (IntWritable value : values) {
    8
               count += value.get();
    9
          }
    10
           context.write(key, new IntWritable(count));
    11
      }
    12
    }
     
     
    
    

    如下图,shuffling 的输出是 reduce 的输入。这里的 key 是每个单词,values 是一个可迭代的数据类型,类似 (1,1,1,...)

    WordCountApp

    组装 MapReduce 作业,并提交到服务器运行,代码如下:

    
    
    /**
    * 组装作业 并提交到集群运行
    */
    public class WordCountApp {
       // 这里为了直观显示参数 使用了硬编码,实际开发中可以通过外部传参
       private static final String HDFS_URL = "hdfs://192.168.100.1";
       private static final String HADOOP_USER_NAME = "root";
    
       public static void main(String[] args) throws Exception {
    
           // 文件输入路径和输出路径由外部传参指定
           if (args.length < 2) {
               System.out.println("Input and output paths are necessary!");
               return;
          }
    
           // 需要指明 hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常
           System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
    
           Configuration configuration = new Configuration();
           // 指明 HDFS 的地址
           configuration.set("fs.defaultFS", HDFS_URL);
    
           // 创建一个 Job
           Job job = Job.getInstance(configuration);
    
           // 设置运行的主类
           job.setJarByClass(WordCountApp.class);
    
           // 设置 Mapper 和 Reducer
           job.setMapperClass(WordCountMapper.class);
           job.setReducerClass(WordCountReducer.class);
    
           // 设置 Mapper 输出 key 和 value 的类型
           job.setMapOutputKeyClass(Text.class);
           job.setMapOutputValueClass(IntWritable.class);
    
           // 设置 Reducer 输出 key 和 value 的类型
           job.setOutputKeyClass(Text.class);
           job.setOutputValueClass(IntWritable.class);
    
           // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常
           FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
           Path outputPath = new Path(args[1]);
           if (fileSystem.exists(outputPath)) {
               fileSystem.delete(outputPath, true);
          }
    
           // 设置作业输入文件和输出文件的路径
           FileInputFormat.setInputPaths(job, new Path(args[0]));
           FileOutputFormat.setOutputPath(job, outputPath);
    
           // 将作业提交到群集并等待它完成,参数设置为 true 代表打印显示对应的进度
           boolean result = job.waitForCompletion(true);
    
           // 关闭之前创建的 fileSystem
           fileSystem.close();
    
           // 根据作业结果,终止当前运行的 Java 虚拟机,退出程序
           System.exit(result ? 0 : -1);
    
      }
    }
     
     
     
    63
     
     
     
    1
    /**
    2
    * 组装作业 并提交到集群运行
    3
    */
    4
    public class WordCountApp {
    5
       // 这里为了直观显示参数 使用了硬编码,实际开发中可以通过外部传参
    6
       private static final String HDFS_URL = "hdfs://192.168.100.1";
    7
       private static final String HADOOP_USER_NAME = "root";
    8
    
    
    9
       public static void main(String[] args) throws Exception {
    10
    
    
    11
           // 文件输入路径和输出路径由外部传参指定
    12
           if (args.length < 2) {
    13
               System.out.println("Input and output paths are necessary!");
    14
               return;
    15
          }
    16
    
    
    17
           // 需要指明 hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常
    18
           System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
    19
    
    
    20
           Configuration configuration = new Configuration();
    21
           // 指明 HDFS 的地址
    22
           configuration.set("fs.defaultFS", HDFS_URL);
    23
    
    
    24
           // 创建一个 Job
    25
           Job job = Job.getInstance(configuration);
    26
    
    
    27
           // 设置运行的主类
    28
           job.setJarByClass(WordCountApp.class);
    29
    
    
    30
           // 设置 Mapper 和 Reducer
    31
           job.setMapperClass(WordCountMapper.class);
    32
           job.setReducerClass(WordCountReducer.class);
    33
    
    
    34
           // 设置 Mapper 输出 key 和 value 的类型
    35
           job.setMapOutputKeyClass(Text.class);
    36
           job.setMapOutputValueClass(IntWritable.class);
    37
    
    
    38
           // 设置 Reducer 输出 key 和 value 的类型
    39
           job.setOutputKeyClass(Text.class);
    40
           job.setOutputValueClass(IntWritable.class);
    41
    
    
    42
           // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常
    43
           FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
    44
           Path outputPath = new Path(args[1]);
    45
           if (fileSystem.exists(outputPath)) {
    46
               fileSystem.delete(outputPath, true);
    47
          }
    48
    
    
    49
           // 设置作业输入文件和输出文件的路径
    50
           FileInputFormat.setInputPaths(job, new Path(args[0]));
    51
           FileOutputFormat.setOutputPath(job, outputPath);
    52
    
    
    53
           // 将作业提交到群集并等待它完成,参数设置为 true 代表打印显示对应的进度
    54
           boolean result = job.waitForCompletion(true);
    55
    
    
    56
           // 关闭之前创建的 fileSystem
    57
           fileSystem.close();
    58
    
    
    59
           // 根据作业结果,终止当前运行的 Java 虚拟机,退出程序
    60
           System.exit(result ? 0 : -1);
    61
    
    
    62
      }
    63
    }
     
     
    
    

    需要注意的是:如果不设置 Mapper 操作的输出类型,则程序默认它和 Reducer操作输出的类型相同。

    提交到服务器运行

    在实际开发中,可以在本机配置 hadoop 开发环境,直接在 IDE 中启动进行测试。这里主要介绍一下打包提交到服务器运行。由于本项目没有使用除 Hadoop 外的第三方依赖,直接打包即可:

    # mvn clean package
     
     
     
    1
     
     
     
    1
    # mvn clean package
     
     

    使用以下命令提交作业:

    
    
    hadoop jar /usr/appjar/hadoop-word-count-1.0.jar 
    com.heibaiying.WordCountApp 
    /wordcount/input.txt /wordcount/output/WordCountApp
     
     
     
    3
     
     
     
    1
    hadoop jar /usr/appjar/hadoop-word-count-1.0.jar 
    2
    com.heibaiying.WordCountApp 
    3
    /wordcount/input.txt /wordcount/output/WordCountApp
     
     
    
    

    作业完成后查看 HDFS 上生成目录:

    
    
    # 查看目录
    hadoop fs -ls /wordcount/output/WordCountApp
    
    # 查看统计结果
    hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000
     
     
     
    5
     
     
     
    1
    # 查看目录
    2
    hadoop fs -ls /wordcount/output/WordCountApp
    3
    
    
    4
    # 查看统计结果
    5
    hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000
     
     
    
    
     

    默认的Partitioner

    这里假设有个需求:将不同单词的统计结果输出到不同文件。这种需求实际上比较常见,比如统计产品的销量时,需要将结果按照产品种类进行拆分。要实现这个功能,就需要用到自定义 Partitioner

    这里先介绍下 MapReduce 默认的分类规则:在构建 job 时候,如果不指定,默认的使用的是 HashPartitioner:对 key 值进行哈希散列并对 numReduceTasks取余。其实现如下:

    public class HashPartitioner<K, V> extends Partitioner<K, V> {
    
      public int getPartition(K key, V value,
                              int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
    }
     
     
     
     
     
     
     
     
    1
    public class HashPartitioner<K, V> extends Partitioner<K, V> {
    2
    
    
    3
      public int getPartition(K key, V value,
    4
                              int numReduceTasks) {
    5
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    6
    }
    7
    }
     
     
    
    

    自定义Partitioner

    这里我们继承 Partitioner自定义分类规则,这里按照单词进行分类:

    
    
    public class CustomPartitioner extends Partitioner<Text, IntWritable> {
    
       public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
           return WordCountDataUtils.WORD_LIST.indexOf(text.toString());
      }
    }
     
     
     
     
     
     
     
     
     
    1
    public class CustomPartitioner extends Partitioner<Text, IntWritable> {
    2
    
    
    3
       public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
    4
           return WordCountDataUtils.WORD_LIST.indexOf(text.toString());
    5
      }
    6
    }
     
     
    
    

    在构建 job时候指定使用我们自己的分类规则,并设置 reduce的个数:

    执行结果

    执行结果如下,分别生成 6 个文件,每个文件中为对应单词的统计结果:

     



  • 相关阅读:
    h5-news_index
    h5-爆料view
    h5-列表
    h5-注册
    h5-登录
    h5-弹出层layer,提示,顶部横条,
    jquery 弹窗插件 layer
    jQuery幻灯片插件Owl Carousel
    display:block jquery.sort()
    Android 开源框架ViewPageIndicator 和 ViewPager 仿网易新闻客户端Tab标签
  • 原文地址:https://www.cnblogs.com/TiePiHeTao/p/a04e4beefea19a0825097a42eab6878f.html
Copyright © 2011-2022 走看看