zoukankan      html  css  js  c++  java
  • 浅谈Spark(2) - RDD

      Spark依赖于一个很特别的数据抽象,叫做弹性分布式数据集(resilient distributed datasets),也就是RDD,它是一个被集群分区(partitioned)的in-memory read-only对象。每一个RDD都是根据range(partitioning of consecutive records),或者是每条记录的key的hash值来进行分区。当然这两种不同的分区方法在特定的use case上有它自己的优点。例如利用hash值来分区,当不同的dataset共享一个key的时候,它能够通过这些record的局部性(locality),来提高join的效率。而采用range的方法,能够加快访问一小段经过过滤的数据。

      尽管RDD并通过物理空间来存储数据,但是RDD依旧能够做到fault-tolerant。在这个过程中,它们并不需要复制或者保留备份数据,它们拥有一个”血统“(lineage)的概念,能够记住一些列建造这些RDD的指令,从而当数据丢失的时候,可以通过这些指令重新建造一份。

      对于Spark来说,所有的工作只分为三种:建造新的RDD,转换(transformation)已存在的RDD和在RDD上运行各种指令。有一点需要声明的是,RDD采用的是lazily computed and ephemeral。所谓lazily computed,就是所有的transformation操作要到第一个action被用到的时候,它们才会开始执行。所谓Emphermal,就是当RDDs被某个应用程序用到的时候,可能会加载到内存里,并且在内存里进行计算,但随后它们就会从内存里被抹掉。

      

    图 1 RDD和DSM的比较

      RDD抽象是一种分布式共享收集系统(distributed shared collection system),它和传统的分布式共享内存系统 (Distributed Shared Memory systems, DSM)很像,所以我觉得可以把两者进行比较一番,可以看出个中的区别。

      下面我来讲一下RDD的生命周期。

    • Creating RDDs

      在Spark的程序里,一个RDD默认为一个Scala的对象,虽然它也可以是Java或者是Python的对象。它通过以下几种方式被建造:

    1. 从分布式系统(如HDFS)的文件里
    2. 通过并行化(parallelizing)一个集合(collection或者array)并分配到许多节点上
    3. 通过转换(transforming)一个已经存在的RDD(后面有详细说明)
    4. 通过改变一个已经存在的RDD的persistence level,它有以下两种action:
      • cache:它能够让RDD在第一次运行后依旧保存在内存里,以备将来再次调用
      • save:把数据写到分布式系统上,如HDFS。

      下面是以下create新的RDD的例子(本文采用Scala,它能够直接在Spark Scala Shell上运行):

      • 通过textFile()方法把加载一个文本文档”server.logs“到RDD上:
    >>> log_lines_RDD = sc.textFile("server.logs")
    
      • 并行化一个已经存在的RDD:
    >>> greeting_lines_RDD = sc.parallelize(["hello", "world"])
    
    • RDD上的指令

      RDD一旦被建立起来,它支持两种不同的指令:

    1. Transformations:从已存在的RDD里重新create一个新的RDD的指令
    2. Actions:在RDD上计算并且返回一个对象给driver。

      正如之前所说,Transformations是lazy的,他们并不会马上被计算,他们成批的存储起来,直到一个Action被执行的时候他们才会被执行(如图2)。一个action的执行使得所有lineage上的RDD被“物化”,存到内存里。当然,一旦计算结束,一个RDD只有在显式指令下才能够继续保持,不然就会从内存中被抹掉。

      下面是一些常见的transformation和actions:

      下面再举一些简单的例子,希望能让读者对RDD的指令有更为深刻的理解:

      Transformations:

    >>> log_lines_RDD = sc.textFile("server.logs")
    >>> xss_RDD =  log_lines_RDD.filter(lambda x: "%3C%73%63%72%69%70%74%3E" in x)
    >>> sqli_RDD =  log_lines_RDD.filter(lambda x: "bobby_tables" in x)
    >>> owasp_attacks_RDD = xss_RDD.union(sqli_RDD)
    

      上述代码是用来过滤log_lines_RDD,通过对某个关键字段的逐行搜索,找到web server被攻击的记录。

      filter()并不会修改原来的RDD,相反,上例中它建立了两个新的RDD,而原来的log_lines_RDD仍然可以在以后被使用上。union()也是一个transformation,把两个RDD合并起来并且生成一个新的owasp_attacks_RDD。

      Actions

    >>> log_lines_RDD = sc.textFile("server.logs")
    >>> xss_RDD =  log_lines_RDD.filter(lambda x: "%3C%73%63%72%69%70%74%3E" in x)
    >>> sqli_RDD =  log_lines_RDD.filter(lambda x: "bobby_tables" in x)
    >>> owasp_attacks_RDD = xss_RDD.union(sqli_RDD)
    

      上述代码是用来计算owasp_attacks_RDD中web server被攻击的总数。一个action会导致所有batched的RDD被materialization(存储到内存中)。

      由此可以看出,RDD非常适合批量指令(batch operations),尤其是当这些指令可以同时被应用到数据集的所有元素上。因为他们是免疫的(immutable),因而为每一个额外的输入计算一个新的RDD的开销是十分巨大的。所以,当处理real-time数据输入的时候,Spark经常在短时间内批量(batch)所有的变化,而并不马上执行它们。

      Persisting RDDs

      一般来说有四种方法可以维持RDDs:

      1. 作为反序列化后的对象存在于内存里;
      2. 作为序列化的数据存在于内存里;
      3. 硬盘存储;
      4. 离堆存储(off-heap storage)。
  • 相关阅读:
    /proc/kcore失效,调试其文件系统相关模块,使重新正常工作
    linux内核的preempt抢占调度,preempt_count抢占保护“锁”
    linux内核的tiny rcu, tree rcu
    futex-based pthread_cond 源代码分析
    linux 内核的futex
    phtread_mutex 组合
    linux 内核的rt_mutex 锁操作实现的临界区
    linux 内核的RCU本质
    Spring Data(一)概念和仓库的定义
    MongoDB之分片集群(Sharding)
  • 原文地址:https://www.cnblogs.com/kepuCS/p/5220734.html
Copyright © 2011-2022 走看看