Spark里的计算都是操作RDD进行,那么学习RDD的第一个问题就是如何构建RDD,构建RDD从数据来源角度分为两类:
第一类是从内存里直接读取数据,
第二类就是从文件系统里读取,当然这里的文件系统种类很多常见的就是HDFS以及本地文件系统了
/* 使用makeRDD创建RDD */
/* List */val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))val r01 = rdd01.map { x => x * x }println(r01.collect().mkString(","))/* Array */val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))val r02 = rdd02.filter { x => x < 5}println(r02.collect().mkString(","))val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)val r03 = rdd03.map { x => x + 1 }println(r03.collect().mkString(","))/* Array */val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)val r04 = rdd04.filter { x => x > 3 }println(r04.collect().mkString(","))大家看到了RDD本质就是一个数组,因此构造数据时候使用的是List(链表)和Array(数组)类型。
第二类方式是通过文件系统构造RDD,代码如下所示:
val rdd:RDD[String] = sc.textFile("file:///D:/sparkdata.txt", 1)
val r:RDD[String] = rdd.flatMap { x => x.split(",") }println(r.collect().mkString(","))这里例子使用的是本地文件系统,所以文件路径协议前缀是file://。
构造了RDD对象了,接下来就是如何操作RDD对象了,RDD的操作分为转化操作(transformation)和行动操作(action),RDD之所以将操作分成这两类这是和RDD惰性运算有关,当RDD执行转化操作时候,实际计算并没有被执行,只有当RDD执行行动操作时候才会促发计算任务提交,执行相应的计算操作。区别转化操作和行动操作也非常简单,转化操作就是从一个RDD产生一个新的RDD操作,而行动操作就是进行实际的计算。
下面是RDD的基础操作API介绍:
|
操作类型 |
函数名 |
作用 |
|
转化操作 |
map() |
参数是函数,函数应用于RDD每一个元素,返回值是新的RDD |
|
flatMap() |
参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD |
|
|
filter() |
参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD |
|
|
distinct() |
没有参数,将RDD里的元素进行去重操作 |
|
|
union() |
参数是RDD,生成包含两个RDD所有元素的新RDD |
|
|
intersection() |
参数是RDD,求出两个RDD的共同元素 |
|
|
subtract() |
参数是RDD,将原RDD里和参数RDD里相同的元素去掉 |
|
|
cartesian() |
参数是RDD,求两个RDD的笛卡儿积 |
|
|
行动操作 |
collect() |
返回RDD所有元素 |
|
count() |
RDD里元素个数 |
|
|
countByValue() |
各元素在RDD中出现次数 |
|
|
reduce() |
并行整合所有RDD数据,例如求和操作 |
|
|
fold(0)(func) |
和reduce功能一样,不过fold带有初始值 |
|
|
aggregate(0)(seqOp,combop) |
和reduce功能一样,但是返回的RDD数据类型和原RDD不一样 |
|
|
foreach(func) |
对RDD每个元素都是使用特定函数 |
/*
* 测试文件数据: * x01,1,4 x02,11,1x01,3,9x01,2,6 x02,18,12 x03,7,9 * * */val rddFile:RDD[(String,String)] = sc.textFile("file:///F:/sparkdata01.txt", 1).map { x => (x.split(",")(0),x.split(",")(1) + "," + x.split(",")(2)) }val rFile:RDD[String] = rddFile.keysprintln("=========createPairMap File=========")println(rFile.collect().mkString(","))// x01,x02,x01,x01,x02,x03println("=========createPairMap File=========")我们由此可以看到以读取文件方式构造RDD,我们需要使用map函数进行转化,让其变成map的形式。
下面是通过内存方式进行创建,代码如下:
val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
val r:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)println("=========createPairMap=========")println(r.collect().mkString(","))// (k01,29),(k03,2),(k02,6)println("=========createPairMap=========")reduceByKey:合并具有相同键的值;
groupByKey:对具有相同键的值进行分组;keys:返回一个仅包含键值的RDD;values:返回一个仅包含值的RDD;sortByKey:返回一个根据键值排序的RDD;flatMapValues:针对Pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录;mapValues:对Pair RDD里每一个值应用一个函数,但是不会对键值进行操作;combineByKey:使用不同的返回类型合并具有相同键的值;subtractByKey:操作的RDD我们命名为RDD1,参数RDD命名为参数RDD,剔除掉RDD1里和参数RDD中键相同的元素;join:对两个RDD进行内连接;rightOuterJoin:对两个RDD进行连接操作,第一个RDD的键必须存在,第二个RDD的键不再第一个RDD里面有那么就会被剔除掉,相同键的值会被合并;leftOuterJoin:对两个RDD进行连接操作,第二个RDD的键必须存在,第一个RDD的键不再第二个RDD里面有那么就会被剔除掉,相同键的值会被合并;cogroup:将两个RDD里相同键的数据分组在一起countByKey:对每个键的元素进行分别计数;
collectAsMap:将结果变成一个map;lookup:在RDD里使用键值查找数据