zoukankan      html  css  js  c++  java
  • 使用java开发spark的wordcount程序(多种实现)

    package spark;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.SparkSession;
    import scala.Tuple2;
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * Created by kkxwz on 2018/5/24
     */
    public class WordCountApp {
    
        public static void main(String[] args) {
    
    //        //spark 2.0版本之前
    //        SparkConf sparkConf = new SparkConf().setAppName("WordCountApp").setMaster("local[2]");
    //        JavaSparkContext spark = new JavaSparkContext(sparkConf);
    //        JavaRDD<String> lines= spark.textFile("/Users/zl/data/sparksqldata/hello.txt");
    
    //        spark 2.0版本之后(建议)
            SparkSession spark = SparkSession.builder()
                    .master("local[2]")
                    .appName("WordCountApp")
                    .getOrCreate();
    
            JavaRDD<String> lines= spark.read().textFile("/Users/zl/data/sparksqldata/hello.txt").javaRDD();
            JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split("	")).iterator());
            JavaPairRDD<String, Integer> counts = words
                    .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
                    .reduceByKey((x, y)-> x+y);
    
            //第一种输出方式:
            counts.foreach(count -> System.out.println(count._1() + ":" + count._2()));
    
            //第二种输出方式:
    //        List<Tuple2<String, Integer>> output = counts.collect();
    //
    //        for(Tuple2<String, Integer> tuple : output){
    //            System.out.println(tuple._1() + ":" + tuple._2());
    //        }
    
            spark.stop();
        }
    
    }
    
    // PS:
    //   1、jdk版本至少为1.8
    //   2、最好关联源码,查看返回类型学习!!!
    

      

  • 相关阅读:
    CODEVS——T 2618 核电站问题
    Spring使用AspectJ注解和XML配置实现AOP
    oracle存储过程
    oracle什么时候需要commit
    短信发送接口被恶意访问
    JAVA内存模型
    构造函数,静态代码块,构造代码块
    mybatis缓存
    volatile和synchronized
    利用反射创建对象必须要显式的声明构造方法吗?
  • 原文地址:https://www.cnblogs.com/kkxwz/p/9083796.html
Copyright © 2011-2022 走看看