zoukankan      html  css  js  c++  java
  • Spark开发wordcount程序

    1、java版本(spark-2.1.0)

    package chavin.king;

    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;

    import scala.Tuple2;

    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;

    import java.util.Arrays;
    import java.util.Iterator;

    import org.apache.spark.SparkConf;

    public class WordCount {

        public static void main(String[] args) {
             // TODO Auto-generated method stub

            //初始化spark应用
             SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local");
             JavaSparkContext sc = new JavaSparkContext(conf);
            
             //读取文件
             JavaRDD<String> lines = sc.textFile("E://test//spark_wc.txt");

            //将每一行切割成单词
             JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

                public Iterator<String> call(String line) throws Exception {
                     return Arrays.asList(line.split(" ")).iterator();
                 }

            });

            //将每个单词映射成(word,1)格式
             JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

                public Tuple2<String, Integer> call(String word) throws Exception {
                     return new Tuple2<String, Integer>(word, 1);
                 }

            });

            //计算每个单词出现次数
             JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

                public Integer call(Integer v1, Integer v2) throws Exception {
                     return v1 + v2;
                 }

            });

            //打印输出
             wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {

                public void call(Tuple2<String, Integer> wordCount) throws Exception {
                     System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
                 }

            });

            //关闭SparkContext
             sc.close();

        }

    }

    2、scala版本

    package chavin.king

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext

    object WordCountLocal {

      def main(args: Array[String]) {
        
         val conf = new SparkConf().setAppName("WordCount").setMaster("local")
         val sc = new SparkContext(conf)

        val lines = sc.textFile("E://test//spark_wc.txt", 1)
         val words = lines.flatMap { line => line.split(" ") }
         val pairs = words.map { word => (word, 1) }
         val wordCounts = pairs.reduceByKey { _ + _ }

        wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))
        
       }

    }

  • 相关阅读:
    内存
    TCP/IP
    安装
    linux常用命令
    linux文本处理三剑客之 grep
    tail命令:显示文件结尾的内容
    less命令:查看文件内容
    head命令:显示文件开头内容
    改进Zhang Suen细化算法的C#实现
    【转】在VS2010上使用C#调用非托管C++生成的DLL文件(图文讲解)
  • 原文地址:https://www.cnblogs.com/wcwen1990/p/10253386.html
Copyright © 2011-2022 走看看