zoukankan      html  css  js  c++  java
  • RDD编程

    RDD创建

    每个RDD被分为多个分区,这些分区运行在集群的不同节点上。

    用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序里driver分发驱动器程序中的对象集合(list和set)。textFile()和parallelize().

    RDD支持两种类型的操作:转换操作和行动操作。惰性计算,只有行动操作才真正计算。

    RDD持久化,RDD.persist()让Spark把这个RDD缓存在内存中,在之后的行动操作中,可以重用这些数据。也可以把RDD缓存到磁盘中。

    两种读取数据的方式
    lines = sc.parallelize(["asda","asdsa"])//创建RDD的parallelize方法,python
    JavaRDD<String> lines = sc.parallelize(Arrays.asList("",""));//Java的parallelize方法
    

    更常用的是从外部存储中读取数据创建RDD。

    lines = sc.textFile("")//python
    val lines = sc.textFile("")//scala
    JavaRDD<String> lines = sc.textFile("ssa");//Java
    

    RDD操作

    //python
    errorsRDD = inputRDD.filter(lambda x: "error" in x)
    warningsRDD = inputRDD.filter(lamdda x: "warning" in x)
    badLinesRDD = errorsRDD.union(warningsRDD)//两个RDD合并操作
    

    Spark会使用lineage graph(DAG)来表示RDD之间的依赖关系。

    用take()来收集RDD的元素,提取badLinesRDD的10个元素。

    for line in badLinesRDD.take(10):
        print line
    

    collect()函数获取整个RDD的数据

    把数据写入HDFS和Amazon S3,可以使用saveAsTextFile()、saveAsSequenceFile()

    转换操作传入函数

    在Python中,有三种方式把函数传递给Spark:

    1. lambda表达式,大多使用lambda
    2. 顶层函数
    3. 定义的局部函数
    word = rdd.filter(lambda x: "error" in x)//lambda
    
    def containsError(s):
        return "error" in s
    word = rdd.filter(containError)//局部函数
    

    在Scala中,可以把定义的内联函数、方法的引用或静态方法传递给Spark。传递的函数及其引用的数据需要是可序列化的。

    在Java中,传递org.apache.spark.api.java.function中的函数接口对象。

    转换函数

    filter() map() flatMap() groupByKey() reduceByKey()

    map(),接收一个函数,把这个函数用于RDD中的每个元素,把函数的返回结果作为RDD元素对应的值。返回值的类型和输入值的类型不需要一样。

    filter(),接收一个函数,把RDD中满足该函数的元素放入新的RDD中返回。

    flatMap(),返回一个包含各个迭代器可访问的所有元素的RDD,相当于执行map()然后flat拍扁。(map返回一行一行,flatMap返回一个词一个词)

    lines = sc.parallelize(["hello world", "hi"])
    words = lines.flatMap(lambda line: line.split(" "))
    words.first()//返回hello
    
    • groupByKey(),根据相同的键,分组,返回[key,iterable(value)]
    • reduceByKey(func),相同的键,把value加起来,返回[key,value],函数func,对v操作,相加或相乘等
    • RDD.distinct()生成一个只包含不同元素的新RDD,开销很大,需要shuffle。
    • union()两个RDD合并成一个;intersection()只返回两个RDD都有的元素。
    • subtract(other)返回只存在第一个RDD不存在第二个RDD中所有元素组成的RDD,也要shuffle。
    如果缓存数据太多,内存放不下,Spark会用LRU策略把最老的分区从内存中移除。写入磁盘。

    行动操作

    count() collect() first() take(n) reduce(func) foreach(func)等,遇到行动操作要开始计算。

    • count(),返回数据集中元素个数
    • reduce(func),接收一个函数作为参数,操作RDD的元素数据并返回相同类型的结果元素
    rdd=[1,2,3,4,5]
    sum = rdd.reduce(lambda x, y : x + y)
    //sum = 15
    
    • collect(),以数组的形式返回数据集的所哟u元素,即把RDD内容返回
    • first(),返回第一个元素
    • take(n)返回RDD的n个元素
    • foreach(func)对RDD中的每个元素进行函数func操作

    持久化

    避免多次计算同一个RDD,可以对数据进行持久化,计算出RDD的节点会分别保存它们分区数据,在scala和Java中,persist()默认把数据缓存在JVM堆空间。unpersist()持久化RDD从缓存中移除。

    • persist()不会马上持久化,遇到行动操作了,才会持久化。持久化后的RDD会被保留在计算节点的内存中,被后面的行动操作反复使用。
    • 策略为LRU。
    rdd.cache()//会调用persist(MEMORY_ONLY)
    
  • 相关阅读:
    Delphi中的接口和抽象类
    设计模式之六大原则
    C 标准库
    linux 管道和重定向
    linux c创建静态库(.a)
    一个C语言程序
    C#动态创建lambda表达式
    linq中order by 和group by (含lambda表达式实现)以及综合案例
    微信扫码登陆原理
    二维码扫码支付原理
  • 原文地址:https://www.cnblogs.com/chenshaowei/p/12402250.html
Copyright © 2011-2022 走看看