zoukankan      html  css  js  c++  java
  • 10分钟入门spark

    Spark是硅谷各大公司都在使用的当红炸子鸡,而且有愈来愈热的趋势,所以大家很有必要了解学习这门技术。本文其实是笔者深入浅出hadoop系列的第三篇,标题里把hadoop去掉了因为spark可以不依赖于Hadoop。Spark可以运行在多种持久化系统之上,比如HDFS, Amazon S3, Azure Storage, Cassandra, Kafka。把深入浅出去掉了是因为Spark功能实在太强大(Spark SQL, Spark Streaming, Spark GraphX, Spark MLlib),本文只能抛砖引玉帮大家节省时间入个门,打算以后分专题深入总结一下sql, graph, streaming和machine learning方面的应用。

    Spark History

    • 2009 project started at UC berkeley's AMP lab
    • 2012 first release (0.5)
    • 2014 became top level apache project
    • 2014 1.0
    • 2015 1.5
    • 2016 2.0

    Spark 起源

    map reduce以及之前系统存在的问题

    • cluster memory 没有被有效运用
    • map reduce重复冗余使用disk I/O,比如前一个job的output存在硬盘中,然后作为下一个job的input,这部分disk I/O如果都在内存中是可以被节省下来的。这一点在ad hoc query上尤为突出,会产生大量的中间文件,而且completion time比中间文件的durability要更为重要。
    • map reduce的job要不停重复的做join,算法写起来要不停的tuning很蛋疼。
    • 没有一个一站式解决方案,往往需要好几个系统比如mapreduce用来做batch processing,storm用来做stream processing,elastic search用来做交互式exploration。这就造成了冗余的data copy。
    • interactive query和stream processing的需求越来越大,比如需要ad hoc analytics和快速的decision-making
    • machine learning的需求越来越多

    spark的破解方案

    • RDD abstraction with rich API
    • 充分使用内存
    • 一站式提供Spark SQL, Spark Streaming, Spark GraphX, Spark MLlib

    spark 架构

    cluster manager: manage to execute tasks could be spark's standalone cluster manager, YARN(参见我之前讲mapreduce的文章) or Mesos。可以用来track当前可用的资源。

    Spark applications包括driver process和若干executor processes。driver process是整个spark app的核心,运行main() function, 维护spark application的信息,响应用户输入,分析,distribute并且schedule work across executors。executors用来执行driver分配给它们的task,并把computation state report back to driver。

    pyspark 安装

    最简单的方式就是在如下地址下载tar包:

    https://spark.apache.org/downloads.html

    命令行运行:

    $ export PYSPARK_PYTHON=python3
    $ ./bin/pyspark 
    Python 3.6.4 (default, Jan  6 2018, 11:51:15) 
    [GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.39.2)] on darwin
    。。。
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_   version 2.2.1
          /_/
    
    Using Python version 3.6.4 (default, Jan  6 2018 11:51:15)
    SparkSession available as 'spark'.

    setup:

    python setup.py install

    导入

    from pyspark import SparkConf, SparkContext

    RDDs (Resilient Distributed Datasets)

    • Resilient: able to withstand failures
    • Distributed: spanning across multiple machines
    • read-only, partitioned collection of records

    RDD 在最新的spark2.x中有逐渐被淡化的趋势,是曾经的主角,现在算作low level API,基本不太可能在生产实践中用到,但是有助于理解spark。新型的RDD需要实现如下接口

    • partitions()
    • iterator(p: Partition)
    • dependencies()
    • optional: partitioner for key-value RDD (E.g. RDD is hash-partitioned)

    从local collection中创建RDD

    >>> mycollection = "Spark Qing Ge Guide : Big Data Processing Made Simple".split(" ")
    >>> words = spark.sparkContext.parallelize(mycollection, 2)
    >>> words.setName("myWords")
    myWords ParallelCollectionRDD[35] at parallelize at PythonRDD.scala:489
    >>> words.name()
    'myWords'

    transformations

    distinct:

    >>> words.distinct().count()
    [Stage 23:>                                                         (0 + 0) / 2]
    10    

    filter:

    >>> words.filter(lambda word: startsWithS(word)).collect()
    ['Spark', 'Simple']  

    map:

    >>> words2 = words.map(lambda word: (word, word[0], word.startswith("S")))
    >>> words2.filter(lambda record: record[2]).take(5)
    [('Spark', 'S', True), ('Simple', 'S', True)]

    flatmap:

    >>> words.flatMap(lambda word: list(word)).take(5)
    ['S', 'p', 'a', 'r', 'k']

    sort:

    >>> words.sortBy(lambda word: len(word) * -1).take(5)
    ['Processing', 'Simple', 'Spark', 'Guide', 'Qing']

    Actions

    reduce:

    >>> spark.sparkContext.parallelize(range(1,21)).reduce(lambda x, y : x+y)
    210

    count:

    >>> words.count()
    10

    first:

    >>> words.first()
    'Spark'

    max and min:

    >>> spark.sparkContext.parallelize(range(1,21)).max()
    20
    >>> spark.sparkContext.parallelize(range(1,21)).min()
    1

    take:

    >>> words.take(5)
    ['Spark', 'Qing', 'Ge', 'Guide', ':']

    注意:spark uses lazy transformation, 上文提到的所有transformation都只有在action时才会被调用。

    Saving files

    >>> words.saveAsTextFile("file:/tmp/words")
    $ ls /tmp/words/
    _SUCCESS	part-00000	part-00001
    $ cat /tmp/words/part-00000
    Spark
    Qing
    Ge
    Guide
    :
    $ cat /tmp/words/part-00001
    Big
    Data
    Processing
    Made
    Simple

    Caching

    >>> words.cache()
    myWords ParallelCollectionRDD[35] at parallelize at PythonRDD.scala:489

    options include: memory only(default), disk only, both

    CoGroups

    >>> import random
    >>> distinctChars = words.flatMap(lambda word: word.lower()).distinct()
    >>> charRDD = distinctChars.map(lambda c: (c, random.random()))
    >>> charRDD2 = distinctChars.map(lambda c: (c, random.random()))
    >>> charRDD.cogroup(charRDD2).take(5)
    [('s', (<pyspark.resultiterable.ResultIterable object at 0x10ab49c88>, <pyspark.resultiterable.ResultIterable object at 0x10ab49080>)), ('p', (<pyspark.resultiterable.ResultIterable object at 0x10ab49438>, <pyspark.resultiterable.ResultIterable object at 0x10ab49048>)), ('r', (<pyspark.resultiterable.ResultIterable object at 0x10ab494a8>, <pyspark.resultiterable.ResultIterable object at 0x10ab495c0>)), ('i', (<pyspark.resultiterable.ResultIterable object at 0x10ab49588>, <pyspark.resultiterable.ResultIterable object at 0x10ab49ef0>)), ('g', (<pyspark.resultiterable.ResultIterable object at 0x10ab49ac8>, <pyspark.resultiterable.ResultIterable object at 0x10ab49da0>))]

    Inner Join

    >>> keyedChars = distinctChars.map(lambda c: (c, random.random()))
    >>> outputPartitions = 10
    >>> chars = words.flatMap(lambda word: word.lower())
    >>> KVcharacters = chars.map(lambda letter: (letter, 1))
    >>> KVcharacters.join(keyedChars).count()
    44
    >>> KVcharacters.join(keyedChars, outputPartitions).count()
    44

     Aggregations

    from functools import reduce
    >>> def addFunc(left, right):
    ...     return left + right
    ... 
    KVcharacters.groupByKey().map(lambda row: (row[0], reduce(addFunc, row[1]))).collect()
    [('s', 4), ('p', 3), ('r', 2), ('i', 5), ('g', 5), ('d', 3), ('b', 1), ('c', 1), ('l', 1), ('a', 4), ('k', 1), ('q', 1), ('n', 2), ('e', 5), ('u', 1), (':', 1), ('t', 1), ('o', 1), ('m', 2)]

     Spark 执行和调度

    具体来说就是

    1. invoke an action...
    2. ...spawns the job...
    3. ...that gets divided into the stages by the job scheduler...
    4. ...and tasks are created for every job stage

    其中,stage和task的区别在于:stage是RDD level,不会被立即执行,而task会被立即执行。

    Broadcast variables

    • read-only shared variables with effective sharing mechanism
    • useful to share dictionaries and models

    今天就先到这,更多内容以后有机会再聊

  • 相关阅读:
    袁创:如何成为黄金程序猿
    划重点!新版电子病历评级标准讲解会上6大核心要点
    台湾医院信息化见闻录
    2500行代码实现高性能数值表达式引擎
    HIT创业感言:只有长寿的企业才有持续价值
    袁创:寂静的战争
    相约南湖,南京都昌信息亮相南湖HIT论坛
    我们是谁?南京都昌信息科技有限公司!
    医疗链的系列谈 第一篇 基本概念研究
    论电子病历控件的现状和发展方向
  • 原文地址:https://www.cnblogs.com/huashao1985/p/8469087.html
Copyright © 2011-2022 走看看