zoukankan      html  css  js  c++  java
  • intersection &union&zip

    && 对于键值对类型的RDD,如果键是自定义类型(比如:Person),则需要重写其hashCode 和equals方法。

    1、 intersection

    底层用的是groupByKey;subtract底层用的是subtractByKey;

    import java.net.MalformedURLException;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.HashPartitioner;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    
    public class IntersectionDemo {
        public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.setAppName("WordCounter");
        	conf.set("spark.testing.memory", "2147480000");
        	JavaSparkContext ctx = new JavaSparkContext(conf);
    
        	List<String> lines1 = new ArrayList<String>();
        	lines1.add("Hello");
        	lines1.add("How");
        	lines1.add("Moon");
        	
    //    	JavaRDD<String> rd1=ctx.parallelize(lines1);
        	
        	JavaPairRDD<String, Integer> rdd1 = ctx.parallelize(lines1, 2).
        			mapToPair(new PairFunction<String, String, Integer>() {  
        	            @Override  
        	            public Tuple2<String, Integer> call(String s) throws Exception {  
        	                return new Tuple2<String, Integer>(s, 1);  
        	            }  
        	        }).partitionBy(new HashPartitioner(3));
       	
        	System.out.println("rdd1:" + rdd1.partitioner());
    //    	rdd1.foreach(x -> {
    //    		int index = x.hashCode() % 2;
    //    		System.out.println("当前数据:" + x + " 它的hashindex:" + index);
    //    	});
    //    	System.out.println(rdd1.glom().collect());
        	
        	
        	List<String> lines2 = new ArrayList<String>();
        	lines2.add("Hello");
        	lines2.add("How");
        	lines2.add("Good");
    
        	JavaRDD<String> rd2=ctx.parallelize(lines2);
        	
        	JavaPairRDD<String, Integer> rdd2 = ctx.parallelize(lines2, 2).
        			mapToPair(new PairFunction<String, String, Integer>() {  
        	            @Override  
        	            public Tuple2<String, Integer> call(String s) throws Exception {  
        	                return new Tuple2<String, Integer>(s, 1);  
        	            }  
        	        }).partitionBy(new HashPartitioner(2));
        	System.out.println("rdd2:" + rdd2.partitioner());
        	
        	//底层是groupByKey  结合HashMap和hashset来使用   代码复用
    //    	JavaPairRDD<String, Integer> rdd3 = rdd1.intersection(rdd2);
        	
        	JavaPairRDD<String, Integer> rdd3 = rdd1.subtract(rdd2);
    //    	JavaPairRDD<String, Integer> rdd3 = rdd1.union(rdd2);
        	System.out.println("rdd3:" + rdd3.partitioner());
        	System.out.println("rdd3:" + rdd3.getNumPartitions());
        	rdd3.foreach(x->System.out.println(x));
        }
    }
    

    2、 union

    操作:父RDD分区对子RDD的影响

    import java.net.MalformedURLException;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.HashPartitioner;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    
    public class UnionDemo {
        public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.set("spark.testing.memory", "2147480000");
        	conf.setAppName("WordCounter");
    
        	JavaSparkContext ctx = new JavaSparkContext(conf);
        	//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
    
        	List<Tuple2<String,Integer>> urls = new ArrayList<Tuple2<String,Integer>>();
        	urls.add(new Tuple2<String,Integer>("http://www.baidu.com/about.html", 3));
        	urls.add(new Tuple2<String,Integer>("http://www.ali.com/index.html", 2));
        	urls.add(new Tuple2<String,Integer>("http://www.sina.com/first.html", 4));
        	urls.add(new Tuple2<String,Integer>("http://www.sohu.com/index.html", 3));
        	urls.add(new Tuple2<String,Integer>("http://www.baidu.com/index.jsp",7));
        	urls.add(new Tuple2<String,Integer>("http://www.sina.com/help.html",1));
        	
        	JavaPairRDD<String, Integer> urlRdd1 = ctx.parallelizePairs(urls,2);
    //    	JavaPairRDD<String, Integer> urlRdd1 = ctx.parallelizePairs(urls).
    //    			                         partitionBy(new HashPartitioner(2));
        	
        	
        	System.out.println("urlRdd1:" + urlRdd1.partitioner());
        	System.out.println("urlRdd1:" + urlRdd1.glom().collect());
    
        	List<Tuple2<String,Integer>> anotherUrls = new ArrayList<Tuple2<String,Integer>>();
        	anotherUrls.add(new Tuple2<String,Integer>("http://www.163.com/about.html", 3));
        	anotherUrls.add(new Tuple2<String,Integer>("http://www.taobao.com/index.html", 2));
        	anotherUrls.add(new Tuple2<String,Integer>("http://www.sina.com/first.html", 4));
        	anotherUrls.add(new Tuple2<String,Integer>("http://www.csdn.com/index.html", 3));
        	anotherUrls.add(new Tuple2<String,Integer>("http://www.facebook.com/index.jsp",7));
        	anotherUrls.add(new Tuple2<String,Integer>("http://www.sina.com/help.html",1));
        	
        	JavaPairRDD<String, Integer> urlRdd2 = ctx.parallelizePairs(anotherUrls,2);
    //    	JavaPairRDD<String, Integer> urlRdd2 = ctx.parallelizePairs(anotherUrls).
    //    			                      partitionBy(new HashPartitioner(3));
        	System.out.println("urlRdd2:" + urlRdd2.partitioner());
        	System.out.println("urlRdd2:" + urlRdd2.glom().collect());
        	
        	//当设置了分区器和分区数相同,则union之后的分区是一样的
        	//若分区器没有设置,就算分区数相同,则union之后的分区是两分区之和
        	
        	JavaPairRDD<String, Integer> rdd3 = urlRdd1.union(urlRdd2);
        	System.out.println("rdd3:" + rdd3.partitioner());
        	System.out.println("rdd3:" + rdd3.getNumPartitions());
        	System.out.println("urlRdd3:" + rdd3.glom().collect());
        	
        	rdd3.foreach(x->System.out.println(x));
        	
        }
    }

    3、 zip操作 与 zipPartitions操作

    (zip底层实现就是zipPartitions)

    import java.net.MalformedURLException;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction2;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    import scala.collection.Iterator;
    
    
    public class ZipDemo {
        public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.set("spark.testing.memory", "2147480000");
        	conf.setAppName("WordCounter");
        	JavaSparkContext ctx = new JavaSparkContext(conf);
    
        	List<String> lines1 = new ArrayList<String>();
        	lines1.add("Hello");
        	lines1.add("How");
        	lines1.add("Moon");
    //    	lines1.add("Hope");
    //    	lines1.add("Dog");
    //    	lines1.add("House");
        	JavaRDD<String> rdd1 = ctx.parallelize(lines1, 2);
        	System.out.println(rdd1.glom().collect());
    
        	List<String> lines2 = new ArrayList<String>();
        	lines2.add("1");
        	lines2.add("2");
        	lines2.add("3");
        	JavaRDD<String> rdd2 = ctx.parallelize(lines2, 2);
        	System.out.println(rdd2.glom().collect());
        	
        	
        	//使用zip必须数量和分区数相同,不然会报错
        	
    //    	JavaPairRDD<String, String> rdd3 = rdd1.zip(rdd2);
    //    	rdd3.foreach(x->System.out.println(x));
    //    	(Hello,1)
    //    	(How,2)
    //    	(Moon,3)
        	
        	JavaRDD<HashMap<String, String>> rdd3 = rdd1.zipPartitions(rdd2,
        			(x, y)-> {
        		         System.out.println("*****************");
        				 List<HashMap<String, String>> lines = new ArrayList<HashMap<String, String>>();
    //    		         List<String> list1 = new ArrayList<String>(); 
        		         while(x.hasNext()){												
    //    		        	 list1.add(x.next());
        		        	 
        		        	 System.out.println(x.next());
        		         }
    //    		         List<String> list2 = new ArrayList<String>(); 
        		         while(y.hasNext()){
    //    		        	 list2.add(y.next());
        		        	 System.out.println(y.next());
        		         }
        				 
        		         return lines.iterator();
        		    });
        	rdd3.foreach(x->System.out.println(x));
        	
    //    	*****************
    //    	Hello
    //    	1
    //    	*****************
    //    	How
    //    	Moon
    //    	2
    //    	3
        	
    //    	JavaRDD<String> rdd3 = rdd1.zipPartitions(rdd2,
    //    			new FlatMapFunction2<
    //    			                Iterator<String>,
    //    			                Iterator<String>,
    //    			                Iterator<String>>(){
    //
    //					@Override
    //					public java.util.Iterator<Iterator<String>> call(
    //							Iterator<String> x, Iterator<String> y)
    //							throws Exception {
    //						return null;
    //					}
    //    		
    //    	});
    //    	System.out.println(rdd3.collect());
    //    	rdd3.foreach(x->System.out.println(x));
        }
    }
    
  • 相关阅读:
    看代码写程序
    NP
    一道神题
    找平方数
    凝视
    排队打水
    时间计算
    git客户端下载地址
    iOS GCD
    UIView 和 CALayer的那点事
  • 原文地址:https://www.cnblogs.com/apppointint/p/8885283.html
Copyright © 2011-2022 走看看