使用Spark 对以下内容进行词频统计 (使用Java语言)
hello world
hello java
hello cnblogs
代码如下:
/** * Spark基于Java Api的词频统计 */ public class WordCountByJava { public static void main(String[] args) { // 初始化 SparkConf setAppName:设置应用名称 setMaster:设置运行模式 SparkConf conf = new SparkConf().setAppName("WORDCOUNT").setMaster("local"); // 初始化 SparkContext对象 JavaSparkContext jsc = new JavaSparkContext(conf); // 使用SparkContext对象读取文件,存为JavaRdd JavaRDD<String> dataRdd = jsc.textFile("G:\test\wc\a.txt"); // 使用flatMap函数对原始Rdd进行转换 按空格进行拆分,保存为集合 JavaRDD<String> flatMapRdd = dataRdd.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterator<String> call(String s) throws Exception { // 拆分字符串 为一个数组 String[] word = s.split(" "); // 把数组转换成List集合 List<String> list = Arrays.asList(word); // 把list集合转换成Iterator集合 Iterator<String> it = list.iterator(); return it; } }); // 使用mapToPair进行map操作 形如: (word,1) JavaPairRDD<String, Integer> mapRdd = flatMapRdd.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); // 使用reduceByKey进行单词统计 返回 (word,CountSum) JavaPairRDD<String, Integer> res = mapRdd.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer a, Integer b) throws Exception { return a + b; } }); // 把最后的 rdd输出 res.foreach(new VoidFunction<Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> tuple2) throws Exception { System.out.println(tuple2._1+" "+tuple2._2); } }); } }