批量梯度下降的逻辑回归可以参考这篇文章:http://blog.csdn.net/pakko/article/details/37878837
看了一些Scala语法后,打算看看MlLib的机器学习算法的并行化,那就是逻辑回归,找到package org.apache.spark.mllib.classification下的LogisticRegressionWithSGD这个类,直接搜train()函数。
def train( input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double, initialWeights: Vector): LogisticRegressionModel = { new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction) .run(input, initialWeights) }
发现它调用了GeneralizedLinearAlgorithm下的一个run函数,这个类GeneralizedLinearAlgorithm是个抽象类,并且在GeneralizedLinearAlgorithm.scala文件下,并且类LogisticRegressionWithSGD是继承了GeneralizedLinearAlgorithm
def run(input: RDD[LabeledPoint], initialWeights: Vector): M = { if (numFeatures < 0) { numFeatures = input.map(_.features.size).first() } if (input.getStorageLevel == StorageLevel.NONE) { logWarning("The input data is not directly cached, which may hurt performance if its" + " parent RDDs are also uncached.") } // Check the data properties before running the optimizer if (validateData && !validators.forall(func => func(input))) { throw new SparkException("Input validation failed.") } /** * Scaling columns to unit variance as a heuristic to reduce the condition number: * * During the optimization process, the convergence (rate) depends on the condition number of * the training dataset. Scaling the variables often reduces this condition number * heuristically, thus improving the convergence rate. Without reducing the condition number, * some training datasets mixing the columns with different scales may not be able to converge. * * GLMNET and LIBSVM packages perform the scaling to reduce the condition number, and return * the weights in the original scale. * See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf * * Here, if useFeatureScaling is enabled, we will standardize the training features by dividing * the variance of each column (without subtracting the mean), and train the model in the * scaled space. Then we transform the coefficients from the scaled space to the original scale * as GLMNET and LIBSVM do. * * Currently, it's only enabled in LogisticRegressionWithLBFGS */ val scaler = if (useFeatureScaling) { new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features)) } else { null } // Prepend an extra variable consisting of all 1.0's for the intercept. // TODO: Apply feature scaling to the weight vector instead of input data. val data = if (addIntercept) { if (useFeatureScaling) { input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache() } else { input.map(lp => (lp.label, appendBias(lp.features))).cache() } } else { if (useFeatureScaling) { input.map(lp => (lp.label, scaler.transform(lp.features))).cache() } else { input.map(lp => (lp.label, lp.features)) } } /** * TODO: For better convergence, in logistic regression, the intercepts should be computed * from the prior probability distribution of the outcomes; for linear regression, * the intercept should be set as the average of response. */ val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) { appendBias(initialWeights) } else { /** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */ initialWeights } val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept) //这里进入优化 val intercept = if (addIntercept && numOfLinearPredictor == 1) { weightsWithIntercept(weightsWithIntercept.size - 1) } else { 0.0 } var weights = if (addIntercept && numOfLinearPredictor == 1) { Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)) } else { weightsWithIntercept } /** * The weights and intercept are trained in the scaled space; we're converting them back to * the original scale. * * Math shows that if we only perform standardization without subtracting means, the intercept * will not be changed. w_i = w_i' / v_i where w_i' is the coefficient in the scaled space, w_i * is the coefficient in the original space, and v_i is the variance of the column i. */ if (useFeatureScaling) { if (numOfLinearPredictor == 1) { weights = scaler.transform(weights) } else { /** * For `numOfLinearPredictor > 1`, we have to transform the weights back to the original * scale for each set of linear predictor. Note that the intercepts have to be explicitly * excluded when `addIntercept == true` since the intercepts are part of weights now. */ var i = 0 val n = weights.size / numOfLinearPredictor val weightsArray = weights.toArray while (i < numOfLinearPredictor) { val start = i * n val end = (i + 1) * n - { if (addIntercept) 1 else 0 } val partialWeightsArray = scaler.transform( Vectors.dense(weightsArray.slice(start, end))).toArray System.arraycopy(partialWeightsArray, 0, weightsArray, start, partialWeightsArray.size) i += 1 } weights = Vectors.dense(weightsArray) } } // Warn at the end of the run as well, for increased visibility. if (input.getStorageLevel == StorageLevel.NONE) { logWarning("The input data was not directly cached, which may hurt performance if its" + " parent RDDs are also uncached.") } // Unpersist cached data if (data.getStorageLevel != StorageLevel.NONE) { data.unpersist(false) } createModel(weights, intercept) }
在上面代码中的optimizer.optimize,传入了数据data和初始化的theta,然后optimizer在LogisticRegressionWithSGD中被初始化为:
class LogisticRegressionWithSGD private[mllib] ( private var stepSize: Double, private var numIterations: Int, private var regParam: Double, private var miniBatchFraction: Double) extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable { private val gradient = new LogisticGradient() private val updater = new SquaredL2Updater() @Since("0.8.0") override val optimizer = new GradientDescent(gradient, updater) .setStepSize(stepSize) .setNumIterations(numIterations) .setRegParam(regParam) .setMiniBatchFraction(miniBatchFraction) override protected val validators = List(DataValidators.binaryLabelValidator) /** * Construct a LogisticRegression object with default parameters: {stepSize: 1.0, * numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}. */ @Since("0.8.0") def this() = this(1.0, 100, 0.01, 1.0) override protected[mllib] def createModel(weights: Vector, intercept: Double) = { new LogisticRegressionModel(weights, intercept) } }
optimizer被赋值为GradientDescent(gradient, updater),然后又看GradientDescent这个类:
class GradientDescent private[spark] (private var gradient: Gradient, private var updater: Updater) extends Optimizer with Logging { private var stepSize: Double = 1.0 private var numIterations: Int = 100 private var regParam: Double = 0.0 private var miniBatchFraction: Double = 1.0 private var convergenceTol: Double = 0.001 ... @DeveloperApi def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = { val (weights, _) = GradientDescent.runMiniBatchSGD( data, gradient, updater, stepSize, numIterations, regParam, miniBatchFraction, initialWeights, convergenceTol) weights } }
发现调用的是随机梯度下降的miniBatch方法,runMiniBatchSGD:
def runMiniBatchSGD( data: RDD[(Double, Vector)], gradient: Gradient, updater: Updater, stepSize: Double, numIterations: Int, regParam: Double, miniBatchFraction: Double, initialWeights: Vector, convergenceTol: Double): (Vector, Array[Double]) = { // convergenceTol should be set with non minibatch settings if (miniBatchFraction < 1.0 && convergenceTol > 0.0) { logWarning("Testing against a convergenceTol when using miniBatchFraction " + "< 1.0 can be unstable because of the stochasticity in sampling.") } val stochasticLossHistory = new ArrayBuffer[Double](numIterations) // Record previous weight and current one to calculate solution vector difference var previousWeights: Option[Vector] = None var currentWeights: Option[Vector] = None val numExamples = data.count() // if no data, return initial weights to avoid NaNs if (numExamples == 0) { logWarning("GradientDescent.runMiniBatchSGD returning initial weights, no data found") return (initialWeights, stochasticLossHistory.toArray) } if (numExamples * miniBatchFraction < 1) { logWarning("The miniBatchFraction is too small") } // Initialize weights as a column vector var weights = Vectors.dense(initialWeights.toArray) val n = weights.size /** * For the first iteration, the regVal will be initialized as sum of weight squares * if it's L2 updater; for L1 updater, the same logic is followed. */ var regVal = updater.compute( weights, Vectors.zeros(weights.size), 0, 1, regParam)._2 //计算正则化的值 var converged = false // indicates whether converged based on convergenceTol var i = 1 while (!converged && i <= numIterations) { //迭代开始,在小于最大迭代数的时候不断运行 val bcWeights = data.context.broadcast(weights) // Sample a subset (fraction miniBatchFraction) of the total data // compute and sum up the subgradients on this subset (this is one map-reduce) val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))( seqOp = (c, v) => { // c: (grad, loss, count), v: (label, features) val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1)) //计算一个batch中每条数据的梯度 (c._1, c._2 + l, c._3 + 1) }, combOp = (c1, c2) => { // c: (grad, loss, count) (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) //将batch中所有数据的梯度相加,损失函数值相加,记录batch的size }) if (miniBatchSize > 0) { /** * lossSum is computed using the weights from the previous iteration * and regVal is the regularization value computed in the previous iteration as well. */ stochasticLossHistory.append(lossSum / miniBatchSize + regVal) //原来损失函数是这样计算batch的总损失值除以batchSize再加上正则化值 val update = updater.compute( weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), //更新权重和下次的正则化值 stepSize, i, regParam) weights = update._1 regVal = update._2 previousWeights = currentWeights currentWeights = Some(weights) if (previousWeights != None && currentWeights != None) { converged = isConverged(previousWeights.get, currentWeights.get, convergenceTol) } } else { logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero") } i += 1 } logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format( stochasticLossHistory.takeRight(10).mkString(", "))) (weights, stochasticLossHistory.toArray) }
发现要对Batch中每一条数据计算梯度,调用的是gradient.compute函数,对于二值分类:
override def compute( data: Vector, label: Double, weights: Vector, cumGradient: Vector): Double = { val dataSize = data.size // (weights.size / dataSize + 1) is number of classes require(weights.size % dataSize == 0 && numClasses == weights.size / dataSize + 1) numClasses match { case 2 => /** * For Binary Logistic Regression. * * Although the loss and gradient calculation for multinomial one is more generalized, * and multinomial one can also be used in binary case, we still implement a specialized * binary version for performance reason. */ val margin = -1.0 * dot(data, weights) val multiplier = (1.0 / (1.0 + math.exp(margin))) - label axpy(multiplier, data, cumGradient) //梯度的计算就是multiplier * data即,(h(x) - y)*x if (label > 0) { // The following is equivalent to log(1 + exp(margin)) but more numerically stable. MLUtils.log1pExp(margin) //返回损失函数值 } else { MLUtils.log1pExp(margin) - margin } ... //下面有多分类,还没看 }
利用treeAggregate并行化batch所有数据后,得到gradientSum要除以miniBatchSize,然后进入updater.compute进行权重theta和正则化值的更新,为了下一次迭代:
@DeveloperApi class SquaredL2Updater extends Updater { override def compute( weightsOld: Vector, gradient: Vector, stepSize: Double, iter: Int, regParam: Double): (Vector, Double) = { // add up both updates from the gradient of the loss (= step) as well as // the gradient of the regularizer (= regParam * weightsOld) // w' = w - thisIterStepSize * (gradient + regParam * w) // w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient //这个就是权重更新的迭代式子,这个是L2正则化后的更新,神奇的是(1 - thisIterStepSize * regParam) val thisIterStepSize = stepSize / math.sqrt(iter) //记得更新式子不是w‘ = w - alpha*gradient alpha就是学习率也就是thisIterStepSize val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector //你会发现alpha = thisIterStepSize = 1/sqrt(iter)也就是随着迭代次数越多学习率越低,迈出的步伐越小 brzWeights :*= (1.0 - thisIterStepSize * regParam) brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights) val norm = brzNorm(brzWeights, 2.0) (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm) //正则化值就是w'的二范数的平方乘以正则化参数regParam乘以0.5 } }