zoukankan      html  css  js  c++  java
  • intersection &union&zip

    && 对于键值对类型的RDD,如果键是自定义类型(比如:Person),则需要重写其hashCode 和equals方法。

    1、 intersection

    底层用的是groupByKey;subtract底层用的是subtractByKey;

    import java.net.MalformedURLException;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.HashPartitioner;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    
    public class IntersectionDemo {
        public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.setAppName("WordCounter");
        	conf.set("spark.testing.memory", "2147480000");
        	JavaSparkContext ctx = new JavaSparkContext(conf);
    
        	List<String> lines1 = new ArrayList<String>();
        	lines1.add("Hello");
        	lines1.add("How");
        	lines1.add("Moon");
        	
    //    	JavaRDD<String> rd1=ctx.parallelize(lines1);
        	
        	JavaPairRDD<String, Integer> rdd1 = ctx.parallelize(lines1, 2).
        			mapToPair(new PairFunction<String, String, Integer>() {  
        	            @Override  
        	            public Tuple2<String, Integer> call(String s) throws Exception {  
        	                return new Tuple2<String, Integer>(s, 1);  
        	            }  
        	        }).partitionBy(new HashPartitioner(3));
       	
        	System.out.println("rdd1:" + rdd1.partitioner());
    //    	rdd1.foreach(x -> {
    //    		int index = x.hashCode() % 2;
    //    		System.out.println("当前数据:" + x + " 它的hashindex:" + index);
    //    	});
    //    	System.out.println(rdd1.glom().collect());
        	
        	
        	List<String> lines2 = new ArrayList<String>();
        	lines2.add("Hello");
        	lines2.add("How");
        	lines2.add("Good");
    
        	JavaRDD<String> rd2=ctx.parallelize(lines2);
        	
        	JavaPairRDD<String, Integer> rdd2 = ctx.parallelize(lines2, 2).
        			mapToPair(new PairFunction<String, String, Integer>() {  
        	            @Override  
        	            public Tuple2<String, Integer> call(String s) throws Exception {  
        	                return new Tuple2<String, Integer>(s, 1);  
        	            }  
        	        }).partitionBy(new HashPartitioner(2));
        	System.out.println("rdd2:" + rdd2.partitioner());
        	
        	//底层是groupByKey  结合HashMap和hashset来使用   代码复用
    //    	JavaPairRDD<String, Integer> rdd3 = rdd1.intersection(rdd2);
        	
        	JavaPairRDD<String, Integer> rdd3 = rdd1.subtract(rdd2);
    //    	JavaPairRDD<String, Integer> rdd3 = rdd1.union(rdd2);
        	System.out.println("rdd3:" + rdd3.partitioner());
        	System.out.println("rdd3:" + rdd3.getNumPartitions());
        	rdd3.foreach(x->System.out.println(x));
        }
    }
    

    2、 union

    操作:父RDD分区对子RDD的影响

    import java.net.MalformedURLException;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.HashPartitioner;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    
    public class UnionDemo {
        public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.set("spark.testing.memory", "2147480000");
        	conf.setAppName("WordCounter");
    
        	JavaSparkContext ctx = new JavaSparkContext(conf);
        	//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
    
        	List<Tuple2<String,Integer>> urls = new ArrayList<Tuple2<String,Integer>>();
        	urls.add(new Tuple2<String,Integer>("http://www.baidu.com/about.html", 3));
        	urls.add(new Tuple2<String,Integer>("http://www.ali.com/index.html", 2));
        	urls.add(new Tuple2<String,Integer>("http://www.sina.com/first.html", 4));
        	urls.add(new Tuple2<String,Integer>("http://www.sohu.com/index.html", 3));
        	urls.add(new Tuple2<String,Integer>("http://www.baidu.com/index.jsp",7));
        	urls.add(new Tuple2<String,Integer>("http://www.sina.com/help.html",1));
        	
        	JavaPairRDD<String, Integer> urlRdd1 = ctx.parallelizePairs(urls,2);
    //    	JavaPairRDD<String, Integer> urlRdd1 = ctx.parallelizePairs(urls).
    //    			                         partitionBy(new HashPartitioner(2));
        	
        	
        	System.out.println("urlRdd1:" + urlRdd1.partitioner());
        	System.out.println("urlRdd1:" + urlRdd1.glom().collect());
    
        	List<Tuple2<String,Integer>> anotherUrls = new ArrayList<Tuple2<String,Integer>>();
        	anotherUrls.add(new Tuple2<String,Integer>("http://www.163.com/about.html", 3));
        	anotherUrls.add(new Tuple2<String,Integer>("http://www.taobao.com/index.html", 2));
        	anotherUrls.add(new Tuple2<String,Integer>("http://www.sina.com/first.html", 4));
        	anotherUrls.add(new Tuple2<String,Integer>("http://www.csdn.com/index.html", 3));
        	anotherUrls.add(new Tuple2<String,Integer>("http://www.facebook.com/index.jsp",7));
        	anotherUrls.add(new Tuple2<String,Integer>("http://www.sina.com/help.html",1));
        	
        	JavaPairRDD<String, Integer> urlRdd2 = ctx.parallelizePairs(anotherUrls,2);
    //    	JavaPairRDD<String, Integer> urlRdd2 = ctx.parallelizePairs(anotherUrls).
    //    			                      partitionBy(new HashPartitioner(3));
        	System.out.println("urlRdd2:" + urlRdd2.partitioner());
        	System.out.println("urlRdd2:" + urlRdd2.glom().collect());
        	
        	//当设置了分区器和分区数相同,则union之后的分区是一样的
        	//若分区器没有设置,就算分区数相同,则union之后的分区是两分区之和
        	
        	JavaPairRDD<String, Integer> rdd3 = urlRdd1.union(urlRdd2);
        	System.out.println("rdd3:" + rdd3.partitioner());
        	System.out.println("rdd3:" + rdd3.getNumPartitions());
        	System.out.println("urlRdd3:" + rdd3.glom().collect());
        	
        	rdd3.foreach(x->System.out.println(x));
        	
        }
    }

    3、 zip操作 与 zipPartitions操作

    (zip底层实现就是zipPartitions)

    import java.net.MalformedURLException;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction2;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    import scala.collection.Iterator;
    
    
    public class ZipDemo {
        public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.set("spark.testing.memory", "2147480000");
        	conf.setAppName("WordCounter");
        	JavaSparkContext ctx = new JavaSparkContext(conf);
    
        	List<String> lines1 = new ArrayList<String>();
        	lines1.add("Hello");
        	lines1.add("How");
        	lines1.add("Moon");
    //    	lines1.add("Hope");
    //    	lines1.add("Dog");
    //    	lines1.add("House");
        	JavaRDD<String> rdd1 = ctx.parallelize(lines1, 2);
        	System.out.println(rdd1.glom().collect());
    
        	List<String> lines2 = new ArrayList<String>();
        	lines2.add("1");
        	lines2.add("2");
        	lines2.add("3");
        	JavaRDD<String> rdd2 = ctx.parallelize(lines2, 2);
        	System.out.println(rdd2.glom().collect());
        	
        	
        	//使用zip必须数量和分区数相同,不然会报错
        	
    //    	JavaPairRDD<String, String> rdd3 = rdd1.zip(rdd2);
    //    	rdd3.foreach(x->System.out.println(x));
    //    	(Hello,1)
    //    	(How,2)
    //    	(Moon,3)
        	
        	JavaRDD<HashMap<String, String>> rdd3 = rdd1.zipPartitions(rdd2,
        			(x, y)-> {
        		         System.out.println("*****************");
        				 List<HashMap<String, String>> lines = new ArrayList<HashMap<String, String>>();
    //    		         List<String> list1 = new ArrayList<String>(); 
        		         while(x.hasNext()){												
    //    		        	 list1.add(x.next());
        		        	 
        		        	 System.out.println(x.next());
        		         }
    //    		         List<String> list2 = new ArrayList<String>(); 
        		         while(y.hasNext()){
    //    		        	 list2.add(y.next());
        		        	 System.out.println(y.next());
        		         }
        				 
        		         return lines.iterator();
        		    });
        	rdd3.foreach(x->System.out.println(x));
        	
    //    	*****************
    //    	Hello
    //    	1
    //    	*****************
    //    	How
    //    	Moon
    //    	2
    //    	3
        	
    //    	JavaRDD<String> rdd3 = rdd1.zipPartitions(rdd2,
    //    			new FlatMapFunction2<
    //    			                Iterator<String>,
    //    			                Iterator<String>,
    //    			                Iterator<String>>(){
    //
    //					@Override
    //					public java.util.Iterator<Iterator<String>> call(
    //							Iterator<String> x, Iterator<String> y)
    //							throws Exception {
    //						return null;
    //					}
    //    		
    //    	});
    //    	System.out.println(rdd3.collect());
    //    	rdd3.foreach(x->System.out.println(x));
        }
    }
    
  • 相关阅读:
    【转】win8.1下安装ubuntu
    Codeforces 1025G Company Acquisitions (概率期望)
    Codeforces 997D Cycles in Product (点分治、DP计数)
    Codeforces 997E Good Subsegments (线段树)
    Codeforces 1188E Problem from Red Panda (计数)
    Codeforces 1284E New Year and Castle Building (计算几何)
    Codeforces 1322D Reality Show (DP)
    AtCoder AGC043C Giant Graph (图论、SG函数、FWT)
    Codeforces 1305F Kuroni and the Punishment (随机化)
    AtCoder AGC022E Median Replace (字符串、自动机、贪心、计数)
  • 原文地址:https://www.cnblogs.com/apppointint/p/8885283.html
Copyright © 2011-2022 走看看