zoukankan      html  css  js  c++  java
  • spark使用02

    1.rdd的初始化

      1.1 读取文件来初始化rdd(通过sparkContext的textFile方法)

        1.1.1 读取本地文件

         SparkConf conf = new SparkConf().setAppName("LocalWordCount").setMaster("local");// 指定运行在本地
            JavaSparkContext sparkContext = new JavaSparkContext(conf);
            // 返回每一行作为一个元素的rdd
            JavaRDD<String> lines = sparkContext
                    .textFile("C://Users//yanglin//Desktop//bb.txt", 5); // 返回为JavaRDD[String]

        1.1.2 读取hdfs文件

          //返回每一行作为一个元素的rdd
            JavaRDD<String> lines=sparkContext.textFile("hdfs://hadoop-senior.ibeifeng.com:8020/user/yanglin/spark/wc.input", 5);//返回为JavaRDD[String]

      1.2 并行化集合来初始化rdd(通过sparkContext.)

        JavaPairRDD<Integer, String> students = context.parallelizePairs(
                    Arrays.asList(new Tuple2<Integer, String>(1, "zhangsan"),
                            new Tuple2<Integer, String>(2, "lisi"),
                            new Tuple2<Integer, String>(3, "wangwu"),
                            new Tuple2<Integer, String>(4, "zhaoliu")),
                    1)

      2.rdd的基本操作(分为transformation和action)

      2.1 Spark支持两种RDD操作:transformation和action

        2.1.1 区别

          transformation操作会针对已有的RDD创建一个新的RDD;

          action则主要是对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并可以返回结果给Driver程序。
        2.1.2 特性

          transformation的特点就是lazy特性:transformation是不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当transformation之后,接着执行了一            个action操作,那么所有的transformation才会执行。

          action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行。

      2.2 常用transformation操作

        2.2.1 flatMap   将有嵌套类型的集合转换为没有嵌套的一个大集合

        // 返回每一个单词为一个元素的rdd,将每行数据按空格分割后合并为一个大的集合
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                private static final long serialVersionUID = 2192898403909387476L;
                public Iterable<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" "));
                }
            });

        2.2.2  map (在java中分为map和mapToPair,在scala中只有map),将一个rdd转换为另一个rdd

        // 返回每一个单词的映射
            JavaPairRDD<String, Integer> wordPairs = words
                    .mapToPair(new PairFunction<String, String, Integer>() {
                        private static final long serialVersionUID = -4729349390159625475L;
                        public Tuple2<String, Integer> call(String word) throws Exception {
                            return new Tuple2<String, Integer>(word, 1);
                        }
                    });

        2.2.3 reduceByKey (根据key分组和进行reduce操作)

        // 单词数的叠加
            JavaPairRDD<String, Integer> wordCountPairs = wordPairs
                    .reduceByKey(new Function2<Integer, Integer, Integer>() {
                        private static final long serialVersionUID = -8636811420038190538L;
                        public Integer call(Integer v1, Integer v2) throws Exception {
                            return v1 + v2;
                        }
                    });

        2.2.4 filter (过滤符合要求的数据,生成一个新的rdd)

            context.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
                    .filter(new Function<Integer, Boolean>() {
                        private static final long serialVersionUID = 1L;
                        public Boolean call(Integer val) throws Exception {
                            return val % 2 == 0;//获取偶数
                        }
                    })

        2.2.5 reduce (从左到右依次执行reduce操作)

          Integer evenSum = context
                    .parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
              //过滤获取偶数 .filter(
    new Function<Integer, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Integer val) throws Exception { return val % 2 == 0; }
              //计算所有偶数的和 2+4=6 6+6=12 12+8=20 20+10=30 }).reduce(
    new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } });

        2.2.6 gropuByKey(根据key进行分组,每个key对应一个Iterable<value>)

        2.2.7 sortByKey(false)(对每个key对应的value进行排序操作。)
            默认为true:表示升序;设置为false,可以按降序排列

        2.2.8 join (对两个包含<key,value>对的RDD进行join操作,每个key join上的pair,都会传入自定义函数进行处理。返回的rdd的value为两个rdd的返回元组对)

          //students对应的数据为(id,name),scores对应的数据为(id,score),join后为(id,(name,score))
          /**
             *   join的结果为:
             *  id:4 name:zhaoliu scores:21 ===============================
             *  id:1 name:zhangsan scores:69 ===============================
             *  id:1 name:zhangsan scores:68 ===============================
             *  id:3 name:wangwu scores:48 ===============================
             *  id:3 name:wangwu scores:52 ===============================
             *  id:2 name:lisi scores:35 ===============================
             *  id:2 name:lisi scores:97 ===============================
             */
          JavaPairRDD<Integer, String> students = context.parallelizePairs(
                    Arrays.asList(new Tuple2<Integer, String>(1, "zhangsan"),
                            new Tuple2<Integer, String>(2, "lisi"),
                            new Tuple2<Integer, String>(3, "wangwu"),
                            new Tuple2<Integer, String>(4, "zhaoliu")),
                    1);
            
          JavaPairRDD<Integer, Integer> scores = context.parallelizePairs(Arrays.asList(
                    new Tuple2<Integer, Integer>(1, 69), new Tuple2<Integer, Integer>(1, 68),
                    new Tuple2<Integer, Integer>(2, 35), new Tuple2<Integer, Integer>(2, 97),
                    new Tuple2<Integer, Integer>(3, 48), new Tuple2<Integer, Integer>(3, 52),
                    new Tuple2<Integer, Integer>(4, 21)));

          JavaPairRDD<Integer, Tuple2<String, Integer>> studentScorse = students.join(scores);

        2.2.9 cogroup (同join,但是是每个key对应的Iterable<value>都会传入自定义函数进行处理)   

          //会对有相同列的元素进行合并到一个Iterable中
          /**
           * cogroup的结果:    
           * id:4 name:[zhaoliu] scores:[21]
               * id:1 name:[zhangsan] scores:[69, 68]
                * id:3 name:[wangwu] scores:[48, 52]
                * id:2 name:[lisi] scores:[35, 97]
             */
          JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScorse = students.cogroup(scores);

      2.3 action常用操作

        2.3.1 collect(将RDD中所有元素获取到本地客户端)

        2.3.2 count (获取RDD元素总数)

        2.3.3 take(n) (获取RDD中前n个元素)

        2.3.4 saveAsTextFile(path) (将RDD元素保存到文件中,对每个元素调用toString方法)

        2.3.5 countByKey (对每个key对应的值进行count计数)

        2.3.6 foreach (遍历RDD中的每个元素。)

  • 相关阅读:
    Building a Space Station POJ
    Networking POJ
    POJ 1251 Jungle Roads
    CodeForces
    CodeForces
    kuangbin专题 专题一 简单搜索 POJ 1426 Find The Multiple
    The Preliminary Contest for ICPC Asia Shenyang 2019 F. Honk's pool
    The Preliminary Contest for ICPC Asia Shenyang 2019 H. Texas hold'em Poker
    The Preliminary Contest for ICPC Asia Xuzhou 2019 E. XKC's basketball team
    robotparser (File Formats) – Python 中文开发手册
  • 原文地址:https://www.cnblogs.com/lifeone/p/6078723.html
Copyright © 2011-2022 走看看