zoukankan      html  css  js  c++  java
  • Spark MLlib Deep Learning Deep Belief Network (深度学习-深度信念网络)2.1

    Spark MLlib Deep Learning Deep Belief Network (深度学习-深度信念网络)2.1

    http://blog.csdn.net/sunbow0

    Spark MLlib Deep Learning工具箱,是依据现有深度学习教程《UFLDL教程》中的算法。在SparkMLlib中的实现。详细Spark MLlib Deep Learning(深度学习)文件夹结构:

    第一章Neural Net(NN)

    1、源代码

    2、源代码解析

    3、实例

    第二章Deep Belief Nets(DBNs)

    1、源代码

    2、源代码解析

    3、实例

    第三章Convolution Neural Network(CNN)

    第四章 Stacked Auto-Encoders(SAE)

    第五章CAE

    第二章Deep Belief Network (深度信念网络)

    1源代码

    眼下Spark MLlib Deep Learning工具箱源代码的github地址为:

    https://github.com/sunbow1/SparkMLlibDeepLearn

    1.1 DBN代码 

    package DBN
    
    import org.apache.spark._
    import org.apache.spark.SparkContext._
    import org.apache.spark.rdd.RDD
    import org.apache.spark.Logging
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.linalg._
    import org.apache.spark.mllib.linalg.distributed.RowMatrix
    
    import breeze.linalg.{
      Matrix => BM,
      CSCMatrix => BSM,
      DenseMatrix => BDM,
      Vector => BV,
      DenseVector => BDV,
      SparseVector => BSV,
      axpy => brzAxpy,
      svd => brzSvd
    }
    import breeze.numerics.{
      exp => Bexp,
      tanh => Btanh
    }
    
    import scala.collection.mutable.ArrayBuffer
    import java.util.Random
    import scala.math._
    
    /**
     * W:权重
     * b:偏置
     * c:偏置
     */
    case class DBNweight(
      W: BDM[Double],
      vW: BDM[Double],
      b: BDM[Double],
      vb: BDM[Double],
      c: BDM[Double],
      vc: BDM[Double]) extends Serializable
    
    /**
     * 配置參数
     */
    case class DBNConfig(
      size: Array[Int],
      layer: Int,
      momentum: Double,
      alpha: Double) extends Serializable
    
    /**
     * DBN(Deep Belief Network)
     */
    
    class DBN(
      private var size: Array[Int],
      private var layer: Int,
      private var momentum: Double,
      private var alpha: Double) extends Serializable with Logging {
      //                  var size=Array(5, 10, 10)
      //                  var layer=3
      //                  var momentum=0.0
      //                  var alpha=1.0
      /**
       * size = architecture;         网络结构
       * layer = numel(nn.size);      网络层数
       * momentum = 0.0;              Momentum
       * alpha = 1.0;                 alpha
       */
      def this() = this(DBN.Architecture, 3, 0.0, 1.0)
    
      /** 设置神经网络结构. Default: [10, 5, 1]. */
      def setSize(size: Array[Int]): this.type = {
        this.size = size
        this
      }
    
      /** 设置神经网络层数据. Default: 3. */
      def setLayer(layer: Int): this.type = {
        this.layer = layer
        this
      }
    
      /** 设置Momentum. Default: 0.0. */
      def setMomentum(momentum: Double): this.type = {
        this.momentum = momentum
        this
      }
    
      /** 设置alpha. Default: 1. */
      def setAlpha(alpha: Double): this.type = {
        this.alpha = alpha
        this
      }
    
      /**
       * 深度信念网络(Deep Belief Network)
       * 执行训练DBNtrain
       */
      def DBNtrain(train_d: RDD[(BDM[Double], BDM[Double])], opts: Array[Double]): DBNModel = {
        // 參数配置 广播配置
        val sc = train_d.sparkContext
        val dbnconfig = DBNConfig(size, layer, momentum, alpha)
        // 初始化权重
        var dbn_W = DBN.InitialW(size)
        var dbn_vW = DBN.InitialvW(size)
        var dbn_b = DBN.Initialb(size)
        var dbn_vb = DBN.Initialvb(size)
        var dbn_c = DBN.Initialc(size)
        var dbn_vc = DBN.Initialvc(size)
        // 训练第1层
        printf("Training Level: %d.
    ", 1)
        val weight0 = new DBNweight(dbn_W(0), dbn_vW(0), dbn_b(0), dbn_vb(0), dbn_c(0), dbn_vc(0))
        val weight1 = RBMtrain(train_d, opts, dbnconfig, weight0)
        dbn_W(0) = weight1.W
        dbn_vW(0) = weight1.vW
        dbn_b(0) = weight1.b
        dbn_vb(0) = weight1.vb
        dbn_c(0) = weight1.c
        dbn_vc(0) = weight1.vc
        // 打印权重
        printf("dbn_W%d.
    ", 1)
        val tmpw0 = dbn_W(0)
        for (i <- 0 to tmpw0.rows - 1) {
          for (j <- 0 to tmpw0.cols - 1) {
            print(tmpw0(i, j) + "	")
          }
          println()
        }
    
        // 训练第2层 至 n层
        for (i <- 2 to dbnconfig.layer - 1) {
          // 前向计算x
          //  x = sigm(repmat(rbm.c', size(x, 1), 1) + x * rbm.W');
          printf("Training Level: %d.
    ", i)
          val tmp_bc_w = sc.broadcast(dbn_W(i - 2))
          val tmp_bc_c = sc.broadcast(dbn_c(i - 2))
          val train_d2 = train_d.map { f =>
            val lable = f._1
            val x = f._2
            val x2 = DBN.sigm(x * tmp_bc_w.value.t + tmp_bc_c.value.t)
            (lable, x2)
          }
          // 训练第i层
          val weighti = new DBNweight(dbn_W(i - 1), dbn_vW(i - 1), dbn_b(i - 1), dbn_vb(i - 1), dbn_c(i - 1), dbn_vc(i - 1))
          val weight2 = RBMtrain(train_d2, opts, dbnconfig, weighti)
          dbn_W(i - 1) = weight2.W
          dbn_vW(i - 1) = weight2.vW
          dbn_b(i - 1) = weight2.b
          dbn_vb(i - 1) = weight2.vb
          dbn_c(i - 1) = weight2.c
          dbn_vc(i - 1) = weight2.vc
          // 打印权重
          printf("dbn_W%d.
    ", i)
          val tmpw1 = dbn_W(i - 1)
          for (i <- 0 to tmpw1.rows - 1) {
            for (j <- 0 to tmpw1.cols - 1) {
              print(tmpw1(i, j) + "	")
            }
            println()
          }
        }
        new DBNModel(dbnconfig, dbn_W, dbn_b, dbn_c)
      }
    
      /**
       * 深度信念网络(Deep Belief Network)
       * 每一层神经网络进行训练rbmtrain
       */
      def RBMtrain(train_t: RDD[(BDM[Double], BDM[Double])],
        opts: Array[Double],
        dbnconfig: DBNConfig,
        weight: DBNweight): DBNweight = {
        val sc = train_t.sparkContext
        var StartTime = System.currentTimeMillis()
        var EndTime = System.currentTimeMillis()
        // 权重參数变量
        var rbm_W = weight.W
        var rbm_vW = weight.vW
        var rbm_b = weight.b
        var rbm_vb = weight.vb
        var rbm_c = weight.c
        var rbm_vc = weight.vc
        // 广播參数
        val bc_config = sc.broadcast(dbnconfig)
        // 训练样本数量
        val m = train_t.count
        // 计算batch的数量
        val batchsize = opts(0).toInt
        val numepochs = opts(1).toInt
        val numbatches = (m / batchsize).toInt
        // numepochs是循环的次数 
        for (i <- 1 to numepochs) {
          StartTime = System.currentTimeMillis()
          val splitW2 = Array.fill(numbatches)(1.0 / numbatches)
          var err = 0.0
          // 依据分组权重,随机划分每组样本数据  
          for (l <- 1 to numbatches) {
            // 1 广播权重參数
            val bc_rbm_W = sc.broadcast(rbm_W)
            val bc_rbm_vW = sc.broadcast(rbm_vW)
            val bc_rbm_b = sc.broadcast(rbm_b)
            val bc_rbm_vb = sc.broadcast(rbm_vb)
            val bc_rbm_c = sc.broadcast(rbm_c)
            val bc_rbm_vc = sc.broadcast(rbm_vc)
    
            //        // 打印权重
            //        println(i + "	" + l)
            //        val tmpw0 = bc_rbm_W.value
            //        for (i <- 0 to tmpw0.rows - 1) {
            //          for (j <- 0 to tmpw0.cols - 1) {
            //            print(tmpw0(i, j) + "	")
            //          }
            //          println()
            //        }
    
            // 2 样本划分
            val train_split2 = train_t.randomSplit(splitW2, System.nanoTime())
            val batch_xy1 = train_split2(l - 1)
            //        val train_split3 = train_t.filter { f => (f._1 >= batchsize * (l - 1) + 1) && (f._1 <= batchsize * (l)) }
            //        val batch_xy1 = train_split3.map(f => (f._2, f._3))
    
            // 3 前向计算
            // v1 = batch;
            // h1 = sigmrnd(repmat(rbm.c', opts.batchsize, 1) + v1 * rbm.W');
            // v2 = sigmrnd(repmat(rbm.b', opts.batchsize, 1) + h1 * rbm.W);
            // h2 = sigm(repmat(rbm.c', opts.batchsize, 1) + v2 * rbm.W');
            // c1 = h1' * v1;
            // c2 = h2' * v2;
            val batch_vh1 = batch_xy1.map { f =>
              val lable = f._1
              val v1 = f._2
              val h1 = DBN.sigmrnd((v1 * bc_rbm_W.value.t + bc_rbm_c.value.t))
              val v2 = DBN.sigmrnd((h1 * bc_rbm_W.value + bc_rbm_b.value.t))
              val h2 = DBN.sigm(v2 * bc_rbm_W.value.t + bc_rbm_c.value.t)
              val c1 = h1.t * v1
              val c2 = h2.t * v2
              (lable, v1, h1, v2, h2, c1, c2)
            }
    
            // 4 更新前向计算        
            // rbm.vW = rbm.momentum * rbm.vW + rbm.alpha * (c1 - c2)     / opts.batchsize;
            // rbm.vb = rbm.momentum * rbm.vb + rbm.alpha * sum(v1 - v2)' / opts.batchsize;
            // rbm.vc = rbm.momentum * rbm.vc + rbm.alpha * sum(h1 - h2)' / opts.batchsize;
            // W 更新方向
            val vw1 = batch_vh1.map {
              case (lable, v1, h1, v2, h2, c1, c2) =>
                c1 - c2
            }
            val initw = BDM.zeros[Double](bc_rbm_W.value.rows, bc_rbm_W.value.cols)
            val (vw2, countw2) = vw1.treeAggregate((initw, 0L))(
              seqOp = (c, v) => {
                // c: (m, count), v: (m)
                val m1 = c._1
                val m2 = m1 + v
                (m2, c._2 + 1)
              },
              combOp = (c1, c2) => {
                // c: (m, count)
                val m1 = c1._1
                val m2 = c2._1
                val m3 = m1 + m2
                (m3, c1._2 + c2._2)
              })
            val vw3 = vw2 / countw2.toDouble
            rbm_vW = bc_config.value.momentum * bc_rbm_vW.value + bc_config.value.alpha * vw3
            // b 更新方向
            val vb1 = batch_vh1.map {
              case (lable, v1, h1, v2, h2, c1, c2) =>
                (v1 - v2)
            }
            val initb = BDM.zeros[Double](bc_rbm_vb.value.cols, bc_rbm_vb.value.rows)
            val (vb2, countb2) = vb1.treeAggregate((initb, 0L))(
              seqOp = (c, v) => {
                // c: (m, count), v: (m)
                val m1 = c._1
                val m2 = m1 + v
                (m2, c._2 + 1)
              },
              combOp = (c1, c2) => {
                // c: (m, count)
                val m1 = c1._1
                val m2 = c2._1
                val m3 = m1 + m2
                (m3, c1._2 + c2._2)
              })
            val vb3 = vb2 / countb2.toDouble
            rbm_vb = bc_config.value.momentum * bc_rbm_vb.value + bc_config.value.alpha * vb3.t
            // c 更新方向
            val vc1 = batch_vh1.map {
              case (lable, v1, h1, v2, h2, c1, c2) =>
                (h1 - h2)
            }
            val initc = BDM.zeros[Double](bc_rbm_vc.value.cols, bc_rbm_vc.value.rows)
            val (vc2, countc2) = vc1.treeAggregate((initc, 0L))(
              seqOp = (c, v) => {
                // c: (m, count), v: (m)
                val m1 = c._1
                val m2 = m1 + v
                (m2, c._2 + 1)
              },
              combOp = (c1, c2) => {
                // c: (m, count)
                val m1 = c1._1
                val m2 = c2._1
                val m3 = m1 + m2
                (m3, c1._2 + c2._2)
              })
            val vc3 = vc2 / countc2.toDouble
            rbm_vc = bc_config.value.momentum * bc_rbm_vc.value + bc_config.value.alpha * vc3.t
    
            // 5 权重更新
            // rbm.W = rbm.W + rbm.vW;
            // rbm.b = rbm.b + rbm.vb;
            // rbm.c = rbm.c + rbm.vc;
            rbm_W = bc_rbm_W.value + rbm_vW
            rbm_b = bc_rbm_b.value + rbm_vb
            rbm_c = bc_rbm_c.value + rbm_vc
    
            // 6 计算误差
            val dbne1 = batch_vh1.map {
              case (lable, v1, h1, v2, h2, c1, c2) =>
                (v1 - v2)
            }
            val (dbne2, counte) = dbne1.treeAggregate((0.0, 0L))(
              seqOp = (c, v) => {
                // c: (e, count), v: (m)
                val e1 = c._1
                val e2 = (v :* v).sum
                val esum = e1 + e2
                (esum, c._2 + 1)
              },
              combOp = (c1, c2) => {
                // c: (e, count)
                val e1 = c1._1
                val e2 = c2._1
                val esum = e1 + e2
                (esum, c1._2 + c2._2)
              })
            val dbne = dbne2 / counte.toDouble
            err += dbne
          }
          EndTime = System.currentTimeMillis()
          // 打印误差结果
          printf("epoch: numepochs = %d , Took = %d seconds; Average reconstruction error is: %f.
    ", i, scala.math.ceil((EndTime - StartTime).toDouble / 1000).toLong, err / numbatches.toDouble)
        }
        new DBNweight(rbm_W, rbm_vW, rbm_b, rbm_vb, rbm_c, rbm_vc)
      }
    
    }
    
    /**
     * NN(neural network)
     */
    object DBN extends Serializable {
    
      // Initialization mode names
      val Activation_Function = "sigm"
      val Output = "linear"
      val Architecture = Array(10, 5, 1)
    
      /**
       * 初始化权重
       * 初始化为0
       */
      def InitialW(size: Array[Int]): Array[BDM[Double]] = {
        // 初始化权重參数
        // weights and weight momentum
        // dbn.rbm{u}.W  = zeros(dbn.sizes(u + 1), dbn.sizes(u));
        val n = size.length
        val rbm_W = ArrayBuffer[BDM[Double]]()
        for (i <- 1 to n - 1) {
          val d1 = BDM.zeros[Double](size(i), size(i - 1))
          rbm_W += d1
        }
        rbm_W.toArray
      }
    
      /**
       * 初始化权重vW
       * 初始化为0
       */
      def InitialvW(size: Array[Int]): Array[BDM[Double]] = {
        // 初始化权重參数
        // weights and weight momentum
        // dbn.rbm{u}.vW = zeros(dbn.sizes(u + 1), dbn.sizes(u));
        val n = size.length
        val rbm_vW = ArrayBuffer[BDM[Double]]()
        for (i <- 1 to n - 1) {
          val d1 = BDM.zeros[Double](size(i), size(i - 1))
          rbm_vW += d1
        }
        rbm_vW.toArray
      }
    
      /**
       * 初始化偏置向量b
       * 初始化为0
       */
      def Initialb(size: Array[Int]): Array[BDM[Double]] = {
        // 初始化偏置向量b
        // weights and weight momentum
        // dbn.rbm{u}.b  = zeros(dbn.sizes(u), 1);
        val n = size.length
        val rbm_b = ArrayBuffer[BDM[Double]]()
        for (i <- 1 to n - 1) {
          val d1 = BDM.zeros[Double](size(i - 1), 1)
          rbm_b += d1
        }
        rbm_b.toArray
      }
    
      /**
       * 初始化偏置向量vb
       * 初始化为0
       */
      def Initialvb(size: Array[Int]): Array[BDM[Double]] = {
        // 初始化偏置向量b
        // weights and weight momentum
        // dbn.rbm{u}.vb = zeros(dbn.sizes(u), 1);
        val n = size.length
        val rbm_vb = ArrayBuffer[BDM[Double]]()
        for (i <- 1 to n - 1) {
          val d1 = BDM.zeros[Double](size(i - 1), 1)
          rbm_vb += d1
        }
        rbm_vb.toArray
      }
    
      /**
       * 初始化偏置向量c
       * 初始化为0
       */
      def Initialc(size: Array[Int]): Array[BDM[Double]] = {
        // 初始化偏置向量c
        // weights and weight momentum
        // dbn.rbm{u}.c  = zeros(dbn.sizes(u + 1), 1);
        val n = size.length
        val rbm_c = ArrayBuffer[BDM[Double]]()
        for (i <- 1 to n - 1) {
          val d1 = BDM.zeros[Double](size(i), 1)
          rbm_c += d1
        }
        rbm_c.toArray
      }
    
      /**
       * 初始化偏置向量vc
       * 初始化为0
       */
      def Initialvc(size: Array[Int]): Array[BDM[Double]] = {
        // 初始化偏置向量c
        // weights and weight momentum
        // dbn.rbm{u}.vc = zeros(dbn.sizes(u + 1), 1);
        val n = size.length
        val rbm_vc = ArrayBuffer[BDM[Double]]()
        for (i <- 1 to n - 1) {
          val d1 = BDM.zeros[Double](size(i), 1)
          rbm_vc += d1
        }
        rbm_vc.toArray
      }
    
      /**
       * Gibbs採样
       * X = double(1./(1+exp(-P)) > rand(size(P)));
       */
      def sigmrnd(P: BDM[Double]): BDM[Double] = {
        val s1 = 1.0 / (Bexp(P * (-1.0)) + 1.0)
        val r1 = BDM.rand[Double](s1.rows, s1.cols)
        val a1 = s1 :> r1
        val a2 = a1.data.map { f => if (f == true) 1.0 else 0.0 }
        val a3 = new BDM(s1.rows, s1.cols, a2)
        a3
      }
    
      /**
       * Gibbs採样
       * X = double(1./(1+exp(-P)))+1*randn(size(P));
       */
      def sigmrnd2(P: BDM[Double]): BDM[Double] = {
        val s1 = 1.0 / (Bexp(P * (-1.0)) + 1.0)
        val r1 = BDM.rand[Double](s1.rows, s1.cols)
        val a3 = s1 + (r1 * 1.0)
        a3
      }
    
      /**
       * sigm激活函数
       * X = 1./(1+exp(-P));
       */
      def sigm(matrix: BDM[Double]): BDM[Double] = {
        val s1 = 1.0 / (Bexp(matrix * (-1.0)) + 1.0)
        s1
      }
    
      /**
       * tanh激活函数
       * f=1.7159*tanh(2/3.*A);
       */
      def tanh_opt(matrix: BDM[Double]): BDM[Double] = {
        val s1 = Btanh(matrix * (2.0 / 3.0)) * 1.7159
        s1
      }
    
    }
    

     

    1.2 DBNModel代码 

    package DBN
    
    import breeze.linalg.{
      Matrix => BM,
      CSCMatrix => BSM,
      DenseMatrix => BDM,
      Vector => BV,
      DenseVector => BDV,
      SparseVector => BSV
    }
    import org.apache.spark.rdd.RDD
    import scala.collection.mutable.ArrayBuffer
    
    class DBNModel(
      val config: DBNConfig,
      val dbn_W: Array[BDM[Double]],
      val dbn_b: Array[BDM[Double]],
      val dbn_c: Array[BDM[Double]]) extends Serializable {
    
      /**
       * DBN模型转化为NN模型
       * 权重转换
       */
      def dbnunfoldtonn(outputsize: Int): (Array[Int], Int, Array[BDM[Double]]) = {
        //1 size layer 參数转换
        val size = if (outputsize > 0) {
          val size1 = config.size
          val size2 = ArrayBuffer[Int]()
          size2 ++= size1
          size2 += outputsize
          size2.toArray
        } else config.size
        val layer = if (outputsize > 0) config.layer + 1 else config.layer
        
        //2 dbn_W 參数转换
        var initW = ArrayBuffer[BDM[Double]]()
        for (i <- 0 to dbn_W.length - 1) {
          initW += BDM.horzcat(dbn_c(i), dbn_W(i))
        }
        (size, layer, initW.toArray)
      }
    
    }

    转载请注明出处:

    http://blog.csdn.net/sunbow0 

  • 相关阅读:
    netty(八) netty中自带channelhandler
    netty(七) Handler的执行顺序
    netty(六) websocket开发应用
    netty(五) http协议开发应用
    netty(四) 编解码技术
    netty(三) TIP黏包/拆包问题解决之道
    netty(二) 创建一个netty服务端和客户端
    netty(一) netty有哪几部分构成
    使用jsp制作index,可以通过<c:if test==“管理员”>或<c:if test=="客户">来区别展示用户界面
    使用jstl和el表达式来展示request域中存放的user对象的信息
  • 原文地址:https://www.cnblogs.com/claireyuancy/p/7202234.html
Copyright © 2011-2022 走看看