zoukankan      html  css  js  c++  java
  • combineByKey&groupByKey&sortedByKey

    分组操作

    在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能。和单节点的程序需要为记录集合选择合适的数据结构一样,Spark 程序可以通过控制RDD 分区方式来减少通信开销。

    分区并不是对所有应用都有好处的——比如,如果给定RDD 只需要被扫描一次,我们完全没有必要对其预先进行分区处理。只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。

    比如, sortByKey() 和 groupByKey()会分别生成范围分区的 RDD 和哈希分区的 RDD。而另一方面,诸如 map() 这样的操作会导致新的 RDD 失去父 RDD 的分区信息,因为这样的操作理论上可能会修改每条记录的键。

    从分区中获益的操作

    Spark 的许多操作都引入了将数据根据键跨节点进行混洗的过程。所有这些操作都会从数据分区中获益。就 Spark 1.0 而言,能够从数据分区中获益的操作有 cogroup() 、groupWith() 、 join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、combineByKey() 以及 lookup() 。

    而对于诸如 cogroup() 和join() 这样的二元操作,预先进行数据分区会导致其中至少一个 RDD(使用已知分区器的那个 RDD)不发生数据混洗。如果两个 RDD 使用同样的分区方式,并且它们还缓存在同样的机器上(比如一个 RDD 是通过 mapValues() 从另一个 RDD 中创建出来的,这两个RDD 就会拥有相同的键和分区方式),或者其中一个 RDD 还没有被计算出来,那么跨节点的数据混洗就不会发生了。

    影响分区方式的操作

    所有会为生成的结果 RDD 设好分区方式的操作: cogroup() 、 groupWith() 、join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、combineByKey() 、 partitionBy() 、 sort() 、 mapValues() (如果父 RDD 有分区方式的话)、flatMapValues() (如果父 RDD 有分区方式的话),以及 filter() (如果父 RDD 有分区方式的话)。其他所有的操作生成的结果都不会存在特定的分区方式。

    最后,对于二元操作,输出数据的分区方式取决于父 RDD 的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。不过,如果其中的一个父 RDD 已

    经设置过分区方式,那么结果就会采用那种分区方式;如果两个父 RDD 都设置过分区方式,结果 RDD 会采用第一个父 RDD 的分区方式。

    1. combineByKey

    test1

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Scanner;
    
    import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
    import org.apache.spark.HashPartitioner;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.AbstractJavaRDDLike;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.storage.StorageLevel;
    
    import scala.Tuple2;
    
    //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
    public class CombineByKeyTest3 {
        public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.setAppName("WordCounter");
        	conf.set("spark.testing.memory", "2147480000");
        	conf.set("spark.default.parallelism", "4");
        	JavaSparkContext ctx = new JavaSparkContext(conf);
        	//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
        	List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
        	data.add(new Tuple2<>("Cake", 2));
        	data.add(new Tuple2<>("Bread", 3));
        	data.add(new Tuple2<>("Cheese", 4));
        	data.add(new Tuple2<>("Milk", 1));
        	data.add(new Tuple2<>("Toast", 2));
        	data.add(new Tuple2<>("Bread", 2));
        	data.add(new Tuple2<>("Egg", 6));
    
        	JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2);
    
          JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {  
    	      @Override
    	      public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {  
    	          return t;
    	      }
          }).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY());
          
          JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() {
    		@Override
    		public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception {
    			return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1));
    		}
    	  });
          mapRdd1.foreach(x->System.out.println(x));
          /*
           * 全部使用List或者Iterable都能实现
           */
          
    //   JavaPairRDD<String, List<Tuple2<Integer, Integer>>>  results = mapRdd1.groupByKey();
    //      JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>>  results = mapRdd1.groupByKey();
           JavaPairRDD<String, List<Tuple2<Integer, Integer>>> results = mapRdd1.combineByKey(
        		   new Function<Tuple2<Integer,Integer>, List<Tuple2<Integer, Integer>>>() {
    		           @Override
    						public List<Tuple2<Integer, Integer>> call(Tuple2<Integer, Integer> value) throws Exception {
    							 List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
    							 list.add(value);
    							 return list;
    						}
    	           }, 
    	           
    	           new Function2<List<Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>, List<Tuple2<Integer, Integer>>>() {
    				@Override
    				public List<Tuple2<Integer, Integer>> call(
    									List<Tuple2<Integer, Integer>> it, 
    									Tuple2<Integer, Integer> value) throws Exception {
    //					List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
    //					it.forEach(list::add);
    //					list.add(value);
    					((List<Tuple2<Integer, Integer>>)it).add(value);
    					return it;
    				}
    	           }, 
    	           
    	           new Function2<List<Tuple2<Integer, Integer>>, List<Tuple2<Integer, Integer>>, List<Tuple2<Integer, Integer>>>() {
    					@Override
    					public List<Tuple2<Integer, Integer>> call(
    							List<Tuple2<Integer, Integer>> it1,
    							List<Tuple2<Integer, Integer>> it2) throws Exception {
    //						List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();
    //						it1.forEach(list::add);
    //						it2.forEach(list::add);
    //						return list;
    						((List)it1).addAll((List)it2);
    						return it1;
    					}
    			   });
           results.foreach(x->System.out.println(x));
           //其实,distinct 基于 reduceByKey实现
    //       mapRdd1.distinct();
           ctx.stop(); 
        }
    }

    test2

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Scanner;
    
    import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
    import org.apache.spark.HashPartitioner;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.AbstractJavaRDDLike;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.storage.StorageLevel;
    
    import scala.Tuple2;
    
    //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
    public class CombineByKeyTest2 {
        public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.set("spark.testing.memory", "2147480000");
        	conf.setAppName("WordCounter");
        	conf.set("spark.default.parallelism", "4");
        	JavaSparkContext ctx = new JavaSparkContext(conf);
        	//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
        	List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
        	data.add(new Tuple2<>("Cake", 2));
        	data.add(new Tuple2<>("Bread", 3));
        	data.add(new Tuple2<>("Cheese", 4));
        	data.add(new Tuple2<>("Milk", 1));    	
        	data.add(new Tuple2<>("Toast", 2));
        	data.add(new Tuple2<>("Bread", 2));
        	data.add(new Tuple2<>("Egg", 6));
        	
        	int index="Code".hashCode() % 4;
        	
        	JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2);
    
          JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {  
    	      @Override
    	      public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {  
    	          return t;
    	      }
          }).partitionBy(new HashPartitioner(4)).persist(StorageLevel.MEMORY_ONLY());
          
          
    
    //      JavaPairRDD<String, Tuple2<Integer, Integer>> results =  mapRdd.combineByKey(
    //    		                         (value) -> new Tuple2<Integer, Integer>(value,1), 
    //    		                         (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1), 
    //    		                         (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()),
    //    		                         new HashPartitioner(2),
    //    		                         false,
    //                                     null
    //    		                       );
    
    //      JavaPairRDD<String, Tuple2<Integer, Integer>> results =  mapRdd.aggregateByKey(
    //    		  new Tuple2<Integer, Integer>(0,0),
    //              (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1), 
    //              (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2())	  
    //    		  );
          
    //      JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() {
    //		@Override
    //		public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception {
    //			return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1));
    //		}
    //	  });
    //      mapRdd1.foreach(System.out::println);
          
    //       JavaPairRDD<String, Tuple2<Integer, Integer>> results =  mapRdd1.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
    //			@Override
    //			public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
    //				return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
    //			}
    //	   });
           //results.foreach(System.out::println);
    
    //       results = mapRdd1.foldByKey(new Tuple2<Integer, Integer>(0, 0), new Function2<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() {
    //			@Override
    //			public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
    //				return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
    //			}
    //	   });
           //results.foreach(System.out::println);
    
           //思考:如何用combineByKey实现groupByKey
    //        mapRdd.groupByKey().foreach(System.out::println);
    
            Function<Integer, List<Integer>> createCombiner=new Function<Integer, List<Integer>>() {
    
    			@Override
    			public List<Integer> call(Integer arg0) throws Exception {
    				List<Integer>list=new ArrayList<Integer>();
    				list.add(arg0);
    				return list;
    			}
    		};
          
    		Function2<List<Integer>, Integer, List<Integer>> mergeValue=new Function2<List<Integer>, Integer, List<Integer>>() {
    
    			@Override
    			public List<Integer> call(List<Integer> list, Integer value) throws Exception {
    				list.add(value);
    				return list;
    			}
    		};
    		
    		Function2< List<Integer>,List<Integer> ,List<Integer> > mergeCombiners=new Function2<List<Integer>, List<Integer>, List<Integer>>() {
    
    			@Override
    			public List<Integer> call(List<Integer> list1, List<Integer> list2) throws Exception {
    				List<Integer> list=new ArrayList<Integer>();
    //				list.addAll(list1);
    //				list.addAll(list2);
    				
    				list1.forEach(list::add);
    				list2.forEach(list::add);
    				
    				return list;
    			}
    		};
    				
    		JavaPairRDD<String, List<Integer>> results=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners);
    		
    		results.foreach(x->System.out.println(x));
    		
    		JavaPairRDD<String, Integer> re=mapRdd.partitionBy(new HashPartitioner(2));
    		System.out.println(re.glom().collect());
    		
    		//第四个参数是分区数,glom()打印分区状态
    		JavaPairRDD<String, List<Integer>> results2=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners, 2);
    		System.out.println(results2.glom().collect());
    		System.out.println(results2.getNumPartitions());
    		
    		//第四个参数自定义分区器
    		JavaPairRDD<String, List<Integer>> results3=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners,new HashPartitioner(3));
    		System.out.println(results3.glom().collect());
    		System.out.println(results3.getNumPartitions());
    		
    		//第四个参数自定义分区器,第五个参数Boolean类型(map短是否merge),第六个参数定义序列化规则,null为默认序列化规则
    		JavaPairRDD<String, List<Integer>> results4=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(3), true, null);
    		System.out.println(results4.glom().collect());
    		System.out.println(results4.getNumPartitions());
    		
           
    //       mapRdd1.combineByKey(
    //    		   new Function<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() {
    //		           @Override
    //						public Tuple2<Integer,Integer> call(Tuple2<Integer, Integer> arg0) throws Exception {
    //							return arg0;
    //						}
    //	           }, 
    //	           
    //	           new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>[]>() {
    //				@Override
    //				public Tuple2<Integer, Integer>[] call(Tuple2<Integer, Integer> arg0, Integer arg1) throws Exception {
    //					return null;
    //				}
    //	           }, 
    //	           mergeCombiners);
    
           //其实,distinct 基于 reduceByKey实现
           
    //        mapRdd1.distinct();
           
            ctx.stop(); 
        }
    }

    2.group&join

    groupByKey

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Scanner;
    
    import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
    import org.apache.spark.HashPartitioner;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.AbstractJavaRDDLike;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.storage.StorageLevel;
    
    import scala.Tuple2;
    
    //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
    public class CombineByKeyTest {
        @SuppressWarnings("serial")
    	public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.setAppName("WordCounter");
        	conf.set("spark.testing.memory", "5435657567560");
        	conf.set("spark.default.parallelism", "4");
        	JavaSparkContext ctx = new JavaSparkContext(conf);
        	//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
        	List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
        	data.add(new Tuple2<>("Cake", 2));
        	data.add(new Tuple2<>("Bread", 3));  //<"Bread", <3,1>>
        	data.add(new Tuple2<>("Cheese", 4));
        	data.add(new Tuple2<>("Milk", 1));    	
        	data.add(new Tuple2<>("Toast", 2));
        	data.add(new Tuple2<>("Bread", 2)); 
        	data.add(new Tuple2<>("Egg", 6));
        	
        	JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2);
    
          JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(
        		  new PairFunction<Tuple2<String, Integer>, String, Integer>() {  
    	      @Override
    	      public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {  
    	          return t;
    	      }
          });
          
    //    	JavaPairRDD<String, Integer> mapRdd=ctx.parallelizePairs(data,2);
        	
          mapRdd.groupByKey().foreach(x->System.out.println(x));
          
    
    //      JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.combineByKey(
    //        new Function<Integer, Tuple2<Integer, Integer>>() {
    //			@Override
    //			public Tuple2<Integer, Integer> call(Integer v1) throws Exception {
    //				return new Tuple2<Integer, Integer>(v1 ,1);
    //			}
    //		}, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
    //			@Override
    //			public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Integer v2) throws Exception {
    //				return new Tuple2<Integer, Integer>(v1._1() + v2, v1._2() + 1);
    //			}
    //		}, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
    //			@Override
    //			public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
    //				return new Tuple2<Integer, Integer>(v1._1() + v2._1(), v1._2() + v2._2());
    //			}
    //		});
    
         
          JavaPairRDD<String, Tuple2<Integer, Integer>> result2s =  mapRdd.combineByKey(
        		                          (Integer value) -> new Tuple2<Integer, Integer>(value,1), 
        		                         (Tuple2<Integer, Integer> acc, Integer value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1), 
        		                         (Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()),
        		                         new HashPartitioner(3),
        		                         true,
        		                         null
        		                       );
          result2s.foreach(x->System.out.println(x));
          
          JavaPairRDD<String, Tuple2<Integer, Integer>> results3 =  mapRdd.aggregateByKey(
        		  new Tuple2<Integer, Integer>(0,0),
                  (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1), 
                  (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2())	  
        		  );
    
          results3.foreach(x->System.out.println(x));
          
         
          JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() {
    		@Override
    		public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception {
    			return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1));
    		}
    	  });
          
           JavaPairRDD<String, Tuple2<Integer, Integer>> results =  mapRdd1.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
    			@Override
    			public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
    				return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
    			}
    	   });
    //       results.foreach(System.out::println);
          
          results.foreach(x->System.out.println(x));
           ctx.stop();
        }
    }

    join

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Scanner;
    
    import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
    import org.apache.spark.HashPartitioner;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.AbstractJavaRDDLike;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.Optional;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.storage.StorageLevel;
    
    import scala.Tuple2;
    
    //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
    public class CogroupApiTest {
        public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.set("spark.testing.memory", "2147480000");
        	conf.setAppName("WordCounter");
        	conf.set("spark.default.parallelism", "4");
        	JavaSparkContext ctx = new JavaSparkContext(conf);
        	//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
        	List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
        	data1.add(new Tuple2<>("Cake", 2));
        	data1.add(new Tuple2<>("Bread", 3));
        	data1.add(new Tuple2<>("Cheese", 4));
        	data1.add(new Tuple2<>("Milk", 1));    	
        	data1.add(new Tuple2<>("Toast", 2));
        	data1.add(new Tuple2<>("Bread", 2));
        	data1.add(new Tuple2<>("Egg", 6));
        	
    //    	JavaPairRDD<String, Integer> mapRdd1=ctx.parallelizePairs(data1);
        	
        	JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data1, 2);
    
    	      JavaPairRDD<String, Integer> mapRdd1 = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {  
    		      @Override
    		      public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {  
    		          return t;
    		      }
    	      }).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY());
    	      
    	  	List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
    	  	data2.add(new Tuple2<>("Cake", 2));
    	  	data2.add(new Tuple2<>("Bread", 3));
    	  	data2.add(new Tuple2<>("Cheese", 4));
    	  	data2.add(new Tuple2<>("Milk", 1));    	
    	  	data2.add(new Tuple2<>("Toast", 2));
    	  	JavaRDD<Tuple2<String, Integer>> rdd2 = ctx.parallelize(data2, 2);
    	
    	    JavaPairRDD<String, Integer> mapRdd2 = rdd2.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {  
    		      @Override
    		      public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {  
    		          return t;
    		      }
    	    }).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY());
    
    	  //groupWith,和cogroup是一样的效果  (Bread,([3, 2],[3]))
    	    JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>  mapRdd3 = mapRdd1.cogroup(mapRdd2);
    	    mapRdd3.foreach(x->System.out.println(x));
    	    
    	  //(Bread,(3,3)),(Bread,(2,3)),(Cake,(2,2))   聚合操作
    //	    JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd3 = mapRdd1.join(mapRdd2);
    //	    mapRdd3.foreach(x->System.out.println(x));
    	    
    	    //(Bread,(Optional[3],3)), (Bread,(Optional[3],2)),(Cake,(Optional[2],2))   聚合操作,主集合可以为optional.empty
    //	    JavaPairRDD<String, Tuple2<Optional<Integer>, Integer>>  mapRdd3 = mapRdd2.rightOuterJoin(mapRdd1);
    //	    mapRdd3.foreach(x->System.out.println(x));
    
    	    //(Cheese,(4,Optional[4])), (Toast,(2,Optional[2])), (Egg,(6,Optional.empty))
    //	    JavaPairRDD<String, Tuple2<Integer, Optional<Integer>>>  mapRdd4 = mapRdd1.leftOuterJoin(mapRdd2);
    //	    mapRdd4.foreach(x->System.out.println(x));    
    
    	    //两边都能为空
    //	    JavaPairRDD<String, Tuple2<Optional<Integer>, Optional<Integer>>> mapRdd5 = mapRdd1.fullOuterJoin(mapRdd2);
    //	    mapRdd5.foreach(x->System.out.println(x));
    	    
    	    //groupWith,和cogroup是一样的效果   (Bread,([3, 2],[3]))
    //	    JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> mapRdd6 = mapRdd1.groupWith(mapRdd2);
    //	    mapRdd6.foreach(x->System.out.println(x));
    	    
    	    //(Bread,(3,3)),(Bread,(2,3)),(Cake,(2,2))   聚合操作
    //	    JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd7=mapRdd1.join(mapRdd2);
    //	    mapRdd7.foreach(x->System.out.println(x));
    	    
    	    //聚合操作,将两个maprdd并集,重复元素不会被删掉
    //	    JavaPairRDD<String,Integer>  mapRdd8=mapRdd1.union(mapRdd2);
    //	    mapRdd8.foreach(x->System.out.println(x));
    	    
    	    //删除key相同的元素
    //	    JavaPairRDD<String, Integer> mapRdd9=mapRdd1.subtractByKey(mapRdd2);
    //	    mapRdd9.foreach(x->System.out.println(x));
    	    
    	    //求交集,只返回key,value相同的tuple
    //	    JavaPairRDD<String, Integer> mapRdd10=mapRdd1.intersection(mapRdd2);
    //	    mapRdd10.foreach(x->System.out.println(x));
        }
    }

    2.sortBykey

    test1

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Comparator;
    import java.util.List;
    
    import org.apache.spark.HashPartitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.storage.StorageLevel;
    
    import scala.Tuple2;
    
    
    public class SortByKeyApi {
        public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.setAppName("WordCounter");
        	conf.set("spark.testing.memory", "2147480000");
        	conf.set("spark.default.parallelism", "4");
        	JavaSparkContext ctx = new JavaSparkContext(conf);
        	//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
        	List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
        	data.add(new Tuple2<>("Cake", 2));
        	data.add(new Tuple2<>("Bread", 3));
        	data.add(new Tuple2<>("Cheese", 4));
        	data.add(new Tuple2<>("Milk", 1));    	
        	data.add(new Tuple2<>("Toast", 2));
        	data.add(new Tuple2<>("Bread", 2));
        	data.add(new Tuple2<>("Egg", 6));
    
        	JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2);
    
          JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {  
    	      @Override
    	      public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {  
    	          return t;
    	      }
          }).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY());
          
           //mapRdd.sortByKey().foreach(System.out::println);
           mapRdd.sortByKey(false).foreach(x->System.out.println(x));
           
    //       mapRdd.sortByKey(new Comparator<Tuple2<String, Integer>>() {
    //					@Override
    //					public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
    //						return 0;
    //					}
    //	    });
           
    //        mapRdd.f
    //       mapRdd.mapValues(x->x+1).foreach(x->System.out.println(x));
    //       mapRdd.flatMapValues(()->Arrays.asList(1,1,1));
    
           ctx.stop(); 
        }
    }
    

    test2

    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    
    import scala.Tuple2;
    
    public class SortByKeyApiTest {
        public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.setAppName("WordCounter");
        	conf.set("spark.default.parallelism", "4");
        	conf.set("spark.testing.memory", "2147480000");
        	JavaSparkContext ctx = new JavaSparkContext(conf);
        	//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
        	List<Person> data1 = 
        			new ArrayList<Person>();
        	data1.add(new Person("Cake",32));
        	data1.add(new Person("Bread",21));
        	data1.add(new Person("Smith",32));
        	data1.add(new Person("Hourse",21));
        	data1.add(new Person("Mary",32));
        	data1.add(new Person("Greey",21));
        	data1.add(new Person("Greey",21));
        	data1.add(new Person("Tom",32));
        	data1.add(new Person("Gao",21));    	
            System.out.println(ctx.parallelize(data1).distinct().count());
    //                     .sortBy(x->x, true, 2).foreach(x->System.out.println(x));
    
        	List<Tuple2<Person, Integer>> data = 
        			new ArrayList<Tuple2<Person, Integer>>();
        	data.add(new Tuple2<Person, Integer>(new Person("Cake",32), 2));
        	data.add(new Tuple2<Person, Integer>(new Person("Bread",21), 3));
        	data.add(new Tuple2<Person, Integer>(new Person("Smith",32), 2));
        	data.add(new Tuple2<Person, Integer>(new Person("Hourse",21), 3));
        	data.add(new Tuple2<Person, Integer>(new Person("Mary",32), 2));
        	data.add(new Tuple2<Person, Integer>(new Person("Greey",21), 3));
        	data.add(new Tuple2<Person, Integer>(new Person("Greey",11), 3));
        	data.add(new Tuple2<Person, Integer>(new Person("Tom",32), 2));
        	data.add(new Tuple2<Person, Integer>(new Person("Gao",21), 3));    	
    
        	JavaPairRDD<Person, Integer> dataRdd = ctx.parallelizePairs(data);
        	dataRdd.sortByKey().foreach(x->System.out.println(x));
            dataRdd.sortByKey(new Comparator<Person>() {
    			@Override
    			public int compare(Person o1, Person o2) {
    				int res = o1.name.compareTo(o2.name);
    				if(res == 0){
    					res = o1.age - o2.age;
    				}
    				return res;
    			}
    		});
        	ctx.close();
        	ctx.stop();
        }
    }
    
    class Person implements Serializable, Comparable<Person>{
    	private static final long serialVersionUID = 1L;
    
    	public Person(String name, int age) {
    		super();
    		this.name = name;
    		this.age = age;
    	}
    	
    	@Override
    	public int hashCode() {
    		final int prime = 31;
    		int result = 1;
    		result = prime * result + age;
    		result = prime * result + ((name == null) ? 0 : name.hashCode());
    		return result;
    	}
    
    	@Override
    	public boolean equals(Object obj) {
    		if (this == obj)
    			return true;
    		if (obj == null)
    			return false;
    		if (getClass() != obj.getClass())
    			return false;
    		Person other = (Person) obj;
    		if (age != other.age)
    			return false;
    		if (name == null) {
    			if (other.name != null)
    				return false;
    		} else if (!name.equals(other.name))
    			return false;
    		return true;
    	}
    	String name;
    	int age;
    
    	@Override
    	public int compareTo(Person p) {
    		int res = this.name.compareTo(p.name);
    		if(res == 0){
    			res = this.age - p.age;
    		}
    		return res;
    	}
    
    	@Override
    	public String toString() {
    		return "Person [name=" + name + ", age=" + age + "]";
    	}
    }
    
  • 相关阅读:
    LeetCode OJ 112. Path Sum
    LeetCode OJ 226. Invert Binary Tree
    LeetCode OJ 100. Same Tree
    LeetCode OJ 104. Maximum Depth of Binary Tree
    LeetCode OJ 111. Minimum Depth of Binary Tree
    LeetCode OJ 110. Balanced Binary Tree
    apache-jmeter-3.1的简单压力测试使用方法(下载和安装)
    JMeter入门教程
    CentOS6(CentOS7)设置静态IP 并且 能够上网
    分享好文:分享我在阿里8年,是如何一步一步走向架构师的
  • 原文地址:https://www.cnblogs.com/apppointint/p/8885284.html
Copyright © 2011-2022 走看看