import
org.apache.spark.SparkConf;
import
org.apache.spark.api.java.JavaPairRDD;
import
org.apache.spark.api.java.JavaRDD;
import
org.apache.spark.api.java.JavaSparkContext;
import
org.apache.spark.api.java.function.FlatMapFunction;
import
org.apache.spark.api.java.function.PairFunction;
import
scala.Tuple2;
import
java.util.Arrays;
import
java.util.List;
public
class
Main {
public
static
void
main(String[] args) {
//设置本地模式,不提交到集群运行,运行的名称为myapp
SparkConf conf =
new
SparkConf().setMaster(
"local"
).setAppName(
"my app"
);
JavaSparkContext sc =
new
JavaSparkContext(conf);
//设置文件的输入路径为/ok/test
String inputFile=
"/ok/test"
;
JavaRDD<String> input = sc.textFile(inputFile);
//设置词之间以 “ ”间隔
JavaRDD<String> words = input.flatMap(
new
FlatMapFunction<String, String>() {
public
Iterable<String> call(String s)
throws
Exception {
return
Arrays.asList(s.split(
" "
));
}
}
);
//设置每遇到一个单词,相应的计数加1
JavaPairRDD<String, Integer> counts = words.mapToPair(
new
PairFunction<String, String, Integer>() {
public
Tuple2<String, Integer> call(String s)
throws
Exception {
return
new
Tuple2(s,
1
);
}
}
//设置遇到相同的词汇,将计数相加
).reduceByKey(
new
org.apache.spark.api.java.function.Function2<Integer, Integer, Integer>() {
public
Integer call(Integer integer, Integer integer2)
throws
Exception {
return
integer+integer2;
}
});
//用列表来存储所有的单词-计数 pair
List<Tuple2<String,Integer>> output =counts.collect();
//遍历此链表
for
(Tuple2 tuple: output){
System.out.println(tuple._1+
": "
+tuple._2);
}
//关闭集群
sc.stop();
}
}
输出: