zoukankan      html  css  js  c++  java
  • 《machine learning with spark》学习笔记--推荐模型

    In this article, we will use explicit rating data, without additional user or item metadata or other information related to the user-item interactions. Hence, the features that we need as inputs are simply the user IDs, movie IDs, and the ratings assigned to each user and movie pair

    Prepare data

    DataSource Download

    Upload data to HDFS

    It’s easy for the programmers who are familiar to hadoop , not repeat them here, HDFS path data herein is located in
    hdfs://master:9000/user/root/input/ml/



    Dataset

    We’re now ready to train our model! The other inputs required for our model are as follows:

    • rank: This refers to the number of factors in our ALS model, that is,the number of hidden features in our low-rank approximation matrices.
      Generally, the greater the number of factors, the better, but this has a direct impact on memory usage, both for computation and to store models for serving, particularly for large number of users or items. Hence, this is often a trade-off in real-world use cases. A rank in the range of 10 to 200 is usually reasonable.

    • iterations: This refers to the number of iterations to run. While each iteration in ALS is guaranteed to decrease the reconstruction error of the ratings matrix, ALS models will converge to a reasonably good solution after relatively few iterations. So, we don’t need to run for too many iterations in most cases (around 10 is often a good default).

    • lambda: This parameter controls the regularization of our model.
      Thus, lambda controls over fitting. The higher the value of lambda,
      the more is the regularization applied. What constitutes a sensible value is very dependent on the size, nature, and sparsity of the underlying data, and as with almost all machine learning models, the regularization parameter is something that should be tuned using out-of-sample test data and cross-validation approaches.

    Training the recommendation model

    Once we have extracted these simple features from our raw data, we are ready to proceed with model training; MLlib takes care of this for us.

    All we have to do is provide the correctly-parsed input RDD we just created as well as our chosen model parameters.

    Training a model on the MovieLens 100k

    SourceCode:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.spark.mllib.recommendation.ALS
    import org.apache.spark.mllib.recommendation.Rating
    import org.jblas.DoubleMatrix
    import scala.math.Ordering
    import org.apache.spark.mllib.evaluation.RegressionMetrics
    import org.apache.spark.mllib.evaluation.RankingMetrics
    
    object ALSTest {
      def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double =
        {
          vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
        }
      /* compute the average precision at K*/
      def avgPrecsionK(actual: Seq[Int], predicted: Seq[Int], k: Int): Double =
        {
          val predK = predicted.take(k)
          var score = 0.0
          var numHits = 0.0
          for ((p, i) <- predK.zipWithIndex) {
            if (actual.contains(p)) {
              numHits += 1.0
              score += numHits / (i.toDouble + 1.0)
            }
          }
          if (actual.isEmpty) {
            1.0
          } else {
            score / math.min(actual.size, k).toDouble
          }
    
        }
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("ALS Application")
        val sc = new SparkContext(conf)
        val rawData = sc.textFile("hdfs://master:9000/user/root/input/ml/u.data")
        print(rawData.first())
        val rawRating = rawData.map(_.split("	").take(3))
        val ratings = rawRating.map {
          case Array(user, movie, rating) => Rating(
            user.toInt, movie.toInt, rating.toFloat)
        }
         print(ratings.first())
        /*We'll use rank of 50, 10 iterations, and a lambda parameter of 0.01 to illustrate how
    to train our model:*/
        val model = ALS.train(ratings, 50, 10, 0.01)
        print(model.userFeatures.count)
        val predictedRating = model.predict(789, 123)
        print(predictedRating)
        val userId = 789
        val K = 10
        val topKRecs = model.recommendProducts(userId, K)
        println(topKRecs.mkString("
    "))
        val movies = sc.textFile("hdfs://master:9000/user/root/input/ml/u.item")
        val titles = movies.map(line => line.split("\|").take(2)).map(array => (array(0).toInt,
          array(1))).collectAsMap()
        println(titles(123))
        val moviesForUser = ratings.keyBy(_.user).lookup(789)
        println(moviesForUser.size)
         moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.
         product), rating.rating)).foreach(println)
        topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)
        val itemId = 567
        val itemFactor = model.productFeatures.lookup(itemId).head
        val itemVector = new DoubleMatrix(itemFactor)
        println(cosineSimilarity(itemVector, itemVector))
        val sims = model.productFeatures.map {
          case (id, factor) =>
            val factorVector = new DoubleMatrix(factor)
            val sim = cosineSimilarity(factorVector, itemVector)
            (id, sim)
        }
        val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] {
          case (id, similarity) => similarity
        })
        println(sortedSims.take(10).mkString("
    "))
        /*   check our item-to-item similarity we will take the numbers 1 to 11 in the list:*/
           val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] {
              case (id, similarity) => similarity
            })
             println(sortedSims2.slice(1, 11).map {
              case (id, sim) => (titles(id), sim)
           }.mkString("
    "))
        /*Mean Squared Error*/
        val actualRating = moviesForUser.take(1)(0)
        val predictRating = model.predict(789, actualRating.product)
        val squaredError = math.pow(predictedRating - actualRating.rating, 2.0)
            println("actualRating is " + actualRating + "
    " + "predictRating is " + predictedRating + "
    " +
             "squaredError is " + squaredError)
        val usersProducts = ratings.map {
          case Rating(user, product, rating) => (user, product)
        }
        val predictions = model.predict(usersProducts).map {
          case Rating(user, product, rating) => ((user, product), rating)
        }
        val ratingsAndPredictions = ratings.map {
          case Rating(user, product, rating) => ((user, product), rating)
        }.join(predictions)
        val MSE = ratingsAndPredictions.map {
          case ((user, product), (actual, predicted)) => math.pow((actual -
            predicted), 2)
        }.reduce(_ + _) / ratingsAndPredictions.count
         println("MSE= " + MSE)
        val RMSE = math.sqrt(MSE)
        println("Root MSE= " + RMSE)
        /* compute the APK metric*/
        val actualMovies = moviesForUser.map(_.product)
        val predictedMovies = topKRecs.map(_.product)
        val apk10 = avgPrecsionK(actualMovies, predictedMovies, 10)
        println(apk10)
        /*collect the item factors and form a DoubleMatrix object from them:*/
        val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
        val itemMatrix = new DoubleMatrix(itemFactors)
        println(itemMatrix.rows, itemMatrix.columns)
        /*  distribute the item matrix as a broadcast variable so that it is available on each worker node*/
        val imBroadcast = sc.broadcast(itemMatrix)
        /* sort*/
        val allRecs = model.userFeatures.map {
          case (userId, array) =>
            val userVector = new DoubleMatrix(array)
            val scores = imBroadcast.value.mmul(userVector)
            val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
            val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
            (userId, recommendedIds)
        }
        val userMovies = ratings.map {
          case Rating(user, product, rating) =>
            (user, product)
        }.groupBy(_._1)
        val MAPK = allRecs.join(userMovies).map {
          case (userId, (predicted, actualWithIds)) =>
            val actual = actualWithIds.map(_._2).toSeq
            avgPrecsionK(actual, predicted, K)
        }.reduce(_ + _) / allRecs.count
         println("Mean Average Precision at K= " + MAPK)
        /*Compute RMSE and MSE*/
        val predictedAndTrue = ratingsAndPredictions.map {
          case ((uesr, product), (predicted, actual)) => (predicted, actual)
        }
            val regressionMetrics = new RegressionMetrics(predictedAndTrue)
            println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
            println("Root Mean Squared Error = " + regressionMetrics.
              rootMeanSquaredError)
        /*  Mean Average Precision */
        val predictedAndTrueForRanking = allRecs.join(userMovies).map {
          case (userId, (predicted, actualWithIds)) =>
            val actual = actualWithIds.map(_._2)
            (predicted.toArray, actual.toArray)
        }
        val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
        println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
        val MAPK2000 = allRecs.join(userMovies).map {
          case (userId, (predicted,
            actualWithIds)) =>
            val actual = actualWithIds.map(_._2).toSeq
            avgPrecsionK(actual, predicted, 2000)
        }.reduce(_ + _) / allRecs.count
        println("Mean Average Precision = " + MAPK2000)
      }
    
    }

    Results



  • 相关阅读:
    构建之法阅读笔记04
    学习进度条10
    描绘用户场景并将典型用户和用户场景描述
    学习进度条09
    构建之法阅读笔记03
    学习进度条08
    每日站立会议10(完成)
    每日站立会议09
    团队成员细节工作项估计
    JS实现全选、不选、反选
  • 原文地址:https://www.cnblogs.com/ainima/p/6331795.html
Copyright © 2011-2022 走看看