zoukankan      html  css  js  c++  java
  • spark-wordcount-sample算子测试

    import org.apache.spark.{SparkConf, SparkContext}
    
    object radomSampleU {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("WordCount_groupBy")
          .setMaster("local")
          //  .set("spark.default.parallelism", "100") //  1. 调节并行度
          .set("spark.executor.memory ","4g")
          .set("spark.executor.cores","5")
          .set("spark.executor.nums","4")//1
          //静态内存机制
          .set("spark.memory.useLegacyMode","false")
          .set("spark.storage.memoryFraction", "0.3")// 5.cache占用的内存占比,default=0.6
          //统一内存机制
          .set("spark.memory.Fraction","0.3")//default=0.6
          .set("spark.storage.storageFraction","0.9")//default=0.5
          .set("spark.shuffle.consolidateFiles", "false")
          //过滤多余日志文件
        val sc = new SparkContext(conf)
        sc.setLogLevel("ERROR")
        val startTime=System.currentTimeMillis()
        val inpath= "F:\hml\dataset\1021\1021\####.txt"//
        val lines = sc.textFile(inpath)//.cache()   // 读取本地文件
        val words = lines.flatMap(_.split(" ")).filter(word => word != " ")  // 拆分单词,并过滤掉空格,当然还可以继续进行过滤,如去掉标点符号
        //sample采样测试
        words.partitions.size
        println("样本汇总结果***********************************")
        val wordsample=words.sample(false,0.0005)
        wordsample.map(word => (word, 1))
          .reduceByKey(_ + _)
          .collect()
          .foreach(println)
    
    println("总体数据汇总结果*************************")
        val pairs = words.map(word => (word, 1))  // 在单词拆分的基础上对每个单词实例计数为1, 也就是 word => (word, 1)
        val start1=System.currentTimeMillis()
        val wordscount = pairs.reduceByKey(_ + _)//.collect() .foreach(println) // 在每个单词实例计数为1的基础之上统计每个单词在文件中出现的总次数, 即key相同的value相加
        val end1=System.currentTimeMillis()
        wordscount.collect.foreach(println)  // 打印结果,使用collect会将集群中的数据收集到当前运行drive的机器上,需要保证单台机器能放得下所有数据
        val endTime=System.currentTimeMillis()
        println("应用总耗时"+(endTime-startTime))
        println("reduceByKey耗时"+ (end1-start1))
        Thread.sleep(1000000)
        sc.stop()   // 释放资源
      }
    }
  • 相关阅读:
    AI中台
    java的static关键字
    java多线程面试题整理及答案(2019年)
    一篇搞懂TCP的三次握手四次挥手
    异步注解Async
    常用的SQL语句大全
    用Intellij Idea导出可执行的jar包
    用Intellij Idea从Github上获取代码
    Thread线程源码解析,Java线程的状态,线程之间的通信
    dubbo配置启动时检查
  • 原文地址:https://www.cnblogs.com/moonlightml/p/10221081.html
Copyright © 2011-2022 走看看