分组操作
在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能。和单节点的程序需要为记录集合选择合适的数据结构一样,Spark 程序可以通过控制RDD 分区方式来减少通信开销。
分区并不是对所有应用都有好处的——比如,如果给定RDD 只需要被扫描一次,我们完全没有必要对其预先进行分区处理。只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。
比如, sortByKey() 和 groupByKey()会分别生成范围分区的 RDD 和哈希分区的 RDD。而另一方面,诸如 map() 这样的操作会导致新的 RDD 失去父 RDD 的分区信息,因为这样的操作理论上可能会修改每条记录的键。
从分区中获益的操作
Spark 的许多操作都引入了将数据根据键跨节点进行混洗的过程。所有这些操作都会从数据分区中获益。就 Spark 1.0 而言,能够从数据分区中获益的操作有 cogroup() 、groupWith() 、 join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、combineByKey() 以及 lookup() 。
而对于诸如 cogroup() 和join() 这样的二元操作,预先进行数据分区会导致其中至少一个 RDD(使用已知分区器的那个 RDD)不发生数据混洗。如果两个 RDD 使用同样的分区方式,并且它们还缓存在同样的机器上(比如一个 RDD 是通过 mapValues() 从另一个 RDD 中创建出来的,这两个RDD 就会拥有相同的键和分区方式),或者其中一个 RDD 还没有被计算出来,那么跨节点的数据混洗就不会发生了。
影响分区方式的操作
所有会为生成的结果 RDD 设好分区方式的操作: cogroup() 、 groupWith() 、join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、combineByKey() 、 partitionBy() 、 sort() 、 mapValues() (如果父 RDD 有分区方式的话)、flatMapValues() (如果父 RDD 有分区方式的话),以及 filter() (如果父 RDD 有分区方式的话)。其他所有的操作生成的结果都不会存在特定的分区方式。
最后,对于二元操作,输出数据的分区方式取决于父 RDD 的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。不过,如果其中的一个父 RDD 已
经设置过分区方式,那么结果就会采用那种分区方式;如果两个父 RDD 都设置过分区方式,结果 RDD 会采用第一个父 RDD 的分区方式。
1. combineByKey
test1
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.AbstractJavaRDDLike;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
//JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
public class CombineByKeyTest3 {
public static void main(String[] xx){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("WordCounter");
conf.set("spark.testing.memory", "2147480000");
conf.set("spark.default.parallelism", "4");
JavaSparkContext ctx = new JavaSparkContext(conf);
//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
data.add(new Tuple2<>("Cake", 2));
data.add(new Tuple2<>("Bread", 3));
data.add(new Tuple2<>("Cheese", 4));
data.add(new Tuple2<>("Milk", 1));
data.add(new Tuple2<>("Toast", 2));
data.add(new Tuple2<>("Bread", 2));
data.add(new Tuple2<>("Egg", 6));
JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2);
JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return t;
}
}).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY());
JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception {
return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1));
}
});
mapRdd1.foreach(x->System.out.println(x));
/*
* 全部使用List或者Iterable都能实现
*/
// JavaPairRDD<String, List<Tuple2<Integer, Integer>>> results = mapRdd1.groupByKey();
// JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> results = mapRdd1.groupByKey();
JavaPairRDD<String, List<Tuple2<Integer, Integer>>> results = mapRdd1.combineByKey(
new Function<Tuple2<Integer,Integer>, List<Tuple2<Integer, Integer>>>() {
@Override
public List<Tuple2<Integer, Integer>> call(Tuple2<Integer, Integer> value) throws Exception {
List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
list.add(value);
return list;
}
},
new Function2<List<Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>, List<Tuple2<Integer, Integer>>>() {
@Override
public List<Tuple2<Integer, Integer>> call(
List<Tuple2<Integer, Integer>> it,
Tuple2<Integer, Integer> value) throws Exception {
// List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
// it.forEach(list::add);
// list.add(value);
((List<Tuple2<Integer, Integer>>)it).add(value);
return it;
}
},
new Function2<List<Tuple2<Integer, Integer>>, List<Tuple2<Integer, Integer>>, List<Tuple2<Integer, Integer>>>() {
@Override
public List<Tuple2<Integer, Integer>> call(
List<Tuple2<Integer, Integer>> it1,
List<Tuple2<Integer, Integer>> it2) throws Exception {
// List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
// it1.forEach(list::add);
// it2.forEach(list::add);
// return list;
((List)it1).addAll((List)it2);
return it1;
}
});
results.foreach(x->System.out.println(x));
//其实,distinct 基于 reduceByKey实现
// mapRdd1.distinct();
ctx.stop();
}
}
test2
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.AbstractJavaRDDLike;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
//JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
public class CombineByKeyTest2 {
public static void main(String[] xx){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.set("spark.testing.memory", "2147480000");
conf.setAppName("WordCounter");
conf.set("spark.default.parallelism", "4");
JavaSparkContext ctx = new JavaSparkContext(conf);
//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
data.add(new Tuple2<>("Cake", 2));
data.add(new Tuple2<>("Bread", 3));
data.add(new Tuple2<>("Cheese", 4));
data.add(new Tuple2<>("Milk", 1));
data.add(new Tuple2<>("Toast", 2));
data.add(new Tuple2<>("Bread", 2));
data.add(new Tuple2<>("Egg", 6));
int index="Code".hashCode() % 4;
JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2);
JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return t;
}
}).partitionBy(new HashPartitioner(4)).persist(StorageLevel.MEMORY_ONLY());
// JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.combineByKey(
// (value) -> new Tuple2<Integer, Integer>(value,1),
// (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1),
// (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()),
// new HashPartitioner(2),
// false,
// null
// );
// JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.aggregateByKey(
// new Tuple2<Integer, Integer>(0,0),
// (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1),
// (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2())
// );
// JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() {
// @Override
// public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception {
// return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1));
// }
// });
// mapRdd1.foreach(System.out::println);
// JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd1.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
// @Override
// public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
// return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
// }
// });
//results.foreach(System.out::println);
// results = mapRdd1.foldByKey(new Tuple2<Integer, Integer>(0, 0), new Function2<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() {
// @Override
// public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
// return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
// }
// });
//results.foreach(System.out::println);
//思考:如何用combineByKey实现groupByKey
// mapRdd.groupByKey().foreach(System.out::println);
Function<Integer, List<Integer>> createCombiner=new Function<Integer, List<Integer>>() {
@Override
public List<Integer> call(Integer arg0) throws Exception {
List<Integer>list=new ArrayList<Integer>();
list.add(arg0);
return list;
}
};
Function2<List<Integer>, Integer, List<Integer>> mergeValue=new Function2<List<Integer>, Integer, List<Integer>>() {
@Override
public List<Integer> call(List<Integer> list, Integer value) throws Exception {
list.add(value);
return list;
}
};
Function2< List<Integer>,List<Integer> ,List<Integer> > mergeCombiners=new Function2<List<Integer>, List<Integer>, List<Integer>>() {
@Override
public List<Integer> call(List<Integer> list1, List<Integer> list2) throws Exception {
List<Integer> list=new ArrayList<Integer>();
// list.addAll(list1);
// list.addAll(list2);
list1.forEach(list::add);
list2.forEach(list::add);
return list;
}
};
JavaPairRDD<String, List<Integer>> results=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners);
results.foreach(x->System.out.println(x));
JavaPairRDD<String, Integer> re=mapRdd.partitionBy(new HashPartitioner(2));
System.out.println(re.glom().collect());
//第四个参数是分区数,glom()打印分区状态
JavaPairRDD<String, List<Integer>> results2=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners, 2);
System.out.println(results2.glom().collect());
System.out.println(results2.getNumPartitions());
//第四个参数自定义分区器
JavaPairRDD<String, List<Integer>> results3=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners,new HashPartitioner(3));
System.out.println(results3.glom().collect());
System.out.println(results3.getNumPartitions());
//第四个参数自定义分区器,第五个参数Boolean类型(map短是否merge),第六个参数定义序列化规则,null为默认序列化规则
JavaPairRDD<String, List<Integer>> results4=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(3), true, null);
System.out.println(results4.glom().collect());
System.out.println(results4.getNumPartitions());
// mapRdd1.combineByKey(
// new Function<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() {
// @Override
// public Tuple2<Integer,Integer> call(Tuple2<Integer, Integer> arg0) throws Exception {
// return arg0;
// }
// },
//
// new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>[]>() {
// @Override
// public Tuple2<Integer, Integer>[] call(Tuple2<Integer, Integer> arg0, Integer arg1) throws Exception {
// return null;
// }
// },
// mergeCombiners);
//其实,distinct 基于 reduceByKey实现
// mapRdd1.distinct();
ctx.stop();
}
}
2.group&join
groupByKey
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.AbstractJavaRDDLike;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
//JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
public class CombineByKeyTest {
@SuppressWarnings("serial")
public static void main(String[] xx){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("WordCounter");
conf.set("spark.testing.memory", "5435657567560");
conf.set("spark.default.parallelism", "4");
JavaSparkContext ctx = new JavaSparkContext(conf);
//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
data.add(new Tuple2<>("Cake", 2));
data.add(new Tuple2<>("Bread", 3)); //<"Bread", <3,1>>
data.add(new Tuple2<>("Cheese", 4));
data.add(new Tuple2<>("Milk", 1));
data.add(new Tuple2<>("Toast", 2));
data.add(new Tuple2<>("Bread", 2));
data.add(new Tuple2<>("Egg", 6));
JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2);
JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(
new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return t;
}
});
// JavaPairRDD<String, Integer> mapRdd=ctx.parallelizePairs(data,2);
mapRdd.groupByKey().foreach(x->System.out.println(x));
// JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.combineByKey(
// new Function<Integer, Tuple2<Integer, Integer>>() {
// @Override
// public Tuple2<Integer, Integer> call(Integer v1) throws Exception {
// return new Tuple2<Integer, Integer>(v1 ,1);
// }
// }, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
// @Override
// public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Integer v2) throws Exception {
// return new Tuple2<Integer, Integer>(v1._1() + v2, v1._2() + 1);
// }
// }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
// @Override
// public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
// return new Tuple2<Integer, Integer>(v1._1() + v2._1(), v1._2() + v2._2());
// }
// });
JavaPairRDD<String, Tuple2<Integer, Integer>> result2s = mapRdd.combineByKey(
(Integer value) -> new Tuple2<Integer, Integer>(value,1),
(Tuple2<Integer, Integer> acc, Integer value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1),
(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()),
new HashPartitioner(3),
true,
null
);
result2s.foreach(x->System.out.println(x));
JavaPairRDD<String, Tuple2<Integer, Integer>> results3 = mapRdd.aggregateByKey(
new Tuple2<Integer, Integer>(0,0),
(acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1),
(acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2())
);
results3.foreach(x->System.out.println(x));
JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception {
return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1));
}
});
JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd1.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
}
});
// results.foreach(System.out::println);
results.foreach(x->System.out.println(x));
ctx.stop();
}
}
join
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.AbstractJavaRDDLike;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
//JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
public class CogroupApiTest {
public static void main(String[] xx){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.set("spark.testing.memory", "2147480000");
conf.setAppName("WordCounter");
conf.set("spark.default.parallelism", "4");
JavaSparkContext ctx = new JavaSparkContext(conf);
//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
data1.add(new Tuple2<>("Cake", 2));
data1.add(new Tuple2<>("Bread", 3));
data1.add(new Tuple2<>("Cheese", 4));
data1.add(new Tuple2<>("Milk", 1));
data1.add(new Tuple2<>("Toast", 2));
data1.add(new Tuple2<>("Bread", 2));
data1.add(new Tuple2<>("Egg", 6));
// JavaPairRDD<String, Integer> mapRdd1=ctx.parallelizePairs(data1);
JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data1, 2);
JavaPairRDD<String, Integer> mapRdd1 = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return t;
}
}).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY());
List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
data2.add(new Tuple2<>("Cake", 2));
data2.add(new Tuple2<>("Bread", 3));
data2.add(new Tuple2<>("Cheese", 4));
data2.add(new Tuple2<>("Milk", 1));
data2.add(new Tuple2<>("Toast", 2));
JavaRDD<Tuple2<String, Integer>> rdd2 = ctx.parallelize(data2, 2);
JavaPairRDD<String, Integer> mapRdd2 = rdd2.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return t;
}
}).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY());
//groupWith,和cogroup是一样的效果 (Bread,([3, 2],[3]))
JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> mapRdd3 = mapRdd1.cogroup(mapRdd2);
mapRdd3.foreach(x->System.out.println(x));
//(Bread,(3,3)),(Bread,(2,3)),(Cake,(2,2)) 聚合操作
// JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd3 = mapRdd1.join(mapRdd2);
// mapRdd3.foreach(x->System.out.println(x));
//(Bread,(Optional[3],3)), (Bread,(Optional[3],2)),(Cake,(Optional[2],2)) 聚合操作,主集合可以为optional.empty
// JavaPairRDD<String, Tuple2<Optional<Integer>, Integer>> mapRdd3 = mapRdd2.rightOuterJoin(mapRdd1);
// mapRdd3.foreach(x->System.out.println(x));
//(Cheese,(4,Optional[4])), (Toast,(2,Optional[2])), (Egg,(6,Optional.empty))
// JavaPairRDD<String, Tuple2<Integer, Optional<Integer>>> mapRdd4 = mapRdd1.leftOuterJoin(mapRdd2);
// mapRdd4.foreach(x->System.out.println(x));
//两边都能为空
// JavaPairRDD<String, Tuple2<Optional<Integer>, Optional<Integer>>> mapRdd5 = mapRdd1.fullOuterJoin(mapRdd2);
// mapRdd5.foreach(x->System.out.println(x));
//groupWith,和cogroup是一样的效果 (Bread,([3, 2],[3]))
// JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> mapRdd6 = mapRdd1.groupWith(mapRdd2);
// mapRdd6.foreach(x->System.out.println(x));
//(Bread,(3,3)),(Bread,(2,3)),(Cake,(2,2)) 聚合操作
// JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd7=mapRdd1.join(mapRdd2);
// mapRdd7.foreach(x->System.out.println(x));
//聚合操作,将两个maprdd并集,重复元素不会被删掉
// JavaPairRDD<String,Integer> mapRdd8=mapRdd1.union(mapRdd2);
// mapRdd8.foreach(x->System.out.println(x));
//删除key相同的元素
// JavaPairRDD<String, Integer> mapRdd9=mapRdd1.subtractByKey(mapRdd2);
// mapRdd9.foreach(x->System.out.println(x));
//求交集,只返回key,value相同的tuple
// JavaPairRDD<String, Integer> mapRdd10=mapRdd1.intersection(mapRdd2);
// mapRdd10.foreach(x->System.out.println(x));
}
}
2.sortBykey
test1
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
public class SortByKeyApi {
public static void main(String[] xx){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("WordCounter");
conf.set("spark.testing.memory", "2147480000");
conf.set("spark.default.parallelism", "4");
JavaSparkContext ctx = new JavaSparkContext(conf);
//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
data.add(new Tuple2<>("Cake", 2));
data.add(new Tuple2<>("Bread", 3));
data.add(new Tuple2<>("Cheese", 4));
data.add(new Tuple2<>("Milk", 1));
data.add(new Tuple2<>("Toast", 2));
data.add(new Tuple2<>("Bread", 2));
data.add(new Tuple2<>("Egg", 6));
JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2);
JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return t;
}
}).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY());
//mapRdd.sortByKey().foreach(System.out::println);
mapRdd.sortByKey(false).foreach(x->System.out.println(x));
// mapRdd.sortByKey(new Comparator<Tuple2<String, Integer>>() {
// @Override
// public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
// return 0;
// }
// });
// mapRdd.f
// mapRdd.mapValues(x->x+1).foreach(x->System.out.println(x));
// mapRdd.flatMapValues(()->Arrays.asList(1,1,1));
ctx.stop();
}
}
test2
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class SortByKeyApiTest {
public static void main(String[] xx){
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("WordCounter");
conf.set("spark.default.parallelism", "4");
conf.set("spark.testing.memory", "2147480000");
JavaSparkContext ctx = new JavaSparkContext(conf);
//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
List<Person> data1 =
new ArrayList<Person>();
data1.add(new Person("Cake",32));
data1.add(new Person("Bread",21));
data1.add(new Person("Smith",32));
data1.add(new Person("Hourse",21));
data1.add(new Person("Mary",32));
data1.add(new Person("Greey",21));
data1.add(new Person("Greey",21));
data1.add(new Person("Tom",32));
data1.add(new Person("Gao",21));
System.out.println(ctx.parallelize(data1).distinct().count());
// .sortBy(x->x, true, 2).foreach(x->System.out.println(x));
List<Tuple2<Person, Integer>> data =
new ArrayList<Tuple2<Person, Integer>>();
data.add(new Tuple2<Person, Integer>(new Person("Cake",32), 2));
data.add(new Tuple2<Person, Integer>(new Person("Bread",21), 3));
data.add(new Tuple2<Person, Integer>(new Person("Smith",32), 2));
data.add(new Tuple2<Person, Integer>(new Person("Hourse",21), 3));
data.add(new Tuple2<Person, Integer>(new Person("Mary",32), 2));
data.add(new Tuple2<Person, Integer>(new Person("Greey",21), 3));
data.add(new Tuple2<Person, Integer>(new Person("Greey",11), 3));
data.add(new Tuple2<Person, Integer>(new Person("Tom",32), 2));
data.add(new Tuple2<Person, Integer>(new Person("Gao",21), 3));
JavaPairRDD<Person, Integer> dataRdd = ctx.parallelizePairs(data);
dataRdd.sortByKey().foreach(x->System.out.println(x));
dataRdd.sortByKey(new Comparator<Person>() {
@Override
public int compare(Person o1, Person o2) {
int res = o1.name.compareTo(o2.name);
if(res == 0){
res = o1.age - o2.age;
}
return res;
}
});
ctx.close();
ctx.stop();
}
}
class Person implements Serializable, Comparable<Person>{
private static final long serialVersionUID = 1L;
public Person(String name, int age) {
super();
this.name = name;
this.age = age;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + age;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Person other = (Person) obj;
if (age != other.age)
return false;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
return true;
}
String name;
int age;
@Override
public int compareTo(Person p) {
int res = this.name.compareTo(p.name);
if(res == 0){
res = this.age - p.age;
}
return res;
}
@Override
public String toString() {
return "Person [name=" + name + ", age=" + age + "]";
}
}