实例代码:
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; //Driver public class SparkWordCount1{ public static void main(String[] args) { SparkConf sparkConf = new SparkConf() // 第一步:创建SparkConf对象,设置相关配置信息 .setMaster("local") .setAppName("wordcount"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); // 第二步:创建JavaSparkContext对象,是Spark的所有功能的入口 ctx.setLogLevel("WARN"); // final JavaRDD<String> linesRdd = ctx.textFile(args[0]); // 根据文件类型的输入源创建RDD的方法textFile() ArrayList<String> lines = new ArrayList<String>(); lines.add("Hello Java Hi Ok"); lines.add("Ok No House Hello"); JavaRDD<String> linesRdd = ctx.parallelize(lines); // 第三步:创建一个初始的RDD 并行化 parallelize JavaRDD<String> words = linesRdd.flatMap((s) -> Arrays.asList(s.split(" ")).iterator()); JavaPairRDD<String, Integer> ones = words.mapToPair(s->new Tuple2<String, Integer>(s, 1)); JavaPairRDD<String, Integer> counts = ones.reduceByKey((x,y)->x+y); List<Tuple2<String, Integer>> results = counts.collect(); // System.out.println(counts.collect()); // System.out.println(results.toString()); // counts.foreach(System.out::println); counts.foreach(x->System.out.println(x)); ctx.close(); } }
实例结果: