zoukankan      html  css  js  c++  java
  • java实现spark常用算子之join



    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.VoidFunction;
    import scala.Tuple2;

    import java.util.Arrays;
    import java.util.List;

    /**
    * join(otherDataSet,[numTasks]) 算子:
    * 同样的也是按照key将两个RDD中进行汇总操作,会对每个key所对应的两个RDD中的数据进行笛卡尔积计算。
    *
    *按照key进行分类汇总,并且做笛卡尔积
    */
    public class JoinOperator {

    public static void main(String[] args) {
    SparkConf conf = new SparkConf().setMaster("local").setAppName("join");
    JavaSparkContext sc = new JavaSparkContext(conf);
    List<Tuple2<String,String>> stus = Arrays.asList(
    new Tuple2<>("w1","1"),
    new Tuple2<>("w2","2"),
    new Tuple2<>("w3","3"),
    new Tuple2<>("w2","22"),
    new Tuple2<>("w1","11")
    );
    List<Tuple2<String,String>> scores = Arrays.asList(
    new Tuple2<>("w1","a1"),
    new Tuple2<>("w2","a2"),
    new Tuple2<>("w2","a22"),
    new Tuple2<>("w1","a11"),
    new Tuple2<>("w3","a3")
    );

    JavaPairRDD<String,String> stusRdd = sc.parallelizePairs(stus);
    JavaPairRDD<String,String> scoresRdd = sc.parallelizePairs(scores);
    JavaPairRDD<String,Tuple2<String,String>> result = stusRdd.join(scoresRdd);

    result.foreach(new VoidFunction<Tuple2<String, Tuple2<String, String>>>() {
    @Override
    public void call(Tuple2<String, Tuple2<String, String>> tuple) throws Exception {
    System.err.println(tuple._1+":"+tuple._2);
    }
    });

    }
    }

    微信扫描下图二维码加入博主知识星球,获取更多大数据、人工智能、算法等免费学习资料哦!

  • 相关阅读:
    Java SE 5.0(JDK 1.5)新特性
    第22章—开启HTTPS
    第21章—websocket
    IE8Get请求中文不兼容:encodeURI的使用
    JavaScript自定义函数
    disable的错误使用
    20190401-记录一次bug ConstraintViolationException
    new Date()的浏览器兼容性问题
    单例模式(转)
    SQL Server使用一个语句块批量插入多条记录的三种方法和union和union all区别
  • 原文地址:https://www.cnblogs.com/guokai870510826/p/11598869.html
Copyright © 2011-2022 走看看