import org.apache.hadoop.conf.{Configuration, Configured}; import org.apache.hadoop.util.{ToolRunner, Tool}; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.io.{LongWritable, Text, IntWritable}; import org.apache.hadoop.mapreduce.{Reducer, Mapper, Job}; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.io.NullWritable import javax.swing.text.html.CSS.Value import flowsum.FlowBean import java.util.function.ToLongFunction import org.supercsv.cellprocessor.ParseLong object scala_test extends Configured with Tool { class RemovalMap extends Mapper[Object, Text, Text, NullWritable] { override def map(key: Object, rowLine: Text, context: Mapper[Object, Text, Text, NullWritable]#Context) { context.write(rowLine, NullWritable.get); } } class RemovalReduce extends Reducer[Text, NullWritable, Text, NullWritable] { override def reduce(key: Text, values: Iterable[NullWritable], context: Reducer[Text, NullWritable, Text, NullWritable]#Context) { context.write(key, NullWritable.get); } } class StatisticsMap extends Mapper[LongWritable, Text, Text, FlowBean] { override def map(key: LongWritable, rowLine: Text, context: Mapper[LongWritable, Text, Text, FlowBean]#Context) { val line = rowLine.toString(); if (line.isEmpty) return; val tokens: Array[String] = line.split(" "); var username : String = tokens(1); context.write(new Text(username), new FlowBean(username, tokens(2).toLong, 1)); } } class StatisticsReduce extends Reducer[Text, FlowBean, Text, FlowBean] { private var count: IntWritable = new IntWritable(); override def reduce(key: Text, values: Iterable[FlowBean], context: Reducer[Text, FlowBean, Text, FlowBean]#Context) { var numberReadCount:Long = 0; var numberArticleCount:Long = 0; for (bean:FlowBean <- values){ numberReadCount += bean.getNumberRead; numberArticleCount += bean.getNumberArticle; } context.write(key, new FlowBean(key.toString(), numberReadCount, numberArticleCount)); } } def run(args: Array[String]) = { val conf = super.getConf(); val job = new Job(conf, "removal"); job.setJarByClass(this.getClass); job.setMapperClass(classOf[RemovalMap]); job.setReducerClass(classOf[RemovalReduce]); job.setCombinerClass(classOf[RemovalReduce]); job.setOutputKeyClass(classOf[Text]); job.setOutputValueClass(classOf[NullWritable]); FileInputFormat.addInputPath(job, new Path("/Users/lihu/Desktop/crawle/tap.txt")); FileOutputFormat.setOutputPath(job, new Path("/Users/lihu/Desktop/crawle/quchong")); val job1 = new Job(conf); job1.setJarByClass(this.getClass); job1.setMapperClass(classOf[StatisticsMap]); job1.setReducerClass(classOf[StatisticsReduce]); job1.setCombinerClass(classOf[StatisticsReduce]); job1.setOutputKeyClass(classOf[Text]); job1.setOutputValueClass(classOf[FlowBean]); FileInputFormat.addInputPath(job1, new Path("/Users/lihu/Desktop/crawle/quchong")); FileOutputFormat.setOutputPath(job1, new Path("/Users/lihu/Desktop/crawle/logss")); //提交job1及job2,并等待完成 if (job.waitForCompletion(true)) { val status = job1.waitForCompletion(true); println(status); if (status) 0 else 1; }else { println(0); 0; } } def main(args: Array[String]) { val conf: Configuration = new Configuration(); System.exit(ToolRunner.run(conf, this, args)); } }