zoukankan      html  css  js  c++  java
  • Spark基础知识

    一、 关于Apache Spark
    Apache Spark是个开源和兼容Hadoop的集群计算平台。由加州大学伯克利分校的AMPLabs开发,作为Berkeley Data Analytics Stack(BDAS)的一部分,当下由大数据公司Databricks保驾护航,更是Apache旗下的顶级项目,下图显示了Apache Spark堆栈中的不同组件。
     
    Apache Spark的5大优势:
    1.更高的性能,因为数据被加载到集群主机的分布式内存中。数据可以被快速的转换迭代,并缓存用以后续的频繁访问需求。很多对Spark感兴趣的朋友可能也会听过这样一句话——在数据全部加载到内存的情况下,Spark可以比Hadoop快100倍,在内存不够存放所有数据的情况下快Hadoop 10倍。
     
    2.通过建立在Java、Scala、Python、SQL(应对交互式查询)的标准API以方便各行各业使用,同时还含有大量开箱即用的机器学习库。
    3.与现有Hadoop v1 (SIMR) 和2.x (YARN) 生态兼容,因此机构可以进行无缝迁移。
     
    4.方便下载和安装。方便的shell(REPL: Read-Eval-Print-Loop)可以对API进行交互式的学习。
    5.借助高等级的架构提高生产力,从而可以讲精力放到计算上。
     
     
     
     
    二、Apache Spark的工作模式
    Spark引擎提供了在集群中所有主机上进行分布式内存数据处理的能力,下图显示了一个典型Spark job的处理流程。
     
    下图显示了Apache Spark如何在集群中执行一个作业。
    一个Worker默认情况下分配一个Executor,配置时根据需要也可以配置多个Executor。一个节点,如果配置了多个Executor,那么就会涉及到性能调优。 
    Driver、Master、Worker、Executor,其实它就是JVM进程。 
    图上Worker区域用◇形状表示的是:Worker上有Executor的句柄,也就是说Worker能开对Executor进行控制,必要时可以Kill掉该进程。 
    从图中可以看到Driver和Executor之间画了一根线条,表明:程序运行时是直接与Executor进行交互的。
     
     
    在Driver中,RDD首先交给DAGSchedule进行Stage的划分。 
    然后底层的调度器TaskScheduler就与Executor进行交互。 
    Driver和上图中4个Worker节点的Executor发指令,让它们在各自的线程池中运行Job。 
    运行时Driver能获得Executor的具体运行资源,这样Driver与Executor之间进行通信,通过网络的方式,Driver把划分好的Task传送给Executor,Task就是我们的Spark程序的业务逻辑代码。 
    Executor接收任务,进行反序列化,得到数据的输入和输出,在分布式集群的相同数据分片上,数据的业务逻辑一样,只是数据不一样罢了,然后由Executor的线程池负责执行。
    编程的Spark程序,打包提交到Driver端,这样就构成了一个Driver, 
    Driver内部的调度流程,根据算子逻辑的依赖关系,DAGScheduler来划分成不同的Stage,每个Stage中的计算逻辑是一样的,只是数据分片不一样。TaskScheduler向Executor发送任务,Executor反序列化数据之后,也就得到数据的输入和输出,也就是任务的业务逻辑,Executor运行的就是我们的业务逻辑代码。
    我们整个Spark应用程序,可以分成:Driver和Executor两部分。 
    Driver由框架直接生成; 
    Executor执行的才是我们的业务逻辑代码。 
    执行的时候,框架控制我们代码的执行。Executor需要执行的结果汇报给框架也就是Driver。
     
     
    Master控制数据如何被分割,利用了数据本地性,并在Slaves上跟踪所有分布式计算。在某个Slave不可用时,其存储的数据会分配给其他可用的Slaves。虽然当下(1.0.1版本)Master还存在单点故障,但后期必然会被修复。
    三、弹性分布式数据集(Resilient Distributed Dataset,RDD)
    弹性分布式数据集(RDD,从Spark 1.3版本开始已被DataFrame替代)是Apache Spark的核心理念。它是由数据组成的不可变分布式集合,其主要进行两个操作:transformation和action。Transformation是类似在RDD上做 filter()、map()或union() 以生成另一个RDD的操作,而action则是count()、first()、take(n)、collect() 等促发一个计算并返回值到Master或者稳定存储系统的操作。Transformations一般都是lazy的,直到action执行后才会被执行。Spark Master/Driver会保存RDD上的Transformations。这样一来,如果某个RDD丢失(也就是salves宕掉),它可以快速和便捷地转换到集群中存活的主机上。这也就是RDD的弹性所在。
    下图展示了Transformation的lazy:
    RDD持久性Apache Spark中一个主要的能力就是在集群内存中持久化/缓存RDD。这将显著地提升交互速度。下表显示了Spark中各种选项。
    Storage Level Purpose
    MEMORY_ONLY (Default level) This option stores RDD in available cluster memory as deserialized Java objects. Some partitions may not be cached if there is not enough cluster memory. Those partitions will be recalculated on the fly as needed.
    MEMORY_AND_DISK This option stores RDD as deserialized Java objects. If RDD does not fit in cluster memory, then store those partitions on the disk and read them as needed.
    MEMORY_ONLY_SER This options stores RDD as serialized Java objects (One byte array per partition). This is more CPU intensive but saves memory as it is more space efficient. Some partitions may not be cached. Those will be recalculated on the fly as needed.
    MEMORY_ONLY_DISK_SER This option is same as above except that disk is used when memory is not sufficient.
    DISC_ONLY This option stores the RDD only on the disk
    MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as other levels but partitions are replicated on 2 slave nodes
    上面的存储等级可以通过RDD. cache()操作上的 persist()操作访问,可以方便地指定MEMORY_ONLY选项。关于持久化等级的更多信息,可以访问这里http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence。
    Spark使用Least Recently Used (LRU)算法来移除缓存中旧的、不常用的RDD,从而释放出更多可用内存。同样还提供了一个unpersist() 操作来强制移除缓存/持久化的RDD。
     
     
    spark中的partition和block的关系
     
    hdfs中的block是分布式存储的最小单元,类似于盛放文件的盒子,一个文件可能要占多个盒子,但一个盒子里的内容只可能来自同一份文件。假设block设置为128M,你的文件是250M,那么这份文件占3个block(128+128+2)。这样的设计虽然会有一部分磁盘空间的浪费,但是整齐的block大小,便于快速找到、读取对应的内容。(p.s. 考虑到hdfs冗余设计,默认三份拷贝,实际上3*3=9个block的物理空间。)
    spark中的partition 是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partition 组成的。partition 是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partition 大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定的,这也是为什么叫“弹性分布式”数据集的原因之一。
    总结:
    block位于存储空间、partition 位于计算空间,
    block的大小是固定的、partition 大小是不固定的,
    block是有冗余的、不会轻易丢失,partition(RDD)没有冗余设计、丢失之后重新计算得到
     
     
    四、 数据的管理 
    在Spark应用具体执行过程中,会涉及到数据的读取和存储。 
    在Executor中关于数据的管理正是Spark的精髓之一。 
    默认情况下,数据要写到磁盘上。
    DAGScheduler划分了多个Stage,下一个Stage 会向Driver请求上一个Stage的运行结果。这就是Shuffle的过程,依赖于上一个Stage全部的结果。前面Stage的输出是后面Stage的输入。

    每个数据分片都要去全部节点上找属于它的数据分片的一部分。 

    比如:上一个Stage有1万个数据分片,现在这个Stage有100个数据分片,那么拉取数据的过程中,整个任务就需要寻找100万次数据。
    五、常用的Transformations
     
    Transformation & Purpose Example & Result
    filter(func) Purpose: new RDD by selecting those data elements on which func returns true
    scala> val rdd = sc.parallelize(List(“ABC”,”BCD”,”DEF”)) scala> val filtered = rdd.filter(_.contains(“C”)) scala> filtered.collect() Result: 
    Array[String] = Array(ABC, BCD)
    map(func) Purpose: return new RDD by applying func on each data element
    scala> val rdd=sc.parallelize(List(1,2,3,4,5)) scala> val times2 = rdd.map(_*2) scala> times2.collect() Result: 
    Array[Int] = Array(2, 4, 6, 8, 10)
    flatMap(func) Purpose: Similar to map but func returns a Seq instead of a value. For example, mapping a sentence into a Seq of words
    scala> val rdd=sc.parallelize(List(“Spark is awesome”,”It is fun”)) scala> val fm=rdd.flatMap(str=>str.split(“ “)) scala> fm.collect() Result: 
    Array[String] = Array(Spark, is, awesome, It, is, fun)
    reduceByKey(func,[numTasks]) Purpose: To aggregate values of a key using a function. “numTasks” is an optional parameter to specify number of reduce tasks
    scala> val word1=fm.map(word=>(word,1)) scala> val wrdCnt=word1.reduceByKey(_+_) scala> wrdCnt.collect() Result: 
    Array[(String, Int)] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1))
    groupByKey([numTasks]) Purpose: To convert (K,V) to (K,Iterable<V>)
    scala> val cntWrd = wrdCnt.map{case (word, count) => (count, word)} scala> cntWrd.groupByKey().collect() Result: 
    Array[(Int, Iterable[String])] = Array((1,ArrayBuffer(It, awesome, Spark, fun)), (2,ArrayBuffer(is)))
    distinct([numTasks]) Purpose: Eliminate duplicates from RDD
    scala> fm.distinct().collect() Result: 
    Array[String] = Array(is, It, awesome, Spark, fun)
     
    常用的集合操作:
    Transformation and Purpose Example and Result
    union() 
    Purpose: new RDD containing all elements from source RDD and argument.
    Scala> val rdd1=sc.parallelize(List(‘A’,’B’)) 
    scala> val rdd2=sc.parallelize(List(‘B’,’C’)) 
    scala> rdd1.union(rdd2).collect() 
    Result: 
    Array[Char] = Array(A, B, B, C)
    intersection() 
    Purpose: new RDD containing only common elements from source RDD and argument.
    Scala> rdd1.intersection(rdd2).collect() 
    Result: 
    Array[Char] = Array(B)
    cartesian() 
    Purpose: new RDD cross product of all elements from source RDD and argument
    Scala> rdd1.cartesian(rdd2).collect() 
    Result: 
    Array[(Char, Char)] = Array((A,B), (A,C), (B,B), (B,C))
    subtract() 
    Purpose: new RDD created by removing data elements in source RDD in common with argument
    scala> rdd1.subtract(rdd2).collect() Result: 
    Array[Char] = Array(A)
    join(RDD,[numTasks]) 
    Purpose: When invoked on (K,V) and (K,W), this operation creates a new RDD of (K, (V,W))
    scala> val personFruit = sc.parallelize(Seq((“Andy”, “Apple”), (“Bob”, “Banana”), (“Charlie”, “Cherry”), (“Andy”,”Apricot”))) 
    scala> val personSE = sc.parallelize(Seq((“Andy”, “Google”), (“Bob”, “Bing”), (“Charlie”, “Yahoo”), (“Bob”,”AltaVista”))) 
    scala> personFruit.join(personSE).collect() 
    Result: 
    Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista)))
    cogroup(RDD,[numTasks]) 
    Purpose: To convert (K,V) to (K,Iterable<V>)
    scala> personFruit.cogroup(personSe).collect() 
    Result: 
    Array[(String, (Iterable[String], Iterable[String]))] = Array((Andy,(ArrayBuffer(Apple, Apricot),ArrayBuffer(google))), (Charlie,(ArrayBuffer(Cherry),ArrayBuffer(Yahoo))), (Bob,(ArrayBuffer(Banana),ArrayBuffer(Bing, AltaVista))))
    更多transformations信息,请查看http://spark.apache.org/docs/latest/programming-guide.html#transformations
    常用的actions
    Action & Purpose Example & Result
    count() Purpose: get the number of data elements in the RDD
    scala> val rdd = sc.parallelize(list(‘A’,’B’,’c’)) scala> rdd.count() Result: 
    long = 3
    collect() Purpose: get all the data elements in an RDD as an array
    scala> val rdd = sc.parallelize(list(‘A’,’B’,’c’)) scala> rdd.collect() Result: 
    Array[char] = Array(A, B, c)
    reduce(func) Purpose: Aggregate the data elements in an RDD using this function which takes two arguments and returns one
    scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.reduce(_+_) Result: 
    Int = 10
    take (n) Purpose: : fetch first n data elements in an RDD. computed by driver program.
    Scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.take(2) Result: 
    Array[Int] = Array(1, 2)
    foreach(func) Purpose: execute function for each data element in RDD. usually used to update an accumulator(discussed later) or interacting with external systems.
    Scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.foreach(x=>println(“%s*10=%s”. format(x,x*10))) Result: 
    1*10=10 4*10=40 3*10=30 2*10=20
    first() Purpose: retrieves the first data element in RDD. Similar to take(1)
    scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.first() Result: 
    Int = 1
    saveAsTextFile(path) Purpose: Writes the content of RDD to a text file or a set of text files to local file system/ HDFS
    scala> val hamlet = sc.textFile(“/users/akuntamukkala/ temp/gutenburg.txt”) scala> hamlet.filter(_.contains(“Shakespeare”)). saveAsTextFile(“/users/akuntamukkala/temp/ filtered”) Result: 
    akuntamukkala@localhost~/temp/filtered$ ls _SUCCESS part-00000 part-00001
     
     

     
    六、变量共享
    Accumulators。Spark提供了一个非常便捷地途径来避免可变的计数器和计数器同步问题——Accumulators。Accumulators在一个Spark context中通过默认值初始化,这些计数器在Slaves节点上可用,但是Slaves节点不能对其进行读取。它们的作用就是来获取原子更新,并将其转发到Master。Master是唯一可以读取和计算所有更新合集的节点。
    Broadcast Variables。实际生产中,通过指定key在RDDs上对数据进行合并的场景非常常见。在这种情况下,很可能会出现给slave nodes发送大体积数据集的情况,让其负责托管需要做join的数据。因此,这里很可能存在巨大的性能瓶颈,因为网络IO比内存访问速度慢100倍。为了解决这个问题,Spark提供了Broadcast Variables,如其名称一样,它会向slave nodes进行广播。因此,节点上的RDD操作可以快速访问Broadcast Variables值。
     
    七、Spark SQL
    通过Spark Engine,Spark SQL提供了一个便捷的途径来进行交互式分析,使用一个被称为SchemaRDD类型的RDD。SchemaRDD可以通过已有RDDs建立,或者其他外部数据格式,比如Parquet files、JSON数据,或者在Hive上运行HQL。SchemaRDD非常类似于RDBMS中的表格。一旦数据被导入SchemaRDD,Spark引擎就可以对它进行批或流处理。Spark SQL提供了两种类型的Contexts——SQLContext和HiveContext,扩展了SparkContext的功能。
    SparkContext提供了到简单SQL parser的访问,而HiveContext则提供了到HiveQL parser的访问。HiveContext允许企业利用已有的Hive基础设施。
    这里看一个简单的SQLContext示例。
    下面文本中的用户数据通过“|”来分割。
     
     
    John Smith|38|M|201 East Heading Way #2203,Irving, TX,75063 Liana Dole|22|F|1023 West Feeder Rd, Plano,TX,75093 Craig Wolf|34|M|75942 Border Trail,Fort Worth,TX,75108 John Ledger|28|M|203 Galaxy Way,Paris, TX,75461 Joe Graham|40|M|5023 Silicon Rd,London,TX,76854
     
    定义Scala case class来表示每一行:
     
     
    case class Customer(name:String,age:Int,gender:String,address: String)
     
    下面的代码片段体现了如何使用SparkContext来建立SQLContext,读取输入文件,将每一行都转换成SparkContext中的一条记录,并通过简单的SQL语句来查询30岁以下的男性用户。
     
     
    val sparkConf = new SparkConf().setAppName(“Customers”)
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val r = sc.textFile(“/Users/akuntamukkala/temp/customers.txt”) val records = r.map(_.split(‘|’))
    val c = records.map(r=>Customer(r(0),r(1).trim.toInt,r(2),r(3))) c.registerAsTable(“customers”)
     
    sqlContext.sql(“select * from customers where gender=’M’ and age <
    30”).collect().foreach(println) Result:[John Ledger,28,M,203 Galaxy Way,Paris,
    TX,75461]
     
    更多使用SQL和HiveQL的示例请访问下面链接https://spark.apache.org/docs/latest/sql-programming-guide.html、https://databricks-training.s3.amazonaws.com/data-exploration-using-spark-sql.html。
     
    八、Spark Streaming
    Spark Streaming提供了一个可扩展、容错、高效的途径来处理流数据,同时还利用了Spark的简易编程模型。从真正意义上讲,Spark Streaming会将流数据转换成micro batches,从而将Spark批处理编程模型应用到流用例中。这种统一的编程模型让Spark可以很好地整合批量处理和交互式流分析。下图显示了Spark Streaming可以从不同数据源中读取数据进行分析。
     
     
     
    Spark Streaming中的核心抽象是Discretized Stream(DStream)。DStream由一组RDD组成,每个RDD都包含了规定时间(可配置)流入的数据。图12很好地展示了Spark Streaming如何通过将流入数据转换成一系列的RDDs,再转换成DStream。每个RDD都包含两秒(设定的区间长度)的数据。在Spark Streaming中,最小长度可以设置为0.5秒,因此处理延时可以达到1秒以下。
    Spark Streaming同样提供了 window operators,它有助于更有效率在一组RDD( a rolling window of time)上进行计算。同时,DStream还提供了一个API,其操作符(transformations和output operators)可以帮助用户直接操作RDD。下面不妨看向包含在Spark Streaming下载中的一个简单示例。示例是在Twitter流中找出趋势hashtags,详见下面代码。
     
     
    spark-1.0.1/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
    val sparkConf = new SparkConf().setAppName(“TwitterPopularTags”)
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters)
     
    上述代码用于建立Spark Streaming Context。Spark Streaming将在DStream中建立一个RDD,包含了每2秒流入的tweets。
     
     
    val hashTags = stream.flatMap(status => status.getText.split(“ “).filter(_.startsWith(“#”)))
     
    上述代码片段将Tweet转换成一组words,并过滤出所有以a#开头的。
     
     
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) => (count, topic)}. transform(_.sortByKey(false))
     
    上述代码展示了如何整合计算60秒内一个hashtag流入的总次数。
     
     
    topCounts60.foreachRDD(rdd => {
    val topList = rdd.take(10)
    println(“ Popular topics in last 60 seconds (%s
    total):”.format(rdd.count())) topList.foreach{case (count, tag) => println(“%s (%s
    tweets)”.format(tag, count))} })
     
    上面代码将找出top 10趋势tweets,然后将其打印。
     
     
    ssc.start()
     
    上述代码让Spark Streaming Context 开始检索tweets。一起聚焦一些常用操作,假设我们正在从一个socket中读入流文本。
     
    al lines = ssc.socketTextStream(“localhost”, 9999, StorageLevel.MEMORY_AND_DISK_SER)
  • 相关阅读:
    【poj1655】Balancing Act
    yargs.js用法
    8、typescript
    7、typescript
    6、typescript
    5、typescript
    4、typescript
    3、typescript
    2、typescript
    1、typescript
  • 原文地址:https://www.cnblogs.com/wzj4858/p/8204247.html
Copyright © 2011-2022 走看看