zoukankan      html  css  js  c++  java
  • Spark开发wordcount程序

    1、java版本(spark-2.1.0)

    package chavin.king;

    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;

    import scala.Tuple2;

    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;

    import java.util.Arrays;
    import java.util.Iterator;

    import org.apache.spark.SparkConf;

    public class WordCount {

        public static void main(String[] args) {
             // TODO Auto-generated method stub

            //初始化spark应用
             SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local");
             JavaSparkContext sc = new JavaSparkContext(conf);
            
             //读取文件
             JavaRDD<String> lines = sc.textFile("E://test//spark_wc.txt");

            //将每一行切割成单词
             JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

                public Iterator<String> call(String line) throws Exception {
                     return Arrays.asList(line.split(" ")).iterator();
                 }

            });

            //将每个单词映射成(word,1)格式
             JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

                public Tuple2<String, Integer> call(String word) throws Exception {
                     return new Tuple2<String, Integer>(word, 1);
                 }

            });

            //计算每个单词出现次数
             JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

                public Integer call(Integer v1, Integer v2) throws Exception {
                     return v1 + v2;
                 }

            });

            //打印输出
             wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {

                public void call(Tuple2<String, Integer> wordCount) throws Exception {
                     System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
                 }

            });

            //关闭SparkContext
             sc.close();

        }

    }

    2、scala版本

    package chavin.king

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext

    object WordCountLocal {

      def main(args: Array[String]) {
        
         val conf = new SparkConf().setAppName("WordCount").setMaster("local")
         val sc = new SparkContext(conf)

        val lines = sc.textFile("E://test//spark_wc.txt", 1)
         val words = lines.flatMap { line => line.split(" ") }
         val pairs = words.map { word => (word, 1) }
         val wordCounts = pairs.reduceByKey { _ + _ }

        wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))
        
       }

    }

  • 相关阅读:
    GeneXus笔记本—城市级联下拉
    GeneXus笔记本—获取当月的最后一天
    GeneXus笔记本——创建一个知识库 哈哈哈哈!
    GeneXus笔记本——入门篇
    Android Studio 学习笔记1.1 创建自己的第一个安卓项目并且打包APK
    初入Android Studio的我
    随笔
    在Azuer创建自己的Linux_VM
    获得自己电脑的SSH公匙
    数据库内连接、外连接以及左右连接的区别
  • 原文地址:https://www.cnblogs.com/wcwen1990/p/10253386.html
Copyright © 2011-2022 走看看