zoukankan      html  css  js  c++  java
  • scala JobConf实现

    package first_mapreduce_scala
    
    import org.apache.hadoop.mapreduce.Job
    import java.util._
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.conf._
    import org.apache.hadoop.io._
    import org.apache.hadoop.mapred._
    import org.apache.hadoop.util._
    import flowsum.FlowBean
    import org.apache.hadoop.io.NullWritable
    
    class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, FlowBean] {
      private val one = new IntWritable(1)
      private val word = new Text()
      override def map(key:LongWritable, value:Text, output:OutputCollector[Text, FlowBean], reporter:Reporter) {
         val line = value.toString()
         if (line.isEmpty) return
         val tokens: Array[String] = line.split(" ")
         var username : String = tokens(1)
         output.collect(new Text(username), new FlowBean(username, tokens(2).toLong, 1))
      }
    }
    
    class Reduce extends MapReduceBase with Reducer[Text, FlowBean, Text, FlowBean] {
       override def reduce(key: Text, values: Iterator[FlowBean], output: OutputCollector[Text, FlowBean],  reporter: Reporter) {
         var numberReadCount:Long = 0;
         var numberArticleCount:Long = 0;
         while(values.hasNext){
                 var bean = values.next;
              numberReadCount += bean.getNumberRead;
              numberArticleCount += bean.getNumberArticle;
         }
          output.collect(key, new FlowBean(key.toString(), numberReadCount, numberArticleCount));
       }
    }
    
    class RemovallMap extends MapReduceBase with Mapper[Object, Text, Text, NullWritable] {
      private val one = new IntWritable(1)
      private val word = new Text()
      override def map(key:Object, value:Text, output:OutputCollector[Text, NullWritable], reporter:Reporter) {
        output.collect(value, NullWritable.get);
      }
    }
    
    class RemovallReduce extends MapReduceBase with Reducer[Text, NullWritable, Text, NullWritable] {
       override def reduce(key: Text, values: Iterator[NullWritable], output: OutputCollector[Text, NullWritable],  reporter: Reporter) {
         output.collect(key, NullWritable.get);
       }
    }
    
    object WordCount {
      def main(args: Array[String]) {
        var conf = new JobConf(this.getClass)
        conf.setJobName("yiyi")
        conf.setOutputKeyClass(classOf[Text])
        conf.setOutputValueClass(classOf[FlowBean])
        conf.setMapperClass(classOf[Map])
        conf.setCombinerClass(classOf[Reduce])
        conf.setReducerClass(classOf[Reduce])
        conf.setInputFormat(classOf[TextInputFormat])
        conf.setOutputFormat(classOf[TextOutputFormat[Text, FlowBean]])
        FileInputFormat.setInputPaths(conf, new Path("/Users/lihu/Desktop/crawle/tap.txt"))
        FileOutputFormat.setOutputPath(conf, new Path("/Users/lihu/Desktop/crawle/logsfajflkawa"))
        JobClient.runJob(conf)
    
      }
    }
  • 相关阅读:
    VNC跨平台远程桌面的安装与使用
    Apache 的编译安装
    Xming配置
    工作杂记
    自动创建系统用户脚本
    关于linux网络基础记录
    Linux的setup命令启动服务名称和功能
    涉密计算机检查工具
    Nginx压力测试工具之WebBench
    关于系统性能检测的一些使用
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6365034.html
Copyright © 2011-2022 走看看