1:spark的算子分类
- Transformation 称为转换,是一种延迟加载的算法,会记录元数据信息,任务触发action时开始执行
- Action 称为动作 出发就执行
sc.textFile().map map是transformation
.filter transformation
.collect 是action直接执行
2:创建rdd的两种方式
- 通过hdfs支持的文件系统,rdd里面没有真正要计算的数据,只记录元数据
- 通过scala集合或者数据以并行化的方式创建rdd
2:spark python高级算子
1.mapPartitions
// 传给mapPartitions的方法中 参数是partitions的迭代器对象,返回值也是一个迭代器对象
// python实现如下
def filterOutFromPartion(list):
//list是partitioins的迭代器集合
iterator = []
//elements是具体的partition中元素的迭代器
for elements in list:
iterator.append([x for x in elements if x !=2 ])
return iter(iterator)
data = [[1,2,3],[3,2,4],[5,2,7]]
conf = SparkConf().setAppName("study")
sc = SparkContext(conf=conf)
partitions = sc.parallelize(data, 2).mapPartitions(filterOutFromPartion).collect()
print(partitions)
//yield为简化版,因为yield本身就是返回一个迭代器
def filterOutFromPartion(list): # iterator = [] for elements in list: yield [x for x in elements if x !=2 ] # iterator.append([x for x in elements if x !=2 ]) # return iter(iterator)
2.mapPartitionsWithIndex
Similar to mapPartitions, but also provides a function with an int value to indicate the index position of the partition.
和mapPartitions类似,但是提供了一个带有整形参数用来表明分区位置的方法
parallel = sc.parallelize(range(1,10),4)
//下面这个方法是传进去的方法对象,
def show(index, iterator): yield 'index: '+str(index)+" values: "+ str(list(iterator))
parallel.mapPartitionsWithIndex(show).collect()
//一下为结果
['index: 0 values: [1, 2, 3]',
'index: 1 values: [4, 5, 6]',
'index: 2 values: [7, 8, 9]']
3.sample
sample(withReplacement,faction,seed):抽样,withReplacement为true表示有放回;faction表示采样的比例;seed为随机种子
parallel = sc.parallelize(range(1,10))
//表示取50%的数据 种子随机
parallel.sample(True,0.5).count()
4.union
union(ortherDataset):将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重
one = sc.parallelize(range(1,10))
two = sc.parallelize(range(10,21))
one.union(two).collect()
//output
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
5.intersection
intersection(otherDataset):返回两个RDD的交集
one = sc.parallelize(range(1,10))
two = sc.parallelize(range(5,15))
one.intersection(two).collect()
//output
[5, 6, 7, 8, 9]
6.distinct
distinct([numTasks]):对RDD中的元素进行去重
>>> parallel = sc.parallelize(range(1,9))
>>> par2 = sc.parallelize(range(5,15))
>>> parallel.union(par2).distinct().collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
7.groupByKey算子
// 但是,JavaPairRDD的第一个泛型类型不变,第二个泛型类型变成Iterable这种集合类型
// 也就是说,按照了key进行分组,那么每个key可能都会有多个value,此时多个value聚合成了Iterable
8.reducebykey
reduceByKey(func, [numTasks]) //通过key来进行reduce过程,key相同的值的集合进行reduce操作
sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1),
... ("b", 1), ("b", 1), ("b", 1), ("b", 1)], 3)
# Applying reduceByKey operation on x>>> y = x.reduceByKey(lambda accum, n: accum + n)
>>> y.collect()
[('b', 5), ('a', 3)]
# Define associative function separately >>>def sumFunc(accum, n):
... return accum + n
...
>>> y = x.reduceByKey(sumFunc)
>>> y.collect()
[('b', 5), ('a', 3)]
9.aggregatebykey
aggregateByKey
这个函数可用于完成对groupByKey,reduceByKey的相同的功能,用于对rdd中相同的key的值的聚合操作,主要用于返回一个指定的类型U的RDD的transform,在这个函数中,需要传入三个参数:
参数1:用于在每个分区中,对key值第一次读取V类型的值时,使用的U类型的初始变量,
参数2:用于在每个分区中,相同的key中V类型的值合并到参数1创建的U类型的变量中,
参数3:用于对重新分区后两个分区中传入的U类型数据的合并的函数.
//合并在不同partition中的值,a,b的数据类型为zeroValue的数据类型
def comb(a: String, b: String): String = {
println("comb: " + a + " " + b)
a + b
}
//合并在同一个partition中的值, a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型
def seq(a: String, b: Int): String = {
println("seq: " + a + " " + b)
a + b
}
rdd.foreach(println)
//zeroValue 中立值,定义返回value的类型,并参与运算
//seqOp 用来在一个partition中合并值的
//comb 用来在不同partition中合并值的
val aggregateByKeyRDD: RDD[(Int, String)] = rdd.aggregateByKey("100")(seq,comb)
10.sortByKey
sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序