zoukankan      html  css  js  c++  java
  • Spark 学习笔记 —— 常见API

    一、RDD 的创建

    1)通过 RDD 的集合数据结构,创建 RDD

    sc.parallelize(List(1,2,3),2) 其中第二个参数代表的是整个数据,分为 2 个 partition,默认情况会讲数据集进行平分,注意不是两个副本

    2)通过文件来读取

    sc.textFile("file.txt")

    sc.sequenceFile("file.txt") sequeceFile 是 HDFS 一些数据结构

    文件读取的位置,容易产生奇异,比如一下几种形式:

    1)、inputRdd = sc.textFile("/data/input")

    2)、inputRdd = sc.textFile("file:///data/input")

    3)、inputRdd = sc.textFile("hdfs:///data/input")

    4)、inputRdd = sc.textFile("hdfs://namenode:8020/data/input")

    第一种 /data/input 具体读取的是本地和 hdfs上的文件,要依赖于上下文环境,driver 的配置,driver 是 local 的模式就读的本地文件,driver 是 cluster 模式的且在conf里面配置了 hdfs 的 namenode 地址的,则是读取的远程的文件

    第二种 file:///data/input 是强制 executor 读取本地的数据,这样完全是为了本地测试用的,如果是在集群上运行作业,executor 具体运行的物理机器的相应目录未必存在

    二、List 集合 RDD 常见的 Transformation 操作

    1、map:1 对 1 进行映射

    2、filter:过滤

    3、flatMap:1 对 多进行映射

    举个例子

    listRdd = sc.parallelize(List(1,2,3),1)

    nums.flatMap(x=>1 to x) // {1,2,3,2,3,3}

    三、key-value 集合的RDD操作

    val listRdd = sc.parallelize(List((“cat”,1),("dog",1),("cat",2)))

    listRdd.reduceByKey(_+_) // => {(cat,3),(dog,1)}

    listRdd.groupByKey() // => {(cat,Seq(1,2),(dog,Seq(1))}

    reduceByKey 自动在map端进行本地的 combine 操作

    四、RDD 常见的 Action 操作

    Action 操作,分为,内存聚类操作,存储类操作

    内存聚集类操作是讲分布式的数据集 汇聚到 driver 运行端,或者汇聚完之后进行聚合运算

    1、collect() // 将 RDD 保存在本地集合收集到本地, 此“本地” 是只 driver 运行的机器,如何 RDD 很大,很可能会把 driver 端给撑爆了

    2、take()

    3、count()

    4、reduce(_+_)

    存储类操作是通过 driver 发起分别进行存储

    1、saveAsTextFile

    2、saveAsSequenceFile

    五、Spark RDD的 Join 操作

    Join 操作必须是 针对 2个或多个 key-value 的 List 集合

    join 和 cogroup 的区别

    如何控制 reduceByKey、groupByKey、join 的并行度

    通过参数来修改

    1、reduceByKey(_+_,5)

    2、groupByKey(5)

    通过修改默认的参数来配置

    spark.default.parallelism

    可以这样来理解问 reduce 的数量的控制,原理我猜是通过 hash 讲不同的key进行分桶

    hadoop 的 reduce 默认是启动一个 task,spark 默认的 reduce 端的聚合操作默认和前一个阶段的并发度是一样的

    六、spark 的 accumulator 和 广播变量(HttpBroadCast和TorrentBroadcast)

    非常类似于 hadoop 里面的 counter 和 分布式缓存,只是分布式缓存是通过文件的方式

    七、RDD 的 Cache

    分析下以下2段代码的区别:

    // 有 cache 函数

    val data = sc.textFile("hdfs://nn:8020/input")
    data.cache()
    data.filter(_.startWith("error")).count()
    data.filter(_.startWith("hadoop")).count()
    data.filter(_.startWith("hbase")).count()

    // 无 cache 函数
    val data = sc.textFile("hdfs://nn:8020/input")
    data.filter(_.startWith("error")).count()
    data.filter(_.startWith("hadoop")).count()
    data.filter(_.startWith("hbase")).count()

  • 相关阅读:
    常见面试之机器学习算法思想简单梳理
    机器学习其实比你想的更简单
    机器学习自学指南
    机器学习算法之旅
    我们需要解决的机器学习问题
    机器学习常见算法分类汇总
    机器学习算法基础概念学习总结
    github cheat sheet
    ubuntu kylin 14.10 安装deepin_music
    安装k-vim遇到的错误
  • 原文地址:https://www.cnblogs.com/oftenlin/p/8478341.html
Copyright © 2011-2022 走看看