zoukankan      html  css  js  c++  java
  • Spark中的各种action算子操作(java版)

    在我看来,Spark编程中的action算子的作用就像一个触发器,用来触发之前的transformation算子。transformation操作具有懒加载的特性,你定义完操作之后并不会立即加载,只有当某个action的算子执行之后,前面所有的transformation算子才会全部执行。常用的action算子如下代码所列:(java版) 
    package cn.spark.study.core;

    import java.util.Arrays; 
    import java.util.List; 
    import java.util.Map;

    import org.apache.spark.SparkConf; 
    import org.apache.spark.api.java.JavaPairRDD; 
    import org.apache.spark.api.java.JavaRDD; 
    import org.apache.spark.api.java.JavaSparkContext; 
    import org.apache.spark.api.java.function.Function; 
    import org.apache.spark.api.java.function.Function2;

    import scala.Tuple2;

    /** 
    * action操作实战 
    * @author dd 

    */ 
    public class ActionOperation { 
    public static void main(String[] args) { 
    //reduceTest(); 
    //collectTest(); 
    //countTest(); 
    //takeTest(); 
    countByKeyTest(); 
    }

      1 /**
      2  * reduce算子
      3  * 案例:求累加和
      4  */
      5 private static void reduceTest(){
      6     SparkConf conf = new SparkConf()
      7                     .setAppName("reduce")
      8                     .setMaster("local");
      9     JavaSparkContext sc = new JavaSparkContext(conf);
     10 
     11     List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
     12 
     13     JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);
     14 
     15     //使用reduce操作对集合中的数字进行累加
     16     int sum = numbersRDD.reduce(new Function2<Integer, Integer, Integer>() {
     17 
     18         @Override
     19         public Integer call(Integer arg0, Integer arg1) throws Exception {
     20             return arg0+arg1;
     21         }
     22     });
     23 
     24     System.out.println(sum);
     25 
     26     sc.close();
     27 }
     28 
     29 /**
     30  * collect算子
     31  * 可以将集群上的数据拉取到本地进行遍历(不推荐使用)
     32  */
     33 private static void collectTest(){
     34     SparkConf conf = new SparkConf()
     35     .setAppName("collect")
     36     .setMaster("local");
     37     JavaSparkContext sc = new JavaSparkContext(conf);
     38 
     39     List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
     40 
     41     JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);
     42 
     43     JavaRDD<Integer> doubleNumbers = numbersRDD.map(new Function<Integer, Integer>() {
     44 
     45         @Override
     46         public Integer call(Integer arg0) throws Exception {
     47             // TODO Auto-generated method stub
     48             return arg0*2;
     49         }
     50     });
     51 
     52     //foreach的action操作是在远程集群上遍历rdd中的元素,而collect操作是将在分布式集群上的rdd
     53     //数据拉取到本地,这种方式一般不建议使用,因为如果rdd中的数据量较大的话,比如超过一万条,那么性能会
     54     //比较差,因为要从远程走大量的网络传输,将数据获取到本地,有时还可能发生oom异常,内存溢出
     55     //所以还是推荐使用foreach操作来对最终的rdd进行处理
     56     List<Integer> doubleNumList = doubleNumbers.collect();
     57     for(Integer num : doubleNumList){
     58         System.out.println(num);
     59     }
     60     sc.close();
     61 }
     62 
     63 /**
     64  * count算子
     65  * 可以统计rdd中的元素个数
     66  */
     67 private static void countTest(){
     68     SparkConf conf = new SparkConf()
     69     .setAppName("count")
     70     .setMaster("local");
     71     JavaSparkContext sc = new JavaSparkContext(conf);
     72 
     73     List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
     74 
     75     JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);
     76 
     77     //对rdd使用count操作统计rdd中元素的个数
     78     long count = numbersRDD.count();
     79     System.out.println(count);
     80 
     81     sc.close();
     82 }
     83 
     84 /**
     85  * take算子
     86  * 将远程rdd的前n个数据拉取到本地
     87  */
     88 private static void takeTest(){
     89     SparkConf conf = new SparkConf()
     90     .setAppName("take")
     91     .setMaster("local");
     92     JavaSparkContext sc = new JavaSparkContext(conf);
     93 
     94     List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
     95 
     96     JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);
     97 
     98     //take操作与collect操作类似,也是从远程集群上获取rdd数据,但是,collect操作获取的是rdd的
     99     //所有数据,take获取的只是前n个数据
    100     List<Integer> top3number = numbersRDD.take(3);
    101     for(Integer num : top3number){
    102         System.out.println(num);
    103     }
    104     sc.close();
    105 }
    106 
    107 /**
    108  * saveAsTextFile算子
    109  * 
    110  */
    111 private static void saveAsTExtFileTest(){
    112     SparkConf conf = new SparkConf()
    113     .setAppName("saveAsTextFile");
    114 
    115     JavaSparkContext sc = new JavaSparkContext(conf);
    116 
    117     List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
    118 
    119     JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);
    120 
    121     JavaRDD<Integer> doubleNumbers = numbersRDD.map(new Function<Integer, Integer>() {
    122 
    123         @Override
    124         public Integer call(Integer arg0) throws Exception {
    125             // TODO Auto-generated method stub
    126             return arg0*2;
    127         }
    128     });
    129 
    130     //saveAsTextFile算子可以直接将rdd中的数据保存在hdfs中
    131     //但是我们在这里只能指定保存的文件夹也就是目录,那么实际上,会保存为目录中的
    132     //  /double_number.txt/part-00000文件
    133     doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt");
    134 
    135     sc.close();
    136 }
    137 
    138 /**
    139  * countByKey算子
    140  */
    141 
    142 private static void countByKeyTest(){
    143     SparkConf conf = new SparkConf()
    144     .setAppName("take")
    145     .setMaster("local");
    146     JavaSparkContext sc = new JavaSparkContext(conf);
    147 
    148     List<Tuple2<String, String>> studentsList = Arrays.asList(
    149             new Tuple2<String, String>("class1","leo"),
    150             new Tuple2<String, String>("class2","jack"),
    151             new Tuple2<String, String>("class1","marry"),
    152             new Tuple2<String, String>("class2","tom"),
    153             new Tuple2<String, String>("class2","david"));
    154 
    155     JavaPairRDD<String, String> studentsRDD = sc.parallelizePairs(studentsList);
    156 
    157     //countByKey算子可以统计每个key对应元素的个数
    158     //countByKey返回的类型直接就是Map<String,Object>
    159 
    160     Map<String, Object> studentsCounts = studentsRDD.countByKey();
    161 
    162     for(Map.Entry<String, Object> studentsCount : studentsCounts.entrySet()){
    163         System.out.println(studentsCount.getKey()+" : "+studentsCount.getValue());
    164     }
    165     sc.close();
    166 }

    原文引自:http://blog.csdn.net/kongshuchen/article/details/51344124

  • 相关阅读:
    ls命令具有一个r选项,可以递归的列出子目录中的内容。请编写一个具有同样功能的程序。
    BizTalk Server 2006 正式发布了,有120天试用版可以下载 无为而为
    IT人看《国富论》系列:第一篇之第四章:论货币的起源及其效用。UML是软件行业的货币 无为而为
    使用XPathNavigator和XPathExpression求出XPath的值,[源代码] 无为而为
    为XPath自定义函数(因为XPath1.0的函数非常有限)[附源代码下载] 无为而为
    IT人看《国富论》系列:第一篇之第二章:论分工的原由。分工其实是人类利己倾向的结果 无为而为
    非常令人沮丧的是,SQL 2005 发布的 Web EndPoint不支持匿名访问 无为而为
    Team Foundation Server Workgroup Edition 5 用户限制到底是如何限制的呢? 无为而为
    微软发布新的MSN ToolBar V2.5,包含桌面搜索,Outlook搜索,支持IE的选项卡浏览模式 无为而为
    IT人看《国富论》系列:第一篇之第三章:论分工受市场范围的限制。外国人都觊觎的中国市场到底大还是不大? 无为而为
  • 原文地址:https://www.cnblogs.com/jinggangshan/p/8125873.html
Copyright © 2011-2022 走看看