zoukankan      html  css  js  c++  java
  • MapReduce编程:词频统计

    首先在项目的src文件中需要加入以下文件,log4j的内容为:

    log4j.rootLogger=INFO, stdout
    
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

    代码如下:

     1 package org.apache.hadoop.examples;
     2      
     3     import java.io.IOException;
     4     import java.util.Iterator;
     5     import java.util.StringTokenizer;
     6     import org.apache.hadoop.conf.Configuration;
     7     import org.apache.hadoop.fs.Path;
     8     import org.apache.hadoop.io.IntWritable;
     9     import org.apache.hadoop.io.Text;
    10     import org.apache.hadoop.mapreduce.Job;
    11     import org.apache.hadoop.mapreduce.Mapper;
    12     import org.apache.hadoop.mapreduce.Reducer;
    13     import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    14     import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    15     import org.apache.hadoop.util.GenericOptionsParser;
    16      
    17     public class WordCount {
    18         public WordCount() {
    19         }
    20          
    21         //main函数,MapReduce程序运行的入口
    22         public static void main(String[] args) throws Exception {
    23             Configuration conf = new Configuration();   //指定HDFS相关的参数
    24             
    25             //String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
    26             String[] otherArgs = new String[]{"input","output"};
    27             if(otherArgs.length < 2) {
    28                 System.err.println("Usage: wordcount <in> [<in>...] <out>");
    29                 System.exit(2);
    30             }
    31          
    32             //通过Job类设置Hadoop程序运行时的环境变量
    33             Job job = Job.getInstance(conf, "word count");  //设置环境参数 
    34             job.setJarByClass(WordCount.class);  //设置整个程序的类名
    35             job.setMapperClass(WordCount.TokenizerMapper.class); //添加Mapper类
    36             job.setCombinerClass(WordCount.IntSumReducer.class); 
    37             job.setReducerClass(WordCount.IntSumReducer.class); //添加Reducer类
    38             job.setOutputKeyClass(Text.class);  //设置输出类型,因为输出的形式是<单词,个数>,所以这里用Text,类似于Java的String,但还是有些区别
    39             job.setOutputValueClass(IntWritable.class);  //设置输出类型,类似于Java的Int
    40      
    41             for(int i = 0; i < otherArgs.length - 1; ++i) {  
    42                 FileInputFormat.addInputPath(job, new Path(otherArgs[i]));    //设置输入文件
    43             }
    44      
    45             FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));   //设置输出文件
    46             System.exit(job.waitForCompletion(true)?0:1);  //提交作业
    47         }
    48      
    49         //Reduce处理逻辑
    50         public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    51             private IntWritable result = new IntWritable();
    52      
    53             public IntSumReducer() {
    54             }
    55      
    56             public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    57                 int sum = 0;
    58      
    59                 IntWritable val;
    60                 for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
    61                     val = (IntWritable)i$.next();
    62                 }
    63      
    64                 this.result.set(sum);
    65                 context.write(key, this.result);
    66             }
    67         }
    68      
    69         
    70         //Map处理逻辑
    71         public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    72             private static final IntWritable one = new IntWritable(1);
    73             private Text word = new Text();
    74      
    75             public TokenizerMapper() {
    76             }
    77      
    78             public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    79                 StringTokenizer itr = new StringTokenizer(value.toString());   //分词器
    80      
    81                 while(itr.hasMoreTokens()) {
    82                     this.word.set(itr.nextToken());
    83                     context.write(this.word, one);  //输出键值对
    84                     //这里也可以直接写成context.write(new Text(word), new IntWritable(1));
    85                 }
    86      
    87             }
    88         }
    89     }    
  • 相关阅读:
    Heapsort 堆排序算法详解(Java实现)
    GIve Me A Welcome Hug!
    linux系统救援模式拯救mv libc.so.6文件后无法使用命令的悲剧
    RabbitMQ集群部署
    使用Xshell通过堡垒机登录服务器
    dubbo + zookeeper环境部署
    zookeeper集群部署
    zabbix-3.0.1 添加微信报警
    zabbix-3.0.1结合grafana绘图
    Centos7.2安装zabbix3.0.1简要
  • 原文地址:https://www.cnblogs.com/zyb993963526/p/10244721.html
Copyright © 2011-2022 走看看