zoukankan      html  css  js  c++  java
  • MapReduce实例

    1.WordCount(统计单词)

    经典的运用MapReuce编程模型的实例

    1.1 Description

    给定一系列的单词/数据,输出每个单词/数据的数量

    1.2 Sample

    1 a is b is not c
    2 b is a is not d

    1.3 Output

    1 a:2
    2 b:2
    3 c:1
    4 d:1
    5 is:4
    6 not:2

    1.4 Solution

      1 /**
      2  *  Licensed under the Apache License, Version 2.0 (the "License");
      3  *  you may not use this file except in compliance with the License.
      4  *  You may obtain a copy of the License at
      5  *
      6  *      http://www.apache.org/licenses/LICENSE-2.0
      7  *
      8  *  Unless required by applicable law or agreed to in writing, software
      9  *  distributed under the License is distributed on an "AS IS" BASIS,
     10  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     11  *  See the License for the specific language governing permissions and
     12  *  limitations under the License.
     13  */    
     14 
     15 package org.apache.hadoop.examples;
     16 
     17 import java.io.File;
     18 import java.io.IOException;
     19 import java.util.StringTokenizer;
     20 
     21 import org.apache.hadoop.conf.Configuration;
     22 import org.apache.hadoop.fs.Path;
     23 import org.apache.hadoop.io.IntWritable;
     24 import org.apache.hadoop.io.Text;
     25 import org.apache.hadoop.mapreduce.Job;
     26 import org.apache.hadoop.mapreduce.Mapper;
     27 import org.apache.hadoop.mapreduce.Reducer;
     28 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     29 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     30 import org.apache.hadoop.util.GenericOptionsParser;
     31 
     32 public class WordCount {
     33     
     34   //map输出的<key,value>为<输入的单词/数据,1>即<Text,IntWritable>
     35   public static class TokenizerMapper 
     36        extends Mapper<Object, Text, Text, IntWritable>{
     37     //value为封装好的int即IntWritable
     38     private final static IntWritable one = new IntWritable(1);
     39     private Text word = new Text();
     40 
     41     public void map(Object key, Text value, Context context
     42                     ) throws IOException, InterruptedException {
     43       StringTokenizer itr = new StringTokenizer(value.toString());
     44       while (itr.hasMoreTokens()) {
     45         word.set(itr.nextToken());//word为每个单词/数据,以空格为分隔符识别
     46         context.write(word, one);
     47       }
     48     }
     49   }
     50   
     51   //reduce输入的<key,value>为<输入的单词/数据,各个值的1相加即sum(实际是一个list)>
     52   //即<Text,IntWrite>
     53   public static class IntSumReducer 
     54        extends Reducer<Text,IntWritable,Text,IntWritable> {  
     55     private IntWritable result = new IntWritable();
     56 
     57     public void reduce(Text key, Iterable<IntWritable> values, 
     58                        Context context
     59                        ) throws IOException, InterruptedException {
     60       int sum = 0;
     61       for (IntWritable val : values) {
     62         sum += val.get();
     63       }
     64       result.set(sum);
     65       context.write(key, result);
     66     }
     67   }
     68 
     69   public static void main(String[] args) throws Exception {
     70     Configuration conf = new Configuration();
     71     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     72     if (otherArgs.length != 2) {
     73       System.err.println("Usage: wordcount <in> <out>");
     74       System.exit(2);
     75     }
     76     //删除已存在的输出文件夹
     77     judgeFileExist(otherArgs[1]);
     78     Job job = new Job(conf, "word count");
     79     job.setJarByClass(WordCount.class);
     80     job.setMapperClass(TokenizerMapper.class);
     81     job.setCombinerClass(IntSumReducer.class);
     82     job.setReducerClass(IntSumReducer.class);
     83     job.setOutputKeyClass(Text.class);
     84     job.setOutputValueClass(IntWritable.class);
     85     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     86     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
     87     System.exit(job.waitForCompletion(true) ? 0 : 1);
     88   }
     89   
     90   //删除文件夹及其目录下的文件
     91   public static void judgeFileExist(String path){
     92       File file = new File(path);
     93       if( file.exists() ){
     94           deleteFileDir(file);
     95       }
     96   }
     97   
     98   public static void deleteFileDir(File path){
     99       if( path.isDirectory() ){
    100           String[] files = path.list();
    101           for( int i=0;i<files.length;i++ ){
    102               deleteFileDir( new File(path,files[i]) );
    103           }
    104       }
    105       path.delete();
    106   }
    107   
    108 }
    View Code

    2. 数据去重

    2.1 Description

    针对给定一系列的数据去重并输出

    2.2 Sample

     1 3-1 a
     2 3-2 b
     3 3-3 c
     4 3-4 d
     5 3-5 a
     6 3-6 b
     7 3-7 c
     8 3-3 c
     9 3-1 b
    10 3-2 a
    11 3-3 b
    12 3-4 d
    13 3-5 a
    14 3-6 c
    15 3-7 d
    16 3-3 c

    2.3 Output

     1 3-1 a
     2 3-1 b
     3 3-2 a
     4 3-2 b
     5 3-3 b
     6 3-3 c
     7 3-4 d
     8 3-5 a
     9 3-6 b
    10 3-6 c
    11 3-7 c
    12 3-7 d

    2.4 Solution

     1 /**
     2  *  Licensed under the Apache License, Version 2.0 (the "License");
     3  *  you may not use this file except in compliance with the License.
     4  *  You may obtain a copy of the License at
     5  *
     6  *      http://www.apache.org/licenses/LICENSE-2.0
     7  *
     8  *  Unless required by applicable law or agreed to in writing, software
     9  *  distributed under the License is distributed on an "AS IS" BASIS,
    10  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  *  See the License for the specific language governing permissions and
    12  *  limitations under the License.
    13  */    
    14 
    15 package org.apache.hadoop.examples;
    16 
    17 import java.io.File;
    18 import java.io.IOException;
    19 import java.util.StringTokenizer;
    20 
    21 import org.apache.hadoop.conf.Configuration;
    22 import org.apache.hadoop.fs.Path;
    23 import org.apache.hadoop.io.IntWritable;
    24 import org.apache.hadoop.io.Text;
    25 import org.apache.hadoop.mapreduce.Job;
    26 import org.apache.hadoop.mapreduce.Mapper;
    27 import org.apache.hadoop.mapreduce.Reducer;
    28 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    29 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    30 import org.apache.hadoop.util.GenericOptionsParser;
    31 
    32 public class WordCount {
    33     
    34  public static class Map extends Mapper<Object,Text,Text,Text>{//map最后一个指定Text
    35      public static Text lineWords= new Text();
    36      
    37      //map输出为<Text,Text>,因为只涉及到是否Key存在的问题,故value可任意
    38      public void map(Object key,Text value,Context context) 
    39              throws IOException, InterruptedException{
    40          lineWords = value;
    41          context.write(lineWords, new Text(""));//<Text,Text>
    42      }
    43  }
    44  
    45  public static class Reduce extends Reducer<Text,Text,Text,Text>{
    46      public void reduce(Text key,Iterable<Text> values,Context context) 
    47              throws IOException, InterruptedException{
    48          context.write(key,new Text(""));
    49      }
    50  }
    51  
    52  public static void main(String args[]) 
    53          throws IOException, ClassNotFoundException, InterruptedException{
    54      Configuration conf = new Configuration();
    55      
    56      String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
    57      if( otherArgs.length!=2 ){
    58          System.err.println("Usage: Data Deduplication <in> <out>");
    59          System.exit(2);
    60      }
    61      
    62      //删除已存在的输出文件夹
    63      judgeFileExist(otherArgs[1]);
    64      Job job = new Job(conf,"Data Dup");
    65      job.setJarByClass(WordCount.class);
    66      //设置map combine reduce处理类
    67      job.setMapperClass(Map.class);
    68      job.setCombinerClass(Reduce.class);
    69      job.setReducerClass(Reduce.class);
    70      //设置key value的类型
    71      job.setOutputKeyClass(Text.class);
    72      job.setOutputValueClass(Text.class);
    73      //设置输入和输出目录
    74      FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    75      FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    76      System.exit(job.waitForCompletion(true) ? 0 : 1);
    77  }
    78   
    79   //删除文件夹及其目录下的文件
    80   public static void judgeFileExist(String path){
    81       File file = new File(path);
    82       if( file.exists() ){
    83           deleteFileDir(file);
    84       }
    85   }
    86   
    87   public static void deleteFileDir(File path){
    88       if( path.isDirectory() ){
    89           String[] files = path.list();
    90           for( int i=0;i<files.length;i++ ){
    91               deleteFileDir( new File(path,files[i]) );
    92           }
    93       }
    94       path.delete();
    95   }
    96   
    97 }
    View Code

    3. 数据排序

     3.1 Description

    给多个文件的数据排序,每个文件中的每个数据占一行

    3.2 Sample

    1
    12
    31
    123
    21
    31
    12

    3.3 Output

    8       1
    9       2
    10      3
    11      4
    12      5
    13      6
    14      7

    3.4 Solution

     1 /**
     2  *  Licensed under the Apache License, Version 2.0 (the "License");
     3  *  you may not use this file except in compliance with the License.
     4  *  You may obtain a copy of the License at
     5  *
     6  *      http://www.apache.org/licenses/LICENSE-2.0
     7  *
     8  *  Unless required by applicable law or agreed to in writing, software
     9  *  distributed under the License is distributed on an "AS IS" BASIS,
    10  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  *  See the License for the specific language governing permissions and
    12  *  limitations under the License.
    13  */    
    14 
    15 package org.apache.hadoop.example;
    16 
    17 import java.io.File;
    18 import java.io.IOException;
    19 
    20 import org.apache.hadoop.conf.Configuration;
    21 import org.apache.hadoop.fs.Path;
    22 import org.apache.hadoop.io.IntWritable;
    23 import org.apache.hadoop.io.Text;
    24 import org.apache.hadoop.mapreduce.Job;
    25 import org.apache.hadoop.mapreduce.Mapper;
    26 import org.apache.hadoop.mapreduce.Reducer;
    27 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    28 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    29 import org.apache.hadoop.util.GenericOptionsParser;
    30 
    31 public class dataSort{
    32     
    33     public static class map extends Mapper<Object,Text,IntWritable,IntWritable>{
    34         private static IntWritable data = new IntWritable();
    35         String lineWords = new String();
    36         //map
    37         public void map(Object key,Text value,Context context) 
    38                 throws IOException, InterruptedException{
    39             lineWords = value.toString();
    40             data.set(Integer.parseInt(lineWords));
    41             context.write(data,new IntWritable(1));
    42         }
    43     }
    44     
    45     public static class reduce extends Reducer<IntWritable, IntWritable,IntWritable,IntWritable>{
    46         private static IntWritable lineNum = new IntWritable(1);
    47         public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) 
    48                 throws IOException, InterruptedException{
    49             for(IntWritable val:values){
    50                 context.write(lineNum,key);
    51                 lineNum = new IntWritable(lineNum.get()+1);
    52             }
    53         }
    54     }
    55     
    56     public static void main(String args[]) 
    57             throws IOException, ClassNotFoundException, InterruptedException{
    58         Configuration conf = new Configuration();
    59          
    60          String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
    61          if( otherArgs.length!=2 ){
    62              System.err.println("Usage: Data Deduplication <in> <out>");
    63              System.exit(2);
    64          }
    65          
    66          //删除已存在的输出文件夹
    67          judgeFileExist(otherArgs[1]);
    68          Job job = new Job(conf,"Data Dup");
    69          job.setJarByClass(dataSort.class);
    70          //设置map combine reduce处理类
    71          job.setMapperClass(map.class);
    72          job.setCombinerClass(reduce.class);
    73          job.setReducerClass(reduce.class);
    74          //设置key value的类型
    75          job.setOutputKeyClass(IntWritable.class);
    76          job.setOutputValueClass(IntWritable.class);
    77          //设置输入和输出目录
    78          FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    79          FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    80          System.exit(job.waitForCompletion(true) ? 0 : 1);
    81     }
    82     //删除文件夹及其目录下的文件
    83       public static void judgeFileExist(String path){
    84           File file = new File(path);
    85           if( file.exists() ){
    86               deleteFileDir(file);
    87           }
    88       }
    89       
    90       public static void deleteFileDir(File path){
    91           if( path.isDirectory() ){
    92               String[] files = path.list();
    93               for( int i=0;i<files.length;i++ ){
    94                   deleteFileDir( new File(path,files[i]) );
    95               }
    96           }
    97           path.delete();
    98       }
    99 }
    View Code
  • 相关阅读:
    C# 实现 Snowflake算法生成唯一性Id
    kafka可视化客户端工具(Kafka Tool)的基本使用(转)
    docker 安装kafka
    Model类代码生成器
    使用docker 部署rabbitmq 镜像
    Vue 增删改查 demo
    git 提交代码到库
    Android ble蓝牙问题
    mac 配置 ssh 到git (Could not resolve hostname github.com, Failed to connect to github.com port 443 Operation timed out)
    okhttp
  • 原文地址:https://www.cnblogs.com/xxx0624/p/4446931.html
Copyright © 2011-2022 走看看