spark的combineByKey
combineByKey的特点
combineByKey的强大之处,在于提供了三个函数操作来操作一个函数。第一个函数,是对元数据处理,从而获得一个键值对。第二个函数,是对键值键值对进行一对一的操作,即一个键值对对应一个输出,且这里是根据key进行整合。第三个函数是对key相同的键值对进行操作,有点像reduceByKey,但真正实现又有着很大的不同。
在Spark入门(五)--Spark的reduce和reduceByKey中,我们用reduce进行求平均值。用combineByKey我们则可以求比平均值更为丰富的事情。现在有一个数据集,每一行数据包括一个a-z字母和一个整数,其中字母和整数之间以空格分隔。现在要求得每个字母的平均数。这个场景有点像多个学生,每个学生多门成绩,求得学生的平均分。但这里将问题简化,其中数据集放在grades中。数据集以及下面的代码都可以在github上下载。
combineByKey求多个平均值
scala实现
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkCombineByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")
val sc = new SparkContext(conf)
sc.textFile("./grades").map(line=>{
val splits = line.split(" ")
(splits(0),splits(1).toInt)
}).combineByKey(
value => (value,1),
(x:(Int,Int),y)=>(x._1+y,x._2+1),
(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
).map(x=>(x._1,x._2._1/x._2._2)).foreach(println)
}
}
scala运行结果
(d,338451)
(e,335306)
(a,336184)
(i,346279)
(b,333069)
(h,334343)
(f,341380)
(j,320145)
(g,334042)
(c,325022)
java实现:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.sources.In;
import scala.Tuple2;
public class SparkCombineByKeyJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKeyJava");
JavaSparkContext sc = new JavaSparkContext(conf);
combineByKeyJava(sc);
combineByKeyJava8(sc);
}
public static void combineByKeyJava(JavaSparkContext sc){
JavaPairRDD<String,Integer> splitData = sc.textFile("./grades").mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
String[] splits = s.split(" ");
return new Tuple2<>(splits[0],Integer.parseInt(splits[1]));
}
});
splitData.combineByKey(new Function<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<>(integer, 1);
}
}, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Integer integer) throws Exception {
return new Tuple2<>(integerIntegerTuple2._1 + integer, integerIntegerTuple2._2 + 1);
}
}, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> integerIntegerTuple22) throws Exception {
return new Tuple2<>(integerIntegerTuple2._1+integerIntegerTuple22._1,integerIntegerTuple2._2+integerIntegerTuple22._2);
}
}).map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Tuple2<String,Double>>() {
@Override
public Tuple2<String,Double> call(Tuple2<String, Tuple2<Integer, Integer>> stringTuple2Tuple2) throws Exception {
return new Tuple2<>(stringTuple2Tuple2._1,stringTuple2Tuple2._2._1*1.0/stringTuple2Tuple2._2._2);
}
}).foreach(new VoidFunction<Tuple2<String, Double>>() {
@Override
public void call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
}
});
}
public static void combineByKeyJava8(JavaSparkContext sc){
JavaPairRDD<String,Integer> splitData = sc.textFile("./grades").mapToPair(line -> {
String[] splits = line.split(" ");
return new Tuple2<>(splits[0],Integer.parseInt(splits[1]));
});
splitData.combineByKey(
x->new Tuple2<>(x,1),
(x,y)->new Tuple2<>(x._1+y,x._2+1),
(x,y)->new Tuple2<>(x._1+y._1,x._2+y._2)
).map(x->new Tuple2(x._1,x._2._1*1.0/x._2._2)).foreach(x->System.out.println(x._1+" "+x._2));
}
}
java运行结果
d 338451.6
e 335306.7480769231
a 336184.95321637427
i 346279.497029703
b 333069.8589473684
h 334343.75
f 341380.94444444444
j 320145.7618069815
g 334042.37605042016
c 325022.4183673469
分析
在开始python之前,我们先观察java和scala两个程序。我们发现java7的代码非常冗余,而java8和scala则相比起来非常干净利落。当然,我们难说好坏,但是这也表现出当代语言开始从繁就简的一个转变。到了python这一特点就体现的更加淋漓尽致。
但我们不光说语言,我们分析这个求平均的实现方式,由于java中对数值做了一个处理,因此有保留小数,而scala则没有,但至少可以判断两者的结果是一致的。当然,这不是重点,重点是,这个combinByKey非常复杂,有三个函数。我们很难观察到每个过程做了什么。因此我们在这里,对scala程序进行进一步的输出,从而观察combineByKey到底做了什么。
scala修改
import org.apache.spark.{SparkConf, SparkContext}
object SparkCombineByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")
val sc = new SparkContext(conf)
sc.textFile("./grades").map(line=>{
val splits = line.split(" ")
(splits(0),splits(1).toInt)
}).combineByKey(
value => {
println("这是第一个函数")
println("将所有的值遍历,并放在元组中,标记1")
println(value)
(value,1)
},
(x:(Int,Int),y)=>{
println("这是第二个函数")
println("将x中的第一个值进行累加求和,第二个值加一,求得元素总个数")
println("x:"+x.toString())
println("y:"+y)
(x._1+y,x._2+1)
},
(x:(Int,Int),y:(Int,Int))=>{
(x._1+y._1,x._2+y._2)
}
).map(x=>(x._1,x._2._1/x._2._2)).foreach(println)
}
}
得到结果
这是第一个函数
将所有的值遍历,并放在元组中,标记1
222783
这是第一个函数
将所有的值遍历,并放在元组中,标记1
48364
这是第一个函数
将所有的值遍历,并放在元组中,标记1
204950
这是第一个函数
将所有的值遍历,并放在元组中,标记1
261777
...
...
...
这是第二个函数
将x中的第一个值进行累加求和,第二个值加一,求得元素总个数
x:(554875,2)
y:357748
这是第二个函数
将x中的第一个值进行累加求和,第二个值加一,求得元素总个数
x:(912623,3)
y:202407
这是第一个函数
将所有的值遍历,并放在元组中,标记1
48608
这是第二个函数
将x中的第一个值进行累加求和,第二个值加一,求得元素总个数
x:(1115030,4)
y:69003
这是第一个函数
将所有的值遍历,并放在元组中,标记1
476893
...
...
...
(d,338451)
(e,335306)
(a,336184)
(i,346279)
(b,333069)
(h,334343)
(f,341380)
(j,320145)
(g,334042)
(c,325022)
这里我们发现了,函数的顺序并不先全部执行完第一个函数,再执行第二个函数。而是分区并行,即第一个分区执行完第一个函数,并不等待其他分区执行完第一个函数,而是紧接着执行第二个函数,最后在第三个函数进行处理。在本地单机下,该并行特点并不能充分发挥,但在集群环境中,各个分区在不同节点计算,然后处理完结果汇总处理。这样,当数据量十分庞大时,集群节点数越多,该优势就表现地越明显。
此外还有一个非常值得关注的特点,当我们把foreach(println)这句话去掉时
foreach(println)
我们运行程序,发现程序没有任何输出。这是由于spark的懒加载特点,spark只用在对数据执行具体操作时,如输出、保存等才会执行计算。这看起来有点不合理,但实际上这样做在很多场景下能大幅度提升效率,但如果没有处理好,可能会导致spark每次执行操作都会从头开始计算该过程。因此当一个操作结果需要被频繁或者多次调用的时候,我们应该将结果存下来。
python实现
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")
sc = SparkContext(conf=conf)
sc.textFile("./grades")
.map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))
.combineByKey(
lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[0],x[1][0]/x[1][1])).foreach(print)
得到结果
('b', 333069.8589473684)
('f', 341380.94444444444)
('j', 320145.7618069815)
('h', 334343.75)
('a', 336184.95321637427)
('g', 334042.37605042016)
('d', 338451.6)
('e', 335306.7480769231)
('c', 325022.4183673469)
spark的sortByKey
sortByKey进行排序
sortByKey非常简单,也非常常用。这里依然采用上述文本,将处理后的结果,进行排序,得到平均值最大的字母。在实际运用中我们这里可以看成求得按照成绩排序,或者按照姓名排序。
scala实现
import org.apache.spark.{SparkConf, SparkContext}
object SparkSortByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")
val sc = new SparkContext(conf)
val result = sc.textFile("./grades").map(line=>{
val splits = line.split(" ")
(splits(0),splits(1).toInt)
}).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
).map(x=>(x._1,x._2._1/x._2._2))
//按照名字排序,顺序
result.sortByKey(true).foreach(println)
//按照名字排序,倒序
result.sortByKey(false).foreach(println)
val result1 = sc.textFile("./grades").map(line=>{
val splits = line.split(" ")
(splits(0),splits(1).toInt)
}).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
).map(x=>(x._2._1/x._2._2,x._1))
//按照成绩排序,顺序
result1.sortByKey(true).foreach(println)
//按照成绩排序,倒序
result1.sortByKey(false).foreach(println)
}
}
python实现
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")
sc = SparkContext(conf=conf)
result = sc.textFile("./grades")
.map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))
.combineByKey(
lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[0],x[1][0]/x[1][1]))
result.sortByKey(True).foreach(print)
result.sortByKey(False).foreach(print)
result1 = sc.textFile("./grades")
.map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))
.combineByKey(
lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[1][0]/x[1][1],x[0]))
result1.sortByKey(True).foreach(print)
result1.sortByKey(False).foreach(print)
得到结果
(a,336184)
(b,333069)
(c,325022)
(d,338451)
(e,335306)
(f,341380)
(g,334042)
(h,334343)
(i,346279)
(j,320145)
(j,320145)
(i,346279)
(h,334343)
(g,334042)
(f,341380)
(e,335306)
(d,338451)
(c,325022)
(b,333069)
(a,336184)
(320145,j)
(325022,c)
(333069,b)
(334042,g)
(334343,h)
(335306,e)
(336184,a)
(338451,d)
(341380,f)
(346279,i)
(346279,i)
(341380,f)
(338451,d)
(336184,a)
(335306,e)
(334343,h)
(334042,g)
(333069,b)
(325022,c)
(320145,j)
数据集以及代码都可以在github上下载。