zoukankan      html  css  js  c++  java
  • MlLib--逻辑回归笔记

    批量梯度下降的逻辑回归可以参考这篇文章:http://blog.csdn.net/pakko/article/details/37878837

    看了一些Scala语法后,打算看看MlLib的机器学习算法的并行化,那就是逻辑回归,找到package org.apache.spark.mllib.classification下的LogisticRegressionWithSGD这个类,直接搜train()函数。

      def train(
          input: RDD[LabeledPoint],
          numIterations: Int,
          stepSize: Double,
          miniBatchFraction: Double,
          initialWeights: Vector): LogisticRegressionModel = {
        new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
          .run(input, initialWeights)
      }
    

    发现它调用了GeneralizedLinearAlgorithm下的一个run函数,这个类GeneralizedLinearAlgorithm是个抽象类,并且在GeneralizedLinearAlgorithm.scala文件下,并且类LogisticRegressionWithSGD是继承了GeneralizedLinearAlgorithm

      def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
    
        if (numFeatures < 0) {
          numFeatures = input.map(_.features.size).first()
        }
    
        if (input.getStorageLevel == StorageLevel.NONE) {
          logWarning("The input data is not directly cached, which may hurt performance if its"
            + " parent RDDs are also uncached.")
        }
    
        // Check the data properties before running the optimizer
        if (validateData && !validators.forall(func => func(input))) {
          throw new SparkException("Input validation failed.")
        }
    
        /**
         * Scaling columns to unit variance as a heuristic to reduce the condition number:
         *
         * During the optimization process, the convergence (rate) depends on the condition number of
         * the training dataset. Scaling the variables often reduces this condition number
         * heuristically, thus improving the convergence rate. Without reducing the condition number,
         * some training datasets mixing the columns with different scales may not be able to converge.
         *
         * GLMNET and LIBSVM packages perform the scaling to reduce the condition number, and return
         * the weights in the original scale.
         * See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf
         *
         * Here, if useFeatureScaling is enabled, we will standardize the training features by dividing
         * the variance of each column (without subtracting the mean), and train the model in the
         * scaled space. Then we transform the coefficients from the scaled space to the original scale
         * as GLMNET and LIBSVM do.
         *
         * Currently, it's only enabled in LogisticRegressionWithLBFGS
         */
        val scaler = if (useFeatureScaling) {
          new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features))
        } else {
          null
        }
    
        // Prepend an extra variable consisting of all 1.0's for the intercept.
        // TODO: Apply feature scaling to the weight vector instead of input data.
        val data =
          if (addIntercept) {
            if (useFeatureScaling) {
              input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache()
            } else {
              input.map(lp => (lp.label, appendBias(lp.features))).cache()
            }
          } else {
            if (useFeatureScaling) {
              input.map(lp => (lp.label, scaler.transform(lp.features))).cache()
            } else {
              input.map(lp => (lp.label, lp.features))
            }
          }
    
        /**
         * TODO: For better convergence, in logistic regression, the intercepts should be computed
         * from the prior probability distribution of the outcomes; for linear regression,
         * the intercept should be set as the average of response.
         */
        val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) {
          appendBias(initialWeights)
        } else {
          /** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */
          initialWeights
        }
    
        val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept) //这里进入优化
    
        val intercept = if (addIntercept && numOfLinearPredictor == 1) {
          weightsWithIntercept(weightsWithIntercept.size - 1)
        } else {
          0.0
        }
    
        var weights = if (addIntercept && numOfLinearPredictor == 1) {
          Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1))
        } else {
          weightsWithIntercept
        }
    
        /**
         * The weights and intercept are trained in the scaled space; we're converting them back to
         * the original scale.
         *
         * Math shows that if we only perform standardization without subtracting means, the intercept
         * will not be changed. w_i = w_i' / v_i where w_i' is the coefficient in the scaled space, w_i
         * is the coefficient in the original space, and v_i is the variance of the column i.
         */
        if (useFeatureScaling) {
          if (numOfLinearPredictor == 1) {
            weights = scaler.transform(weights)
          } else {
            /**
             * For `numOfLinearPredictor > 1`, we have to transform the weights back to the original
             * scale for each set of linear predictor. Note that the intercepts have to be explicitly
             * excluded when `addIntercept == true` since the intercepts are part of weights now.
             */
            var i = 0
            val n = weights.size / numOfLinearPredictor
            val weightsArray = weights.toArray
            while (i < numOfLinearPredictor) {
              val start = i * n
              val end = (i + 1) * n - { if (addIntercept) 1 else 0 }
    
              val partialWeightsArray = scaler.transform(
                Vectors.dense(weightsArray.slice(start, end))).toArray
    
              System.arraycopy(partialWeightsArray, 0, weightsArray, start, partialWeightsArray.size)
              i += 1
            }
            weights = Vectors.dense(weightsArray)
          }
        }
    
        // Warn at the end of the run as well, for increased visibility.
        if (input.getStorageLevel == StorageLevel.NONE) {
          logWarning("The input data was not directly cached, which may hurt performance if its"
            + " parent RDDs are also uncached.")
        }
    
        // Unpersist cached data
        if (data.getStorageLevel != StorageLevel.NONE) {
          data.unpersist(false)
        }
    
        createModel(weights, intercept)
      }
    

     在上面代码中的optimizer.optimize,传入了数据data和初始化的theta,然后optimizer在LogisticRegressionWithSGD中被初始化为:

    class LogisticRegressionWithSGD private[mllib] (
        private var stepSize: Double,
        private var numIterations: Int,
        private var regParam: Double,
        private var miniBatchFraction: Double)
      extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {
    
      private val gradient = new LogisticGradient()
      private val updater = new SquaredL2Updater()
      @Since("0.8.0")
      override val optimizer = new GradientDescent(gradient, updater)
        .setStepSize(stepSize)
        .setNumIterations(numIterations)
        .setRegParam(regParam)
        .setMiniBatchFraction(miniBatchFraction)
      override protected val validators = List(DataValidators.binaryLabelValidator)
    
      /**
       * Construct a LogisticRegression object with default parameters: {stepSize: 1.0,
       * numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}.
       */
      @Since("0.8.0")
      def this() = this(1.0, 100, 0.01, 1.0)
    
      override protected[mllib] def createModel(weights: Vector, intercept: Double) = {
        new LogisticRegressionModel(weights, intercept)
      }
    }
    

     optimizer被赋值为GradientDescent(gradient, updater),然后又看GradientDescent这个类:

    class GradientDescent private[spark] (private var gradient: Gradient, private var updater: Updater)
      extends Optimizer with Logging {
    
      private var stepSize: Double = 1.0
      private var numIterations: Int = 100
      private var regParam: Double = 0.0
      private var miniBatchFraction: Double = 1.0
      private var convergenceTol: Double = 0.001
    
      ...
      @DeveloperApi
      def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
        val (weights, _) = GradientDescent.runMiniBatchSGD(
          data,
          gradient,
          updater,
          stepSize,
          numIterations,
          regParam,
          miniBatchFraction,
          initialWeights,
          convergenceTol)
        weights
      }
    }
    

     发现调用的是随机梯度下降的miniBatch方法,runMiniBatchSGD:

      def runMiniBatchSGD(
          data: RDD[(Double, Vector)],
          gradient: Gradient,
          updater: Updater,
          stepSize: Double,
          numIterations: Int,
          regParam: Double,
          miniBatchFraction: Double,
          initialWeights: Vector,
          convergenceTol: Double): (Vector, Array[Double]) = {
    
        // convergenceTol should be set with non minibatch settings
        if (miniBatchFraction < 1.0 && convergenceTol > 0.0) {
          logWarning("Testing against a convergenceTol when using miniBatchFraction " +
            "< 1.0 can be unstable because of the stochasticity in sampling.")
        }
    
        val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
        // Record previous weight and current one to calculate solution vector difference
    
        var previousWeights: Option[Vector] = None
        var currentWeights: Option[Vector] = None
    
        val numExamples = data.count()
    
        // if no data, return initial weights to avoid NaNs
        if (numExamples == 0) {
          logWarning("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
          return (initialWeights, stochasticLossHistory.toArray)
        }
    
        if (numExamples * miniBatchFraction < 1) {
          logWarning("The miniBatchFraction is too small")
        }
    
        // Initialize weights as a column vector
        var weights = Vectors.dense(initialWeights.toArray)
        val n = weights.size
    
        /**
         * For the first iteration, the regVal will be initialized as sum of weight squares
         * if it's L2 updater; for L1 updater, the same logic is followed.
         */
        var regVal = updater.compute(
          weights, Vectors.zeros(weights.size), 0, 1, regParam)._2 //计算正则化的值
    
        var converged = false // indicates whether converged based on convergenceTol
        var i = 1
        while (!converged && i <= numIterations) { //迭代开始,在小于最大迭代数的时候不断运行
          val bcWeights = data.context.broadcast(weights)
          // Sample a subset (fraction miniBatchFraction) of the total data
          // compute and sum up the subgradients on this subset (this is one map-reduce)
          val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
            .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
              seqOp = (c, v) => {
                // c: (grad, loss, count), v: (label, features)
                val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1)) //计算一个batch中每条数据的梯度
                (c._1, c._2 + l, c._3 + 1)
              },
              combOp = (c1, c2) => {
                // c: (grad, loss, count)
                (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) //将batch中所有数据的梯度相加,损失函数值相加,记录batch的size
              })
    
          if (miniBatchSize > 0) {
            /**
             * lossSum is computed using the weights from the previous iteration
             * and regVal is the regularization value computed in the previous iteration as well.
             */
            stochasticLossHistory.append(lossSum / miniBatchSize + regVal) //原来损失函数是这样计算batch的总损失值除以batchSize再加上正则化值
            val update = updater.compute(
              weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), //更新权重和下次的正则化值
              stepSize, i, regParam)
            weights = update._1
            regVal = update._2
    
            previousWeights = currentWeights
            currentWeights = Some(weights)
            if (previousWeights != None && currentWeights != None) {
              converged = isConverged(previousWeights.get,
                currentWeights.get, convergenceTol)
            }
          } else {
            logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
          }
          i += 1
        }
    
        logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
          stochasticLossHistory.takeRight(10).mkString(", ")))
    
        (weights, stochasticLossHistory.toArray)
    
      }
    

     发现要对Batch中每一条数据计算梯度,调用的是gradient.compute函数,对于二值分类:

      override def compute(
          data: Vector,
          label: Double,
          weights: Vector,
          cumGradient: Vector): Double = {
        val dataSize = data.size
    
        // (weights.size / dataSize + 1) is number of classes
        require(weights.size % dataSize == 0 && numClasses == weights.size / dataSize + 1)
        numClasses match {
          case 2 =>
            /**
             * For Binary Logistic Regression.
             *
             * Although the loss and gradient calculation for multinomial one is more generalized,
             * and multinomial one can also be used in binary case, we still implement a specialized
             * binary version for performance reason.
             */
            val margin = -1.0 * dot(data, weights)
            val multiplier = (1.0 / (1.0 + math.exp(margin))) - label
            axpy(multiplier, data, cumGradient) //梯度的计算就是multiplier * data即,(h(x) - y)*x
            if (label > 0) {
              // The following is equivalent to log(1 + exp(margin)) but more numerically stable.
              MLUtils.log1pExp(margin) //返回损失函数值
            } else {
              MLUtils.log1pExp(margin) - margin
            }
            ... //下面有多分类,还没看
    }
    

     利用treeAggregate并行化batch所有数据后,得到gradientSum要除以miniBatchSize,然后进入updater.compute进行权重theta和正则化值的更新,为了下一次迭代:

    @DeveloperApi
    class SquaredL2Updater extends Updater {
      override def compute(
          weightsOld: Vector,
          gradient: Vector,
          stepSize: Double,
          iter: Int,
          regParam: Double): (Vector, Double) = {
        // add up both updates from the gradient of the loss (= step) as well as
        // the gradient of the regularizer (= regParam * weightsOld)
        // w' = w - thisIterStepSize * (gradient + regParam * w)
        // w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient //这个就是权重更新的迭代式子,这个是L2正则化后的更新,神奇的是(1 - thisIterStepSize * regParam)
        val thisIterStepSize = stepSize / math.sqrt(iter)                           //记得更新式子不是w‘ = w - alpha*gradient alpha就是学习率也就是thisIterStepSize
        val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector              //你会发现alpha = thisIterStepSize = 1/sqrt(iter)也就是随着迭代次数越多学习率越低,迈出的步伐越小
        brzWeights :*= (1.0 - thisIterStepSize * regParam)
        brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
        val norm = brzNorm(brzWeights, 2.0)
    
        (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)              //正则化值就是w'的二范数的平方乘以正则化参数regParam乘以0.5
      }
    }
    
  • 相关阅读:
    Linux篇---ftp服务器的搭建
    【Spark篇】---SparkStreaming+Kafka的两种模式receiver模式和Direct模式
    【Spark篇】---Spark故障解决(troubleshooting)
    【Spark篇】---Spark解决数据倾斜问题
    【Spark篇】---Spark调优之代码调优,数据本地化调优,内存调优,SparkShuffle调优,Executor的堆外内存调优
    【Redis篇】Redis持久化方式AOF和RDB
    【Redis篇】Redis集群安装与初始
    【Redis篇】初始Redis与Redis安装
    Git提示“warning: LF will be replaced by CRLF”
    Git 忽略特殊文件
  • 原文地址:https://www.cnblogs.com/Key-Ky/p/5246093.html
Copyright © 2011-2022 走看看