zoukankan      html  css  js  c++  java
  • Spark 常用的 Transformation 算子示例 ===> Java 版

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.*;
    import scala.Int;
    import scala.Tuple2;

    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;

    public class TransformationCases {
    public static void main(String[] args) {

    //准备测试数据
    List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
    List<String> text = Arrays.asList("cat,dog,rabbit","apple,pear,peach","eyes,nose,mouth");
    List<Tuple2<String,Integer>> scores = Arrays.asList(
    new Tuple2<String, Integer>("class1",88),
    new Tuple2<String, Integer>("class2",90),
    new Tuple2<String, Integer>("class2",85),
    new Tuple2<String, Integer>("class1",95),
    new Tuple2<String, Integer>("class2",89)
    );
    List<Tuple2<Integer,String>> students = Arrays.asList(
    new Tuple2<Integer, String>(1,"s1"),
    new Tuple2<Integer, String>(2,"s2"),
    new Tuple2<Integer, String>(3,"s3"),
    new Tuple2<Integer, String>(4,"s4")
    );
    List<Tuple2<Integer,Integer>> stuScores = Arrays.asList(
    new Tuple2<Integer, Integer>(1,100),
    new Tuple2<Integer, Integer>(2,98),
    new Tuple2<Integer, Integer>(3,98),
    new Tuple2<Integer, Integer>(3,99),
    new Tuple2<Integer, Integer>(2,99)
    );

    //拿到 SparkContext 对象
    JavaSparkContext sc = getContext();
    //测试 Transformation 方法:
    // mapDemo(sc,numbers);
    // filterDemo(sc,numbers);
    // flatMapDemo(sc,text);
    // groupByKeyDemo(sc,scores);
    // reduceByKeyDemo(sc,scores);
    // sortByKeyDemo(sc,scores);
    // joinDemo(sc,students,stuScores);
    cogroupDemo(sc,students,stuScores);
            closeContext(sc);
        }

    //创建SparkConf 和 SparkContext 对象。
    public static JavaSparkContext getContext(){
    SparkConf conf = new SparkConf()
    .setAppName("TransformationCases")
    .setMaster("local");

    JavaSparkContext sc = new JavaSparkContext(conf);
    return sc;
    }

    //关闭 SparkContext 对象。
    public static void closeContext(JavaSparkContext sc){
    if (sc != null){
    sc.close();
    }
    }

    //调用 map 算子实现功能:将集合中的每个元素乘以 2 .
    public static void mapDemo(JavaSparkContext sc, List<Integer> numbers){
    JavaRDD<Integer> rdd = sc.parallelize(numbers,1);
    JavaRDD<Integer> doubledNumbers = rdd.map(new Function<Integer,Integer>() {
    public Integer call(Integer v1) throws Exception {
    return v1 * 2;
    }
    });

    doubledNumbers.foreach(new VoidFunction<Integer>() {
    public void call(Integer number) throws Exception {
    System.out.println(number);
    }
    });
    }

    //调用 filter 算子实现功能:返回集合中所有的偶数。
    public static void filterDemo(JavaSparkContext sc,List<Integer> numbers){
    JavaRDD<Integer> rdd = sc.parallelize(numbers,1);
    JavaRDD<Integer> evenNumbers = rdd.filter(new Function<Integer, Boolean>() {
    public Boolean call(Integer v1) throws Exception {
    return v1 % 2 == 0;
    }
    });

    evenNumbers.foreach(new VoidFunction<Integer>() {
    public void call(Integer number) throws Exception {
    System.out.println(number);
    }
    });
    }

    //调用 flatMap 算子实现功能:将每个字符串拆分成单个的单词。
    public static void flatMapDemo(JavaSparkContext sc,List<String> text){
    JavaRDD<String> rdd = sc.parallelize(text);
    JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
    public Iterator<String> call(String line) throws Exception {
    return Arrays.asList(line.split(",")).iterator();
    }
    });

    words.foreach(new VoidFunction<String>() {
    public void call(String word) throws Exception {
    System.out.println(word);
    }
    });
    }

    //调用 groupByKey 算子实现功能:根据班级分组,将同一个班级的分数归为一组。
    public static void groupByKeyDemo(JavaSparkContext sc, List<Tuple2<String,Integer>> scores){
    JavaPairRDD<String, Integer> lists = sc.parallelizePairs(scores);
    JavaPairRDD<String,Iterable<Integer>> groupedScores = lists.groupByKey();

    groupedScores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
    public void call(Tuple2<String, Iterable<Integer>> scores) throws Exception {
    System.out.println(scores._1);
    Iterator<Integer> iterator = scores._2.iterator();
    while (iterator.hasNext()){
    System.out.println(iterator.next());
    }
    System.out.println("========================================");
    }
    });
    }

    //调用 reduceByKey 算子实现功能:计算每个班级分数总和。
    public static void reduceByKeyDemo(JavaSparkContext sc,List<Tuple2<String,Integer>> scores){
    JavaPairRDD<String,Integer> rdd = sc.parallelizePairs(scores);
    JavaPairRDD<String,Integer> reducedScores = rdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer v1, Integer v2) throws Exception {
    return v1 + v2;
    }
    });

    reducedScores.foreach(new VoidFunction<Tuple2<String, Integer>>() {
    public void call(Tuple2<String, Integer> scores) throws Exception {
    System.out.println(scores._1 + " : " + scores._2);
    }
    });
    }

    //调用 sortedByKey 算子实现功能:按照分数做升序排序。
    public static void sortByKeyDemo(JavaSparkContext sc,List<Tuple2<String,Integer>> scores){
    JavaPairRDD<String,Integer> rdd = sc.parallelizePairs(scores);
         //因为是要根据分数排序,而原始数据的key是class,所以将key和value临时调换一下。
    JavaPairRDD<Integer,String> swapedRdd = rdd.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
    public Tuple2<Integer, String> call(Tuple2<String, Integer> pair) throws Exception {
    return new Tuple2<Integer, String>(pair._2,pair._1);
    }
    });
         //根据现在的key(分数)升序排序。
    JavaPairRDD<Integer,String> sortedRdd = swapedRdd.sortByKey();
         //排序完成后,还是要按照原始数据的key和value来保存,所以再把key和value调换回来。
    JavaPairRDD<String,Integer> result = sortedRdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
    public Tuple2<String, Integer> call(Tuple2<Integer, String> pair) throws Exception {
    return new Tuple2<String, Integer>(pair._2,pair._1);
    }
    });

    result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
    public void call(Tuple2<String, Integer> pairs) throws Exception {
    System.out.println(pairs._1 + " : " + pairs._2);
    }
    });
    }

    //调用 join 算子实现功能:将两个RDD的元素按照key做连接。
    public static void joinDemo(JavaSparkContext sc,List<Tuple2<Integer,String>> students,List<Tuple2<Integer,Integer>> stuScores){
    JavaPairRDD<Integer,String> stuRdd = sc.parallelizePairs(students);
    JavaPairRDD<Integer,Integer> scoreRdd = sc.parallelizePairs(stuScores);

    JavaPairRDD<Integer,Tuple2<String,Integer>> lists = stuRdd.join(scoreRdd);

    lists.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
    public void call(Tuple2<Integer, Tuple2<String, Integer>> pairs) throws Exception {
    System.out.println(pairs._1 + " : " + pairs._2._1 + " : " + pairs._2._2);
    }
    });
    }

    //调用 cogroup 算子实现功能:将两个RDD的元素按照key做连接。 它跟join实现的功能是一样的,但是它们的返回值不同。
        public static void cogroupDemo(JavaSparkContext sc,List<Tuple2<Integer,String>> students,List<Tuple2<Integer,Integer>> stuScores){
    JavaPairRDD<Integer,String> stuRdd = sc.parallelizePairs(students);
    JavaPairRDD<Integer,Integer> scoreRdd = sc.parallelizePairs(stuScores);
    JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> cogroupedRdd = stuRdd.cogroup(scoreRdd);

    cogroupedRdd.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
    public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> pairs) throws Exception {
    System.out.println(pairs._1 + " : " + pairs._2._1 + " : " + pairs._2._2);
    }
    });

    }
    }
  • 相关阅读:
    前端整体流程
    django安装
    scrapy中出现[scrapy.downloadermiddlewares.redirect] DEBUG: Redirecting (302) to 如何解决
    python测试当前代理IP是否有效
    grequests模块
    scrapy中发起post请求
    post请求中的payload解决办法
    SSM配置动态数据源
    前端(十):使用redux管理数据
    前端(九):react生命周期
  • 原文地址:https://www.cnblogs.com/rabbit624/p/10656567.html
Copyright © 2011-2022 走看看