zoukankan      html  css  js  c++  java
  • 多种语言开发Spark-以WordCount为例

    Spark是目前最火爆的大数据计算框架,有赶超Hadoop MapReduce的趋势。因此,趁着现在还有大多数人不懂得Spark开发的,赶紧好好学习吧,为了使不同的开发人员能够很好的利用Spark,Spark官方提供了不同开发语言的API,本文以大数据经典入门案例WordCount为例,开发多个版本的Spark应用程序,以满足不同的开发人员需求。

    一、Scala:

      

        val conf: SparkConf = new SparkConf().setMaster("local")
        val sc: SparkContext = new SparkContext(conf)
        sc.textFile("test")
          .flatMap(line => {
            line.split("	")
          })
          .mapPartitions(iter => {
            val list: List[(String, Int)] = List[(String, Int)]()
            iter.foreach(word => {
              list.::((word,1))
            })
            list.iterator
          })
          .reduceByKey(_ + _)
          .saveAsTextFile("result")

    二、JDK1.7及以下版本:

      

    SparkConf conf = new SparkConf().setAppName("JavaSparkTest").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    sc.textFile("test")
            .flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterable<String> call(String t) throws Exception {
                    return Arrays.asList(t.split("	"));
                }
            }).mapToPair(new PairFunction<String, String, Integer>() {
    
                @Override
                public Tuple2<String, Integer> call(String t) throws Exception {
                    return new Tuple2<String, Integer>(t, 1);
                }
                
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
    
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1+v2;
                }
            }).saveAsTextFile("result");

    三、JDK1.8:

      由于JDK1.8加入了新特性——函数式编程,因此,可以利用JDK1.8的新特性简化Java开发Spark的语句。

    SparkConf conf = new SparkConf().setAppName("JavaSparkTest").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    sc.textFile("test")
            .flatMap(line -> {
                return Arrays.asList(line.split("	"));
            }).mapToPair(word -> {
                return new Tuple2<String, Integer>(word, 1);
            }).reduceByKey((x, y) -> {
                return x + y;
            }).saveAsTextFile("result");

      是不是觉得比上述的Scala还简洁呢?其实是这样的,Scala中使用了mapPartitions是对map函数的优化,即对每一个RDD的分区进行map操作,这样就减少了对象的创建,从而加速了计算。而Java中,通过我的测试,不能使用mapPartitions方法进行上述优化,只能使用map方法(不知道为啥),这样也可以使用,但是在大数据集面前,其性能就逊色于mapPartitions了。

     四、Python:

    from pyspark import SparkContext
    from pyspark import SparkConf as conf
    conf.setAppName("WordCount").setMaster("local")
    sc = SparkContext(conf)
    
    text_file = sc.textFile("test")
        .flatMap(lambda line: line.split("	"))
        .map(lambda word: (word, 1))
        .reduceByKey(lambda x, y: x + y)
        .saveAsTextFile("test")
  • 相关阅读:
    简简单单制作鼠标静态动态 ani cur 小结 鼠标形状指针
    【VB6 学习文档管理系统源码】
    Delphi 中的全局快捷键+给指定窗体发送按键
    C# 委托实例实现的多种类型
    PyCharm 上传项目到码云托管平台
    vs rdlc 设置Tablix 在新页面重复表头
    .net C# Chart控件的简单使用
    发邮件,阿里云,未指定邮件服务器端口导致的报错
    使用Quartz Job 简单的做一个定时服务
    FromBase64String 输入的不是有效的 Base-64 字符串,因为它包含非 Base-64 字符、两个以上的填充字符,或者填充字符间包含非法字符
  • 原文地址:https://www.cnblogs.com/cstzhou/p/6553565.html
Copyright © 2011-2022 走看看