zoukankan      html  css  js  c++  java
  • mapreduce用scala分析百度百家上作者发布的文章数量和总阅读量

    import org.apache.hadoop.conf.{Configuration, Configured};
    
    import org.apache.hadoop.util.{ToolRunner, Tool};
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.io.{LongWritable, Text, IntWritable};
    import org.apache.hadoop.mapreduce.{Reducer, Mapper, Job};
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.io.NullWritable
    import javax.swing.text.html.CSS.Value
    import flowsum.FlowBean
    import java.util.function.ToLongFunction
    import org.supercsv.cellprocessor.ParseLong
    
    
    object scala_test extends Configured with Tool
    {
      
       class RemovalMap extends  Mapper[Object, Text, Text, NullWritable]
      {
         override def map(key: Object, rowLine: Text, context: Mapper[Object, Text, Text, NullWritable]#Context)
            {
              context.write(rowLine, NullWritable.get);
            }
      }
       
       
       class RemovalReduce extends Reducer[Text, NullWritable, Text, NullWritable]
     {
             override  def reduce(key: Text, values: Iterable[NullWritable], context: Reducer[Text, NullWritable, Text, NullWritable]#Context)
            {
                context.write(key, NullWritable.get);
            }
     }
       
       class StatisticsMap extends Mapper[LongWritable, Text, Text, FlowBean]
        {
            override def map(key: LongWritable, rowLine: Text, context: Mapper[LongWritable, Text, Text, FlowBean]#Context)
            {
                val line = rowLine.toString();
                if (line.isEmpty) return;
    
                val tokens: Array[String] = line.split(" ");
                var username : String = tokens(1);
                context.write(new Text(username), new FlowBean(username, tokens(2).toLong, 1));
            }
        }
    
        class StatisticsReduce extends Reducer[Text, FlowBean, Text, FlowBean]
        {
            private var count: IntWritable = new IntWritable();
           override def reduce(key: Text, values: Iterable[FlowBean], context: Reducer[Text, FlowBean, Text, FlowBean]#Context)
            {
               var numberReadCount:Long = 0;
               var numberArticleCount:Long = 0;
               for (bean:FlowBean <- values){
                 numberReadCount += bean.getNumberRead;
                 numberArticleCount += bean.getNumberArticle;
               }
               context.write(key, new FlowBean(key.toString(), numberReadCount, numberArticleCount));
            }
        }
    
        def run(args: Array[String]) =
        {
            val conf = super.getConf();
            
            val job = new Job(conf, "removal");
            job.setJarByClass(this.getClass);
            job.setMapperClass(classOf[RemovalMap]);
            job.setReducerClass(classOf[RemovalReduce]);
            job.setCombinerClass(classOf[RemovalReduce]);
            job.setOutputKeyClass(classOf[Text]);
            job.setOutputValueClass(classOf[NullWritable]);
            FileInputFormat.addInputPath(job, new Path("/Users/lihu/Desktop/crawle/tap.txt"));
            FileOutputFormat.setOutputPath(job, new Path("/Users/lihu/Desktop/crawle/quchong"));
    
            val job1 = new Job(conf);
            job1.setJarByClass(this.getClass);
            job1.setMapperClass(classOf[StatisticsMap]);
            job1.setReducerClass(classOf[StatisticsReduce]);
            job1.setCombinerClass(classOf[StatisticsReduce]);
            job1.setOutputKeyClass(classOf[Text]);
            job1.setOutputValueClass(classOf[FlowBean]);
            FileInputFormat.addInputPath(job1, new Path("/Users/lihu/Desktop/crawle/quchong"));
            FileOutputFormat.setOutputPath(job1, new Path("/Users/lihu/Desktop/crawle/logss"));
    
            //提交job1及job2,并等待完成
            if (job.waitForCompletion(true)) {
                 val status = job1.waitForCompletion(true);
                 println(status);
               if (status) 0 else 1;
            }else {
              println(0);
              0;
            }
           
        }
    
        def main(args: Array[String])
        {
            val conf: Configuration = new Configuration();
            System.exit(ToolRunner.run(conf, this, args));
        }
    }
  • 相关阅读:
    gearman管理
    php运行方式
    gearman mysql持久化
    gearman安装及初次使用
    消息队列各种比较
    IOC
    post提交/文件上传服务器修改
    protobuf php
    thrift 安装介绍
    qt中使用opencv处理图片 QImage 和 IplImage 相互之间转换问题
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6364580.html
Copyright © 2011-2022 走看看