zoukankan      html  css  js  c++  java
  • persist&checkpoint&countApi

    1、 persisit机制

    import java.util.Iterator;
    
    import org.apache.spark.SparkConf;  
    import org.apache.spark.api.java.JavaPairRDD;  
    import org.apache.spark.api.java.JavaRDD;  
    import org.apache.spark.api.java.JavaSparkContext;  
    import org.apache.spark.api.java.function.FlatMapFunction;  
    import org.apache.spark.api.java.function.Function2;  
    import org.apache.spark.api.java.function.PairFunction;  
    import org.apache.spark.storage.StorageLevel;
    
    import scala.Tuple2;  
      
    
    
    import java.util.Arrays;  
    import java.util.Iterator;
    import java.util.Scanner;
    
    
    public class PersisitDemo {
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setMaster("local[3]").setAppName("wordcount").set("spark.testing.memory", "2147480000");
            JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    //        ctx.setCheckpointDir("file:///d:/checkpoint");
            final JavaRDD<String> lines = ctx.textFile("words.txt").repartition(2);
    
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                	long th=Thread.currentThread().getId();
                	System.out.println("flatMap...  thread  id:"+th);
                    return Arrays.asList(s.split(" ")).iterator();
                }
            }).repartition(2)
            //.persist(StorageLevel.MEMORY_ONLY());
            .cache();
            
            //使用了缓存第二次调用的时候不会再次执行
            
            while(true){
            	Scanner sc = new Scanner(System.in);
            	String line = sc.next();
            	if(line.equals("END")){
            		break;
            	}
            	
    	        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {  
    	            @Override
    	            public Tuple2<String, Integer> call(String s) throws Exception {  
    	            	long th=Thread.currentThread().getId();
    	            	System.out.println("mapToPair...  thread  id:"+th);
    	                return new Tuple2<String, Integer>(s, 1);
    	            }
    	        }).repartition(2);
    	        
    	        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {  
    	            @Override  
    	            public Integer call(Integer integer, Integer integer2) throws Exception {  
    	            	long th=Thread.currentThread().getId();
    	            	System.out.println("reduceByKey...  thread  id:"+th);
    	                return integer + integer2;  
    	            }  
    	        }).repartition(2);
    	        //counts.saveAsTextFile(args[1]);
    	        counts.foreach(x->System.out.println(x));
    
    	        
    //	        lines.unpersist();
            }
            ctx.stop();
        }  
    }
    

    2、checkpoint机制

    import java.util.Iterator;
    
    import org.apache.spark.SparkConf;  
    import org.apache.spark.api.java.JavaPairRDD;  
    import org.apache.spark.api.java.JavaRDD;  
    import org.apache.spark.api.java.JavaSparkContext;  
    import org.apache.spark.api.java.function.FlatMapFunction;  
    import org.apache.spark.api.java.function.Function2;  
    import org.apache.spark.api.java.function.PairFunction;  
    import org.apache.spark.storage.StorageLevel;
    
    import scala.Tuple2;  
      
    
    
    import java.util.Arrays;  
    import java.util.Iterator;
    import java.util.Scanner;
    
    
    public class CheckpointDemo {
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("wordcount").set("spark.testing.memory", "2147480000");
            JavaSparkContext ctx = new JavaSparkContext(sparkConf);
            ctx.setCheckpointDir("file:///d:/checkpoint");
            final JavaRDD<String> lines = ctx.textFile("words.txt");
    
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                	System.out.println("flatMap...");
                    return Arrays.asList(s.split(" ")).iterator();
                }
            });
            
            JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {  
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {  
                	System.out.println("mapToPair...");
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
            ones.checkpoint();            //设置检查点       斩断依赖
            
            JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {  
                @Override  
                public Integer call(Integer integer, Integer integer2) throws Exception {  
                	System.out.println("reduceByKey...");
                    return integer + integer2;  
                }
            });
            System.out.println(counts.toDebugString());
    //        counts.saveAsTextFile(args[1]);
            counts.foreach(x->System.out.println(x));
            System.out.println("after action:" + counts.toDebugString());
            ctx.stop();
        }  
    }
    

    3、 count API 近似计算

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Scanner;
    
    import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
    import org.apache.spark.HashPartitioner;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.AbstractJavaRDDLike;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.partial.BoundedDouble;
    import org.apache.spark.partial.PartialResult;
    import org.apache.spark.storage.StorageLevel;
    
    
    
    
    
    import scala.Tuple2;
    
    //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
    public class CountApiTest {
        public static void main(String[] xx){
        	SparkConf conf = new SparkConf();
        	conf.setMaster("local");
        	conf.setAppName("Count API");
        	conf.set("spark.testing.memory", "2147480000");
    //    	conf.set("spark.default.parallelism", "4");
        	JavaSparkContext ctx = new JavaSparkContext(conf);
        	//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合
    
        	List<Integer> list = new ArrayList<Integer>();
            for(int i = 0; i < 10000; i++){
        	    list.add(i);
            }
        	JavaRDD<Integer> rdd1 = ctx.parallelize(list, 2);
        	JavaRDD<Integer> rdd2 = rdd1.union(rdd1).union(rdd1).union(rdd1);
        	
        	//System.out.println(rdd2.count());                  //计算并集后的总数
        	PartialResult<BoundedDouble> result = rdd2.countApprox(450);//1000, 300  2秒内跑完给结果,若没有完,也要返回结果
        	System.out.println(result.initialValue().mean());
        	System.out.println(result.initialValue().low());
        	System.out.println(result.initialValue().high());
        	System.out.println(result.initialValue().confidence()); //自信程度
        	
    //    	40000.0             使用2000
    //    	40000.0
    //    	40000.0
    //    	1.0
        	
    //    	40000.6       使用450
    //    	39696.95761248283
    //    	40304.242387517166
    //    	0.95
        	
        	//0.01  0.1  偏移度的大致跑完了的任务      执行的更快
    //        System.out.println(rdd2.countApproxDistinct(0.01));   //9945  
        }
    }
  • 相关阅读:
    机器学习技法笔记-Lecture 4 Soft-margin support vector machine
    机器学习技法笔记-Lecture 3 Kernel support vector machine
    机器学习技法笔记-Lecture 2 Dual support vector machine
    【C#】静态构造方法与静态变量
    Fitness
    【C#】Random类中构造方法、时间种子与随机数序列的关系
    Fitness
    【量化金融阅读书籍--转载https://www.douban.com/doulist/45193230/】
    【量化金融基础知识(二)】
    【量化金融基础知识】
  • 原文地址:https://www.cnblogs.com/apppointint/p/8885282.html
Copyright © 2011-2022 走看看