import org.apache.spark.{SparkConf, SparkContext} object radomSampleU { def main(args: Array[String]) { val conf = new SparkConf().setAppName("WordCount_groupBy") .setMaster("local") // .set("spark.default.parallelism", "100") // 1. 调节并行度 .set("spark.executor.memory ","4g") .set("spark.executor.cores","5") .set("spark.executor.nums","4")//1 //静态内存机制 .set("spark.memory.useLegacyMode","false") .set("spark.storage.memoryFraction", "0.3")// 5.cache占用的内存占比,default=0.6 //统一内存机制 .set("spark.memory.Fraction","0.3")//default=0.6 .set("spark.storage.storageFraction","0.9")//default=0.5 .set("spark.shuffle.consolidateFiles", "false") //过滤多余日志文件 val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val startTime=System.currentTimeMillis() val inpath= "F:\hml\dataset\1021\1021\####.txt"// val lines = sc.textFile(inpath)//.cache() // 读取本地文件 val words = lines.flatMap(_.split(" ")).filter(word => word != " ") // 拆分单词,并过滤掉空格,当然还可以继续进行过滤,如去掉标点符号 //sample采样测试 words.partitions.size println("样本汇总结果***********************************") val wordsample=words.sample(false,0.0005) wordsample.map(word => (word, 1)) .reduceByKey(_ + _) .collect() .foreach(println) println("总体数据汇总结果*************************") val pairs = words.map(word => (word, 1)) // 在单词拆分的基础上对每个单词实例计数为1, 也就是 word => (word, 1) val start1=System.currentTimeMillis() val wordscount = pairs.reduceByKey(_ + _)//.collect() .foreach(println) // 在每个单词实例计数为1的基础之上统计每个单词在文件中出现的总次数, 即key相同的value相加 val end1=System.currentTimeMillis() wordscount.collect.foreach(println) // 打印结果,使用collect会将集群中的数据收集到当前运行drive的机器上,需要保证单台机器能放得下所有数据 val endTime=System.currentTimeMillis() println("应用总耗时"+(endTime-startTime)) println("reduceByKey耗时"+ (end1-start1)) Thread.sleep(1000000) sc.stop() // 释放资源 } }