zoukankan      html  css  js  c++  java
  • Spark机器学习读书笔记-CH04

    [root@demo1 ch04]# spark-shell --master yarn --jars /root/studio/jblas-1.2.3.jar

    scala> val rawData = sc.textFile("hdfs://192.168.0.85:8020/user/root/studio/MachineLearningWithSpark/ch04/ml-100k/u.data")
    rawData: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at textFile at <console>:27

    scala> rawData.first()
    res2: String = 196 242 3 881250949

    scala> val rawRatings = rawData.map(_.split(" ").take(3))
    rawRatings: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at <console>:29

    scala> rawRatings.first()
    res4: Array[String] = Array(196, 242, 3)

    scala> import org.apache.spark.mllib.recommendation.ALS
    import org.apache.spark.mllib.recommendation.ALS

    scala> ALS.
    asInstanceOf isInstanceOf toString train trainImplicit

    scala> import org.apache.spark.mllib.recommendation.Rating
    import org.apache.spark.mllib.recommendation.Rating

    scala> val ratings = rawRatings.map{case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble)}
    ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[7] at map at <console>:33

    scala> ratings.first()
    res6: org.apache.spark.mllib.recommendation.Rating = Rating(196,242,3.0)

    scala>

    scala> val model = ALS.train(ratings, 50, 10, 0.01)
    model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@9a0afd

    scala> model.userFeatures
    res3: org.apache.spark.rdd.RDD[(Int, Array[Double])] = users MapPartitionsRDD[209] at mapValues at ALS.scala:255

    scala> model.userFeatures.count
    res4: Long = 943

    scala>

    scala> model.productFeatures.count
    res5: Long = 1682

    scala> val predictedRating = model.predict(789, 123)
    16/01/31 10:16:43 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
    16/01/31 10:16:43 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
    predictedRating: Double = 3.138734782421803

    scala>

    scala> val userId = 789
    userId: Int = 789

    scala> val k = 10
    k: Int = 10

    scala> val topKRecs = model.recommendProducts(userId, k)
    topKRecs: Array[org.apache.spark.mllib.recommendation.Rating] = Array(Rating(789,447,5.457686378317019), Rating(789,185,5.450265921125553), Rating(789,246,5.41465336756543), Rating(789,134,5.412768035881092), Rating(789,192,5.339175195120079), Rating(789,60,5.283787802367103), Rating(789,195,5.2768427354294625), Rating(789,135,5.26616884812581), Rating(789,42,5.245382247909927), Rating(789,45,5.134660805921923))

    scala> println(topKRecs.mkString(" "))
    Rating(789,447,5.457686378317019)
    Rating(789,185,5.450265921125553)
    Rating(789,246,5.41465336756543)
    Rating(789,134,5.412768035881092)
    Rating(789,192,5.339175195120079)
    Rating(789,60,5.283787802367103)
    Rating(789,195,5.2768427354294625)
    Rating(789,135,5.26616884812581)
    Rating(789,42,5.245382247909927)
    Rating(789,45,5.134660805921923)

    scala>

    scala> val movies = sc.textFile("hdfs://192.168.0.85:8020/user/root/studio/MachineLearningWithSpark/ch04/ml-100k/u.item")
    movies: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[214] at textFile at <console>:30

    scala> val titles = movies.map(line => line.split("\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()
    titles: scala.collection.Map[Int,String] = Map(137 -> Big Night (1996), 891 -> Bent (1997), 550 -> Die Hard: With a Vengeance (1995), 1205 -> Secret Agent, The (1996), 146 -> Unhook the Stars (1996), 864 -> My Fellow Americans (1996), 559 -> Interview with the Vampire (1994), 218 -> Cape Fear (1991), 568 -> Speed (1994), 227 -> Star Trek VI: The Undiscovered Country (1991), 765 -> Boomerang (1992), 1115 -> Twelfth Night (1996), 774 -> Prophecy, The (1995), 433 -> Heathers (1989), 92 -> True Romance (1993), 1528 -> Nowhere (1997), 846 -> To Gillian on Her 37th Birthday (1996), 1187 -> Switchblade Sisters (1975), 1501 -> Prisoner of the Mountains (Kavkazsky Plennik) (1996), 442 -> Amityville Curse, The (1990), 1160 -> Love! Valour! Compassion! (1997), 101 -> Heavy Metal (1981), 1196 -> Sa...
    scala> titles(123)
    res7: String = Frighteners, The (1996)

    scala>

    scala> val moviesForUser = ratings.keyBy(_.user).lookup(789)
    moviesForUser: Seq[org.apache.spark.mllib.recommendation.Rating] = WrappedArray(Rating(789,1012,4.0), Rating(789,127,5.0), Rating(789,475,5.0), Rating(789,93,4.0), Rating(789,1161,3.0), Rating(789,286,1.0), Rating(789,293,4.0), Rating(789,9,5.0), Rating(789,50,5.0), Rating(789,294,3.0), Rating(789,181,4.0), Rating(789,1,3.0), Rating(789,1008,4.0), Rating(789,508,4.0), Rating(789,284,3.0), Rating(789,1017,3.0), Rating(789,137,2.0), Rating(789,111,3.0), Rating(789,742,3.0), Rating(789,248,3.0), Rating(789,249,3.0), Rating(789,1007,4.0), Rating(789,591,3.0), Rating(789,150,5.0), Rating(789,276,5.0), Rating(789,151,2.0), Rating(789,129,5.0), Rating(789,100,5.0), Rating(789,741,5.0), Rating(789,288,3.0), Rating(789,762,3.0), Rating(789,628,3.0), Rating(789,124,4.0))

    scala> println(moviesForUser.size)
    33

    scala>

    scala> moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product), rating.rating)).foreach(println)
    (Godfather, The (1972),5.0)
    (Trainspotting (1996),5.0)
    (Dead Man Walking (1995),5.0)

    (Star Wars (1977),5.0)
    (Swingers (1996),5.0)
    (Leaving Las Vegas (1995),5.0)
    (Bound (1996),5.0)
    (Fargo (1996),5.0)
    (Last Supper, The (1995),5.0)
    (Private Parts (1997),4.0)

    scala>

    scala> topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)
    (Carrie (1976),5.457686378317019)
    (Psycho (1960),5.450265921125553)
    (Chasing Amy (1997),5.41465336756543)
    (Citizen Kane (1941),5.412768035881092)
    (Raging Bull (1980),5.339175195120079)
    (Three Colors: Blue (1993),5.283787802367103)
    (Terminator, The (1984),5.2768427354294625)
    (2001: A Space Odyssey (1968),5.26616884812581)
    (Clerks (1994),5.245382247909927)
    (Eat Drink Man Woman (1994),5.134660805921923)

    scala> val aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0))
    aMatrix: org.jblas.DoubleMatrix = [1.000000; 2.000000; 3.000000]

    scala> def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
    | vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
    | }
    cosineSimilarity: (vec1: org.jblas.DoubleMatrix, vec2: org.jblas.DoubleMatrix)Double

    scala> val itemId = 567
    itemId: Int = 567

    scala> val itemFactor = model.productFeatures.lookup(itemId).head
    itemFactor: Array[Double] = Array(-0.10181606560945511, -0.17942631244659424, 0.49227064847946167, -0.5855011343955994, 0.13942785561084747, 0.20914733409881592, 0.6555922627449036, -0.31868791580200195, 0.10309237241744995, 0.11868427693843842, -0.6755932569503784, -0.4224179983139038, 0.6295595169067383, -0.46667173504829407, 0.3661192059516907, 0.09347377717494965, -0.06300525367259979, -1.2393603324890137, 0.6922296285629272, -0.1859930455684662, 1.9753334522247314, -0.9490454196929932, -0.9428562521934509, -1.242025375366211, -0.6144394278526306, 0.11396315693855286, 0.5499435067176819, -0.39970043301582336, 0.6376444697380066, -0.5110273957252502, -0.03175868093967438, 0.07247018814086914, -0.8671998381614685, 0.07232850044965744, -0.296882688999176, 0.16593052446842194, -0.544847...
    scala> val itemVector = new DoubleMatrix(itemFactor)
    itemVector: org.jblas.DoubleMatrix = [-0.101816; -0.179426; 0.492271; -0.585501; 0.139428; 0.209147; 0.655592; -0.318688; 0.103092; 0.118684; -0.675593; -0.422418; 0.629560; -0.466672; 0.366119; 0.093474; -0.063005; -1.239360; 0.692230; -0.185993; 1.975333; -0.949045; -0.942856; -1.242025; -0.614439; 0.113963; 0.549944; -0.399700; 0.637644; -0.511027; -0.031759; 0.072470; -0.867200; 0.072329; -0.296883; 0.165931; -0.544848; 0.031844; -0.028931; -0.155556; -0.579473; -0.759191; -0.493584; 0.482710; -0.213130; 0.084202; -0.195430; 0.626008; 0.223824; 0.342239]

    scala> cosineSimilarity(itemVector, itemVector)
    res11: Double = 1.0000000000000002

    scala> val sim2 = model.productFeatures.map{ case (id, factor) =>
    | val factorVector = new DoubleMatrix(factor)
    | val sim = cosineSimilarity(factorVector, itemVector)
    | (id, sim)
    | }
    sim2: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[220] at map at <console>:46

    scala> val sims = sim2
    sims: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[220] at map at <console>:46

    scala> val sortedSims = sims.top(k)(Ordering.by[(Int, Double), Double]{ case (id, similarity) => similarity })
    sortedSims: Array[(Int, Double)] = Array((567,1.0000000000000002), (563,0.7664604379073522), (288,0.7565452613547423), (685,0.7328709208683085), (895,0.7256132495520126), (413,0.719023340103824), (1376,0.714902115325024), (940,0.7146608851135356), (556,0.7137073949065319), (201,0.7122042387255716))

    scala> println(sortedSims.take(10).mkString(" "))
    (567,1.0000000000000002)
    (563,0.7664604379073522)
    (288,0.7565452613547423)
    (685,0.7328709208683085)
    (895,0.7256132495520126)
    (413,0.719023340103824)
    (1376,0.714902115325024)
    (940,0.7146608851135356)
    (556,0.7137073949065319)
    (201,0.7122042387255716)

    scala> println(titles(itemId))
    Wes Craven's New Nightmare (1994)

    scala> val sortedSims2 = sims.top(k + 1)(Ordering.by[(Int, Double), Double]{ case (id, similarity) => similarity })
    sortedSims2: Array[(Int, Double)] = Array((567,1.0000000000000002), (563,0.7664604379073522), (288,0.7565452613547423), (685,0.7328709208683085), (895,0.7256132495520126), (413,0.719023340103824), (1376,0.714902115325024), (940,0.7146608851135356), (556,0.7137073949065319), (201,0.7122042387255716), (1206,0.7115277702784656))

    scala> sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim)}.mkString(" ")
    res14: String =
    (Stephen King's The Langoliers (1995),0.7664604379073522)
    (Scream (1996),0.7565452613547423)
    (Executive Decision (1996),0.7328709208683085)
    (Scream 2 (1997),0.7256132495520126)
    (Tales from the Crypt Presents: Bordello of Blood (1996),0.719023340103824)
    (Meet Wally Sparks (1997),0.714902115325024)
    (Airheads (1994),0.7146608851135356)
    (Wild Bill (1995),0.7137073949065319)
    (Evil Dead II (1987),0.7122042387255716)
    (Amos & Andrew (1993),0.7115277702784656)

    scala>

    //4.5

    scala> def avgPrecisionK(actual: Seq[Int], predicted: Seq[Int], k: Int): Double = {
    | val predK = predicted.take(k)
    | var score = 0.0
    | var numHits = 0.0
    | for((p, i) <- predK.zipWithIndex) {
    | if (actual.contains(p)) {
    | numHits += 1.0
    | score += numHits / (i.toDouble + 1.0)
    | }
    | }
    | if(actual.isEmpty) {
    | 1.0
    | } else {
    | score / scala.math.min(actual.size, k).toDouble
    | }
    | }
    avgPrecisionK: (actual: Seq[Int], predicted: Seq[Int], k: Int)Double

    scala> val actualMovies = moviesForUser.map(_.product)
    actualMovies: Seq[Int] = ArrayBuffer(1012, 127, 475, 93, 1161, 286, 293, 9, 50, 294, 181, 1, 1008, 508, 284, 1017, 137, 111, 742, 248, 249, 1007, 591, 150, 276, 151, 129, 100, 741, 288, 762, 628, 124)

    scala> val predictedMovies = topKRecs.map(_.product)
    predictedMovies: Array[Int] = Array(447, 185, 246, 134, 192, 60, 195, 135, 42, 45)

    scala> val apk10 = avgPrecisionK(actualMovies, predictedMovies, 10)
    apk10: Double = 0.0

    scala> val itemFactors = model.productFeatures.map{ case (id, factor) => factor}.collect()
    itemFactors: Array[Array[Double]] = Array(Array(-0.046477749943733215, 0.18675905466079712, 0.430140882730484, -0.5521891117095947, -0.2257368415594101, 0.4078529477119446, 0.600894033908844, 0.05511023849248886, -0.5756528377532959, -0.546343982219696, -0.6446431279182434, 0.39646831154823303, 1.1618980169296265, -0.4992135763168335, 0.2643599212169647, 0.09223347902297974, -0.43453606963157654, -0.8171247839927673, 0.5304603576660156, 0.16867069900035858, 1.199129581451416, -1.1530932188034058, 0.2695677876472473, -1.2567310333251953, -0.3311900496482849, -0.9677346348762512, 0.10665648430585861, -0.4674195647239685, 1.44817316532135, -1.1533961296081543, 0.6151375770568848, 0.24237817525863647, -1.085084319114685, 0.7758651971817017, -0.07997342944145203, 0.12670616805553436, 0.99509...
    scala> val itemMatrix = new DoubleMatrix(itemFactors)
    itemMatrix: org.jblas.DoubleMatrix = [-0.046478, 0.186759, 0.430141, -0.552189, -0.225737, 0.407853, 0.600894, 0.055110, -0.575653, -0.546344, -0.644643, 0.396468, 1.161898, -0.499214, 0.264360, 0.092233, -0.434536, -0.817125, 0.530460, 0.168671, 1.199130, -1.153093, 0.269568, -1.256731, -0.331190, -0.967735, 0.106656, -0.467420, 1.448173, -1.153396, 0.615138, 0.242378, -1.085084, 0.775865, -0.079973, 0.126706, 0.995097, 0.893790, 0.562197, -0.266731, -0.215226, -0.435322, -0.325578, 0.640598, -0.176773, -0.083776, -0.727552, 0.252224, -0.656758, -0.465791; -0.216228, 0.772439, 1.059774, -0.106539, -0.724421, 0.648145, 1.078649, -0.113092, 0.574351, -0.781359, -1.225590, 0.773134, 0.083251, -0.581324, -0.496888, -0.200736, -1.635370, -1.282534, -0.492300, 0.250896, 0.852908, -1.064428, ...

    scala> println(itemMatrix.rows + ", " + itemMatrix.columns)
    1682, 50

    scala> val imBroadcast = sc.broadcast(itemMatrix)
    imBroadcast: org.apache.spark.broadcast.Broadcast[org.jblas.DoubleMatrix] = Broadcast(56)

    scala>

    scala> val allRecs = model.userFeatures.map{ case (userId, array) =>
    | val userVector = new DoubleMatrix(array)
    | val scores = imBroadcast.value.mmul(userVector)
    | val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
    | val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
    | (userId, recommendedIds)
    | }
    allRecs: org.apache.spark.rdd.RDD[(Int, Seq[Int])] = MapPartitionsRDD[239] at map at <console>:46

    scala>

    scala> val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product)}.groupBy(_._1)
    userMovies: org.apache.spark.rdd.RDD[(Int, Iterable[(Int, Int)])] = ShuffledRDD[242] at groupBy at <console>:36

    scala>

    scala> val k = 10
    k: Int = 10

    scala> val MAPK = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
    | val actual = actualWithIds.map(_._2).toSeq
    | avgPrecisionK(actual, predicted, k)
    | }.reduce(_+_) / allRecs.count
    MAPK: Double = 0.022845696779949153

    scala> println("Mean Average Precision at K = " + MAPK)
    Mean Average Precision at K = 0.022845696779949153

    //4.5.3

    scala> import org.apache.spark.mllib.evaluation.RegressionMetrics
    import org.apache.spark.mllib.evaluation.RegressionMetrics

    scala>

    scala> val predictedAndTrue = ratingsAndPredictions.map{ case ((user, product), (predicted, actual)) => (predicted, actual) }
    predictedAndTrue: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[247] at map at <console>:45

    scala> val regressionMetrics = new RegressionMetrics(predictedAndTrue)
    regressionMetrics: org.apache.spark.mllib.evaluation.RegressionMetrics = org.apache.spark.mllib.evaluation.RegressionMetrics@10cd08af

    scala> println("Mean Squared Error: " + regressionMetrics.meanSquaredError)
    Mean Squared Error: 0.08514250654661593

    scala> println("Root Mean Squared Error: " + regressionMetrics.rootMeanSquaredError)
    Root Mean Squared Error: 0.2917918891035457

    scala> import org.apache.spark.mllib.evaluation.RankingMetrics
    import org.apache.spark.mllib.evaluation.RankingMetrics

    scala> val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
    | val actual = actualWithIds.map(_._2)
    | (predicted.toArray, actual.toArray)
    | }
    predictedAndTrueForRanking: org.apache.spark.rdd.RDD[(Array[Int], Array[Int])] = MapPartitionsRDD[252] at map at <console>:52

    scala>

    scala> val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
    rankingMetrics: org.apache.spark.mllib.evaluation.RankingMetrics[Int] = org.apache.spark.mllib.evaluation.RankingMetrics@41142fe

    scala> println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
    Mean Average Precision = 0.06632563552768554

    scala> val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
    rankingMetrics: org.apache.spark.mllib.evaluation.RankingMetrics[Int] = org.apache.spark.mllib.evaluation.RankingMetrics@41142fe

    scala> println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
    Mean Average Precision = 0.06632563552768554

    scala> val MAPK2000 = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
    | val actual = actualWithIds.map(_._2).toSeq
    | avgPrecisionK(actual, predicted, 2000)
    | }.reduce(_+_) / allRecs.count
    MAPK2000: Double = 0.06632563552768551

    scala> println("Mean Average Precision = " + MAPK2000)
    Mean Average Precision = 0.06632563552768551

  • 相关阅读:
    Mysql存储引擎
    k8s集群故障二:节点为NotReady状态
    K8S集群二进制搭建3——部署Master Node
    K8S集群二进制搭建2——docker搭建
    K8S集群二进制搭建1——集群介绍及Etcd集群搭建
    政府会计
    其他收益、递延收益的区别
    未确认融资费用通俗理解
    非货币性资产交换的会计处理
    借营业收入贷营业成本投资收益
  • 原文地址:https://www.cnblogs.com/littlesuccess/p/5157230.html
Copyright © 2011-2022 走看看