zoukankan      html  css  js  c++  java
  • Spark Scalaa 几个常用的示例

    SparkWordCount 类源码 standalong 模式
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    
    object SparkWordCount {
     def FILE_NAME:String = "word_count_results_";
     
     def main(args:Array[String]) {
     if (args.length < 1) {
     println("Usage:SparkWordCount FileName");
     System.exit(1);
     }
     val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program");
     val sc = new SparkContext(conf);
     val textFile = sc.textFile(args(0));
     val wordCounts = textFile.flatMap(line => line.split(" ")).map(
                                            word => (word, 1)).reduceByKey((a, b) => a + b)
    
    										
     wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());
     println("Word Count program running results are successfully saved.");
     }
    }
    
    
    --------
    ./spark-submit 
    --class com.ibm.spark.exercise.basic.SparkWordCount 
    --master spark://hadoop036166:7077 
    --num-executors 3 
    --driver-memory 6g --executor-memory 2g 
    --executor-cores 2 
    /home/fams/sparkexercise.jar 
    hdfs://hadoop036166:9000/user/fams/*.txt
    
    
    求平均值 
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    object AvgAgeCalculator {
     def main(args:Array[String]) {
     if (args.length < 1){
     println("Usage:AvgAgeCalculator datafile")
     System.exit(1)
     }
     val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
     val sc = new SparkContext(conf)
     val dataFile = sc.textFile(args(0), 5);
     val count = dataFile.count()
     val ageData = dataFile.map(line => line.split(" ")(1))
     val totalAge = ageData.map(age => Integer.parseInt(
                                    String.valueOf(age))).collect().reduce((a,b) => a+b)
     println("Total Age:" + totalAge + ";Number of People:" + count )
     val avgAge : Double = totalAge.toDouble / count.toDouble
     println("Average Age is " + avgAge)
     }
    }
    
    
    --------------------------
    ./spark-submit 
     --class com.ibm.spark.exercise.basic.AvgAgeCalculator 
     --master spark://hadoop036166:7077 
     --num-executors 3 
     --driver-memory 6g 
     --executor-memory 2g 
     --executor-cores 2 
     /home/fams/sparkexercise.jar 
     hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt
     
     
     
     求男性/女性 最高 最低身高 
     -----------------------
     object PeopleInfoCalculator {
     def main(args:Array[String]) {
     if (args.length < 1){
     println("Usage:PeopleInfoCalculator datafile")
     System.exit(1)
     }
     val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator")
     val sc = new SparkContext(conf)
     val dataFile = sc.textFile(args(0), 5);
     val maleData = dataFile.filter(line => line.contains("M")).map(
                                  line => (line.split(" ")(1) + " " + line.split(" ")(2)))
     val femaleData = dataFile.filter(line => line.contains("F")).map(
                                  line => (line.split(" ")(1) + " " + line.split(" ")(2)))
    
     val maleHeightData = maleData.map(line => line.split(" ")(1).toInt)
     val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt)
    
     val lowestMale = maleHeightData.sortBy(x => x,true).first()
     val lowestFemale = femaleHeightData.sortBy(x => x,true).first()
    
     val highestMale = maleHeightData.sortBy(x => x, false).first()
     val highestFemale = femaleHeightData.sortBy(x => x, false).first()
     println("Number of Male Peole:" + maleData.count())
     println("Number of Female Peole:" + femaleData.count())
     println("Lowest Male:" + lowestMale)
     println("Lowest Female:" + lowestFemale)
     println("Highest Male:" + highestMale)
     println("Highest Female:" + highestFemale)
     }
    }
    
    
    
    
    
    ./spark-submit 
     --class com.ibm.spark.exercise.basic.PeopleInfoCalculator 
     --master spark://hadoop036166:7077 
     --num-executors 3 
     --driver-memory 6g 
     --executor-memory 3g 
     --executor-cores 2 
     /home/fams/sparkexercise.jar 
     hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt
     
     
     每行数据出现的次数最高的
     
     =============
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    object TopKSearchKeyWords {
     def main(args:Array[String]){
     if (args.length < 2) {
     println("Usage:TopKSearchKeyWords KeyWordsFile K");
     System.exit(1)
     }
     val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words")
     val sc = new SparkContext(conf)
     val srcData = sc.textFile(args(0))
     val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b)
    
     val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false)
     val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) }
     topKData.foreach(println)
     }
    }
    
    ./spark-submit 
     --class com.ibm.spark.exercise.basic.TopKSearchKeyWords 
     --master spark://hadoop036166:7077 
     --num-executors 3 
     --driver-memory 6g 
     --executor-memory 2g 
     --executor-cores 2 
     /home/fams/sparkexercise.jar 
     hdfs://hadoop036166:9000/user/fams/inputfiles/search_key_words.txt
     

  • 相关阅读:
    9.11 eventbus
    9.10,,,实现new instanceof apply call 高阶函数,偏函数,柯里化
    9.9 promise实现 写完了传到gitee上面了,这里这个不完整
    9.5cors配置代码
    9.5 jsonp 实现
    9.5 http tcp https总结
    9.3 es6 class一部分 and es5 class 发布订阅
    8.30 cookie session token jwt
    8.30vue响应式原理
    warning: LF will be replaced by CRLF in renard-wx/project.config.json. The file will have its original line endings in your working directory
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501363.html
Copyright © 2011-2022 走看看