zoukankan      html  css  js  c++  java
  • 通过statCounter计算给定的RDD[Double]的统计信息的方法

    需求1:给定一个RDD[Double],进行计算,该RDD的统计信息(count,mean,stdev,max,min)

    代码:

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getName)
        val sc = new SparkContext(conf)
        sc.setLogLevel("warn")
    
        val arr = Array(1.2,3.4,4.4,6.5)
        val rdd = sc.parallelize(arr)
        println(rdd.stats())
    
        sc.stop()
      }
    

    结果:

    该种方式(使用RDD[Double])的stats()的方法适合在单机上对数据两较小的情况下使用,在分布式环境并且数据位于多台机器的情况下,该种方法的执行效率就比较低了

    需求2:给定数据集A,计算该数据集中每一列的统计信息

    数据集:

    5.1,3.5,1.4,0.2,Iris-setosa
    4.9,3.0,1.4,0.2,Iris-setosa
    4.7,3.2,1.3,0.2,Iris-setosa
    4.6,3.1,1.5,0.2,Iris-setosa
    5.0,3.6,1.4,0.2,Iris-setosa
    5.4,3.9,1.7,0.4,Iris-setosa
    4.6,3.4,1.4,0.3,Iris-setosa
    5.0,3.4,1.5,0.2,Iris-setosa
    4.4,2.9,1.4,0.2,Iris-setosa
    4.9,3.1,1.5,0.1,Iris-setosa
    5.4,3.7,1.5,0.2,Iris-setosa
    4.8,3.4,1.6,0.2,Iris-setosa
    4.8,3.0,1.4,0.1,Iris-setosa
    4.3,3.0,1.1,0.1,Iris-setosa
    5.8,4.0,1.2,0.2,Iris-setosa
    5.7,4.4,1.5,0.4,Iris-setosa
    5.4,3.9,1.3,0.4,Iris-setosa
    5.1,3.5,1.4,0.3,Iris-setosa
    5.7,3.8,1.7,0.3,Iris-setosa
    5.1,3.8,1.5,0.3,Iris-setosa
    5.4,3.4,1.7,0.2,Iris-setosa
    5.1,3.7,1.5,0.4,Iris-setosa
    4.6,3.6,1.0,0.2,Iris-setosa
    5.1,3.3,1.7,0.5,Iris-setosa
    4.8,3.4,1.9,0.2,Iris-setosa
    5.0,3.0,1.6,0.2,Iris-setosa
    5.0,3.4,1.6,0.4,Iris-setosa
    5.2,3.5,1.5,0.2,Iris-setosa
    5.2,3.4,1.4,0.2,Iris-setosa
    4.7,3.2,1.6,0.2,Iris-setosa
    4.8,3.1,1.6,0.2,Iris-setosa
    5.4,3.4,1.5,0.4,Iris-setosa
    5.2,4.1,1.5,0.1,Iris-setosa
    5.5,4.2,1.4,0.2,Iris-setosa
    4.9,3.1,1.5,0.1,Iris-setosa
    5.0,3.2,1.2,0.2,Iris-setosa
    5.5,3.5,1.3,0.2,Iris-setosa
    4.9,3.1,1.5,0.1,Iris-setosa
    4.4,3.0,1.3,0.2,Iris-setosa
    5.1,3.4,1.5,0.2,Iris-setosa
    5.0,3.5,1.3,0.3,Iris-setosa
    4.5,2.3,1.3,0.3,Iris-setosa
    4.4,3.2,1.3,0.2,Iris-setosa
    5.0,3.5,1.6,0.6,Iris-setosa
    5.1,3.8,1.9,0.4,Iris-setosa
    4.8,3.0,1.4,0.3,Iris-setosa
    5.1,3.8,1.6,0.2,Iris-setosa
    5.3,3.7,1.5,0.2,Iris-setosa
    5.0,3.3,1.4,0.2,Iris-setosa
    7.0,3.2,4.7,1.4,Iris-versicolor
    6.4,3.2,4.5,1.5,Iris-versicolor
    6.9,3.1,4.9,1.5,Iris-versicolor
    6.5,2.8,4.6,1.5,Iris-versicolor
    5.7,2.8,4.5,1.3,Iris-versicolor
    6.3,3.3,4.7,1.6,Iris-versicolor
    4.9,2.4,3.3,1.0,Iris-versicolor
    6.6,2.9,4.6,1.3,Iris-versicolor
    5.2,2.7,3.9,1.4,Iris-versicolor
    5.0,2.0,3.5,1.0,Iris-versicolor
    5.9,3.0,4.2,1.5,Iris-versicolor
    6.0,2.2,4.0,1.0,Iris-versicolor
    6.1,2.9,4.7,1.4,Iris-versicolor
    5.6,2.9,3.6,1.3,Iris-versicolor
    6.7,3.1,4.4,1.4,Iris-versicolor
    5.6,3.0,4.5,1.5,Iris-versicolor
    5.8,2.7,4.1,1.0,Iris-versicolor
    6.2,2.2,4.5,1.5,Iris-versicolor
    5.6,2.5,3.9,1.1,Iris-versicolor
    5.9,3.2,4.8,1.8,Iris-versicolor
    6.1,2.8,4.0,1.3,Iris-versicolor
    6.3,2.5,4.9,1.5,Iris-versicolor
    6.1,2.8,4.7,1.2,Iris-versicolor
    6.4,2.9,4.3,1.3,Iris-versicolor
    6.6,3.0,4.4,1.4,Iris-versicolor
    6.8,2.8,4.8,1.4,Iris-versicolor
    6.7,3.0,5.0,1.7,Iris-versicolor
    6.0,2.9,4.5,1.5,Iris-versicolor
    5.7,2.6,3.5,1.0,Iris-versicolor
    5.5,2.4,3.8,1.1,Iris-versicolor
    5.5,2.4,3.7,1.0,Iris-versicolor
    5.8,2.7,3.9,1.2,Iris-versicolor
    6.0,2.7,5.1,1.6,Iris-versicolor
    5.4,3.0,4.5,1.5,Iris-versicolor
    6.7,3.1,4.7,1.5,Iris-versicolor
    6.3,2.3,4.4,1.3,Iris-versicolor
    5.6,3.0,4.1,1.3,Iris-versicolor
    5.5,2.5,4.0,1.3,Iris-versicolor
    5.5,2.6,4.4,1.2,Iris-versicolor
    6.1,3.0,4.6,1.4,Iris-versicolor
    5.8,2.6,4.0,1.2,Iris-versicolor
    5.0,2.3,3.3,1.0,Iris-versicolor
    5.6,2.7,4.2,1.3,Iris-versicolor
    5.7,3.0,4.2,1.2,Iris-versicolor
    5.7,2.9,4.2,1.3,Iris-versicolor
    6.2,2.9,4.3,1.3,Iris-versicolor
    5.1,2.5,3.0,1.1,Iris-versicolor
    5.7,2.8,4.1,1.3,Iris-versicolor
    6.3,3.3,6.0,2.5,Iris-virginica
    5.8,2.7,5.1,1.9,Iris-virginica
    7.1,3.0,5.9,2.1,Iris-virginica
    6.3,2.9,5.6,1.8,Iris-virginica
    6.5,3.0,5.8,2.2,Iris-virginica
    7.6,3.0,6.6,2.1,Iris-virginica
    4.9,2.5,4.5,1.7,Iris-virginica
    7.3,2.9,6.3,1.8,Iris-virginica
    6.7,2.5,5.8,1.8,Iris-virginica
    7.2,3.6,6.1,2.5,Iris-virginica
    6.5,3.2,5.1,2.0,Iris-virginica
    6.4,2.7,5.3,1.9,Iris-virginica
    6.8,3.0,5.5,2.1,Iris-virginica
    5.7,2.5,5.0,2.0,Iris-virginica
    5.8,2.8,5.1,2.4,Iris-virginica
    6.4,3.2,5.3,2.3,Iris-virginica
    6.5,3.0,5.5,1.8,Iris-virginica
    7.7,3.8,6.7,2.2,Iris-virginica
    7.7,2.6,6.9,2.3,Iris-virginica
    6.0,2.2,5.0,1.5,Iris-virginica
    6.9,3.2,5.7,2.3,Iris-virginica
    5.6,2.8,4.9,2.0,Iris-virginica
    7.7,2.8,6.7,2.0,Iris-virginica
    6.3,2.7,4.9,1.8,Iris-virginica
    6.7,3.3,5.7,2.1,Iris-virginica
    7.2,3.2,6.0,1.8,Iris-virginica
    6.2,2.8,4.8,1.8,Iris-virginica
    6.1,3.0,4.9,1.8,Iris-virginica
    6.4,2.8,5.6,2.1,Iris-virginica
    7.2,3.0,5.8,1.6,Iris-virginica
    7.4,2.8,6.1,1.9,Iris-virginica
    7.9,3.8,6.4,2.0,Iris-virginica
    6.4,2.8,5.6,2.2,Iris-virginica
    6.3,2.8,5.1,1.5,Iris-virginica
    6.1,2.6,5.6,1.4,Iris-virginica
    7.7,3.0,6.1,2.3,Iris-virginica
    6.3,3.4,5.6,2.4,Iris-virginica
    6.4,3.1,5.5,1.8,Iris-virginica
    6.0,3.0,4.8,1.8,Iris-virginica
    6.9,3.1,5.4,2.1,Iris-virginica
    6.7,3.1,5.6,2.4,Iris-virginica
    6.9,3.1,5.1,2.3,Iris-virginica
    5.8,2.7,5.1,1.9,Iris-virginica
    6.8,3.2,5.9,2.3,Iris-virginica
    6.7,3.3,5.7,2.5,Iris-virginica
    6.7,3.0,5.2,2.3,Iris-virginica
    6.3,2.5,5.0,1.9,Iris-virginica
    6.5,3.0,5.2,2.0,Iris-virginica
    6.2,3.4,5.4,2.3,Iris-virginica
    5.9,3.0,5.1,1.8,Iris-virginica  

    该数据集假设是在hdfs上,分布在集群中的不同的机器上,现在需要对该数据集中的1,2,3,4列进行计算统计信息。在该种情况下的处理的方式就跟之前的方式不一样,当然处理的代码的复杂度也相对来说比较复杂,但是对于分布式环境下的数据处理效率来说会比较高

    代码:

    步骤一、

    package _core.Test
    
    import org.apache.spark.util.StatCounter
    import java.lang.Double._
    
    /**
      * Author Mr. Guo
      * Create 2019/5/1 - 0:07
      */
    class NAStatCounter extends Serializable {
      val status: StatCounter = new StatCounter()
      var missing: Long = 0
    
      def add(x: Double): NAStatCounter = {
        if (isNaN(x)) {
          missing += 1
        } else {
          status.merge(x)
        }
        this
      }
    
      def merge(other: NAStatCounter): NAStatCounter = {
        status.merge(other.status)
        missing += 1
        this
      }
    
      override def toString: String = {
        "stats: " + status.toString() + " NaN:" + missing
      }
    }
    
    object NAStatCounter {
      def apply(x: Double) = new NAStatCounter().add(x)
    }
    

     步骤二、

    object TestStatsFunction {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getName)
        val sc = new SparkContext(conf)
        sc.setLogLevel("warn")
    
        val rdd = sc.textFile("hdfs://client/TestFile/aa.data")
        val mid1 = rdd.map(x => x.split(","))
          .filter(_.length == 5)
          .map(x => {
            val arr = x.dropRight(1)
            arr
          }).map(x => {
                x.map(d => NAStatCounter(d.toDouble))
              }).reduce { (x, y) =>
          val re = x.zip(y)
          re.map(x => x._1.merge(x._2))
        }
        mid1.foreach(println _)
        sc.stop()
      }
    

    运行结果:

    对于后面的NaN来说的话,是跟我们的分区数有关系的。

    其实对于步骤二,我们可以对其进行一个改进,将其写成一个方法,然后可以让代码更加的通用:

    def statusWithMissing(rdd: RDD[Array[Double]]): Array[NAStatCounter] = {
        val nastats = rdd.mapPartitions((iter: Iterator[Array[Double]]) => {
          val nas: Array[NAStatCounter] = iter.next().map(d => NAStatCounter(d))
          iter.foreach(arr => {
            nas.zip(arr).foreach { case (a, b) => a.add(b) }
          })
          Iterator(nas)
        })
        nastats.reduce((n1, n2) => {
          n1.zip(n2).map { case (a, b) => a.merge(b) }
        })
      }
    

     然后在这种情况下,我们就可以计算任何的RDD[Array[Double]]。在计算的时候直接进行调用该方法即可,这样代码更加的通用。可以将其当成一个工具类。

     

  • 相关阅读:
    Python学习日记(三) 学习使用dict
    Python学习日记(二) list操作
    Python学习日记(一) String函数使用
    Linux 下查找并删除文件命令
    spring mvc处理静态文件
    集合工具类CollectionUtils、ListUtils、SetUtils、MapUtils探究(转)
    如何选择IO流
    java并发框架Executor介绍
    mybatis如何传入一个list参数
    大规模SOA系统中的分布事务思考
  • 原文地址:https://www.cnblogs.com/Gxiaobai/p/10803494.html
Copyright © 2011-2022 走看看