zoukankan      html  css  js  c++  java
  • scala JobConf实现

    package first_mapreduce_scala
    
    import org.apache.hadoop.mapreduce.Job
    import java.util._
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.conf._
    import org.apache.hadoop.io._
    import org.apache.hadoop.mapred._
    import org.apache.hadoop.util._
    import flowsum.FlowBean
    import org.apache.hadoop.io.NullWritable
    
    class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, FlowBean] {
      private val one = new IntWritable(1)
      private val word = new Text()
      override def map(key:LongWritable, value:Text, output:OutputCollector[Text, FlowBean], reporter:Reporter) {
         val line = value.toString()
         if (line.isEmpty) return
         val tokens: Array[String] = line.split(" ")
         var username : String = tokens(1)
         output.collect(new Text(username), new FlowBean(username, tokens(2).toLong, 1))
      }
    }
    
    class Reduce extends MapReduceBase with Reducer[Text, FlowBean, Text, FlowBean] {
       override def reduce(key: Text, values: Iterator[FlowBean], output: OutputCollector[Text, FlowBean],  reporter: Reporter) {
         var numberReadCount:Long = 0;
         var numberArticleCount:Long = 0;
         while(values.hasNext){
                 var bean = values.next;
              numberReadCount += bean.getNumberRead;
              numberArticleCount += bean.getNumberArticle;
         }
          output.collect(key, new FlowBean(key.toString(), numberReadCount, numberArticleCount));
       }
    }
    
    class RemovallMap extends MapReduceBase with Mapper[Object, Text, Text, NullWritable] {
      private val one = new IntWritable(1)
      private val word = new Text()
      override def map(key:Object, value:Text, output:OutputCollector[Text, NullWritable], reporter:Reporter) {
        output.collect(value, NullWritable.get);
      }
    }
    
    class RemovallReduce extends MapReduceBase with Reducer[Text, NullWritable, Text, NullWritable] {
       override def reduce(key: Text, values: Iterator[NullWritable], output: OutputCollector[Text, NullWritable],  reporter: Reporter) {
         output.collect(key, NullWritable.get);
       }
    }
    
    object WordCount {
      def main(args: Array[String]) {
        var conf = new JobConf(this.getClass)
        conf.setJobName("yiyi")
        conf.setOutputKeyClass(classOf[Text])
        conf.setOutputValueClass(classOf[FlowBean])
        conf.setMapperClass(classOf[Map])
        conf.setCombinerClass(classOf[Reduce])
        conf.setReducerClass(classOf[Reduce])
        conf.setInputFormat(classOf[TextInputFormat])
        conf.setOutputFormat(classOf[TextOutputFormat[Text, FlowBean]])
        FileInputFormat.setInputPaths(conf, new Path("/Users/lihu/Desktop/crawle/tap.txt"))
        FileOutputFormat.setOutputPath(conf, new Path("/Users/lihu/Desktop/crawle/logsfajflkawa"))
        JobClient.runJob(conf)
    
      }
    }
  • 相关阅读:
    HDU 3879 Base Station 最大权闭合图
    自己定义头像处理,轻巧有用,非常多强大的小技巧在里面哦,快来赞美我一下吧^_^
    【Spring实战】—— 8 自动装配
    【Spring实战】—— 7 复杂集合类型的注入
    【Spring实战】—— 6 内部Bean
    【Spring实战】—— 5 设值注入
    【Spring实战】—— 4 Spring中bean的init和destroy方法讲解
    【Spring实战】—— 3 使用facotry-method创建单例Bean总结
    【Spring实战】—— 2 构造注入
    【Spring实战】—— 1 入门讲解
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6365034.html
Copyright © 2011-2022 走看看