zoukankan      html  css  js  c++  java
  • MapReduce实例

    1.WordCount(统计单词)

    经典的运用MapReuce编程模型的实例

    1.1 Description

    给定一系列的单词/数据,输出每个单词/数据的数量

    1.2 Sample

    1 a is b is not c
    2 b is a is not d

    1.3 Output

    1 a:2
    2 b:2
    3 c:1
    4 d:1
    5 is:4
    6 not:2

    1.4 Solution

      1 /**
      2  *  Licensed under the Apache License, Version 2.0 (the "License");
      3  *  you may not use this file except in compliance with the License.
      4  *  You may obtain a copy of the License at
      5  *
      6  *      http://www.apache.org/licenses/LICENSE-2.0
      7  *
      8  *  Unless required by applicable law or agreed to in writing, software
      9  *  distributed under the License is distributed on an "AS IS" BASIS,
     10  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     11  *  See the License for the specific language governing permissions and
     12  *  limitations under the License.
     13  */    
     14 
     15 package org.apache.hadoop.examples;
     16 
     17 import java.io.File;
     18 import java.io.IOException;
     19 import java.util.StringTokenizer;
     20 
     21 import org.apache.hadoop.conf.Configuration;
     22 import org.apache.hadoop.fs.Path;
     23 import org.apache.hadoop.io.IntWritable;
     24 import org.apache.hadoop.io.Text;
     25 import org.apache.hadoop.mapreduce.Job;
     26 import org.apache.hadoop.mapreduce.Mapper;
     27 import org.apache.hadoop.mapreduce.Reducer;
     28 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     29 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     30 import org.apache.hadoop.util.GenericOptionsParser;
     31 
     32 public class WordCount {
     33     
     34   //map输出的<key,value>为<输入的单词/数据,1>即<Text,IntWritable>
     35   public static class TokenizerMapper 
     36        extends Mapper<Object, Text, Text, IntWritable>{
     37     //value为封装好的int即IntWritable
     38     private final static IntWritable one = new IntWritable(1);
     39     private Text word = new Text();
     40 
     41     public void map(Object key, Text value, Context context
     42                     ) throws IOException, InterruptedException {
     43       StringTokenizer itr = new StringTokenizer(value.toString());
     44       while (itr.hasMoreTokens()) {
     45         word.set(itr.nextToken());//word为每个单词/数据,以空格为分隔符识别
     46         context.write(word, one);
     47       }
     48     }
     49   }
     50   
     51   //reduce输入的<key,value>为<输入的单词/数据,各个值的1相加即sum(实际是一个list)>
     52   //即<Text,IntWrite>
     53   public static class IntSumReducer 
     54        extends Reducer<Text,IntWritable,Text,IntWritable> {  
     55     private IntWritable result = new IntWritable();
     56 
     57     public void reduce(Text key, Iterable<IntWritable> values, 
     58                        Context context
     59                        ) throws IOException, InterruptedException {
     60       int sum = 0;
     61       for (IntWritable val : values) {
     62         sum += val.get();
     63       }
     64       result.set(sum);
     65       context.write(key, result);
     66     }
     67   }
     68 
     69   public static void main(String[] args) throws Exception {
     70     Configuration conf = new Configuration();
     71     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     72     if (otherArgs.length != 2) {
     73       System.err.println("Usage: wordcount <in> <out>");
     74       System.exit(2);
     75     }
     76     //删除已存在的输出文件夹
     77     judgeFileExist(otherArgs[1]);
     78     Job job = new Job(conf, "word count");
     79     job.setJarByClass(WordCount.class);
     80     job.setMapperClass(TokenizerMapper.class);
     81     job.setCombinerClass(IntSumReducer.class);
     82     job.setReducerClass(IntSumReducer.class);
     83     job.setOutputKeyClass(Text.class);
     84     job.setOutputValueClass(IntWritable.class);
     85     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     86     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
     87     System.exit(job.waitForCompletion(true) ? 0 : 1);
     88   }
     89   
     90   //删除文件夹及其目录下的文件
     91   public static void judgeFileExist(String path){
     92       File file = new File(path);
     93       if( file.exists() ){
     94           deleteFileDir(file);
     95       }
     96   }
     97   
     98   public static void deleteFileDir(File path){
     99       if( path.isDirectory() ){
    100           String[] files = path.list();
    101           for( int i=0;i<files.length;i++ ){
    102               deleteFileDir( new File(path,files[i]) );
    103           }
    104       }
    105       path.delete();
    106   }
    107   
    108 }
    View Code

    2. 数据去重

    2.1 Description

    针对给定一系列的数据去重并输出

    2.2 Sample

     1 3-1 a
     2 3-2 b
     3 3-3 c
     4 3-4 d
     5 3-5 a
     6 3-6 b
     7 3-7 c
     8 3-3 c
     9 3-1 b
    10 3-2 a
    11 3-3 b
    12 3-4 d
    13 3-5 a
    14 3-6 c
    15 3-7 d
    16 3-3 c

    2.3 Output

     1 3-1 a
     2 3-1 b
     3 3-2 a
     4 3-2 b
     5 3-3 b
     6 3-3 c
     7 3-4 d
     8 3-5 a
     9 3-6 b
    10 3-6 c
    11 3-7 c
    12 3-7 d

    2.4 Solution

     1 /**
     2  *  Licensed under the Apache License, Version 2.0 (the "License");
     3  *  you may not use this file except in compliance with the License.
     4  *  You may obtain a copy of the License at
     5  *
     6  *      http://www.apache.org/licenses/LICENSE-2.0
     7  *
     8  *  Unless required by applicable law or agreed to in writing, software
     9  *  distributed under the License is distributed on an "AS IS" BASIS,
    10  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  *  See the License for the specific language governing permissions and
    12  *  limitations under the License.
    13  */    
    14 
    15 package org.apache.hadoop.examples;
    16 
    17 import java.io.File;
    18 import java.io.IOException;
    19 import java.util.StringTokenizer;
    20 
    21 import org.apache.hadoop.conf.Configuration;
    22 import org.apache.hadoop.fs.Path;
    23 import org.apache.hadoop.io.IntWritable;
    24 import org.apache.hadoop.io.Text;
    25 import org.apache.hadoop.mapreduce.Job;
    26 import org.apache.hadoop.mapreduce.Mapper;
    27 import org.apache.hadoop.mapreduce.Reducer;
    28 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    29 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    30 import org.apache.hadoop.util.GenericOptionsParser;
    31 
    32 public class WordCount {
    33     
    34  public static class Map extends Mapper<Object,Text,Text,Text>{//map最后一个指定Text
    35      public static Text lineWords= new Text();
    36      
    37      //map输出为<Text,Text>,因为只涉及到是否Key存在的问题,故value可任意
    38      public void map(Object key,Text value,Context context) 
    39              throws IOException, InterruptedException{
    40          lineWords = value;
    41          context.write(lineWords, new Text(""));//<Text,Text>
    42      }
    43  }
    44  
    45  public static class Reduce extends Reducer<Text,Text,Text,Text>{
    46      public void reduce(Text key,Iterable<Text> values,Context context) 
    47              throws IOException, InterruptedException{
    48          context.write(key,new Text(""));
    49      }
    50  }
    51  
    52  public static void main(String args[]) 
    53          throws IOException, ClassNotFoundException, InterruptedException{
    54      Configuration conf = new Configuration();
    55      
    56      String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
    57      if( otherArgs.length!=2 ){
    58          System.err.println("Usage: Data Deduplication <in> <out>");
    59          System.exit(2);
    60      }
    61      
    62      //删除已存在的输出文件夹
    63      judgeFileExist(otherArgs[1]);
    64      Job job = new Job(conf,"Data Dup");
    65      job.setJarByClass(WordCount.class);
    66      //设置map combine reduce处理类
    67      job.setMapperClass(Map.class);
    68      job.setCombinerClass(Reduce.class);
    69      job.setReducerClass(Reduce.class);
    70      //设置key value的类型
    71      job.setOutputKeyClass(Text.class);
    72      job.setOutputValueClass(Text.class);
    73      //设置输入和输出目录
    74      FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    75      FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    76      System.exit(job.waitForCompletion(true) ? 0 : 1);
    77  }
    78   
    79   //删除文件夹及其目录下的文件
    80   public static void judgeFileExist(String path){
    81       File file = new File(path);
    82       if( file.exists() ){
    83           deleteFileDir(file);
    84       }
    85   }
    86   
    87   public static void deleteFileDir(File path){
    88       if( path.isDirectory() ){
    89           String[] files = path.list();
    90           for( int i=0;i<files.length;i++ ){
    91               deleteFileDir( new File(path,files[i]) );
    92           }
    93       }
    94       path.delete();
    95   }
    96   
    97 }
    View Code

    3. 数据排序

     3.1 Description

    给多个文件的数据排序,每个文件中的每个数据占一行

    3.2 Sample

    1
    12
    31
    123
    21
    31
    12

    3.3 Output

    8       1
    9       2
    10      3
    11      4
    12      5
    13      6
    14      7

    3.4 Solution

     1 /**
     2  *  Licensed under the Apache License, Version 2.0 (the "License");
     3  *  you may not use this file except in compliance with the License.
     4  *  You may obtain a copy of the License at
     5  *
     6  *      http://www.apache.org/licenses/LICENSE-2.0
     7  *
     8  *  Unless required by applicable law or agreed to in writing, software
     9  *  distributed under the License is distributed on an "AS IS" BASIS,
    10  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  *  See the License for the specific language governing permissions and
    12  *  limitations under the License.
    13  */    
    14 
    15 package org.apache.hadoop.example;
    16 
    17 import java.io.File;
    18 import java.io.IOException;
    19 
    20 import org.apache.hadoop.conf.Configuration;
    21 import org.apache.hadoop.fs.Path;
    22 import org.apache.hadoop.io.IntWritable;
    23 import org.apache.hadoop.io.Text;
    24 import org.apache.hadoop.mapreduce.Job;
    25 import org.apache.hadoop.mapreduce.Mapper;
    26 import org.apache.hadoop.mapreduce.Reducer;
    27 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    28 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    29 import org.apache.hadoop.util.GenericOptionsParser;
    30 
    31 public class dataSort{
    32     
    33     public static class map extends Mapper<Object,Text,IntWritable,IntWritable>{
    34         private static IntWritable data = new IntWritable();
    35         String lineWords = new String();
    36         //map
    37         public void map(Object key,Text value,Context context) 
    38                 throws IOException, InterruptedException{
    39             lineWords = value.toString();
    40             data.set(Integer.parseInt(lineWords));
    41             context.write(data,new IntWritable(1));
    42         }
    43     }
    44     
    45     public static class reduce extends Reducer<IntWritable, IntWritable,IntWritable,IntWritable>{
    46         private static IntWritable lineNum = new IntWritable(1);
    47         public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) 
    48                 throws IOException, InterruptedException{
    49             for(IntWritable val:values){
    50                 context.write(lineNum,key);
    51                 lineNum = new IntWritable(lineNum.get()+1);
    52             }
    53         }
    54     }
    55     
    56     public static void main(String args[]) 
    57             throws IOException, ClassNotFoundException, InterruptedException{
    58         Configuration conf = new Configuration();
    59          
    60          String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
    61          if( otherArgs.length!=2 ){
    62              System.err.println("Usage: Data Deduplication <in> <out>");
    63              System.exit(2);
    64          }
    65          
    66          //删除已存在的输出文件夹
    67          judgeFileExist(otherArgs[1]);
    68          Job job = new Job(conf,"Data Dup");
    69          job.setJarByClass(dataSort.class);
    70          //设置map combine reduce处理类
    71          job.setMapperClass(map.class);
    72          job.setCombinerClass(reduce.class);
    73          job.setReducerClass(reduce.class);
    74          //设置key value的类型
    75          job.setOutputKeyClass(IntWritable.class);
    76          job.setOutputValueClass(IntWritable.class);
    77          //设置输入和输出目录
    78          FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    79          FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    80          System.exit(job.waitForCompletion(true) ? 0 : 1);
    81     }
    82     //删除文件夹及其目录下的文件
    83       public static void judgeFileExist(String path){
    84           File file = new File(path);
    85           if( file.exists() ){
    86               deleteFileDir(file);
    87           }
    88       }
    89       
    90       public static void deleteFileDir(File path){
    91           if( path.isDirectory() ){
    92               String[] files = path.list();
    93               for( int i=0;i<files.length;i++ ){
    94                   deleteFileDir( new File(path,files[i]) );
    95               }
    96           }
    97           path.delete();
    98       }
    99 }
    View Code
  • 相关阅读:
    洛谷P2221 [HAOI2012]高速公路(线段树+概率期望)
    洛谷P2254 [NOI2005]瑰丽华尔兹(单调队列)
    洛谷P2607 [ZJOI2008]骑士(基环树)
    洛谷P1505 [国家集训队]旅游(树剖+线段树)
    洛谷P3250 [HNOI2016]网络(整体二分+树状数组+树剖)
    洛谷P3833 [SHOI2012]魔法树(树链剖分)
    洛谷P4216 [SCOI2015]情报传递(树剖+主席树)
    Ansible 利用playbook批量部署Nginx
    Ansible实现zabbix服务器agent端批量部署
    时间序列模型:ARIMA
  • 原文地址:https://www.cnblogs.com/xxx0624/p/4446931.html
Copyright © 2011-2022 走看看