zoukankan      html  css  js  c++  java
  • PySpark理解wordcount.py

    在本文中, 我们借由深入剖析wordcount.py, 来揭开Spark内部各种概念的面纱。我们再次回顾wordcount.py代码来回答如下问题

    1. 对于大多数语言的Hello Word示例,都有main()函数, wordcount.py的main函数,或者说调用Spark的main() 在哪里

    2. 数据的读入,各个RDD数据如何转换

    3. map与flatMap的工作机制,以及区别

    4. reduceByKey的作用

    WordCount.py 的代码如下:

     1 from __future__ import print_function
     2 
     3 import sys
     4 from operator import add
     5 
     6 # SparkSession:是一个对Spark的编程入口,取代了原本的SQLContext与HiveContext,方便调用Dataset和DataFrame API
     7 # SparkSession可用于创建DataFrame,将DataFrame注册为表,在表上执行SQL,缓存表和读取parquet文件。
     8 from pyspark.sql import SparkSession
     9 
    10 
    11 if __name__ == "__main__":
    12 
    13     # Python 常用的简单参数传入
    14     if len(sys.argv) != 2:
    15         print("Usage: wordcount <file>", file=sys.stderr)
    16         exit(-1)
    17         
    18     # appName 为 Spark 应用设定一个应用名,改名会显示在 Spark Web UI 上
    19     # 假如SparkSession 已经存在就取得已存在的SparkSession,否则创建一个新的。
    20     spark = SparkSession
    21         .builder
    22         .appName("PythonWordCount")
    23         .getOrCreate()
    24         
    25     # 读取传入的文件内容,并写入一个新的RDD实例lines中,此条语句所做工作有些多,不适合初学者,可以截成两条语句以便理解。
    26     # map是一种转换函数,将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。原始RDD中的数据项与新RDD中的数据项是一一对应的关系。
    27     lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    28    
    29     # flatMap与map类似,但每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出 
    30     counts = lines.flatMap(lambda x: x.split(' ')) 
    31                   .map(lambda x: (x, 1)) 
    32                   .reduceByKey(add)
    33                 
    34     # collect() 在驱动程序中将数据集的所有元素作为数组返回。 这在返回足够小的数据子集的过滤器或其他操作之后通常是有用的。由于collect 是将整个RDD汇聚到一台机子上,所以通常需要预估返回数据集的大小以免溢出。             
    35     output = counts.collect()
    36     
    37     for (word, count) in output:
    38         print("%s: %i" % (word, count))
    39 
    40     spark.stop()

    Spark 入口 SparkSession

    Spark2.0中引入了SparkSession的概念,它为用户提供了一个统一的切入点来使用Spark的各项功能,这边不妨对照Http Session, 在此Spark就在充当Web service的角色,程序调用Spark功能的时候需要先建立一个Session。因此看到getOrCreate()就很容易理解了, 表明可以视情况新建session或利用已有的session。

    1     spark = SparkSession
    2         .builder
    3         .appName("PythonWordCount")
    4         .getOrCreate()

    既然将Spark 想象成一个Web server, 也就意味着可能用多个访问在进行,为了便于监控管理, 对应用命名一个恰当的名称是个好办法。Web UI并不是本文的重点,有兴趣的同学可以参考  Spark Application’s Web Console

    加载数据

    在建立SparkSession之后, 就是读入数据并写入到Dateset中。

    1  lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])

    为了更好的分解执行过程,是时候借助PySpark了, PySpark是python调用Spark的 API,它可以启动一个交互式Python Shell。为了方便脚本调试,暂时切换到Linux执行

     1 # pyspark
     2 Python 2.7.6 (default, Jun 22 2015, 17:58:13) 
     3 [GCC 4.8.2] on linux2
     4 Type "help", "copyright", "credits" or "license" for more information.
     5 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
     6 Setting default log level to "WARN".
     7 To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
     8 17/02/23 08:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
     9 17/02/23 08:30:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
    10 17/02/23 08:30:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
    11 17/02/23 08:30:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    12 Welcome to
    13       ____              __
    14      / __/__  ___ _____/ /__
    15     _ / _ / _ `/ __/  '_/
    16    /__ / .__/\_,_/_/ /_/\_   version 2.1.0
    17       /_/
    18 
    19 Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
    20 SparkSession available as 'spark'.
    21 >>> ds = spark.read.text('/home/spark2.1/spark/examples/src/main/python/a.txt')
    22 >>> type(ds)
    23 <class 'pyspark.sql.dataframe.DataFrame'>
    24 >>> print ds
    25 DataFrame[value: string]
    26 >>> lines = ds.rdd

    交互式Shell的好处是可以方便的查看变量内容和类型。此刻文件a.txt已经加载到lines中,它是RDD(Resilient Distributed Datasets)弹性分布式数据集的实例。

    RDD操作

    RDD在内存中的结构可以参考论文, 理解RDD有两点比较重要:

    一是RDD一种只读、只能由已存在的RDD变换而来的共享内存,然后将所有数据都加载到内存中,方便进行多次重用。

    二是RDD的数据默认情况下存放在集群中不同节点的内存中,本身提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。

    为了探究RDD内部的数据内容,可以利用collect()函数, 它能够以数组的形式,返回RDD数据集的所有元素。

    1 >>> lines = ds.rdd
    2 >>> for i in lines.collect():
    3 ...     print i
    4 ... 
    5 Row(value=u'These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.')

    lines存储的是Row object类型,而我们希望的是对String类型进行处理,所以需要利用map api进一步转换RDD

    1 >>> lines_map = lines.map(lambda x: x[0])
    2 >>> for i in lines_map.collect():
    3 ...     print i
    4 ... 
    5 These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.

    为了统计每个单词的出现频率,需要对每个单词分别统计,那么第一步需要将上面的字符串以空格作为分隔符将单词提取出来,并为每个词设置一个计数器。比如 These出现次数是1, 我们期望的数据结构是['There', 1]。但是如何将包含字符串的RDD转换成元素为类似 ['There', 1] 的RDD呢?

     1 >>> flat_map = lines_map.flatMap(lambda x: x.split(' '))
     2 >>> rdd_map = flat_map.map(lambda x: [x, 1])
     3 >>> for i in rdd_map.collect():
     4 ...     print i
     5 ... 
     6 [u'These', 1]
     7 [u'examples', 1]
     8 [u'give', 1]
     9 [u'a', 1]
    10 [u'quick', 1]

    下图简要的讲述了flatMap 和 map的转换过程。

    不难看出,map api只是为所有出现的单词初始化了计数器为1,并没有统计相同词,接下来这个任务由reduceByKey()来完成。在rdd_map 中,所有的词被视为一个key,而key相同的value则执行reduceByKey内的算子操作,因为统计相同key是累加操作,所以可以直接add操作。
     1 >>> from operator import add
     2 >>> add_map = rdd_map.reduceByKey(add)
     3 >>> for i in add_map.collect():
     4 ...     print i
     5 ... 
     6 (u'a', 1)
     7 (u'on', 1)
     8 (u'of', 2)
     9 (u'arbitrary', 1)
    10 (u'quick', 1)
    11 (u'the', 2)
    12 (u'or', 1)
    13 
    14 >>> print rdd_map.count()
    15 26
    16 >>> print add_map.count()
    17 23

    根据a.txt 的内容,可知只有 of 和 the 两个单词出现了两次,符合预期。

    总结

    以上的分解步骤,可以帮我们理解RDD的操作,需要提示的是,RDD将操作分为两类:transformation与action。无论执行了多少次transformation操作,RDD都不会真正执行运算,只有当action操作被执行时,运算才会触发。也就是说,上面所有的RDD都是通过collect()触发的, 那么如果将上述的transformation放入一条简练语句中, 则展现为原始wordcount.py的书写形式。

    1 counts = lines.flatMap(lambda x: x.split(' ')) 
    2                   .map(lambda x: (x, 1)) 
    3                   .reduceByKey(add)

    而真正的action 则是由collect()完成。

    1 output = counts.collect()

    至此,已经完成了对wordcount.py的深入剖析

    转自:https://www.jianshu.com/p/067907b23546?winzoom=1

  • 相关阅读:
    JHipster
    Integrating Jenkins and Apache Tomcat for Continuous Deployment
    What is the difference between apache tomcat deployer and core version?
    JEECG--去掉(增加)登陆页面验证码功能
    Protobuf一例
    进程间通信之POSIX信号量
    进程间通信之共享存储
    进程间通信之信号量
    进程间通信之消息队列
    进程间通信之XSI IPC
  • 原文地址:https://www.cnblogs.com/luozeng/p/9097270.html
Copyright © 2011-2022 走看看