zoukankan      html  css  js  c++  java
  • Spark Scalaa 几个常用的示例

    SparkWordCount 类源码 standalong 模式
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    
    object SparkWordCount {
     def FILE_NAME:String = "word_count_results_";
     
     def main(args:Array[String]) {
     if (args.length < 1) {
     println("Usage:SparkWordCount FileName");
     System.exit(1);
     }
     val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program");
     val sc = new SparkContext(conf);
     val textFile = sc.textFile(args(0));
     val wordCounts = textFile.flatMap(line => line.split(" ")).map(
                                            word => (word, 1)).reduceByKey((a, b) => a + b)
    
    										
     wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());
     println("Word Count program running results are successfully saved.");
     }
    }
    
    
    --------
    ./spark-submit 
    --class com.ibm.spark.exercise.basic.SparkWordCount 
    --master spark://hadoop036166:7077 
    --num-executors 3 
    --driver-memory 6g --executor-memory 2g 
    --executor-cores 2 
    /home/fams/sparkexercise.jar 
    hdfs://hadoop036166:9000/user/fams/*.txt
    
    
    求平均值 
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    object AvgAgeCalculator {
     def main(args:Array[String]) {
     if (args.length < 1){
     println("Usage:AvgAgeCalculator datafile")
     System.exit(1)
     }
     val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
     val sc = new SparkContext(conf)
     val dataFile = sc.textFile(args(0), 5);
     val count = dataFile.count()
     val ageData = dataFile.map(line => line.split(" ")(1))
     val totalAge = ageData.map(age => Integer.parseInt(
                                    String.valueOf(age))).collect().reduce((a,b) => a+b)
     println("Total Age:" + totalAge + ";Number of People:" + count )
     val avgAge : Double = totalAge.toDouble / count.toDouble
     println("Average Age is " + avgAge)
     }
    }
    
    
    --------------------------
    ./spark-submit 
     --class com.ibm.spark.exercise.basic.AvgAgeCalculator 
     --master spark://hadoop036166:7077 
     --num-executors 3 
     --driver-memory 6g 
     --executor-memory 2g 
     --executor-cores 2 
     /home/fams/sparkexercise.jar 
     hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt
     
     
     
     求男性/女性 最高 最低身高 
     -----------------------
     object PeopleInfoCalculator {
     def main(args:Array[String]) {
     if (args.length < 1){
     println("Usage:PeopleInfoCalculator datafile")
     System.exit(1)
     }
     val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator")
     val sc = new SparkContext(conf)
     val dataFile = sc.textFile(args(0), 5);
     val maleData = dataFile.filter(line => line.contains("M")).map(
                                  line => (line.split(" ")(1) + " " + line.split(" ")(2)))
     val femaleData = dataFile.filter(line => line.contains("F")).map(
                                  line => (line.split(" ")(1) + " " + line.split(" ")(2)))
    
     val maleHeightData = maleData.map(line => line.split(" ")(1).toInt)
     val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt)
    
     val lowestMale = maleHeightData.sortBy(x => x,true).first()
     val lowestFemale = femaleHeightData.sortBy(x => x,true).first()
    
     val highestMale = maleHeightData.sortBy(x => x, false).first()
     val highestFemale = femaleHeightData.sortBy(x => x, false).first()
     println("Number of Male Peole:" + maleData.count())
     println("Number of Female Peole:" + femaleData.count())
     println("Lowest Male:" + lowestMale)
     println("Lowest Female:" + lowestFemale)
     println("Highest Male:" + highestMale)
     println("Highest Female:" + highestFemale)
     }
    }
    
    
    
    
    
    ./spark-submit 
     --class com.ibm.spark.exercise.basic.PeopleInfoCalculator 
     --master spark://hadoop036166:7077 
     --num-executors 3 
     --driver-memory 6g 
     --executor-memory 3g 
     --executor-cores 2 
     /home/fams/sparkexercise.jar 
     hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt
     
     
     每行数据出现的次数最高的
     
     =============
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    object TopKSearchKeyWords {
     def main(args:Array[String]){
     if (args.length < 2) {
     println("Usage:TopKSearchKeyWords KeyWordsFile K");
     System.exit(1)
     }
     val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words")
     val sc = new SparkContext(conf)
     val srcData = sc.textFile(args(0))
     val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b)
    
     val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false)
     val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) }
     topKData.foreach(println)
     }
    }
    
    ./spark-submit 
     --class com.ibm.spark.exercise.basic.TopKSearchKeyWords 
     --master spark://hadoop036166:7077 
     --num-executors 3 
     --driver-memory 6g 
     --executor-memory 2g 
     --executor-cores 2 
     /home/fams/sparkexercise.jar 
     hdfs://hadoop036166:9000/user/fams/inputfiles/search_key_words.txt
     

  • 相关阅读:
    C#后台正则表达式
    Layer 弹出层抖动问题
    JS中子页面父页面方法 变量相互调用
    layer最大话.最小化.还原回调方法
    trove远程连接mongodb
    tar.gz tar.bz2的解压命令
    IO测试工具之fio详解
    HTTP请求方法
    jmeter --使用put方法上传文件
    DHCP的原理和实现过程
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501363.html
Copyright © 2011-2022 走看看