zoukankan      html  css  js  c++  java
  • Spark开发wordcount程序

    1、java版本(spark-2.1.0)

    package chavin.king;

    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;

    import scala.Tuple2;

    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;

    import java.util.Arrays;
    import java.util.Iterator;

    import org.apache.spark.SparkConf;

    public class WordCount {

        public static void main(String[] args) {
             // TODO Auto-generated method stub

            //初始化spark应用
             SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local");
             JavaSparkContext sc = new JavaSparkContext(conf);
            
             //读取文件
             JavaRDD<String> lines = sc.textFile("E://test//spark_wc.txt");

            //将每一行切割成单词
             JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

                public Iterator<String> call(String line) throws Exception {
                     return Arrays.asList(line.split(" ")).iterator();
                 }

            });

            //将每个单词映射成(word,1)格式
             JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

                public Tuple2<String, Integer> call(String word) throws Exception {
                     return new Tuple2<String, Integer>(word, 1);
                 }

            });

            //计算每个单词出现次数
             JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

                public Integer call(Integer v1, Integer v2) throws Exception {
                     return v1 + v2;
                 }

            });

            //打印输出
             wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {

                public void call(Tuple2<String, Integer> wordCount) throws Exception {
                     System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
                 }

            });

            //关闭SparkContext
             sc.close();

        }

    }

    2、scala版本

    package chavin.king

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext

    object WordCountLocal {

      def main(args: Array[String]) {
        
         val conf = new SparkConf().setAppName("WordCount").setMaster("local")
         val sc = new SparkContext(conf)

        val lines = sc.textFile("E://test//spark_wc.txt", 1)
         val words = lines.flatMap { line => line.split(" ") }
         val pairs = words.map { word => (word, 1) }
         val wordCounts = pairs.reduceByKey { _ + _ }

        wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))
        
       }

    }

  • 相关阅读:
    Java并发编程:synchronized
    对一致性Hash算法,Java代码实现的深入研究
    在Xcode中使用Git进行源码版本控制
    这些 Git 技能够你用一年了
    简明 Git 命令速查表
    Git 分支管理是一门艺术
    2016年Web前端面试题目汇总
    iOS 面试基础题目
    Json数据交互格式介绍和比较
    SpringBoot端口和访问路径
  • 原文地址:https://www.cnblogs.com/wcwen1990/p/10253386.html
Copyright © 2011-2022 走看看