zoukankan      html  css  js  c++  java
  • Spark入门(三)--Spark经典的单词统计

    spark经典之单词统计

    准备数据

    既然要统计单词我们就需要一个包含一定数量的文本,我们这里选择了英文原著《GoneWithTheWind》(《飘》)的文本来做一个数据统计,看看文章中各个单词出现频次如何。为了便于大家下载文本。可以到GitHub上下载文本以及对应的代码。我将文本放在项目的目录下。

    首先我们要读取该文件,就要用到SparkContext中的textFile的方法,我们尝试先读取第一行。

    scala实现

    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("WordCount")
    
        val sc = new SparkContext(conf)
    
        println(sc.textFile("./GoneWithTheWind").first())
      }
    
    }
    

    java实现

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    
    public class WordCountJava {
    
        public static void main(String[] args){
    
            SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
    
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            System.out.println(sc.textFile("./GoneWithTheWind").first());
            
        }
    
    }
    

    python实现

    from pyspark import SparkConf,SparkContext
    
    
    conf = SparkConf().setMaster("local").setAppName("HelloWorld")
    
    sc = SparkContext(conf=conf)
    
    print(sc.textFile("./GoneWithTheWind").first())
    

    得到输出

    Chapter 1
    

    以scala为例,其余两种语言也差不多。第一步我们创建了一个SparkConf

    val conf = new SparkConf().setMaster("local").setAppName("WordCount")
    

    这里我们设置Master为local,该程序名称为WordCount,当然程序名称可以任意取,和类名不同也无妨。但是这个Master则不能乱写,当我们在集群上运行,用spark-submit的时候,则要注意。我们现在只讨论本地的写法,因此,这里只写local。

    接着一句我们创建了一个SparkContext,这是spark的核心,我们将conf配置传入初始化

     val sc = new SparkContext(conf)
    

    最后我们将文本路径告诉SparkContext,然后输出第一行内容

    println(sc.textFile("./GoneWithTheWind").first())
    

    开始统计

    接着我们就可以开始统计文本的单词数了,因为单词是以空格划分,所以我们可以把空格作为单词的标记。

    scala实现

    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("WordCount")
    
        val sc = new SparkContext(conf)
    
        //设置数据路径
        val text = sc.textFile("./GoneWithTheWind")
    
        //将文本数据按行处理,每行按空格拆成一个数组
        // flatMap会将各个数组中元素合成一个大的集合
        val textSplit = text.flatMap(line =>line.split(" "))
    
        //处理合并后的集合中的元素,每个元素的值为1,返回一个元组(key,value)
        //其中key为单词,value这里是1,即该单词出现一次
        val textSplitFlag = textSplit.map(word => (word,1))
    
        //reduceByKey会将textSplitFlag中的key相同的放在一起处理
        //传入的(x,y)中,x是上一次统计后的value,y是本次单词中的value,即每一次是x+1
        val countWord = textSplitFlag.reduceByKey((x,y)=>x+y)
    
        //将计算后的结果存在项目目录下的result目录中
        countWord.saveAsTextFile("./result")
      }
    
    }
    

    java实现

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    public class WordCountJava {
    
        public static void main(String[] args){
    
            SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
    
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            //设置数据的路径
            JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind");
    
    
            //将文本数据按行处理,每行按空格拆成一个数组,flatMap会将各个数组中元素合成一个大的集合
            //这里需要注意的是FlatMapFunction中<String, String>,第一个表示输入,第二个表示输出
            //与Hadoop中的map-reduce非常相似
            JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    return Arrays.asList(s.split(" ")).iterator();
                }
            });
    
            //处理合并后的集合中的元素,每个元素的值为1,返回一个Tuple2,Tuple2表示两个元素的元组
            //值得注意的是上面是JavaRDD,这里是JavaPairRDD,在返回的是元组时需要注意这个区别
            //PairFunction中<String, String, Integer>,第一个String是输入值类型
            //第二第三个,String, Integer是返回值类型
            //这里返回的是一个word和一个数值1,表示这个单词出现一次
            JavaPairRDD<String, Integer> splitFlagRDD = splitRDD.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<>(s,1);
                }
            });
    
    
            //reduceByKey会将splitFlagRDD中的key相同的放在一起处理
            //传入的(x,y)中,x是上一次统计后的value,y是本次单词中的value,即每一次是x+1
            JavaPairRDD<String, Integer> countRDD = splitFlagRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer+integer2;
                }
            });
    
            //将计算后的结果存在项目目录下的result目录中
            countRDD.saveAsTextFile("./resultJava");
    
    
    
        }
    
    }
    

    python实现

    from pyspark import SparkConf,SparkContext
    
    
    conf = SparkConf().setMaster("local").setAppName("HelloWorld")
    
    sc = SparkContext(conf=conf)
    
    # 设置数据的路径
    textData = sc.textFile("./GoneWithTheWind")
    
    # 将文本数据按行处理,每行按空格拆成一个数组,flatMap会将各个数组中元素合成一个大的集合
    splitData = textData.flatMap(lambda line:line.split(" "))
    
    # 处理合并后的集合中的元素,每个元素的值为1,返回一个元组(key,value)
    # 其中key为单词,value这里是1,即该单词出现一次
    flagData = splitData.map(lambda word:(word,1))
    
    # reduceByKey会将textSplitFlag中的key相同的放在一起处理
    # 传入的(x,y)中,x是上一次统计后的value,y是本次单词中的value,即每一次是x+1
    countData = flagData.reduceByKey(lambda x,y:x+y)
    
    #输出文件
    countData.saveAsTextFile("./result")
    

    运行后在住目录下得到一个名为result的目录,该目录如下图,SUCCESS表示生成文件成功,文件内容存储在part-00000中


    我们可以查看文件的部分内容:

    ('Chapter', 1)
    ('1', 1)
    ('SCARLETT', 1)
    ('O’HARA', 1)
    ('was', 74)
    ('not', 33)
    ('beautiful,', 1)
    ('but', 32)
    ('men', 4)
    ('seldom', 3)
    ('realized', 2)
    ('it', 37)
    ('when', 19)
    ('caught', 1)
    ('by', 20)
    ('her', 65)
    ('charmas', 1)
    ('the', 336)
    ('Tarleton', 7)
    ('twins', 16)
    ('were.', 1)
    ('In', 1)
    ('face', 6)
    ('were', 49)
    ...
    ...
    ...
    ...
    

    这样就完成了一个spark的真正HelloWorld程序--单词计数。对比三个语言版本的程序,发现一个事实那就是,用scala和python写的代码非常简洁而且易懂,而Java实现的则相对复杂,难懂。当然这个易懂和难懂是相对而言的。如果你只会Java无论如何你都应该从中能看懂java的程序,而简洁的scala和python对你来说根本看不懂。这也无妨,语言只是工具,重点看你怎么用。况且,我们使用java8的特性也可以写出简洁的代码。

    java8实现

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    public class WordCountJava {
    
        public static void main(String[] args){
    
            SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
    
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            countJava8(sc);
    
        }
    
    
        public static void countJava8(JavaSparkContext sc){
    
    
            sc.textFile("./GoneWithTheWind")
              .flatMap(s->Arrays.asList(s.split(" ")).iterator())
              .mapToPair(s->new Tuple2<>(s,1))
              .reduceByKey((x,y)->x+y)
              .saveAsTextFile("./resultJava8");
    
    
        }
    
    }
    

    spark的优越性在这个小小的程序中已经有所体现,计算一本书的每个单词出现的次数,spark在单机上运行(读取文件、生成临时文件、将结果写到硬盘),加载-运行-结束只花费了2秒时间。

    对程序进行优化

    程序是否还能再简单高效呢?当然是可以的,我们可以用countByValue这个函数,这个函数正是常用的计数的方法。

    scala实现

    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("WordCount")
    
        val sc = new SparkContext(conf)
        
    
        //设置数据路径
        val text = sc.textFile("./GoneWithTheWind")
    
        //将文本数据按行处理,每行按空格拆成一个数组
        // flatMap会将各个数组中元素合成一个大的集合
        val textSplit = text.flatMap(line =>line.split(" "))
        
        println(textSplit.countByValue())
      }
    }
    

    运行得到结果

    Map(Heknew -> 1, &emsp;&emsp;“Ashley -> 1, “Let’s -> 1, anarresting -> 1, of. -> 1, pasture -> 1, war’s -> 1, wall. -> 1, looks -> 2, ain’t -> 7,.......
    

    java实现

    public class WordCountJava {
    
        public static void main(String[] args){
    
            SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
    
            JavaSparkContext sc = new JavaSparkContext(conf);
            countJava(sc);
    
        }
        
         public static void countJava(JavaSparkContext sc){
            //设置数据的路径
            JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind");
    
    
            //将文本数据按行处理,每行按空格拆成一个数组,flatMap会将各个数组中元素合成一个大的集合
            //这里需要注意的是FlatMapFunction中<String, String>,第一个表示输入,第二个表示输出
            //与Hadoop中的map-reduce非常相似
            JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    return Arrays.asList(s.split(" ")).iterator();
                }
            });
    
            System.out.println(splitRDD.countByValue());
    
        }
    
    }
    

    运行得到结果

    {Heknew=1, &emsp;&emsp;“Ashley=1, “Let’s=1, anarresting=1, of.=1, pasture=1, war’s=1, wall.=1, looks=2, ain’t=7, Clayton=1, approval.=1, ideas=1,
    

    python实现

    from pyspark import SparkConf,SparkContext
    
    
    conf = SparkConf().setMaster("local").setAppName("HelloWorld")
    
    sc = SparkContext(conf=conf)
    
    # 设置数据的路径
    textData = sc.textFile("./GoneWithTheWind")
    
    # 将文本数据按行处理,每行按空格拆成一个数组,flatMap会将各个数组中元素合成一个大的集合
    splitData = textData.flatMap(lambda line:line.split(" "))
    
    print(splitData.countByValue())
    

    运行得到结果:

    defaultdict(<class 'int'>, {'Chapter': 1, '1': 1, 'SCARLETT': 1, 'O’HARA': 1, 'was': 74, 'not': 33, 'beautiful,': 1, 'but': 32, 'men': 4, 
    
    spark的优越性在这个小小的程序中已经有所体现,计算一本书的每个单词出现的次数,spark在单机上运行(读取文件、生成临时文件、将结果写到硬盘),加载-运行-结束只花费了2秒时间。如果想要获取源代码以及数据内容,可以前往我的github下载。



    转自:https://juejin.im/post/5c768f5b6fb9a049b348a811

  • 相关阅读:
    可分离卷积详解及计算量 Basic Introduction to Separable Convolutions
    全面解读Group Normalization,对比BN,LN,IN
    高斯混合模型(GMM)及MATLAB代码
    VLAD算法浅析, BOF、FV比较
    MATLAB 可以画的各种类型的图总结
    检测算法简介及其原理——fast R-CNN,faster R-CNN,YOLO,SSD,YOLOv2,YOLOv3
    深度学习图像配准 Image Registration: From SIFT to Deep Learning
    在IIS7里配置 ISAPI,运行dll程序,总提示下载dll
    iis7下配置php出现404.17错误的解决办法
    C# Acrobat打开pdf出错,提示Acrobat.AcroPDDocClass不能强制转换为Acrobat.CAcroPDDoc,使用com组件{9B4CD3E7-4981-101B-9CA8-9240CE2738AE},HRESULT: 0x80004002
  • 原文地址:https://www.cnblogs.com/tjp40922/p/12181603.html
Copyright © 2011-2022 走看看