走进RDD
案例
- 给定一个网站的访问记录, 俗称 Access log 计算其中出现的独立 IP, 以及其访问的次数
/**
* @author noor9
* @date 2021-01-14-16:09
*/
class AccessLogAgg {
@Test
def ipAgg():Unit = {
//1. 创建sparkContext
val conf = new SparkConf().setMaster("local[2]").setAppName("ip_agg")
val sc = new SparkContext(conf)
//2. 读取文件
val sourceRDD = sc.textFile("dataset/access_log_sample.txt")
//3. 取出ip
val ipRDD = sourceRDD.map(item => (item.split(" ")(0),1))
//4. 简单清洗
// 4.1. 去掉空数据
// 4.2. 去掉非法数据
// 4.3. 根据业务再调整
val cleanRDD = ipRDD.filter(item => StringUtils.isNotEmpty(item._1))
//5. 聚合
val ipAggRDD = cleanRDD.reduceByKey( (curr,agg) => curr+agg)
//6. 排序
val sortedRDD = ipAggRDD.sortBy(item => item._2,ascending = false)//默认升序
//7. 取出结果
sortedRDD.take(10).foreach(item => println(item))
}
}
为什么会出现RDD
在 RDD 出现之前, MapReduce 是比较主流的, 而 MapReduce 如何执行迭代计算的任务呢?
可以看出多个 MapReduce 任务之间没有基于内存的数据共享方式, 只能通过磁盘来进行共享
然而这种方式明显比较低效
所以便引入了RDD,那RDD 如何解决迭代计算非常低效的问题呢?
在 Spark 中, 其实最终 Job3 从逻辑上的计算过程是: Job3 = (Job1.map).filter
, 整个过程是共享内存的, 而不需要将中间结果存放在可靠的分布式文件系统中
这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度, RDD 在执行迭代型任务时候的表现可以通过下面代码体现
// 线性回归
val points = sc.textFile(...)
.map(...)
.persist(...)
val w = randomValue
for (i <- 1 to 10000) {
val gradient = points.map(p => p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y)
.reduce(_ + _)
w -= gradient
}
在这个例子中, 进行了大致 10000 次迭代, 如果在 MapReduce 中实现, 可能需要运行很多 Job, 每个 Job 之间都要通过 HDFS 共享结果, 熟快熟慢一窥便知!
RDD特点
RDD 不仅是数据集, 也是编程模型
RDD 即是一种数据结构, 同时也提供了上层 API, 同时 RDD 的 API 和 Scala 中对集合运算的 API 非常类似, 同样也都是各种算子 RDD 的算子大致分为两类:
- Transformation 转换操作, 例如
map
flatMap
filter
等 - Action 动作操作, 例如
reduce
collect
show
等
执行 RDD 的时候, 在执行到转换操作的时候, 并不会立刻执行, 直到遇见了 Action 操作, 才会触发真正的执行, 这个特点叫做 惰性求值
RDD 可以分区
RDD 是一个分布式计算框架, 所以, 一定是要能够进行分区计算的, 只有分区了, 才能利用集群的并行计算能力同时, RDD 不需要始终被具体化, 也就是说: RDD 中可以没有数据, 只要有足够的信息知道自己是从谁计算得来的就可以, 这是一种非常高效的容错方式。
RDD 是只读的
RDD 是只读的, 不允许任何形式的修改. 虽说不能因为 RDD 和 HDFS 是只读的, 就认为分布式存储系统必须设计为只读的. 但是设计为只读的, 会显著降低问题的复杂度, 因为 RDD 需要可以容错, 可以惰性求值, 可以移动计算, 所以很难支持修改.
- RDD2 中可能没有数据, 只是保留了依赖关系和计算函数, 那修改啥?
- 如果因为支持修改, 而必须保存数据的话, 怎么容错?如果允许修改, 如何定位要修改的那一行?
- RDD 的转换是粗粒度的, 也就是说, RDD 并不感知具体每一行在哪.
RDD 是可以容错的
RDD 的容错有两种方式
- 保存 RDD 之间的依赖关系, 以及计算函数, 出现错误重新计算
- 直接将 RDD 的数据存放在外部存储系统, 出现错误直接读取, Checkpoint
什么叫做弹性分布式数据集
-
分布式
RDD 支持分区, 可以运行在集群中
-
弹性
- RDD 支持高效的容错
- RDD 中的数据即可以缓存在内存中, 也可以缓存在磁盘中, 也可以缓存在外部存储中
-
数据集
- RDD 可以不保存具体数据, 只保留创建自己的必备信息, 例如依赖和计算函数
- RDD 也可以缓存起来, 相当于存储具体数据
RDD五大属性
Partition List
分片列表, 记录 RDD 的分片, 可以在创建 RDD 的时候指定分区数目, 也可以通过算子来生成新的 RDD 从而改变分区数目Compute Function
为了实现容错, 需要记录 RDD 之间转换所执行的计算函数RDD Dependencies
RDD 之间的依赖关系, 要在 RDD 中记录其上级 RDD 是谁, 从而实现容错和计算Partitioner
为了执行 Shuffled 操作, 必须要有一个函数用来计算数据应该发往哪个分区Preferred Location
优先位置, 为了实现数据本地性操作, 从而移动计算而不是移动存储, 需要记录每个 RDD 分区最好应该放置在什么位置