MLlib:Machine Learning Library。主要内容包括:
- 数据类型
- 统计工具
- summary statistics
- correlations
- stratified sampling
- hypothesis testing
- random data generation
- 分类和回归
- 线性模型(SVM,逻辑回归,线性回归)
- 朴素贝叶斯
- 决策树
- ensembles of trees(随机森林和Gradient-Boosted Trees)
- isotonic regression
- 协同过滤
- ALS(alternating least squares)
- 聚类
- k-means
- 高斯混合模型
- power iteration clustering(PIC)
- LDA(latent Dirichlet allocation)
- 流式k-means
- 降维
- SVD
- PCA
- 特征提取和转换
- Frequent pattern mining
- FP-growth
- 优化
- stochastic gradient descent
- limited-memory BFGS (L-BFGS)
I.数据类型
MLlib的数据类型主要是local vectors和local matrices,潜在的代数操作由Breeze和jblas提供。
1.local vector 有int型和double型,下标从0开始,分为dense和sparse 两种。
Local vector的基本类型是Vector,包括:DenseVector和SparseVector。
import org.apache.spark.mllib.linalg.{Vector, Vectors} // Create a dense vector (1.0, 0.0, 3.0). val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries. val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries. val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
Scala imports scala.collection.immutable.Vector by default, so you have to import org.apache.spark.mllib.linalg.Vector explicitly to use MLlib’s Vector.
- MLlib中一个监督学习的训练样本被称为“labeled point”。
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint // Create a labeled point with a positive label and a dense feature vector. val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) // Create a labeled point with a negative label and a sparse feature vector. val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
- MLlib supports reading training examples stored in
LIBSVMformat, which is the default format used byLIBSVMandLIBLINEAR.
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
2.local matrix
A local matrix has integer-typed row and column indices and double-typed values, stored on a single machine. MLlib supports dense matrices, whose entry values are stored in a single double array in column major.
import org.apache.spark.mllib.linalg.{Matrix, Matrices} // Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
- A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs.It is very important to choose the right format to store large and distributed matrices. A
RowMatrixis a row-oriented distributed matrix without meaningful row indices, e.g., a collection of feature vectors. It is backed by an RDD of its rows, where each row is a local vector. We assume that the number of columns is not huge. AnIndexedRowMatrixis similar to aRowMatrixbut with row indices, which can be used for identifying rows and executing joins. ACoordinateMatrixis a distributed matrix stored in coordinate list (COO) format, backed by an RDD of its entries.
import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.distributed.RowMatrix val rows: RDD[Vector] = ... // an RDD of local vectors // Create a RowMatrix from an RDD[Vector]. val mat: RowMatrix = new RowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols() import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} val rows: RDD[IndexedRow] = ... // an RDD of indexed rows // Create an IndexedRowMatrix from an RDD[IndexedRow]. val mat: IndexedRowMatrix = new IndexedRowMatrix(rows) // Drop its row indices. val rowMat: RowMatrix = mat.toRowMatrix() import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries // Create a CoordinateMatrix from an RDD[MatrixEntry]. val mat: CoordinateMatrix = new CoordinateMatrix(entries)// Convert it to an IndexRowMatrix whose rows are sparse vectors. val indexedRowMatrix = mat.toIndexedRowMatrix()
- A
BlockMatrixis a distributed matrix backed by an RDD ofMatrixBlocks, where aMatrixBlockis a tuple of((Int, Int), Matrix), where the(Int, Int)is the index of the block, andMatrixis the sub-matrix at the given index.BlockMatrixsupports methods such asaddandmultiplywith anotherBlockMatrix.ABlockMatrixcan be most easily created from anIndexedRowMatrixorCoordinateMatrixby callingtoBlockMatrix.
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries // Create a CoordinateMatrix from an RDD[MatrixEntry]. val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) // Transform the CoordinateMatrix to a BlockMatrix val matA: BlockMatrix = coordMat.toBlockMatrix().cache() // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid. // Nothing happens if it is valid. matA.validate() // Calculate A^T A. val ata = matA.transpose.multiply(matA)
用到什么model先看介绍,再查API doc: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package