zoukankan      html  css  js  c++  java
  • Java_spark简单例子

    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
     * Created by spark on 15-1-19.
     * 根据key对K-V类型的RDD进行排序获得新的RDD
     */
    object SortByKey {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("spark-demo").setMaster("local")
        val sc = new SparkContext(conf)
    
        import org.apache.spark.SparkContext._
        val a = sc.parallelize(List("dog","cat","owl","gnu","ant"))
        val b = sc.parallelize(1 to a.count().toInt)
        val c = a.zip(b)
    
        //asc
        c.sortByKey(true).collect().foreach(print)
        //desc
        c.sortByKey(false).collect().foreach(print)
      }
    }
    /**
     * Created by spark on 15-1-19.
     * RDD1.subtract(RDD2):返回一个新的RDD,内容是:RDD1中存在的,RDD2中不存在的
     */
    object Subtract {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("spark-demo").setMaster("local")
        val sc = new SparkContext(conf)
    
        import org.apache.spark.SparkContext._
        val a = sc.parallelize(1 to 10)
        val b = sc.parallelize(1 to 3)
        //45678910
        //a.subtract(b).collect().foreach(print)
    
        val c = sc.parallelize(1 to 10)
        val d = sc.parallelize(List(1,2,3,11))
        //45678910
        c.subtract(d).collect().foreach(print)
      }
    }
    /**
     * Created by spark on 15-1-19.
     * RDD1.subtractByKey(RDD2):返回一个新的RDD,内容是:RDD1 key中存在的,RDD2 key中不存在的
     */
    object SubtractByKey {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("spark-demo").setMaster("local")
        val sc = new SparkContext(conf)
    
        import org.apache.spark.SparkContext._
        val a = sc.parallelize(List("dog","he","word","hello"))
        val b = a.keyBy(_.length)
    
        val c = sc.parallelize(List("cat","first","everyone"))
        val d = c.keyBy(_.length)
        //(2,he)(4,word)
        b.subtractByKey(d).collect().foreach(print)
      }
    }
    /**
     * Created by spark on 15-1-19.
     * sumApprox没有出现我希望的结果
     */
    object SumAndSumApprox {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("spark-demo").setMaster("local")
        val sc = new SparkContext(conf)
    
        import org.apache.spark.SparkContext._
        val a = sc.parallelize(1 to 1000000)
        val b = a.sum()
        val c = a.sumApprox(0L,0.9).getFinalValue()
    
        println(b + " *** " + c)
      }
    }
    /**
     * Created by spark on 15-1-19.
     * 取出RDD的前n个元素,以数组的形式返回
     */
    object Take {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("spark-demo").setMaster("local")
        val sc = new SparkContext(conf)
    
        //import org.apache.spark.SparkContext._
        val a = sc.parallelize(1 to 1000000)
    
        //12345678910
        a.take(10).foreach(print)
      }
    }
    /**
     * Created by spark on 15-1-19.
     * 对RDD元素进行升序排序
     * 取出前n个元素并以数组的形式放回
     */
    object TakeOrdered {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("spark-demo").setMaster("local")
        val sc = new SparkContext(conf)
    
        //import org.apache.spark.SparkContext._
        val a = sc.parallelize(List("ff","aa","dd","cc"))
    
        //aacc
        a.takeOrdered(2).foreach(print)
      }
    }
    /**
     * Created by spark on 15-1-19.
     * 数据取样
     */
    object TakeSample {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("spark-demo").setMaster("local")
        val sc = new SparkContext(conf)
    
        //import org.apache.spark.SparkContext._
        val a = sc.parallelize(1 to 10000)
    
        /**
         * 9048
            5358
            5216
            7301
            6303
            6179
            6151
            5304
            8115
            3869
         */
        a.takeSample(true , 10 , 1).foreach(println)
      }
    }
    /**
     * Created by spark on 15-1-19.
     * debug 详情信息显示
     */
    object ToDebugString {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("spark-demo").setMaster("local")
        val sc = new SparkContext(conf)
    
        //import org.apache.spark.SparkContext._
        val a = sc.parallelize(1 to 9)
        val b = sc.parallelize(1 to 3)
        val c = a.subtract(b)
    
        c.toDebugString
      }
    }
    /**
     * Created by spark on 15-1-19.
     * 获得前几个最大值
     */
    object Top {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("spark-demo").setMaster("local")
        val sc = new SparkContext(conf)
    
        //import org.apache.spark.SparkContext._
        val a = sc.parallelize(1 to 1000)
        val c = a.top(10)
    
        /**
         *1000
          999
          998
          997
          996
          995
          994
          993
          992
          991
         */
        c.foreach(println) 
      }
    }
    
    /**
     * Union == ++ 把两个RDD合并为一个新的RDD
     */
    object Union {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("spark-demo").setMaster("local")
        val sc = new SparkContext(conf)
    
        //import org.apache.spark.SparkContext._
        val a = sc.parallelize(1 to 3)
        val b = sc.parallelize(3 to 5)
        val c = a.union(b)
        val d = a ++ b
    
        /**
         *123345
         */
        c.collect().foreach(print)
    
        /**
         *123345
         */
        d.collect().foreach(print)
      }
    }

     --Java

    package com.demo.sparkWordCount;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import akka.japi.Function;
    import scala.Tuple2;
    
    /*
     * Ming Z M LI
     * */
    public class FunctionDemo {
        /*
         * create Context
         */
        public static JavaSparkContext createContext() {
    
            SparkConf sparkConf = new SparkConf().setAppName("FunctionDemo").setMaster("local[*]");
    
            JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    
            return ctx;
    
        }
    
        public static void main(String[] args) {
    
            demo5();
        }
    
        /*
         * RDD1.subtract(RDD2):返回一个新的RDD,内容是:RDD1中存在的,RDD2中不存在的
         */
        public static void demo2() {
    
            JavaSparkContext ctx = createContext();
            List<String> list1 = new ArrayList<String>();
            list1.add("hello1");
            list1.add("hello2");
            list1.add("hello3");
            list1.add("hello4");
    
            List<String> list2 = new ArrayList<String>();
            list2.add("hello3");
            list2.add("hello4");
            list2.add("world5");
            list2.add("world6");
    
            JavaRDD<String> a = ctx.parallelize(list1);
            JavaRDD<String> b = ctx.parallelize(list2);
    
            a.subtract(b).foreach(new VoidFunction<String>() {
                public void call(String t) throws Exception {
                    System.out.println(t.toString());
                }
            });
        }
    
        /**
         * Created by spark on 15-1-19. RDD1.subtractByKey(RDD2):返回一个新的RDD,内容是:RDD1
         * key中存在的,RDD2 key中不存在的 foreach 结果带key (4, bird) (5, hello) (3, cat) output
         * - (4,bird) (4,bird)
         */
    
        public static void demo3() {
            JavaSparkContext ctx = createContext();
            JavaRDD<String> a = ctx.parallelize(new ArrayList<String>(Arrays.asList("cat", "hello", "bird", "bird")));
            JavaRDD<String> b = ctx.parallelize(new ArrayList<String>(Arrays.asList("cat", "hello", "testing")));
    
            JavaPairRDD<Integer, String> c = a.keyBy(new org.apache.spark.api.java.function.Function<String, Integer>() {
    
                public Integer call(String v1) throws Exception {
    
                    return v1.length();
                }
    
            });
    
            // c.foreach(new VoidFunction<Tuple2<Integer,String>>(){
            //
            // public void call(Tuple2<Integer, String> t) throws Exception {
            // // TODO Auto-generated method stub
            // System.out.println("("+t._1+", "+t._2+")");
            // }
            // });
    
            JavaPairRDD<Integer, String> d = b.keyBy(new org.apache.spark.api.java.function.Function<String, Integer>() {
    
                public Integer call(String v1) throws Exception {
    
                    return v1.length();
                }
    
            });
    
            c.subtract(d).foreach(new VoidFunction<Tuple2<Integer, String>>() {
                public void call(Tuple2<Integer, String> t) throws Exception {
                    // TODO Auto-generated method stub
                    System.out.println("(" + t._1 + ", " + t._2 + ")");
                }
            });
        }
    
        /**
         * 取出RDD的前n个元素,以数组的形式返回
         */
        public static void demo4() {
            JavaSparkContext ctx = createContext();
            JavaRDD<String> a = ctx.parallelize(new ArrayList<String>(Arrays.asList("1", "4", "2", "3")));
    
            List<String> b = a.take(3);
    
            for (String c : b) {
                System.out.println(c);
            }
    
        }
    
        /**
         * 获得前几个最大值 output - hello 3
         */
        public static void demo5() {
            JavaSparkContext ctx = createContext();
            JavaRDD<String> a = ctx.parallelize(new ArrayList<String>(Arrays.asList("1", "hello", "2", "3")));
            List<String> b = a.top(2);
            for (String c : b) {
                System.out.println(c);
            }
        }
    
        
    }
  • 相关阅读:
    迭代平方根
    windows怎么进如debug调试
    wxwidgets安装环境配置
    【android官方文档】与其他App交互
    Android ImageView图片自适应
    Gridview的stretchMode详解附自动宽度
    Activity生命周期解说
    一些问题
    android 文件读取(assets)
    android:scaleType属性
  • 原文地址:https://www.cnblogs.com/MarchThree/p/5059649.html
Copyright © 2011-2022 走看看