1. 算子
package com.test; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class Test { private static final int Function2 = 0; public static void main(String[] args) { SparkConf sparkConf = new SparkConf() .setAppName("Test") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD linesRdd = sc.textFile("/home/test/a.txt"); /** * FlatMapFunction 中的2个String 分别代表输入参数类型和输出参数类型 */ JavaRDD<String> wordsRDD = linesRdd.flatMap(new FlatMapFunction<String, String>(){ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { /** * 参数 line 就代表 linesRDD中的每一条记录 */ List<String> list = Arrays.asList(line.split(" ")); return list; } }); /** * 要将每一个单词计数为1 * wordsRDD 是一个非 K V 格式的Rdd, * 在java api 中要返回一个K V 格式的rdd, 必须使用 mapToPair 方法 * return 结果就是一个 K V 格式 */ JavaPairRDD<String, Integer> pairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); /** * 使用reduceByKey 进行聚合操作 * 1. 进行 groupByKey 将相同的 key 分割到一个组里去, 然后通过传入的函数对主内的数据进行聚合 * call 方法将会自动将个数循环相加 */ JavaPairRDD<String, Integer> resultRdd = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); /** * 按照单词出现的次数进行排序, 应为排序需要对 v 值(出现个数)进行排序, 所以需要将 K V, 进行调换, 因为sortByKey只对key能进行排序 * 先使用 mapToPair 来调换位置 * sortByKey 进行排序 * 再使用 mapToPair 来调换位置 */ resultRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception { return new Tuple2<Integer, String>(tuple._2,tuple._1); } }).sortByKey().mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception { return new Tuple2<String, Integer>(tuple._2, tuple._1); } }).foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> tuple) throws Exception { System.out.println(tuple); } }); } }
jar包 链接:https://pan.baidu.com/s/1UDp81G8tY7IgwJatlT_1Vg 密码:yj06