package first_mapreduce_scala import org.apache.hadoop.mapreduce.Job import java.util._ import org.apache.hadoop.fs.Path import org.apache.hadoop.conf._ import org.apache.hadoop.io._ import org.apache.hadoop.mapred._ import org.apache.hadoop.util._ import flowsum.FlowBean import org.apache.hadoop.io.NullWritable class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, FlowBean] { private val one = new IntWritable(1) private val word = new Text() override def map(key:LongWritable, value:Text, output:OutputCollector[Text, FlowBean], reporter:Reporter) { val line = value.toString() if (line.isEmpty) return val tokens: Array[String] = line.split(" ") var username : String = tokens(1) output.collect(new Text(username), new FlowBean(username, tokens(2).toLong, 1)) } } class Reduce extends MapReduceBase with Reducer[Text, FlowBean, Text, FlowBean] { override def reduce(key: Text, values: Iterator[FlowBean], output: OutputCollector[Text, FlowBean], reporter: Reporter) { var numberReadCount:Long = 0; var numberArticleCount:Long = 0; while(values.hasNext){ var bean = values.next; numberReadCount += bean.getNumberRead; numberArticleCount += bean.getNumberArticle; } output.collect(key, new FlowBean(key.toString(), numberReadCount, numberArticleCount)); } } class RemovallMap extends MapReduceBase with Mapper[Object, Text, Text, NullWritable] { private val one = new IntWritable(1) private val word = new Text() override def map(key:Object, value:Text, output:OutputCollector[Text, NullWritable], reporter:Reporter) { output.collect(value, NullWritable.get); } } class RemovallReduce extends MapReduceBase with Reducer[Text, NullWritable, Text, NullWritable] { override def reduce(key: Text, values: Iterator[NullWritable], output: OutputCollector[Text, NullWritable], reporter: Reporter) { output.collect(key, NullWritable.get); } } object WordCount { def main(args: Array[String]) { var conf = new JobConf(this.getClass) conf.setJobName("yiyi") conf.setOutputKeyClass(classOf[Text]) conf.setOutputValueClass(classOf[FlowBean]) conf.setMapperClass(classOf[Map]) conf.setCombinerClass(classOf[Reduce]) conf.setReducerClass(classOf[Reduce]) conf.setInputFormat(classOf[TextInputFormat]) conf.setOutputFormat(classOf[TextOutputFormat[Text, FlowBean]]) FileInputFormat.setInputPaths(conf, new Path("/Users/lihu/Desktop/crawle/tap.txt")) FileOutputFormat.setOutputPath(conf, new Path("/Users/lihu/Desktop/crawle/logsfajflkawa")) JobClient.runJob(conf) } }