zoukankan      html  css  js  c++  java
  • spark 笔记4 sparkRDD基本操作_连接_创建RDD_转换_行动

    spark RDD

    目录

    关于sparkRDD基本概念

    • RDD:弹性分布式数据集,是spark对数据的核心抽象,也是spark数据处理的基本单位
      spark处理数据之前会首先把数据转换成RDD然后在RDD上对数据进行操作
    • spark对RDD的数据操作,其本身有两种对于RDD的算子:转换(transform)和行动(action),这两个分类下分别由各自对应的数个api函数
      对于数据,spark的操作过程是:创建RDD、对RDD进行转化操作(transform)、用行动操作来求值(action)
    • 该数据操作流程的便捷性:spark底层节点的协商、容错、通信的细节,这样在上层对于数据的操作就变得更容易

    学习对于RDD的基本操作

    主从节点的启动

    首先就像第一次学在笔记1里面记录的一样,启动spark主节点的服务,然后在localhost:8080查看spark主节点的参数并且记录下来

    然后就可以使用这个主节点的参数,启动这个主节点的从节点

    spark的初始化

    在开发程序时,spark的初始化操作首先就是创建一个SparkConf对象,这个对象包含应用的一些信息,然后创建SparkContext,SparkContext可以让 Spark 知道如何访问集群
    那么,代码是这个样子的
    就是在指定app名和主节点所在的spark集群之后,使用这个conf对象指定给一个sparkcontext方法来创建一个sparkcontext

    val conf = new SparkConf().setAppName("Shiyanlou").setMaster("spark://7576cf9c687e:7077")
    
    new SparkContext(conf)
    // 在每个JVM中,只有一个SparkContext能够被激活。若需要创建新的SparkContext,你必须调用sc.stop()来关闭当前已激活的那个
    

    在spark-shell里做spark的初始化并不需要新建这两个对象,因为 Spark Shell 相当于一个 Spark 应用,启动时已经用过spark-shell --master spark://7576cf9c687e:7077来指定集群信息,所以 Spark Shell 启动后已经具备了一个 SparkContext 对象sc

    RDD创建

    Spark 上开发的应用程序都是由一个driver programe构成,这个所谓的驱动程序在 Spark 集群通过跑main函数来执行各种并行操作。集群上的所有节点进行并行计算需要共同访问一个分区元素的集合,这就是 RDD(RDD:resilient distributed dataset)弹性分布式数据集。RDD 可以存储在内存或磁盘中,具有一定的容错性,可以在节点宕机重启后恢复。
    RDD 可以从 HDFS 中的文件创建,也可以从 Scala 或 Python 集合中创建。
    创建 RDD 有两种方式:一种是调用 SparkContext 的 parallelize() 方法将数据并行化生成 RDD,另外一种方法是从外部存储中的数据集生成 RDD(如 Linux 文件系统,HDFS,HBase 等)

    调用parallelize()方法并行化生成RDD

    如果要对已有的集合进行并行化,我们可以先创建一个列表,然后调用上面提到的sc的parallelize方法将该集合并行化。集合中的元素会被复制到一个 RDD 中。并行集合创建后可以进行 RDD 的分布式操作,一个很重要的参数是切片数(slices),表示数据集被切分的份数,Spark 会为每个切片运行一个任务并能够根据集群状况动态调整切片数量。使用parallelize方法的参数可以手动设置切片数。
    这种并行集合生成RDD的办法会把所有的数据都放在内存里,所以除了开发原型和测试以外,一般不采用这种方式

    就这样,把新建的数据列表传给parallelize这个函数,这个函数就会在这个数据集合的基础上为我们创建RDD,并且RDD的切片数同样可以通过parallelize函数来指定

    使用外部存储中的数据集生成RDD

    在实际开发中最常用的是从外部存储系统中读取数据创建 RDD。Spark 可以从任何 Hadoop 支持的存储上创建 RDD,比如本地的文件系统、HDFS、Cassandra、HBase 等。同时 Spark 还支持文本文件、SequenceFiles 等

    注意事项

    • 使用不同的 SparkContext 的函数接口可以在不同的外部存储场景下创建RDD。然后使用 testfile() 方法会返回一个 RDD 对象,然后就可以调用 RDD 中定义的方法
    • 如果使用本地存储上的文本文件,这个文件必须可以被所有节点 worker 访问
    • 支持目录,压缩文件及通配符
    • 同上一节的并行集合一样,textFile 函数还有另外一个接口控制切片数目
    // 从 protocols 文件中创建 RDD
    val distFile = sc.textFile("/etc/protocols")
    
    // RDD 操作:计算所有行的长度之和,最后结果为 2868
    distFile.map(s => s.length).reduce((a,b) => a + b)
    

    RDD的这个操作也是做的一个mapreduce,用map来把每一行映射成每一行的长度,reduce做的是把数据集合里面的元素相加

    正式的、RDD的基础操作

    对于RDD的基础操作有两种:transformations和actions

    • 转换(transformations):将已存在的数据集 RDD 转换成新的数据集 RDD,例如 map。转换是惰性的,不会立刻计算结果,仅仅记录转换操作应用的目标数据集,当动作需要一个结果时才计算。
    • 动作(actions) :在数据集 RDD 上进行计算后返回一个结果值给驱动程序,例如 reduce。
    // 从 protocols 文件创建RDD
    val distFile = sc.textFile("/etc/protocols")
    
    // Map 操作,每行的长度
    val lineLengths = distFile.map(s => s.length)
    
    // Reduce 操作,获得所有行长度的和,即文件总长度,这里才会执行 map 运算
    val totalLength = lineLengths.reduce((a, b) => a + b)
    
    // 可以将转换后的 RDD 保存到集群内存中
    lineLengths.persist()
    

    上面这个例子里面,map操作敲进去的时候,并没有被执行,在敲完reduce求和的时候,map运算才被执行的,也就是说想要的到最后map出来的结果要执行完reduce才行
    persist方法是把map完的那个RDD保留到内存里

    WordCount的例子

    val textFile = sc.textFile("/etc/protocols")
    val counts = textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _)
    counts.saveAsTextFile("/home/hadoop/counts")
    

    首先,使用 SparkContext 的 textFile() 函数从本地读取 /etc/protocols 文件将其转化为记录着每一行内容的 RDD。
    其次,使用 .flatMap(line => line.split(" ")) 把每一行的内容按照空格分隔开,将其转化为记录着每一个单词的 RDD。
    然后,使用 .map(word => (word,1)) 把记录着每一个单词的 RDD 转化为 (word,1) 这种表示每一个单词出现一次的键值对形式的 RDD。
    接着,使用 .reduceByKey(_ + _) 把具有相同键的 RDD 的次数进行相加,求出每个单词的词频。
    最后,使用 saveAsTextFile() 函数把结果存入 /home/hadoop/counts 文件夹中。
    在文件系统中查看文件

    查看文件的内容

    RDD转化操作transformation

    转化的 RDD 都是惰性求值的,只有执行行动操作才会被真正的计算。Spark 只是在内部记录下需要转化的操作,真正有必要时才会执行这些操作,所以应该把 RDD 当做记录如何计算数据的指令列表。比如上面的例子中使用 textFile() 函数读取文件的内容,程序并没有真正的读取这个文件,否则加载大量的数据会占用极大的内存,在遇到执行操作 first() 需要获取第一行的数据时去执行读取文件的第一行并返回数据。从上面的例子中可以看出这样会极大的优化程序执行的效率。
    一般转化操作分为两类,一类是对所有 RDD 的每一个元素进行转化,另一类是只对具有相同键的所有 RDD 进行转化。

    • map()
      参数是函数,函数应用于 RDD 每一个元素,返回值为新的 RDD。
    • flatMap()
      参数是函数,把每个输入元素生成多个输出元素,函数应用于 RDD 每一个元素,将元素数据进行拆分变为迭代器,返回值为新的 RDD。通常用来切分单词
      例子如下

      而且通过上图的比对,加不加collect函数的区别体现在输出结果上,用了collect能够直观地看到结果
    • filter()
      参数是函数,使用函数过滤掉不符合条件的元素,返回新的 RDD
    • distinct()
      用来把RDD的元素进行去重
    • union()
      参数是RDD,生成包含两个RDD的所有数据的新RDD
    • intersection()
      参数是 RDD,生成包含两个 RDD 共同元素的新 RDD。
    • subtract()
      参数是 RDD,将原 RDD 里和参数 RDD 里相同的元素去掉生成新的 RDD。

      这个比较有意思,实际上是nums2调用的方法,所以去掉的相同元素其实是,nums2里面拥有的和nums3一样的元素,剩下的nums2里面的元素

    RDD行动操作actions

    • collect()
      这个刚才转化的例子里面有很多都用到了,作用是返回RDD里面的所有元素
    • count()
      计算RDD中的元素的个数
    • countByValue()
      计算各个元素在RDD中出现的次数
    • take(num)
      从 RDD 中返回 num 个元素。
    • top(num)
      从 RDD 中返回最前面的 num 个元素。
      比对一下:
    • reduce(func)
      并行整合 RDD 中所有数据。reduce 将 RDD 中元素两两传递给输入函数求得一个新值,再把新值与 RDD 中的下一个值一起传递给输入函数直到最后一个值为止。
    • fold(zero)(func)
      和 reduce() 一样,但是需要提供初始值。fold有一个“zero”值,数据存在多少个分区中就有多少个“zero”值。该函数现计算每一个分区中的数据,再计算分区之间中的数据。所以,有多少个分区就会有多少个“zero”值被包含进来。看这个分区,需要调用partition

      一共是4个分区,但是最后在8上加上的是Zero*5而不是乘4,目前还不太懂
    • foreach(func)
      对 RDD 中每个元素使用给定的元素
    • 以文件格式存储
      有 3 个方法可以做,分别是:saveAsTextFile(path)、saveAsSequenceFile(path)、saveAsObjectFile(path)
      将 RDD 以不同的文件格式(文本文件、Sequence 格式文件、对象文件)存储在本地文件系统或 Hadoop 文件系统中

    总结

    基本编程步骤总结

    所以,课程里面是这样总结的:

    RDD 基本编程步骤可以总结为:
    1.读取内、外部数据集创建 RDD。
    2.对于 RDD 进行转化生成新的 RDD ,比如 map()、filter() 等。
    3.对需要重用的数据执行 persist()/cache() 进行缓存。
    4.执行行动操作获得最终结果,比如 count() 和 first()等。

    没有做的实践操作

    导入并使用jar包

    还没有一个具体的应用场景,让我指定某个具体的jar包

    集成编译环境下的配置操作

  • 相关阅读:
    为什么 PCB 生产时推荐出 Gerber 给工厂?
    Fedora Redhat Centos 有什么区别和关系?
    【KiCad】 如何给元件给元件的管脚加上划线?
    MCU ADC 进入 PD 模式后出现错误的值?
    FastAdmin 生产环境升级注意
    EMC EMI 自行评估记录
    如何让你的 KiCad 在缩放时不眩晕?
    KiCad 5.1.0 正式版终于发布
    一次单片机 SFR 页引发的“事故”
    java基础之集合
  • 原文地址:https://www.cnblogs.com/ltl0501/p/12116835.html
Copyright © 2011-2022 走看看