[root@demo1 ch04]# spark-shell --master yarn --jars /root/studio/jblas-1.2.3.jar
scala> val rawData = sc.textFile("hdfs://192.168.0.85:8020/user/root/studio/MachineLearningWithSpark/ch04/ml-100k/u.data")
rawData: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at textFile at <console>:27
scala> rawData.first()
res2: String = 196 242 3 881250949
scala> val rawRatings = rawData.map(_.split(" ").take(3))
rawRatings: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at <console>:29
scala> rawRatings.first()
res4: Array[String] = Array(196, 242, 3)
scala> import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.ALS
scala> ALS.
asInstanceOf isInstanceOf toString train trainImplicit
scala> import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.Rating
scala> val ratings = rawRatings.map{case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble)}
ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[7] at map at <console>:33
scala> ratings.first()
res6: org.apache.spark.mllib.recommendation.Rating = Rating(196,242,3.0)
scala>
scala> val model = ALS.train(ratings, 50, 10, 0.01)
model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@9a0afd
scala> model.userFeatures
res3: org.apache.spark.rdd.RDD[(Int, Array[Double])] = users MapPartitionsRDD[209] at mapValues at ALS.scala:255
scala> model.userFeatures.count
res4: Long = 943
scala>
scala> model.productFeatures.count
res5: Long = 1682
scala> val predictedRating = model.predict(789, 123)
16/01/31 10:16:43 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
16/01/31 10:16:43 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
predictedRating: Double = 3.138734782421803
scala>
scala> val userId = 789
userId: Int = 789
scala> val k = 10
k: Int = 10
scala> val topKRecs = model.recommendProducts(userId, k)
topKRecs: Array[org.apache.spark.mllib.recommendation.Rating] = Array(Rating(789,447,5.457686378317019), Rating(789,185,5.450265921125553), Rating(789,246,5.41465336756543), Rating(789,134,5.412768035881092), Rating(789,192,5.339175195120079), Rating(789,60,5.283787802367103), Rating(789,195,5.2768427354294625), Rating(789,135,5.26616884812581), Rating(789,42,5.245382247909927), Rating(789,45,5.134660805921923))
scala> println(topKRecs.mkString("
"))
Rating(789,447,5.457686378317019)
Rating(789,185,5.450265921125553)
Rating(789,246,5.41465336756543)
Rating(789,134,5.412768035881092)
Rating(789,192,5.339175195120079)
Rating(789,60,5.283787802367103)
Rating(789,195,5.2768427354294625)
Rating(789,135,5.26616884812581)
Rating(789,42,5.245382247909927)
Rating(789,45,5.134660805921923)
scala>
scala> val movies = sc.textFile("hdfs://192.168.0.85:8020/user/root/studio/MachineLearningWithSpark/ch04/ml-100k/u.item")
movies: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[214] at textFile at <console>:30
scala> val titles = movies.map(line => line.split("\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()
titles: scala.collection.Map[Int,String] = Map(137 -> Big Night (1996), 891 -> Bent (1997), 550 -> Die Hard: With a Vengeance (1995), 1205 -> Secret Agent, The (1996), 146 -> Unhook the Stars (1996), 864 -> My Fellow Americans (1996), 559 -> Interview with the Vampire (1994), 218 -> Cape Fear (1991), 568 -> Speed (1994), 227 -> Star Trek VI: The Undiscovered Country (1991), 765 -> Boomerang (1992), 1115 -> Twelfth Night (1996), 774 -> Prophecy, The (1995), 433 -> Heathers (1989), 92 -> True Romance (1993), 1528 -> Nowhere (1997), 846 -> To Gillian on Her 37th Birthday (1996), 1187 -> Switchblade Sisters (1975), 1501 -> Prisoner of the Mountains (Kavkazsky Plennik) (1996), 442 -> Amityville Curse, The (1990), 1160 -> Love! Valour! Compassion! (1997), 101 -> Heavy Metal (1981), 1196 -> Sa...
scala> titles(123)
res7: String = Frighteners, The (1996)
scala>
scala> val moviesForUser = ratings.keyBy(_.user).lookup(789)
moviesForUser: Seq[org.apache.spark.mllib.recommendation.Rating] = WrappedArray(Rating(789,1012,4.0), Rating(789,127,5.0), Rating(789,475,5.0), Rating(789,93,4.0), Rating(789,1161,3.0), Rating(789,286,1.0), Rating(789,293,4.0), Rating(789,9,5.0), Rating(789,50,5.0), Rating(789,294,3.0), Rating(789,181,4.0), Rating(789,1,3.0), Rating(789,1008,4.0), Rating(789,508,4.0), Rating(789,284,3.0), Rating(789,1017,3.0), Rating(789,137,2.0), Rating(789,111,3.0), Rating(789,742,3.0), Rating(789,248,3.0), Rating(789,249,3.0), Rating(789,1007,4.0), Rating(789,591,3.0), Rating(789,150,5.0), Rating(789,276,5.0), Rating(789,151,2.0), Rating(789,129,5.0), Rating(789,100,5.0), Rating(789,741,5.0), Rating(789,288,3.0), Rating(789,762,3.0), Rating(789,628,3.0), Rating(789,124,4.0))
scala> println(moviesForUser.size)
33
scala>
scala> moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product), rating.rating)).foreach(println)
(Godfather, The (1972),5.0)
(Trainspotting (1996),5.0)
(Dead Man Walking (1995),5.0)
(Star Wars (1977),5.0)
(Swingers (1996),5.0)
(Leaving Las Vegas (1995),5.0)
(Bound (1996),5.0)
(Fargo (1996),5.0)
(Last Supper, The (1995),5.0)
(Private Parts (1997),4.0)
scala>
scala> topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)
(Carrie (1976),5.457686378317019)
(Psycho (1960),5.450265921125553)
(Chasing Amy (1997),5.41465336756543)
(Citizen Kane (1941),5.412768035881092)
(Raging Bull (1980),5.339175195120079)
(Three Colors: Blue (1993),5.283787802367103)
(Terminator, The (1984),5.2768427354294625)
(2001: A Space Odyssey (1968),5.26616884812581)
(Clerks (1994),5.245382247909927)
(Eat Drink Man Woman (1994),5.134660805921923)
scala> val aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0))
aMatrix: org.jblas.DoubleMatrix = [1.000000; 2.000000; 3.000000]
scala> def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
| vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
| }
cosineSimilarity: (vec1: org.jblas.DoubleMatrix, vec2: org.jblas.DoubleMatrix)Double
scala> val itemId = 567
itemId: Int = 567
scala> val itemFactor = model.productFeatures.lookup(itemId).head
itemFactor: Array[Double] = Array(-0.10181606560945511, -0.17942631244659424, 0.49227064847946167, -0.5855011343955994, 0.13942785561084747, 0.20914733409881592, 0.6555922627449036, -0.31868791580200195, 0.10309237241744995, 0.11868427693843842, -0.6755932569503784, -0.4224179983139038, 0.6295595169067383, -0.46667173504829407, 0.3661192059516907, 0.09347377717494965, -0.06300525367259979, -1.2393603324890137, 0.6922296285629272, -0.1859930455684662, 1.9753334522247314, -0.9490454196929932, -0.9428562521934509, -1.242025375366211, -0.6144394278526306, 0.11396315693855286, 0.5499435067176819, -0.39970043301582336, 0.6376444697380066, -0.5110273957252502, -0.03175868093967438, 0.07247018814086914, -0.8671998381614685, 0.07232850044965744, -0.296882688999176, 0.16593052446842194, -0.544847...
scala> val itemVector = new DoubleMatrix(itemFactor)
itemVector: org.jblas.DoubleMatrix = [-0.101816; -0.179426; 0.492271; -0.585501; 0.139428; 0.209147; 0.655592; -0.318688; 0.103092; 0.118684; -0.675593; -0.422418; 0.629560; -0.466672; 0.366119; 0.093474; -0.063005; -1.239360; 0.692230; -0.185993; 1.975333; -0.949045; -0.942856; -1.242025; -0.614439; 0.113963; 0.549944; -0.399700; 0.637644; -0.511027; -0.031759; 0.072470; -0.867200; 0.072329; -0.296883; 0.165931; -0.544848; 0.031844; -0.028931; -0.155556; -0.579473; -0.759191; -0.493584; 0.482710; -0.213130; 0.084202; -0.195430; 0.626008; 0.223824; 0.342239]
scala> cosineSimilarity(itemVector, itemVector)
res11: Double = 1.0000000000000002
scala> val sim2 = model.productFeatures.map{ case (id, factor) =>
| val factorVector = new DoubleMatrix(factor)
| val sim = cosineSimilarity(factorVector, itemVector)
| (id, sim)
| }
sim2: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[220] at map at <console>:46
scala> val sims = sim2
sims: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[220] at map at <console>:46
scala> val sortedSims = sims.top(k)(Ordering.by[(Int, Double), Double]{ case (id, similarity) => similarity })
sortedSims: Array[(Int, Double)] = Array((567,1.0000000000000002), (563,0.7664604379073522), (288,0.7565452613547423), (685,0.7328709208683085), (895,0.7256132495520126), (413,0.719023340103824), (1376,0.714902115325024), (940,0.7146608851135356), (556,0.7137073949065319), (201,0.7122042387255716))
scala> println(sortedSims.take(10).mkString("
"))
(567,1.0000000000000002)
(563,0.7664604379073522)
(288,0.7565452613547423)
(685,0.7328709208683085)
(895,0.7256132495520126)
(413,0.719023340103824)
(1376,0.714902115325024)
(940,0.7146608851135356)
(556,0.7137073949065319)
(201,0.7122042387255716)
scala> println(titles(itemId))
Wes Craven's New Nightmare (1994)
scala> val sortedSims2 = sims.top(k + 1)(Ordering.by[(Int, Double), Double]{ case (id, similarity) => similarity })
sortedSims2: Array[(Int, Double)] = Array((567,1.0000000000000002), (563,0.7664604379073522), (288,0.7565452613547423), (685,0.7328709208683085), (895,0.7256132495520126), (413,0.719023340103824), (1376,0.714902115325024), (940,0.7146608851135356), (556,0.7137073949065319), (201,0.7122042387255716), (1206,0.7115277702784656))
scala> sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim)}.mkString("
")
res14: String =
(Stephen King's The Langoliers (1995),0.7664604379073522)
(Scream (1996),0.7565452613547423)
(Executive Decision (1996),0.7328709208683085)
(Scream 2 (1997),0.7256132495520126)
(Tales from the Crypt Presents: Bordello of Blood (1996),0.719023340103824)
(Meet Wally Sparks (1997),0.714902115325024)
(Airheads (1994),0.7146608851135356)
(Wild Bill (1995),0.7137073949065319)
(Evil Dead II (1987),0.7122042387255716)
(Amos & Andrew (1993),0.7115277702784656)
scala>
//4.5
scala> def avgPrecisionK(actual: Seq[Int], predicted: Seq[Int], k: Int): Double = {
| val predK = predicted.take(k)
| var score = 0.0
| var numHits = 0.0
| for((p, i) <- predK.zipWithIndex) {
| if (actual.contains(p)) {
| numHits += 1.0
| score += numHits / (i.toDouble + 1.0)
| }
| }
| if(actual.isEmpty) {
| 1.0
| } else {
| score / scala.math.min(actual.size, k).toDouble
| }
| }
avgPrecisionK: (actual: Seq[Int], predicted: Seq[Int], k: Int)Double
scala> val actualMovies = moviesForUser.map(_.product)
actualMovies: Seq[Int] = ArrayBuffer(1012, 127, 475, 93, 1161, 286, 293, 9, 50, 294, 181, 1, 1008, 508, 284, 1017, 137, 111, 742, 248, 249, 1007, 591, 150, 276, 151, 129, 100, 741, 288, 762, 628, 124)
scala> val predictedMovies = topKRecs.map(_.product)
predictedMovies: Array[Int] = Array(447, 185, 246, 134, 192, 60, 195, 135, 42, 45)
scala> val apk10 = avgPrecisionK(actualMovies, predictedMovies, 10)
apk10: Double = 0.0
scala> val itemFactors = model.productFeatures.map{ case (id, factor) => factor}.collect()
itemFactors: Array[Array[Double]] = Array(Array(-0.046477749943733215, 0.18675905466079712, 0.430140882730484, -0.5521891117095947, -0.2257368415594101, 0.4078529477119446, 0.600894033908844, 0.05511023849248886, -0.5756528377532959, -0.546343982219696, -0.6446431279182434, 0.39646831154823303, 1.1618980169296265, -0.4992135763168335, 0.2643599212169647, 0.09223347902297974, -0.43453606963157654, -0.8171247839927673, 0.5304603576660156, 0.16867069900035858, 1.199129581451416, -1.1530932188034058, 0.2695677876472473, -1.2567310333251953, -0.3311900496482849, -0.9677346348762512, 0.10665648430585861, -0.4674195647239685, 1.44817316532135, -1.1533961296081543, 0.6151375770568848, 0.24237817525863647, -1.085084319114685, 0.7758651971817017, -0.07997342944145203, 0.12670616805553436, 0.99509...
scala> val itemMatrix = new DoubleMatrix(itemFactors)
itemMatrix: org.jblas.DoubleMatrix = [-0.046478, 0.186759, 0.430141, -0.552189, -0.225737, 0.407853, 0.600894, 0.055110, -0.575653, -0.546344, -0.644643, 0.396468, 1.161898, -0.499214, 0.264360, 0.092233, -0.434536, -0.817125, 0.530460, 0.168671, 1.199130, -1.153093, 0.269568, -1.256731, -0.331190, -0.967735, 0.106656, -0.467420, 1.448173, -1.153396, 0.615138, 0.242378, -1.085084, 0.775865, -0.079973, 0.126706, 0.995097, 0.893790, 0.562197, -0.266731, -0.215226, -0.435322, -0.325578, 0.640598, -0.176773, -0.083776, -0.727552, 0.252224, -0.656758, -0.465791; -0.216228, 0.772439, 1.059774, -0.106539, -0.724421, 0.648145, 1.078649, -0.113092, 0.574351, -0.781359, -1.225590, 0.773134, 0.083251, -0.581324, -0.496888, -0.200736, -1.635370, -1.282534, -0.492300, 0.250896, 0.852908, -1.064428, ...
scala> println(itemMatrix.rows + ", " + itemMatrix.columns)
1682, 50
scala> val imBroadcast = sc.broadcast(itemMatrix)
imBroadcast: org.apache.spark.broadcast.Broadcast[org.jblas.DoubleMatrix] = Broadcast(56)
scala>
scala> val allRecs = model.userFeatures.map{ case (userId, array) =>
| val userVector = new DoubleMatrix(array)
| val scores = imBroadcast.value.mmul(userVector)
| val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
| val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
| (userId, recommendedIds)
| }
allRecs: org.apache.spark.rdd.RDD[(Int, Seq[Int])] = MapPartitionsRDD[239] at map at <console>:46
scala>
scala> val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product)}.groupBy(_._1)
userMovies: org.apache.spark.rdd.RDD[(Int, Iterable[(Int, Int)])] = ShuffledRDD[242] at groupBy at <console>:36
scala>
scala> val k = 10
k: Int = 10
scala> val MAPK = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
| val actual = actualWithIds.map(_._2).toSeq
| avgPrecisionK(actual, predicted, k)
| }.reduce(_+_) / allRecs.count
MAPK: Double = 0.022845696779949153
scala> println("Mean Average Precision at K = " + MAPK)
Mean Average Precision at K = 0.022845696779949153
//4.5.3
scala> import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.mllib.evaluation.RegressionMetrics
scala>
scala> val predictedAndTrue = ratingsAndPredictions.map{ case ((user, product), (predicted, actual)) => (predicted, actual) }
predictedAndTrue: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[247] at map at <console>:45
scala> val regressionMetrics = new RegressionMetrics(predictedAndTrue)
regressionMetrics: org.apache.spark.mllib.evaluation.RegressionMetrics = org.apache.spark.mllib.evaluation.RegressionMetrics@10cd08af
scala> println("Mean Squared Error: " + regressionMetrics.meanSquaredError)
Mean Squared Error: 0.08514250654661593
scala> println("Root Mean Squared Error: " + regressionMetrics.rootMeanSquaredError)
Root Mean Squared Error: 0.2917918891035457
scala> import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.mllib.evaluation.RankingMetrics
scala> val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
| val actual = actualWithIds.map(_._2)
| (predicted.toArray, actual.toArray)
| }
predictedAndTrueForRanking: org.apache.spark.rdd.RDD[(Array[Int], Array[Int])] = MapPartitionsRDD[252] at map at <console>:52
scala>
scala> val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
rankingMetrics: org.apache.spark.mllib.evaluation.RankingMetrics[Int] = org.apache.spark.mllib.evaluation.RankingMetrics@41142fe
scala> println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
Mean Average Precision = 0.06632563552768554
scala> val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
rankingMetrics: org.apache.spark.mllib.evaluation.RankingMetrics[Int] = org.apache.spark.mllib.evaluation.RankingMetrics@41142fe
scala> println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
Mean Average Precision = 0.06632563552768554
scala> val MAPK2000 = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
| val actual = actualWithIds.map(_._2).toSeq
| avgPrecisionK(actual, predicted, 2000)
| }.reduce(_+_) / allRecs.count
MAPK2000: Double = 0.06632563552768551
scala> println("Mean Average Precision = " + MAPK2000)
Mean Average Precision = 0.06632563552768551