注意:spark的编码格式是utf-8,其他的格式会有乱码,所以文件要使用utf-8编码
pom.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>gao</groupId> <artifactId>WordCount</artifactId> <version>1.0-SNAPSHOT</version> <properties> <spark.version>2.1.0</spark.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.19</version> <configuration> <skip>true</skip> </configuration> </plugin> </plugins> </build> </project>
(1)该系总共有多少学生;
(2)该系共开设来多少门课程;
(3)Tom 同学的总成绩平均分是多少;
(4)求每名同学的选修的课程门数;
(5)该系 DataBase 课程共有多少人选修;
(6)各门课程的平均分是多少;
(7)使用累加器计算共有多少人选了 DataBase 这门课。
import org.apache.spark.SparkConf import org.apache.spark.SparkContext object one { def main(args: Array[String]) { val conf = new SparkConf() conf.setMaster("local") .setAppName("text1") val sc = new SparkContext(conf) val rdd = sc.textFile("C:\Users\Administrator\Desktop\Data01.txt") //该系总共有多少学生; val par=rdd.map( row=>row.split(",")(0)) var count=par.distinct() println("学生总人数:"+count.count()) //该系共开设来多少门课程; val couse=rdd.map( row=>row.split(",")(1)) println("课程数:"+couse.distinct().count()) //Tom 同学的总成绩平均分是多少; val pare = rdd.filter(row=>row.split(",")(0)=="Tom") /*pare.foreach(println)*/ pare.map(row=>(row.split(",")(0),row.split(",")(2).toInt)) .mapValues(x=>(x,1)) .reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)) .mapValues(x => (x._1 / x._2)) .collect().foreach(x=>println("Tom的平均成绩:"+x._2)) //求每名同学的选修的课程门数; val pare2 = rdd.map(row=>(row.split(",")(0),row.split(",")(1))) pare2.mapValues(x => (x,1)).reduceByKey((x,y) => (" ",x._2 + y._2)).mapValues(x => x._2).foreach(println) //该系 DataBase 课程共有多少人选修; val pare3 = rdd.filter(row=>row.split(",")(1)=="DataBase") println("DataBase的选修人数:"+pare3.count) // 各门课程的平均分是多少; val pare4 = rdd.map(row=>(row.split(",")(1),row.split(",")(2).toInt)) pare4.mapValues(x=>(x,1)) .reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)) .mapValues(x => (x._1/ x._2)) .collect().foreach(println) //使用累加器计算共有多少人选了 DataBase 这门课。 val pare5 = rdd.filter(row=>row.split(",")(1)=="DataBase") .map(row=>(row.split(",")(1),1)) val accum = sc.longAccumulator("My Accumulator") pare5.values.foreach(x => accum.add(x)) println("选了 DataBase 这门课的人数:"+accum.value) } }
2.对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并,并剔除其 中重复的内容,得到一个新文件 C
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner object two { def main(args: Array[String]) { val conf = new SparkConf() conf.setMaster("local") .setAppName("text2") val sc = new SparkContext(conf) val dataFile = "C:\Users\Administrator\Desktop\data" val data = sc.textFile(dataFile,2) val res = data.filter(_.trim().length>0).map(line=>(line.trim," ")) .partitionBy(new HashPartitioner(1)).groupByKey().sortByKey().keys res.saveAsTextFile("result") } }
3.每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生 名字,第二个是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到 一个新文件中。
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner object three { def main(args: Array[String]) { val conf = new SparkConf() conf.setMaster("local") .setAppName("text3") val sc = new SparkContext(conf) val dataFile = "C:\Users\Administrator\Desktop\data1" val data = sc.textFile(dataFile,3) val res = data.filter(_.trim().length>0) .map(line=>(line.split(" ")(0).trim() ,line.split(" ")(1).trim().toInt)) .partitionBy(new HashPartitioner(1)) .groupByKey().map(x => { var n = 0 var sum = 0.0 for(i <- x._2){ sum = sum + i n = n +1 } val avg = sum/n val format = f"$avg%1.2f".toDouble (x._1,format) }) res.saveAsTextFile("result1") } }