zoukankan      html  css  js  c++  java
  • Scala开发Hadoop示例

    import org.apache.hadoop.conf.{Configuration, Configured};
    import org.apache.hadoop.util.{ToolRunner, Tool};
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.io.{LongWritable, Text, IntWritable};
    import org.apache.hadoop.mapreduce.{Reducer, Mapper, Job};
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    
    /**
     * Created with IntelliJ IDEA.
     * User: riley
     * Date: 8/26/13
     * Time: 1:58 PM
     */
    object WordCount extends Configured with Tool
    {
        class Map extends Mapper[LongWritable, Text, Text, IntWritable]
        {
            private val one: IntWritable = new IntWritable(1);
            private var word: Text;
    
            override def map(key: LongWritable, rowLine: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context)
            {
                val line = rowLine.toString();
                if (line.isEmpty) return;
    
                val tokens: Array[String] = line.split(" ");
                for (item: String <- tokens) {
                    word.set(item);
                    context.write(word, one);
                }
            }
        }
    
        class Reduce extends Reducer[Text, IntWritable, Text, IntWritable]
        {
            private var count: IntWritable = new IntWritable();
    
            override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context)
            {
                var sum: Int = 0;
    
                for (i: IntWritable <- values) sum = sum + i.get();
    
                count.set(sum);
                context.write(key, count);
            }
        }
    
        def run(args: Array[String]) =
        {
            val conf = super.getConf();
            val job = new Job(conf, "WordCount");
    
            job.setJarByClass(this.getClass);
            job.setOutputKeyClass(classOf[Text]);
            job.setOutputValueClass(classOf[IntWritable]);
    
            job.setMapperClass(classOf[Map]);
            job.setReducerClass(classOf[Reduce]);
            job.setCombinerClass(classOf[Reduce]);
    
            FileInputFormat.addInputPath(job, new Path(args(0)));
            FileOutputFormat.setOutputPath(job, new Path(args(1)));
    
            val status = job.waitForCompletion(true);
            if (status) 0 else 1;
        }
    
        def main(args: Array[String])
        {
            val conf: Configuration = new Configuration();
            System.exit(ToolRunner.run(conf, this, args));
        }
    }
  • 相关阅读:
    fork安全的gettid高效实现
    TCP_DEFER_ACCEPT的坑
    TCP Linger的坑
    Blade和其他构建工具有什么不同
    在Blade中结合gperftools检查内存泄露
    GraphViz web版
    用户场景分析
    java-二维数组——with 刘童格
    java-四则运算-五-网页版--with刘童格
    java-四则运算-四
  • 原文地址:https://www.cnblogs.com/rilley/p/5430935.html
Copyright © 2011-2022 走看看