zoukankan      html  css  js  c++  java
  • Spark初步 从wordcount开始

    Spark初步-从wordcount开始

    spark中自带的example,有一个wordcount例子,我们逐步分析wordcount代码,开始我们的spark之旅。

    准备工作

    把README.md文件复制到当前的文件目录,启动jupyter,编写我们的代码。

    README.md文件在Spark的根目录下。

    from pyspark.sql import SparkSession
    from operator import add
    
    # 初始化spark实例,并把应用命名为wordcount
    spark = SparkSession.builder.appName("WordCount").getOrCreate()
    
    # 从文件读取内容
    # 此时data为dataframe格式,每一行为文件中的一行
    data = spark.read.text("README.md")
    
    # 查看第一行数据
    f = data.first()
    f
    
    Row(value='# Apache Spark')
    
    # 查看前5行数据
    data.take(5)
    
    [Row(value='# Apache Spark'),
     Row(value=''),
     Row(value='Spark is a fast and general cluster computing system for Big Data. It provides'),
     Row(value='high-level APIs in Scala, Java, Python, and R, and an optimized engine that'),
     Row(value='supports general computation graphs for data analysis. It also supports a')]
    
    # 把数据转换为rdd格式,并取出值
    data2 = data.rdd.map(lambda x: x[0])
    # 查看第一行数据,可以看到数据为string格式
    data2.first()
    
    '# Apache Spark'
    
    # 对于每行按照空格来分割,并把结果拉平
    data3 = data2.flatMap(lambda x: x.split(' '))
    # 查看前5个数据,可以看到已经分割为单个词了
    data3.take(5)
    
    ['#', 'Apache', 'Spark', '', 'Spark']
    
    # 为每个单词标记次数1
    data4 = data3.map(lambda x: (x,1))
    # 结果为turple类型,前面是key,后面的数字为单词的次数
    data4.take(5)
    
    [('#', 1), ('Apache', 1), ('Spark', 1), ('', 1), ('Spark', 1)]
    
    # 汇总统计每个单词出现的次数
    data5 = data4.reduceByKey(add)
    # 结果为turple类型,数字为单词的出现次数
    data5.take(10)
    
    [('#', 1),
     ('Apache', 1),
     ('Spark', 16),
     ('', 71),
     ('is', 6),
     ('a', 8),
     ('fast', 1),
     ('and', 9),
     ('general', 3),
     ('cluster', 2)]
    
    # 按照出现次数多少来排序
    res = data5.sortBy(lambda x: x[1], ascending=False).collect()
    res[:5]
    
    [('', 71), ('the', 24), ('to', 17), ('Spark', 16), ('for', 12)]
    

    完整代码

    from pyspark.sql import SparkSession
    from operator import add
    
    spark = SparkSession.builder.appName("WordCount").getOrCreate()
    data = spark.read.text("README.md")
    
    data1 = data.rdd.map(lambda x: x[0])
    
    data2 = data1.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
    
    res = data2.sortBy(lambda x: x[1], ascending=False).collect()
    
    print(res[:10])
    
    
    [('', 71), ('the', 24), ('to', 17), ('Spark', 16), ('for', 12), ('and', 9), ('##', 9), ('a', 8), ('can', 7), ('on', 7)]
  • 相关阅读:
    堆排序实现
    Unable to convert MySQL date/time value to System.DateTime
    想想那些除了技术之外重要的事情
    js小功能
    01输入框回车触发操作
    javascript闭包
    BQ27510 电量计的校准 的 C语言实现
    嵌入式Linux开发教程:Linux常见命令(上篇)
    技术团队的情绪与效率
    如何有效使用Project(2)——进度计划的执行与监控
  • 原文地址:https://www.cnblogs.com/StitchSun/p/10535822.html
Copyright © 2011-2022 走看看