zoukankan      html  css  js  c++  java
  • Java & Scala 版 Spark word count 程序

    学习高级编程语言的时候,作为入门程序,要先学会写 “Hello World !”。

    在大数据的世界,作为入门程序,要先学会写 Word Count。

    这里记录一下如何分别使用 java 和 scala语言调用 spark 的算子来完成 word count 程序。

    一、Java 版本:

    import java.util.Arrays;
    import java.util.Iterator;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import scala.Tuple2;

    public class WordCountLocal {
    public static void main(String[] args) {
    //第一步:创建conf对象。
    SparkConf conf = new SparkConf()
    .setAppName("wordcount")
    .setMaster("local");
    //第二步:创建context对象。
    JavaSparkContext sc = new JavaSparkContext(conf);

    //第三步:创建RDD,调用RDD算子。
    JavaRDD<String> lines = sc.textFile("data/wc.txt");
    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

    private static final long serialVersionUID = 1L;
    public Iterator<String> call(String line) throws Exception {
    return Arrays.asList(line.split(" ")).iterator();
    }
    });

    JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

    private static final long serialVersionUID = 1L;
    public Tuple2<String, Integer> call(String word) throws Exception {
    return new Tuple2<String, Integer>(word,1);
    }
    });

    JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

    private static final long serialVersionUID = 1L;
    public Integer call(Integer v1, Integer v2) throws Exception {
    return v1 + v2;
    }
    });

    wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {

    private static final long serialVersionUID = 1L;
    public void call(Tuple2<String, Integer> wordCount) throws Exception {
    System.out.println(wordCount._1 + "--->" + wordCount._2);
    }
    });
    //别忘了关闭sparkContext
    sc.close();

    }
    }

    二、 Scala 版本
    object wordCount {
    def main(args: Array[String]): Unit = {
    val path = "data/wc.txt"
    val savePath = s"result/wc/${System.currentTimeMillis()}"
    // val path = "hdfs:bd27-server.ibeifeng.com:8020/user/beifeng/wc.txt"
    // val savePath = "hdfs:bd27-server.ibeifeng.com:8020/user/beifeng/sparkwc/"
    val conf = new SparkConf()
    .setMaster("local")
    .setAppName("sparkWC")
    val sc = new SparkContext(conf)

    val rdd = sc.textFile(path)
    val words = rdd.flatMap(line => line.split(" "))
    val wordPair = words.map(word => (word,1))
    val result: RDD[(String, Int)] = wordPair.reduceByKey((a, b) => a + b)

    result.foreachPartition(f => f.foreach(println))
    result.saveAsTextFile(savePath)

    //Thread.sleep(100000) 这一行是为了到4040页面看一下DAG图和Excutor执行情况。让页面停留一会儿。
    }
    }

    这个程序写的有些冗余了。scala有着非常强大的链式编程的特性,可以从第一个RDD开始一路 .XXX(函数名称) 到最后。就像这样:
    val rdd = sc.textFile(path)
    .flatMap(line => line.split(" "))
    .map(word => (word,1))
    .reduceByKey((a, b) => a + b)

    除了reduceByKey,spark还有一个高性能算子叫做aggregateByKey。它们内部做了相当于MapReduce的combiner的操作,效率会比较高。
    val rdd = sc.textFile(path)
    .flatMap(_.split(" "))
    .map(word => (word,1))
    .aggregateByKey(0)(
    _+_,
    _+_
    )

    在程序的最后一定要调用action类型算子才能触发job的执行,比如 collect,take,foreach等。
    
    
  • 相关阅读:
    PAT 甲级 1072 Gas Station (30 分)(dijstra)
    python flask框架学习(二)——第一个flask程序
    python flask框架学习(一)——准备工作和环境配置与安装
    【Beats】Filebeat工作原理(十七)
    【Beats】Filebeat介绍及使用(十六)
    Dapper简介
    Sequelize ORM
    StackExchange / Dapper
    TypeORM 简介
    Egg.js 是什么?
  • 原文地址:https://www.cnblogs.com/rabbit624/p/10650448.html
Copyright © 2011-2022 走看看