zoukankan      html  css  js  c++  java
  • 实验 4 RDD 编程初级实践

    注意:spark的编码格式是utf-8,其他的格式会有乱码,所以文件要使用utf-8编码

    pom.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>gao</groupId>
        <artifactId>WordCount</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <spark.version>2.1.0</spark.version>
            <scala.version>2.11</scala.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
    
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <version>2.15.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.6.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.19</version>
                    <configuration>
                        <skip>true</skip>
                    </configuration>
                </plugin>
    
            </plugins>
        </build>
    
    </project>
    View Code

    (1)该系总共有多少学生; 

    (2)该系共开设来多少门课程;

    (3)Tom 同学的总成绩平均分是多少;

    (4)求每名同学的选修的课程门数;

    (5)该系 DataBase 课程共有多少人选修;

    (6)各门课程的平均分是多少;

    (7)使用累加器计算共有多少人选了 DataBase 这门课。

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    object one {
      def main(args: Array[String]) {
        val conf = new SparkConf()
        conf.setMaster("local")
          .setAppName("text1")
        val sc = new SparkContext(conf)
        val rdd = sc.textFile("C:\Users\Administrator\Desktop\Data01.txt")
        //该系总共有多少学生;
        val par=rdd.map( row=>row.split(",")(0))
        var count=par.distinct()
        println("学生总人数:"+count.count())
        //该系共开设来多少门课程;
        val couse=rdd.map( row=>row.split(",")(1))
        println("课程数:"+couse.distinct().count())
       //Tom 同学的总成绩平均分是多少;
        val pare = rdd.filter(row=>row.split(",")(0)=="Tom")
        /*pare.foreach(println)*/
        pare.map(row=>(row.split(",")(0),row.split(",")(2).toInt))
          .mapValues(x=>(x,1))
          .reduceByKey((x,y) => (x._1+y._1,x._2 + y._2))
          .mapValues(x => (x._1 / x._2))
          .collect().foreach(x=>println("Tom的平均成绩:"+x._2))
        //求每名同学的选修的课程门数;
        val pare2 = rdd.map(row=>(row.split(",")(0),row.split(",")(1)))
        pare2.mapValues(x => (x,1)).reduceByKey((x,y) => (" ",x._2 + y._2)).mapValues(x => x._2).foreach(println)
       //该系 DataBase 课程共有多少人选修;
        val pare3 = rdd.filter(row=>row.split(",")(1)=="DataBase")
        println("DataBase的选修人数:"+pare3.count)
        // 各门课程的平均分是多少;
        val pare4 = rdd.map(row=>(row.split(",")(1),row.split(",")(2).toInt))
        pare4.mapValues(x=>(x,1))
          .reduceByKey((x,y) => (x._1+y._1,x._2 + y._2))
          .mapValues(x => (x._1/ x._2))
          .collect().foreach(println)
        //使用累加器计算共有多少人选了 DataBase 这门课。
        val pare5 = rdd.filter(row=>row.split(",")(1)=="DataBase")
          .map(row=>(row.split(",")(1),1))
        val accum = sc.longAccumulator("My Accumulator")
        pare5.values.foreach(x => accum.add(x))
        println("选了 DataBase 这门课的人数:"+accum.value)
      }
    }
    View Code

    2.对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并,并剔除其 中重复的内容,得到一个新文件 C

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.HashPartitioner
    
    object two
    {
      def main(args: Array[String]) {
        val conf = new SparkConf()
        conf.setMaster("local")
          .setAppName("text2")
        val sc = new SparkContext(conf)
        val dataFile = "C:\Users\Administrator\Desktop\data"
        val data = sc.textFile(dataFile,2)
        val res = data.filter(_.trim().length>0).map(line=>(line.trim,"	"))
          .partitionBy(new HashPartitioner(1)).groupByKey().sortByKey().keys
        res.saveAsTextFile("result")
      }
    }
    View Code

    3.每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生 名字,第二个是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到 一个新文件中。

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.HashPartitioner
    
    object three {
      def main(args: Array[String]) {
        val conf = new SparkConf()
        conf.setMaster("local")
          .setAppName("text3")
        val sc = new SparkContext(conf)
        val dataFile = "C:\Users\Administrator\Desktop\data1"
        val data = sc.textFile(dataFile,3)
        val res = data.filter(_.trim().length>0)
          .map(line=>(line.split("	")(0).trim()
            ,line.split("	")(1).trim().toInt))
          .partitionBy(new HashPartitioner(1))
          .groupByKey().map(x => {
          var n = 0
          var sum = 0.0
          for(i <- x._2){
            sum = sum + i
            n = n +1
          }
          val avg = sum/n
          val format = f"$avg%1.2f".toDouble
          (x._1,format)
        })
        res.saveAsTextFile("result1")
      }
    }
    View Code
    https://necydcy.me/
  • 相关阅读:
    MySQL服务器SSD性能问题分析与测试
    MySQL 5.7基于GTID复制的常见问题和修复步骤(一)
    用pt-stalk定位MySQL短暂的性能问题
    服务器IO瓶颈对MySQL性能的影响
    MySQL主从检验一致性工具pt-table-checksum报错的案例分析
    MySQL DROP DB或TABLE场景下借助SQL Thread快速应用binlog恢复方案
    MySQL服务器发生OOM的案例分析
    NUMA导致的MySQL服务器SWAP问题分析与解决方案
    python学习之-- 生成唯一ID
    python练习之-计算器
  • 原文地址:https://www.cnblogs.com/miria-486/p/10519630.html
Copyright © 2011-2022 走看看