数学公式[Mathematical formulation]:
很多标准的机器学习方法都可以归结伟凸优化问题。 例如,寻找凸函数f(w)极小值的任务(w[weights]为d维权值向量,它是函数f的自变量)。比较正式地,我们可以将之写作优化问题:min f(w), w∈Rd,其目标函数如下:
这里向量xi∈Rd(1<=i<=n)是训练样本,yi∈R是相应的标签,标签也是训练出模型之后需要预测的值。当用到的损失函数L(w; x, y)表示为wTx和y(wT是w的转置,转置之后才能和x做矩阵乘法),机器学习方法就可以认为是线性的。MLlib中的几个分类和回归算法就是这个线性类型,是我们这里要讨论的。
目标函数f分为两部分:正则化部分控制模型的复杂度,损失函数部分评估模型在训练数据上的错误率。损失函数L(w;.)一般是w的凸函数。固定的正则化参数(λ>=0)用于调节最小化损失函数和最小化模型复杂段这两个目标之间的平衡(例如,避免过拟合)。
损失函数[Loss functions]:
下表汇总了MLlib支持的损失函数和对应的梯度或者次梯度[1](次梯度可用于不可微函数,没有梯度的限制严格)。
正则化[Regularizers]:
正则化是为了约束训练过程,使其向生成简单模型的方向收敛,从而避免过拟合。MLlib目前支持下面的正则化方法:
注:提到正则化,必须要说一下数学中范数(norm)的概念。范数是具有“长度”概念的函数。最常用的范数是p-范数,其定义为:若x = [x1, x2, x3, …, xn]T,那么||x||p = (|x1| p+ |x2|p + |x3|p + … + |xn|p)1/p 。当p取值为1和2时,就是我们的L1和L2【2-范数的平方除以2】正则化:
1-范数:||x||1=|x1|+|x2|+…+|xn|
2-范数:||x||2=(|x1|2+|x2|2+…+|xn|2)½
由于平滑的特性,L2正则化问题比L1正则化更容易解决。但是,L1正则可以促进产生稀疏的权值,从而产生简单可解释的模型。并 且,L1由于产生稀疏的权值(即大部分取值为0),相当于做了特征选择。不建议做没有正则化的模型训练,特别是训练样本特别少 的时候。
优化[Optimization]:
线性模型底层使用凸函数优化方法来优化目标函数。MLlib中使用了两种方法,SGD和L-BFGS,章节 optimization section会有具体 介绍。当前,绝大多数MLlib的算法API支持随机梯度下降法(SGD),部分支持L-BFGS。
分类[Classification]:
分类旨在将多个条目分到不同的类别。最常见的分类类型是二分类,两个类型通常被比较为正和负。 如果多于两个类型的话,就是多分类。MLlib提供两种线性方法用于分类:线性支持向量机(SVM)和逻辑回归。SVM只支持二分类,而逻辑回归既支持二分类又支持多分类。这两种方法,都提供了L1和L2正则化。训练数据集用RDD[LabeledPoint]表示,其中label是分类类型的索引,从0开始,即0, 1, 2, …。注意在上文的数学公式部分,二分类标签y使用+1和-1标记,这个是为了方便公式化。实际再MLlib中,使用0代替了-1,从而更多分类保持一致。
线性支持向量机[SVMs]:
线性SVM是用于大规模分类任务的一种标准方法。它用到的线性方法上文的等式(1)中已经有说明,使用的损失函数为hinge loss:
默认情况下,线性SVM使用L2正则化做训练。也可以替换为L1正则化,这样就成了线性优化问题。
线性SVM算法输出的是SVM模型。给定一个新的数据点(用x表示),模型基于wTx的值做预测。默认情况下, 如果wTx>=0则结果为正例,否则为负例。
import scala.Tuple2; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.*; import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; public class SVMClassifier { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("SVM Classifier Example"); SparkContext sc = new SparkContext(conf); String path = "data/mllib/sample_libsvm_data.txt"; JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); // Split initial RDD into two... [60% training data, 40% testing data]. JavaRDD<LabeledPoint> training = data.sample(false, 0.6, 11L); training.cache(); JavaRDD<LabeledPoint> test = data.subtract(training); // Run training algorithm to build the model. int numIterations = 100; final SVMModel model = SVMWithSGD.train(training.rdd(), numIterations); // Clear the default threshold. model.clearThreshold(); // Compute raw scores on the test set. JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map( new Function<LabeledPoint, Tuple2<Object, Object>>() { public Tuple2<Object, Object> call(LabeledPoint p) { Double score = model.predict(p.features()); return new Tuple2<Object, Object>(score, p.label()); } } ); // Get evaluation metrics. BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); double auROC = metrics.areaUnderROC(); System.out.println("Area under ROC = " + auROC); // Save and load model model.save(sc, "myModelPath"); SVMModel sameModel = SVMModel.load(sc, "myModelPath"); }}
import org.apache.spark.mllib.optimization.L1Updater; SVMWithSGD svmAlg = new SVMWithSGD();svmAlg.optimizer() .setNumIterations(200) .setRegParam(0.1) .setUpdater(new L1Updater());final SVMModel modelL1 = svmAlg.run(training.rdd());
逻辑回归[Logistic regression]:
逻辑回归广泛应用于二分类问题。它是线性分类方法(参见公式(1)),损失函数是logistic loss:
其中exp是以自然常数e为底的指数函数。对二分类问题来说,这个算法输出一个二分逻辑回归模型。给定一个新的数据点(x),模型 应用下面的逻辑函数给出预测:
其中 z=wTx,f(z)也叫sigmoid函数,它将实数范围内的值映射到(0,1)区间。默认情况下, 如果f(wTx) > 0.5, 输出为正,否则 为负。不过跟线性SVM不同的是,逻辑回归模型的原始输出(f(z)),可以解释为概率(例如,x 是正例的概率)。
二分逻辑回归可以泛化为多项式逻辑回归 ,用于训练和预测多分类问题。例如,对于K个可能的输出,其中一个输出可被选作“中 心点”(pivot),另外的K-1个输出可以分别跟中心点输出进行回归。在MLlib中,第一个类0被选作“中心点”类。可以参考《The Elements of Statistical Learning》的章节4.4了解详情。
对于多分类问题,算法会输出一个多项式回归模型,它包含K-1个跟第一类配对的二元回归模型。给定一个新的数据点,K-1个模型都会被执行,概率最大的类会被选作预测的类型。
import scala.Tuple2; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS; import org.apache.spark.mllib.evaluation.MulticlassMetrics; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; public class MultinomialLogisticRegressionExample { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("SVM Classifier Example"); SparkContext sc = new SparkContext(conf); String path = "data/mllib/sample_libsvm_data.txt"; JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD(); // Split initial RDD into two... [60% training data, 40% testing data]. JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[] {0.6, 0.4}, 11L); JavaRDD<LabeledPoint> training = splits[0].cache(); JavaRDD<LabeledPoint> test = splits[1]; // Run training algorithm to build the model. final LogisticRegressionModel model = new LogisticRegressionWithLBFGS() .setNumClasses(10) .run(training.rdd()); // Compute raw scores on the test set. JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map( new Function<LabeledPoint, Tuple2<Object, Object>>() { public Tuple2<Object, Object> call(LabeledPoint p) { Double prediction = model.predict(p.features()); return new Tuple2<Object, Object>(prediction, p.label()); } } ); // Get evaluation metrics. MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd()); double precision = metrics.precision(); System.out.println("Precision = " + precision); // Save and load model model.save(sc, "myModelPath"); LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc, "myModelPath"); }}
回归[Regression]:
线性最小二乘法,Lasso,岭回归[Linear least squares, Lasso, and ridge regression]:
线性最小二乘法是回归问题中最常用的公式,如公式(1)所述它是一个线性方法,损失函数是squared loss:
通过使用不同的正则化方法,上面的回归公式派生出了各种各样相关的回归方法:普通最小二乘法 或 线性最小二乘法 没有使用正则化方法;岭回归 使用了L2正则化; Lasso 使用了 L1 正则化。对上述这些模型,平均的训练误差(见下面的表达式),叫做均方误差。
import scala.Tuple2; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.regression.LinearRegressionModel; import org.apache.spark.mllib.regression.LinearRegressionWithSGD; import org.apache.spark.SparkConf; public class LinearRegression { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Linear Regression Example"); JavaSparkContext sc = new JavaSparkContext(conf); // Load and parse the data String path = "data/mllib/ridge-data/lpsa.data"; JavaRDD<String> data = sc.textFile(path); JavaRDD<LabeledPoint> parsedData = data.map( new Function<String, LabeledPoint>() { public LabeledPoint call(String line) { String[] parts = line.split(","); String[] features = parts[1].split(" "); double[] v = new double[features.length]; for (int i = 0; i < features.length - 1; i++) v[i] = Double.parseDouble(features[i]); return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); } } ); parsedData.cache(); // Building the model int numIterations = 100; final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations); // Evaluate model on training examples and compute training error JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map( new Function<LabeledPoint, Tuple2<Double, Double>>() { public Tuple2<Double, Double> call(LabeledPoint point) { double prediction = model.predict(point.features()); return new Tuple2<Double, Double>(prediction, point.label()); } } ); double MSE = new JavaDoubleRDD(valuesAndPreds.map( new Function<Tuple2<Double, Double>, Object>() { public Object call(Tuple2<Double, Double> pair) { return Math.pow(pair._1() - pair._2(), 2.0); } } ).rdd()).mean(); System.out.println("training Mean Squared Error = " + MSE); // Save and load model model.save(sc.sc(), "myModelPath"); LinearRegressionModel sameModel = LinearRegressionModel.load(sc.sc(), "myModelPath"); }}
流式线性回归[Streaming linear regression]:
当数据以流的方式到达时,就很有必要使回归模型适应在线环境,每当有新的数据到来时就得更新模型参数。MLlib当前支持普通最小二乘法的流式线性回归。拟合过程跟离线情况类似,但是要实时拟合每一批数据,使得模型能够及时持续更新从而能够预测流式数据。(目前只有Scala支持流式线性回归)
实现 [developer]:
幕后,MLlib实现了一个简单分布式版本的随机梯度下降算法,建立在底层的梯度下降优化原语上(参见 optimization )。所有算法都会接受一个正则化参数(regParam)和多个其他的梯度下降相关的参数(stepSize, numIterations, miniBatchFraction)。每种算法都支持三种可能的正则化(none, L1或者L2)。
对于逻辑回归,L-BFGS 版本在LogisticRegressionWithLBFGS中实现。这个版本支持二分逻辑回归和多项式逻辑回归,而SGD版本只能支持二分逻辑回归。但是,L-BFGS版本不支持L1正则化,SGD版本支持L1正则化。当L1不是必须的时候,强烈推荐用L-BFGS版本,因为相对于SGD来说,它通过拟牛顿近似逆Hessian矩阵收敛得更快更准。