zoukankan      html  css  js  c++  java
  • Scala,Java,Python 3种语言编写Spark WordCount示例

    首先,我先定义一个文件,hello.txt,里面的内容如下:

    hello spark
    hello hadoop
    hello flink
    hello storm

    Scala方式

    scala版本是2.11.8。

    配置maven文件,三个依赖:

    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0-cdh5.7.0</version>
    </dependency>
    <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
    </dependency>
    package com.darrenchan.spark
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkCoreApp2 {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCountApp")
        val sc = new SparkContext(sparkConf)
    
        //业务逻辑
        val counts = sc.textFile("D:\hello.txt").
          flatMap(_.split(" ")).
          map((_, 1)).
          reduceByKey(_+_)
    
        println(counts.collect().mkString("
    "))
    
        sc.stop()
      }
    }

    运行结果:

    Java方式

    Java8,用lamda表达式。

    package com.darrenchan.spark.javaapi;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.SparkSession;
    import scala.Tuple2;
    
    import java.util.Arrays;
    
    public class WordCountApp2 {
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCountApp");
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
    
            //业务逻辑
            JavaPairRDD<String, Integer> counts =
                    sc.textFile("D:\hello.txt").
                            flatMap(line -> Arrays.asList(line.split(" ")).iterator()).
                            mapToPair(word -> new Tuple2<>(word, 1)).
                            reduceByKey((a, b) -> a + b);
    
            System.out.println(counts.collect());
    
            sc.stop();
        }
    }

    运行结果:

    Python方式

    Python 3.6.5。

    from pyspark import SparkConf, SparkContext
    
    def main():
        # 创建SparkConf,设置Spark相关的参数信息
        conf = SparkConf().setMaster("local[2]").setAppName("spark_app")
        # 创建SparkContext
        sc = SparkContext(conf=conf)
    
        # 业务逻辑开发
        counts = sc.textFile("D:\hello.txt").
            flatMap(lambda line: line.split(" ")).
            map(lambda word: (word, 1)).
            reduceByKey(lambda a, b: a + b)
    
        print(counts.collect())
    
        sc.stop()
    
    
    if __name__ == '__main__':
        main()

    运行结果:

    使用Python在Windows下运行Spark有很多坑,详见如下链接:

    http://note.youdao.com/noteshare?id=aad06f5810f9463a94a2d42144279ea4

  • 相关阅读:
    关于委托的一篇不错的文章(C# 中的委托和事件)
    李建忠老师的《.net框架程序设计(修订版)》电子书下载地址,超级推荐
    普通无线路由变成纯AP模式
    CLR到底是什么?是怎么工作的?
    HTTP协议的三个问题
    桌面战争——揭秘中国互联网的里程碑之战
    B2C这点事儿
    不用baidu,不用google,你有bing啊
    让.net程序脱离.NET Framework在Linux下运行
    哥乃一介光棍
  • 原文地址:https://www.cnblogs.com/DarrenChan/p/11052534.html
Copyright © 2011-2022 走看看