zoukankan      html  css  js  c++  java
  • spark cogroup算子

    java

     1 /** 
     2  *cogroup与join算子不同的是如果rdd中的一个key,对应多个value,则返回<Iterable<key>,Iterable<value>>
     3  *@author Tele
     4  */
     5 public class CogroupDemo {
     6     private static SparkConf conf = new SparkConf().setMaster("local").setAppName("congroupdemo");
     7     private static JavaSparkContext jsc = new JavaSparkContext(conf);
     8     public static void main(String[] args) {
     9         //每个学生有多门成绩
    10         List<Tuple2<Integer,String>> studentList = Arrays.asList(
    11                                                     new Tuple2<Integer,String>(1,"tele"), 
    12                                                     new Tuple2<Integer,String>(1,"xx"), 
    13                                                     new Tuple2<Integer,String>(2,"yeye"), 
    14                                                     new Tuple2<Integer,String>(3,"wyc")
    15                                                    );
    16 
    17         List<Tuple2<Integer,Integer>> scoreList = Arrays.asList(
    18                                                   new Tuple2<Integer,Integer>(1,100),
    19                                                   new Tuple2<Integer,Integer>(1,110),
    20                                                   new Tuple2<Integer,Integer>(1,120),
    21                                                   new Tuple2<Integer,Integer>(2,90),
    22                                                   new Tuple2<Integer,Integer>(2,60),
    23                                                   new Tuple2<Integer,Integer>(2,50),
    24                                                   new Tuple2<Integer,Integer>(3,70),
    25                                                   new Tuple2<Integer,Integer>(3,70)
    26                                                   );
    27         
    28         JavaPairRDD<Integer, String> studentRDD = jsc.parallelizePairs(studentList);
    29         JavaPairRDD<Integer, Integer> scoreRDD = jsc.parallelizePairs(scoreList);
    30         
    31         JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> result = studentRDD.cogroup(scoreRDD);
    32         result.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {
    33             
    34             private static final long serialVersionUID = 1L;
    35 
    36             @Override
    37             public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {
    38                 System.out.println("学号:" + t._1);
    39                 System.out.println("姓名:" + t._2._1);
    40                 System.out.println("成绩:" + t._2._2);
    41                 
    42             /*    System.out.print("成绩:[");
    43                 t._2._2.forEach(i->System.out.print(i + ","));
    44                 System.out.println("]");
    45                 System.out.println("====================");*/
    46                 
    47             }
    48         });
    49         
    50         jsc.close();
    51     }
    52 }

    scala

     1 object CogroupDemo {
     2     def main(args: Array[String]): Unit = {
     3         val conf = new SparkConf().setMaster("local").setAppName("cogroupdemo");
     4         val sc = new SparkContext(conf);
     5         
     6         val studentArr = Array((1,"tele"),(2,"yeye"),(3,"wyc"));
     7         val scoreArr = Array((1,100),(1,200),(2,80),(2,300),(3,100));
     8         
     9         val studentRDD = sc.parallelize(studentArr,1);
    10         val scoreRDD = sc.parallelize(scoreArr,1);
    11         
    12         val result = studentRDD.cogroup(scoreRDD);
    13         result.foreach(t=>{
    14           println("学号:" + t._1);
    15           println("姓名:" + t._2._1.mkString(" "));
    16           println("成绩:" + t._2._2.mkString(","));
    17           println("============");
    18         })
    19     }
    20 }

  • 相关阅读:
    oracle impdp 导入
    oracle权限的分配
    Oracle CASE WHEN 用法介绍
    Oracle自动执行任务(存储过程)
    PL/SQL注册码
    ORACLE基本的sql语句
    ORACLE导出导入问题和表空间问题
    PLSQL笔记
    JSEL 表达式
    asp.net HTTP教程一(HTTP运行期与页面执行模型 )
  • 原文地址:https://www.cnblogs.com/tele-share/p/10268576.html
Copyright © 2011-2022 走看看