zoukankan      html  css  js  c++  java
  • Spark MLlib Data Type

    MLlib 支持存放在单机上的本地向量和矩阵,也支持通过多个RDD实现的分布式矩阵。因此MLlib的数据类型主要分为两大类:一个是本地单机向量;另一个是分布式矩阵。下面分别介绍一下这两大类都有哪些类型:

    1、Local vector(本地向量)

    (1)Vector

      最基本的类型是Vector,该类型索引是从0开始的整型类型,值类型是double类型。并提供了两个实现:DenseVector and SparseVector。但是一把情况下都是推荐使用工厂方法来创建Vector。如下所示:

    import org.apache.spark.mllib.linalg.{Vector, Vectors}
    
    // Create a dense vector (1.0, 0.0, 3.0).
    val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
    // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
    val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
    // Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
    val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
    

    (2)LabeledPoint

      LabeledPoint类型一般用于有监督的学习算法当中,因为该类型会标记对应的标签。并且第一个参数就是标签,第二个参数是一个vector类型的数据。

    val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
    

    (3)Local matrix

      Local matrix是有一个int类型的行索引和列索引,和double类型的值。并且存储在单机。Local matrix最基本的类型是Matrix ,也提供了两个实现类型:DenseMatrix, and SparseMatrix。但是依伴推荐使用工厂方法:Matrices 。 如下所示:

        val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
        val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
    

    2、Distributed matrix(分布式矩阵)

    (1)RowMatrix

      RowMatrix矩阵是一个基于行的,且没有索引的一个分布式矩阵,它的所有行组成一个RDD,它的每一行是一个local Vector。由于它的行类型是Local Vector,所以它的列应该是有限的。因为它必须能保证能够存储在一台机器内。如下所示:

        val rows = sc.textFile("/user/liujiyu/spark/mldata1.txt")
          .map(_.split(' ') //     转换为RDD[Array[String]]类型
            .map(_.toDouble)) //            转换为RDD[Array[Double]]类型
          .map(line => Vectors.dense(line)) //转换为RDD[Vector]类型
    
        // Create a RowMatrix from an RDD[Vector].
        val mat: RowMatrix = new RowMatrix(rows)
    

    (2)IndexedRowMatrix

      IndexedRowMatrix类型与RowMatrix类型相似,但是IndexedRowMatrix拥有强大的行索引。IndexedRowMatrix能够由RDD[IndexedRow]创建,而IndexedRow是由(Long,Vector)封装。

    val rows1 = sc.textFile("/user/liujiyu/spark/mldata1.txt")
          .map(_.split(' ') //     转换为RDD[Array[String]]类型
            .map(_.toDouble)) //            转换为RDD[Array[Double]]类型
          .map(line => Vectors.dense(line)) //转换为RDD[Vector]类型
          .map((vc) => new IndexedRow(vc.size, vc)) //IndexedRow 带有行索引的矩阵,初始化的参数,列数和每一行的vector
    
    val irm = new IndexedRowMatrix(rows1)
    

    (3)CoordinateMatrix(坐标矩阵)

      CoordinateMatrix是一个分布式矩阵,它是由Entry组成的一个RDD,每一个Entry是由(i:Long,j:Long,value:Double)封装。这里的i表示的是行索引,j表示的是列索引,value表示的对应的值。CoordinateMatrix能够通过RDD[MatrixEntry]来创建。如果矩阵是非常大的而且稀疏,坐标矩阵一定是最好的选择。坐标矩阵则是通过RDD[MatrixEntry]实例创建,MatrixEntry是(long,long.Double)封装形式。如下所示:

    对应的矩阵文件mldata1.txt:
                      1 1 4
                      2 6 2
                      1 3 4
                      2 3 4
                      2 8 1
                      3 2 4
                      5 1 3
    读取该文件,并初始化为CoordinateMatrix:

    val rows2 = sc.textFile("/user/liujiyu/spark/mldata1.txt")
          .map(_.split(' ') //     转换为RDD[Array[String]]类型
      //      .map(_.toDouble)) //            转换为RDD[Array[Double]]类型
          .map(m => (m(0).toLong, m(1).toLong, m(2).toDouble))
          .map((vc) => new MatrixEntry(vc._1, vc._2, vc._3)) //IndexedRow 带有行索引的矩阵,初始化的参数,列数和每一行的vector
    
        val cm = new CoordinateMatrix(rows2)
    

    (4)BlockMatrix

      BlockMatrix是一个分布式矩阵,它是由MatrixBlocks组成的一个RDD 。这里的MatrixBlocks是由字典类型((Int,Int),Matrix)组成。这里(Int,Int)是block的索引,Matrix是这个给定的尺寸rowsPerBlock x colsPerBlock的子矩阵。

      BlockMatrix能够容易通过IndexedRowMatrix or CoordinateMatrixtoBlockMatrix方法来创建。toBlockMatrix方法默认创建的blocks的大小是1024*1024。用户可以通过传递参数的方式来改变这个blocks的大小,如:toBlockMatrix(rowsPerBlock, colsPerBlock)

        //A BlockMatrix can be most easily created from an IndexedRowMatrix or CoordinateMatrix by calling toBlockMatrix.
        
        val matA: BlockMatrix = cm.toBlockMatrix().cache()
    
        // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
        // Nothing happens if it is valid.
        matA.validate()
    
        // Calculate A^T A.
        val ata = matA.transpose.multiply(matA)
    

         

  • 相关阅读:
    控件
    ASP.NET简介与Repeater重复器
    WinForm简介
    ADO.net测试题
    6.08练习
    高级查询几种方法
    数据库查询的几种方式
    MySQL更新(修改、查询)
    create(创建) table(表格) student<表名>
    候选键,主键,外键
  • 原文地址:https://www.cnblogs.com/ljy2013/p/5101355.html
Copyright © 2011-2022 走看看