In this article, we will use explicit rating data, without additional user or item metadata or other information related to the user-item interactions. Hence, the features that we need as inputs are simply the user IDs, movie IDs, and the ratings assigned to each user and movie pair
Prepare data
Upload data to HDFS
It’s easy for the programmers who are familiar to hadoop , not repeat them here, HDFS path data herein is located in
hdfs://master:9000/user/root/input/ml/
Dataset
We’re now ready to train our model! The other inputs required for our model are as follows:
rank: This refers to the number of factors in our ALS model, that is,the number of hidden features in our low-rank approximation matrices.
Generally, the greater the number of factors, the better, but this has a direct impact on memory usage, both for computation and to store models for serving, particularly for large number of users or items. Hence, this is often a trade-off in real-world use cases. A rank in the range of 10 to 200 is usually reasonable.iterations: This refers to the number of iterations to run. While each iteration in ALS is guaranteed to decrease the reconstruction error of the ratings matrix, ALS models will converge to a reasonably good solution after relatively few iterations. So, we don’t need to run for too many iterations in most cases (around 10 is often a good default).
lambda: This parameter controls the regularization of our model.
Thus, lambda controls over fitting. The higher the value of lambda,
the more is the regularization applied. What constitutes a sensible value is very dependent on the size, nature, and sparsity of the underlying data, and as with almost all machine learning models, the regularization parameter is something that should be tuned using out-of-sample test data and cross-validation approaches.
Training the recommendation model
Once we have extracted these simple features from our raw data, we are ready to proceed with model training; MLlib takes care of this for us.
All we have to do is provide the correctly-parsed input RDD we just created as well as our chosen model parameters.
Training a model on the MovieLens 100k
SourceCode:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
import org.jblas.DoubleMatrix
import scala.math.Ordering
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.mllib.evaluation.RankingMetrics
object ALSTest {
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double =
{
vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}
/* compute the average precision at K*/
def avgPrecsionK(actual: Seq[Int], predicted: Seq[Int], k: Int): Double =
{
val predK = predicted.take(k)
var score = 0.0
var numHits = 0.0
for ((p, i) <- predK.zipWithIndex) {
if (actual.contains(p)) {
numHits += 1.0
score += numHits / (i.toDouble + 1.0)
}
}
if (actual.isEmpty) {
1.0
} else {
score / math.min(actual.size, k).toDouble
}
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ALS Application")
val sc = new SparkContext(conf)
val rawData = sc.textFile("hdfs://master:9000/user/root/input/ml/u.data")
print(rawData.first())
val rawRating = rawData.map(_.split(" ").take(3))
val ratings = rawRating.map {
case Array(user, movie, rating) => Rating(
user.toInt, movie.toInt, rating.toFloat)
}
print(ratings.first())
/*We'll use rank of 50, 10 iterations, and a lambda parameter of 0.01 to illustrate how
to train our model:*/
val model = ALS.train(ratings, 50, 10, 0.01)
print(model.userFeatures.count)
val predictedRating = model.predict(789, 123)
print(predictedRating)
val userId = 789
val K = 10
val topKRecs = model.recommendProducts(userId, K)
println(topKRecs.mkString("
"))
val movies = sc.textFile("hdfs://master:9000/user/root/input/ml/u.item")
val titles = movies.map(line => line.split("\|").take(2)).map(array => (array(0).toInt,
array(1))).collectAsMap()
println(titles(123))
val moviesForUser = ratings.keyBy(_.user).lookup(789)
println(moviesForUser.size)
moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.
product), rating.rating)).foreach(println)
topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)
val itemId = 567
val itemFactor = model.productFeatures.lookup(itemId).head
val itemVector = new DoubleMatrix(itemFactor)
println(cosineSimilarity(itemVector, itemVector))
val sims = model.productFeatures.map {
case (id, factor) =>
val factorVector = new DoubleMatrix(factor)
val sim = cosineSimilarity(factorVector, itemVector)
(id, sim)
}
val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] {
case (id, similarity) => similarity
})
println(sortedSims.take(10).mkString("
"))
/* check our item-to-item similarity we will take the numbers 1 to 11 in the list:*/
val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] {
case (id, similarity) => similarity
})
println(sortedSims2.slice(1, 11).map {
case (id, sim) => (titles(id), sim)
}.mkString("
"))
/*Mean Squared Error*/
val actualRating = moviesForUser.take(1)(0)
val predictRating = model.predict(789, actualRating.product)
val squaredError = math.pow(predictedRating - actualRating.rating, 2.0)
println("actualRating is " + actualRating + "
" + "predictRating is " + predictedRating + "
" +
"squaredError is " + squaredError)
val usersProducts = ratings.map {
case Rating(user, product, rating) => (user, product)
}
val predictions = model.predict(usersProducts).map {
case Rating(user, product, rating) => ((user, product), rating)
}
val ratingsAndPredictions = ratings.map {
case Rating(user, product, rating) => ((user, product), rating)
}.join(predictions)
val MSE = ratingsAndPredictions.map {
case ((user, product), (actual, predicted)) => math.pow((actual -
predicted), 2)
}.reduce(_ + _) / ratingsAndPredictions.count
println("MSE= " + MSE)
val RMSE = math.sqrt(MSE)
println("Root MSE= " + RMSE)
/* compute the APK metric*/
val actualMovies = moviesForUser.map(_.product)
val predictedMovies = topKRecs.map(_.product)
val apk10 = avgPrecsionK(actualMovies, predictedMovies, 10)
println(apk10)
/*collect the item factors and form a DoubleMatrix object from them:*/
val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
println(itemMatrix.rows, itemMatrix.columns)
/* distribute the item matrix as a broadcast variable so that it is available on each worker node*/
val imBroadcast = sc.broadcast(itemMatrix)
/* sort*/
val allRecs = model.userFeatures.map {
case (userId, array) =>
val userVector = new DoubleMatrix(array)
val scores = imBroadcast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
(userId, recommendedIds)
}
val userMovies = ratings.map {
case Rating(user, product, rating) =>
(user, product)
}.groupBy(_._1)
val MAPK = allRecs.join(userMovies).map {
case (userId, (predicted, actualWithIds)) =>
val actual = actualWithIds.map(_._2).toSeq
avgPrecsionK(actual, predicted, K)
}.reduce(_ + _) / allRecs.count
println("Mean Average Precision at K= " + MAPK)
/*Compute RMSE and MSE*/
val predictedAndTrue = ratingsAndPredictions.map {
case ((uesr, product), (predicted, actual)) => (predicted, actual)
}
val regressionMetrics = new RegressionMetrics(predictedAndTrue)
println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
println("Root Mean Squared Error = " + regressionMetrics.
rootMeanSquaredError)
/* Mean Average Precision */
val predictedAndTrueForRanking = allRecs.join(userMovies).map {
case (userId, (predicted, actualWithIds)) =>
val actual = actualWithIds.map(_._2)
(predicted.toArray, actual.toArray)
}
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
val MAPK2000 = allRecs.join(userMovies).map {
case (userId, (predicted,
actualWithIds)) =>
val actual = actualWithIds.map(_._2).toSeq
avgPrecsionK(actual, predicted, 2000)
}.reduce(_ + _) / allRecs.count
println("Mean Average Precision = " + MAPK2000)
}
}