zoukankan      html  css  js  c++  java
  • 自定义Counter使用

    自定义计数器的使用(记录敏感单词)

      1 package counter;
      2 
      3 import java.net.URI;
      4 import org.apache.hadoop.conf.Configuration;
      5 import org.apache.hadoop.fs.FileSystem;
      6 import org.apache.hadoop.fs.Path;
      7 import org.apache.hadoop.io.LongWritable;
      8 import org.apache.hadoop.io.Text;
      9 import org.apache.hadoop.mapreduce.Counter;
     10 import org.apache.hadoop.mapreduce.Job;
     11 import org.apache.hadoop.mapreduce.Mapper;
     12 import org.apache.hadoop.mapreduce.Reducer;
     13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     17 
     18 public class WordCountApp {
     19     static final String INPUT_PATH = "hdfs://chaoren:9000/hello";
     20     static final String OUT_PATH = "hdfs://chaoren:9000/out";
     21 
     22     public static void main(String[] args) throws Exception {
     23         Configuration conf = new Configuration();
     24         FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
     25         Path outPath = new Path(OUT_PATH);
     26         if (fileSystem.exists(outPath)) {
     27             fileSystem.delete(outPath, true);
     28         }
     29 
     30         Job job = new Job(conf, WordCountApp.class.getSimpleName());
     31 
     32         // 1.1指定读取的文件位于哪里
     33         FileInputFormat.setInputPaths(job, INPUT_PATH);
     34         // 指定如何对输入的文件进行格式化,把输入文件每一行解析成键值对
     35         //job.setInputFormatClass(TextInputFormat.class);
     36 
     37         // 1.2指定自定义的map类
     38         job.setMapperClass(MyMapper.class);
     39         // map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
     40         //job.setOutputKeyClass(Text.class);
     41         //job.setOutputValueClass(LongWritable.class);
     42 
     43         // 1.3分区
     44         //job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
     45         // 有一个reduce任务运行
     46         //job.setNumReduceTasks(1);
     47 
     48         // 1.4排序、分组
     49 
     50         // 1.5归约
     51 
     52         // 2.2指定自定义reduce类
     53         job.setReducerClass(MyReducer.class);
     54         // 指定reduce的输出类型
     55         job.setOutputKeyClass(Text.class);
     56         job.setOutputValueClass(LongWritable.class);
     57 
     58         // 2.3指定写出到哪里
     59         FileOutputFormat.setOutputPath(job, outPath);
     60         // 指定输出文件的格式化类
     61         //job.setOutputFormatClass(TextOutputFormat.class);
     62 
     63         // 把job提交给jobtracker运行
     64         job.waitForCompletion(true);
     65     }
     66 
     67     /**
     68      * 
     69      * KEYIN     即K1     表示行的偏移量 
     70      * VALUEIN     即V1     表示行文本内容 
     71      * KEYOUT     即K2     表示行中出现的单词 
     72      * VALUEOUT 即V2        表示行中出现的单词的次数,固定值1
     73      * 
     74      */
     75     static class MyMapper extends
     76             Mapper<LongWritable, Text, Text, LongWritable> {
     77         protected void map(LongWritable k1, Text v1, Context context)
     78                 throws java.io.IOException, InterruptedException {
     79             /**
     80              * 自定义计数器的使用
     81              */
     82             Counter counter = context.getCounter("Sensitive Words", "hello");//自定义计数器名称Sensitive Words
     83             String line = v1.toString();
     84             if(line.contains("hello")){
     85                 counter.increment(1L);//记录敏感词汇hello的出现次数
     86             }
     87             String[] splited = line.split("	");
     88             for (String word : splited) {
     89                 context.write(new Text(word), new LongWritable(1));
     90             }
     91         };
     92     }
     93 
     94     /**
     95      * KEYIN     即K2     表示行中出现的单词 
     96      * VALUEIN     即V2     表示出现的单词的次数 
     97      * KEYOUT     即K3     表示行中出现的不同单词
     98      * VALUEOUT 即V3     表示行中出现的不同单词的总次数
     99      */
    100     static class MyReducer extends
    101             Reducer<Text, LongWritable, Text, LongWritable> {
    102         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s,
    103                 Context ctx) throws java.io.IOException,
    104                 InterruptedException {
    105             long times = 0L;
    106             for (LongWritable count : v2s) {
    107                 times += count.get();
    108             }
    109             ctx.write(k2, new LongWritable(times));
    110         };
    111     }
    112 }

    在eclipse中运行后,可以在控制台查看到结果:

     

  • 相关阅读:
    use paramiko to connect remote server and execute command
    protect golang source code
    adjust jedi vim to python2 and python3
    install vim plugin local file offline
    add swap file if you only have 1G RAM
    datatables hyperlink in td
    django rest framework custom json format
    【JAVA基础】网络编程
    【JAVA基础】多线程
    【JAVA基础】String类的概述和使用
  • 原文地址:https://www.cnblogs.com/ahu-lichang/p/6656303.html
Copyright © 2011-2022 走看看