zoukankan      html  css  js  c++  java
  • Spark 学习笔记 (三): Spark MLlib库的数据类型

    介绍

    MLlib支持存储在单机上的local vectors和metrices,也支持分布式的matrics(背后通过一或多个RDD实现)。
    local vectors和local matrices都是简单数据类型,作为公共接口使用。
    底层的线性算法操作则由Breeze和jblas来实现。MLlib中,监督学习的一个训练样本,被称为“labeled point”。

    1.Local vector

    存储在单机上的local vector,由一个整数类型的从0开始的索引(indice),double类型的值(value)组成。MLlib支持两种类型的local vectors: dense和sparse。dense vector 背后通过一个double array来表示它的条目值,而sparse vector则由两个并列数组实现:索引(indices)和值(values)。例如,一个vector(1.0, 0.0, 3.0),可以表示成dense格式:[1.0, 0.0, 3.0],也可以表示成sparse格式:(3, [0, 2], [1.0, 3.0]),其中,3就是vector的size。

    import org.apache.spark.mllib.linalg.{Vector, Vectors}
    
    // Create a dense vector (1.0, 0.0, 3.0).
    val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
    // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
    val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
    // Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
    val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
    

    2.Local matrix

    local matrix由整数型的行索引、列索引(indices),以及浮点型的值(values)组成,存储在单机上。MLlib支持dense matrices,它的条目值存储在单个double array上,以列为主(column-major)的顺序。而sparse matrices,它是非零条目值以压缩稀疏列(CSC: Compressed Sparse Column)的格式存储,以列为主(column-major)的顺序。

    local matrices的基类是Matrix,它提供了两种实现:DenseMatrix和SparseMatrix. 我们推荐使用Matrices的工厂方法来创建local matrices。记住,MLlib的local matrices被存成以列为主的顺序。

    import org.apache.spark.mllib.linalg.{Matrix, Matrices}
    
    // Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
    val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
    
    // https://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.mllib.linalg.Matrices$
    // Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
    // colPtrs: Array[Int], rowIndices: Array[Int]
    val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
    

    3.Distributed matrix

    distributed matrix由long型的行索引和列索引,以及double型的值组成,以一或多个RDD的方式分布式存储。选择合适的格式来存储分布式大矩阵相当重要。将一个分布式矩阵转换成一个不同的格式,可能需要一个全局的shuffle,计算代价高昂!至今支持三种类型的分布式矩阵。

    基本类型被称为RowMatrix。

    一个RowMatrix是以行为主的分布式矩阵,它没有行索引,只是一个特征向量的集合。背后由一个包含这些行的RDD实现,其中,每个行(row)都是一个local vector。我们假设,对于一个RowMatrix,列的数目并不大,因而,单个local vector可以合理地与driver进行通信,也可以使用单个节点被存储/操作。

    一个IndexedRowMatrix与一个RowMatrix相似,但有行索引,它可以被用来标识出行(rows)以及正在执行的join操作(executing joins)。

    一个CoordinateMatrix是一个分布式矩阵,以coordinate list(COO)的格式进行存储,后端由一个包含这些条目的RDD实现。

    注意:

    一个分布式矩阵的底层RDD实现必须是确定的(deterministic),因为我们会对matrix size进行cache。总之,使用非确定的RDD会导致errors。

    (1) RowMatrix

    rowMatrix是面向row的分布式矩阵,没有行索引,背后由一个包含这些行的RDD实现,基中,每个行是一个local vector。因为每个row都被表示成一个local vector,列的数目被限制在一个整数范围内,实际使用时会更小。

    RowMatrix可以通过一个RDD[Vector]实例被创建。接着,我们可以计算它的列归纳统计(column summary statistics),以及分解(decompositions)。QR deceompsition的格式:A=QR,其中Q是一个正交矩阵(orthogonal matrix),R是一个上三角矩阵(upper triangular matrix)。对于SVD和PCA,请参考降维这一节。

    import org.apache.spark.mllib.linalg.Vector
    import org.apache.spark.mllib.linalg.distributed.RowMatrix
    
    val rows: RDD[Vector] = ... // an RDD of local vectors
    // Create a RowMatrix from an RDD[Vector].
    val mat: RowMatrix = new RowMatrix(rows)
    
    // Get its size.
    val m = mat.numRows()
    val n = mat.numCols()
    
    // QR decomposition 
    val qrResult = mat.tallSkinnyQR(true)
    

    (2)IndexedRowMatrix

    IndexedRowMatrix与RowMatrix相似,但有行索引。它背后由一个带索引的行的RDD实现,因此,每个行可以被表示成long型索引和local vector。

    一个IndexedRowMatrix由RDD[IndexedRow]实例实现,其中,IndexedRow是一个在(Long,Vector)上的封装wrapper。IndexedRowMatrix可以被转换成一个RowMatrix,通过drop掉它的行索引来完成。

    import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
    
    val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
    // Create an IndexedRowMatrix from an RDD[IndexedRow].
    val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
    
    // Get its size.
    val m = mat.numRows()
    val n = mat.numCols()
    
    // Drop its row indices.
    val rowMat: RowMatrix = mat.toRowMatrix()
    

    (3)CoordinateMatrix

    CoordinateMatrix是一个分布式矩阵,背后由一个包含这些条目(entries)的RDD实现。每个条目(entry)是一个三元组(tuple):(i: Long, j: Long, value: Double), 其中: i是行索引,j是列索引,value是entry value。当矩阵的维度很大,并且很稀疏时,推荐使用CoordinateMatrix。

    CoordinateMatrix可以通过一个RDD[MatrixEntry]实例来创建,其中MatrixEntry是一个(Long,Long,Double)的Wrapper。通过调用toIndexedRowMatrix,一个CoordinateMatrix可以被转化成一个带有稀疏行的IndexedRowMatrix。CoordinateMatrix的其它计算目前不支持。

    import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
    
    val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
    // Create a CoordinateMatrix from an RDD[MatrixEntry].
    val mat: CoordinateMatrix = new CoordinateMatrix(entries)
    
    // Get its size.
    val m = mat.numRows()
    val n = mat.numCols()
    
    // Convert it to an IndexRowMatrix whose rows are sparse vectors.
    val indexedRowMatrix = mat.toIndexedRowMatrix()
    

    (4)BlockMatrix

    BlockMatrix是一个分布式矩阵,背后由一个MatrixBlocks的RDD实现,其中MatrixBlock是一个tuple: ((Int,Int),Matrix),其中(Int,Int)是block的索引,Matrix是由rowsPerBlock x colsPerBlock的sub-matrix。BlockMatrix支持方法:add和multiply。BlockMatrix也有一个helper function:validate,它可以被用于确认BlockMatrix的设置是否正确。

    BlockMatrix 可以由一个IndexedRowMatrix或CoordinateMatrix,通过调用toBlockMatrix很容易地创建。toBlockMatrix缺省会创建1024x1024的blocks。可以通过toBlockMatrix(rowsPerBlock, colsPerBlock)进行修改。

    import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
    
    val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
    // Create a CoordinateMatrix from an RDD[MatrixEntry].
    val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
    // Transform the CoordinateMatrix to a BlockMatrix
    val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
    
    // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
    // Nothing happens if it is valid.
    matA.validate()
    
    // Calculate A^T A.
    val ata = matA.transpose.multiply(matA)
    
  • 相关阅读:
    WorkerMan源码分析(resetStd方法,PHP中STDIN, STDOUT, STDERR的重定向)
    linux:nohup 不生成 nohup.out的方法
    PHP系统编程--PHP进程信号处理(转)
    shell脚本实例总结
    saltstack 迭代项目到客户端并结合jenkins自动发布多台服务器
    自动化运维工具 SaltStack 搭建
    coding利用Webhook实现Push代码后的jenkins自动构建
    基于jquery地图特效全国网点查看代码
    基于jquery判断浏览器版本过低代码
    EntityFramework Model有外键时,Json提示循环引用 解决方法
  • 原文地址:https://www.cnblogs.com/shawshawwan/p/10857577.html
Copyright © 2011-2022 走看看