zoukankan      html  css  js  c++  java
  • SparkMLlib—协同过滤推荐算法,电影推荐系统,物品喜好推荐



    相关内容原文地址:
    博客园:Lemon_Qin:MLlib-协同过滤
    博客园:大数据和AI躺过的坑:Spark MLlib协同过滤算法
    CSDN:Running_Tiger:Spark MLlib协同过滤推荐算法实现
    CSDN:小俊同学:推荐算法+Spark MLlib代码Demo



    一、协同过滤

    协同过滤(Collaborative Filtering,简称CF,WIKI上的定义是:简单来说是利用某个兴趣相投、拥有共同经验之群体的喜好来推荐感兴趣的资讯给使用者,个人透过合作的机制给予资讯相当程度的回应(如评分)并记录下来以达到过滤的目的,进而帮助别人筛选资讯,回应不一定局限于特别感兴趣的,特别不感兴趣资讯的纪录也相当重要。

    协同过滤常被应用于推荐系统。这些技术旨在补充用户—商品关联矩阵中所缺失的部分。

    MLlib 当前支持基于模型的协同过滤,其中用户和商品通过一小组隐性因子进行表达,并且这些因子也用于预测缺失的元素。MLLib 使用交替最小二乘法(ALS) 来学习这些隐性因子。

    协同过滤是推荐系统的常用方法。可以填充user-item相关矩阵中的缺失值。MLlib支持基于模型的协同过滤,即使用能够预测缺失值的一个隐藏因素集合来表示用户和产品。MLlib使用交替做小二乘法(alternating least squares, ALS)学习隐藏因子。MLlib算法中的参数如下:

    • numBlocks 并行计算的block数(-1为自动配置)
    • rank 模型中隐藏因子数
    • iterations 算法迭代次数
    • lambda ALS中的正则化参数
    • implicitPrefs 使用显示反馈ALS变量或隐式反馈
    • alpha ALS隐式反馈变化率用于控制 the baseline confidence in preference observations

    在这里插入图片描述

    1.1 显示vs隐式反馈

    基于矩阵分解的协同过滤的标准方法一般将用户商品矩阵中的元素作为用户对商品的显性偏好。
    用户对物品或者信息的偏好,根据应用本身的不同,可能包括用户对物品的评分、用户查看物品的记录、用户的购买记录等。其实这些用户的偏好信息可以分为两类:

    • 显式的用户反馈:类是用户在网站上自然浏览或者使用网站以外,显式地提供反馈信息,例如用户对物品的评分或者对物品的评论。
    • 这类是用户在使用网站是产生的数据,隐式地反映了用户对物品的喜好,例如用户购买了某物品,用户查看了某物品的信息,等等。

    真实的案例中通常只有隐式反馈(例如,查看,点击,购买,喜欢,分享等)。

    显式的用户反馈能准确地反映用户对物品的真实喜好,但需要用户付出额外的代价;而隐式的用户行为,通过一些分析和处理,也能反映用户的喜好,只是数据不是很精确,有些行为的分析存在较大的噪音。但只要选择正确的行为特征,隐式的用户反馈也能得到很好的效果,只是行为特征的选择可能在不同的应用中有很大的不同,例如在电子商务的网站上,购买行为其实就是一个能很好表现用户喜好的隐式反馈。

    推荐引擎根据不同的推荐机制可能用到数据源中的一部分,然后根据这些数据,分析出一定的规则或者直接对用户对其他物品的喜好进行预测计算。这样推荐引擎可以在用户进入时给他推荐他可能感兴趣的物品。

    1.2 实例介绍

    将使用协同过滤算法对GroupLens Research(http://grouplens.org/datasets/movielens/)提供的数据进行分析,该数据为一组从20世纪90年末到21世纪初由MovieLens用户提供的电影评分数据,这些数据中包括电影评分、电影元数据(风格类型和年代)以及关于用户的人口统计学数据(年龄、邮编、性别和职业等)。根据不同需求该组织提供了不同大小的样本数据,不同样本信息中包含三种数据:评分、用户信息和电影信息。

    对这些数据分析进行如下步骤:

    1. 装载如下两种数据:
      • a)装载样本评分数据,其中最后一列时间戳除10的余数作为key,Rating为值;
      • b)装载电影目录对照表(电影ID->电影标题)
    2. 将样本评分表以key值切分成3个部分,分别用于训练 (60%,并加入用户评分), 校验 (20%), and 测试 (20%)
    3. 训练不同参数下的模型,并再校验集中验证,获取最佳参数下的模型
    4. 用最佳模型预测测试集的评分,计算和实际评分之间的均方根误差
    5. 根据用户评分的数据,推荐前十部最感兴趣的电影(注意要剔除用户已经评分的电影)

    1.2.1 数据说明

    在MovieLens提供的电影评分数据分为三个表:评分、用户信息和电影信息,在该系列提供的附属数据提供大概6000位读者和100万个评分数据,具体位置为/data/class8/movielens/data目录下,对三个表数据说明可以参考该目录下README文档。

    评分数据说明(ratings.data)

    该评分数据总共四个字段,格式为UserID::MovieID::Rating::Timestamp,分为为用户编号::电影编号::评分::评分时间戳,其中各个字段说明如下:

    • 用户编号范围1~6040
    • 电影编号1 -3952
    • 电影评分为五星评分,范围0~5
    • 评分时间戳单位秒
    • 每个用户至少有20个电影评分

    ratings.dat的数据样本如下所示:

    1::1193::5::978300760
    1::661::3::978302109
    1::914::3::978301968
    1::3408::4::978300275
    1::2355::5::978824291
    1::1197::3::978302268
    1::1287::5::978302039
    1::2804::5::978300719

    用户信息(users.dat)

    用户信息五个字段,格式为UserID::Gender::Age::Occupation::Zip-code,分为为用户编号::性别::年龄::职业::邮编,其中各个字段说明如下:

    • 用户编号范围1~6040
    • 性别,其中M为男性,F为女性
    • 不同的数字代表不同的年龄范围,如:25代表25~34岁范围
    • 职业信息,在测试数据中提供了21中职业分类
    • 地区邮编

    users.dat的数据样本如下所示:

    1::F::1::10::48067
    2::M::56::16::70072
    3::M::25::15::55117
    4::M::45::7::02460
    5::M::25::20::55455
    6::F::50::9::55117
    7::M::35::1::06810
    8::M::25::12::11413

    电影信息(movies.dat)

    电影数据分为三个字段,格式为MovieID::Title::Genres,分为为电影编号::电影名::电影类别,其中各个字段说明如下:

    • 电影编号1~3952
    • 由IMDB提供电影名称,其中包括电影上映年份
    • 电影分类,这里使用实际分类名非编号,如:Action、Crime等

    movies.dat的数据样本如下所示:

    1::Toy Story (1995)::Animation|Children’s|Comedy
    2::Jumanji (1995)::Adventure|Children’s|Fantasy
    3::Grumpier Old Men (1995)::Comedy|Romance
    4::Waiting to Exhale (1995)::Comedy|Drama
    5::Father of the Bride Part II (1995)::Comedy
    6::Heat (1995)::Action|Crime|Thriller
    7::Sabrina (1995)::Comedy|Romance
    8::Tom and Huck (1995)::Adventure|Children’s

    程序代码

    import java.io.File
    import scala.io.Source
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.rdd._
    import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}
    
    object MovieLensALS {
      def main(args: Array[String]) {
    
        // 屏蔽不必要的日志显示在终端上
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    
        if (args.length != 2) {
          println("Usage: /path/to/spark/bin/spark-submit --driver-memory 2g --class week7.MovieLensALS " +
            "week7.jar movieLensHomeDir personalRatingsFile")
          sys.exit(1)
        }
    
        // 设置运行环境
        val conf = new SparkConf().setAppName("MovieLensALS").setMaster("local[4]")
        val sc = new SparkContext(conf)
    
        // 装载用户评分,该评分由评分器生成
        val myRatings = loadRatings(args(1))
        val myRatingsRDD = sc.parallelize(myRatings, 1)
    
         // 样本数据目录
        val movieLensHomeDir = args(0)
    
         // 装载样本评分数据,其中最后一列Timestamp取除10的余数作为key,Rating为值,即(Int,Rating)
        val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map { line =>
          val fields = line.split("::")
          (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
        }
    
         // 装载电影目录对照表(电影ID->电影标题)
        val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map { line =>
          val fields = line.split("::")
          (fields(0).toInt, fields(1))
        }.collect().toMap
    
         val numRatings = ratings.count()
        val numUsers = ratings.map(_._2.user).distinct().count()
        val numMovies = ratings.map(_._2.product).distinct().count()
    
        println("Got " + numRatings + " ratings from " + numUsers + " users on " + numMovies + " movies.")
    
         // 将样本评分表以key值切分成3个部分,分别用于训练 (60%,并加入用户评分), 校验 (20%), and 测试 (20%)
        // 该数据在计算过程中要多次应用到,所以cache到内存
        val numPartitions = 4
        val training = ratings.filter(x => x._1 < 6)
          .values
          .union(myRatingsRDD) //注意ratings是(Int,Rating),取value即可
          .repartition(numPartitions)
          .cache()
    
        val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8)
          .values
          .repartition(numPartitions)
          .cache()
    
        val test = ratings.filter(x => x._1 >= 8).values.cache()
    
        val numTraining = training.count()
        val numValidation = validation.count()
        val numTest = test.count()
    
        println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)
    
        // 训练不同参数下的模型,并在校验集中验证,获取最佳参数下的模型
        val ranks = List(8, 12)
        val lambdas = List(0.1, 10.0)
        val numIters = List(10, 20)
    
        var bestModel: Option[MatrixFactorizationModel] = None
        var bestValidationRmse = Double.MaxValue
        var bestRank = 0
        var bestLambda = -1.0
        var bestNumIter = -1
        for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
          val model = ALS.train(training, rank, numIter, lambda)
          val validationRmse = computeRmse(model, validation, numValidation)
          println("RMSE (validation) = " + validationRmse + " for the model trained with rank = "
            + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
          if (validationRmse < bestValidationRmse) {
            bestModel = Some(model)
            bestValidationRmse = validationRmse
            bestRank = rank
            bestLambda = lambda
            bestNumIter = numIter
          }
        }
    
        // 用最佳模型预测测试集的评分,并计算和实际评分之间的均方根误差
        val testRmse = computeRmse(bestModel.get, test, numTest)
    
         println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda  + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")
    
         // create a naive baseline and compare it with the best model
        val meanRating = training.union(validation).map(_.rating).mean
    
        val baselineRmse =
    
          math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean)
    
        val improvement = (baselineRmse - testRmse) / baselineRmse * 100
    
        println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")
    
         // 推荐前十部最感兴趣的电影,注意要剔除用户已经评分的电影
        val myRatedMovieIds = myRatings.map(_.product).toSet
        val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
        val recommendations = bestModel.get
          .predict(candidates.map((0, _)))
          .collect()
          .sortBy(-_.rating)
          .take(10)
    
        var i = 1
        println("Movies recommended for you:")
        recommendations.foreach { r =>
          println("%2d".format(i) + ": " + movies(r.product))
          i += 1
        }
    
      sc.stop()
      }
    
      /** 校验集预测数据和实际数据之间的均方根误差 **/
      def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = {
        val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
        val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating))
          .join(data.map(x => ((x.user, x.product), x.rating)))
          .values
        math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
      }
    
      /** 装载用户评分文件 **/
      def loadRatings(path: String): Seq[Rating] = {
        val lines = Source.fromFile(path).getLines()
        val ratings = lines.map { line =>
          val fields = line.split("::")
          Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
        }.filter(_.rating > 0.0)
        if (ratings.isEmpty) {
          sys.error("No ratings provided.")
        } else {
          ratings.toSeq
        }
      }
    }
    

    二、协同过滤推荐算法——推荐系统代码

    2.1 训练数据

    1,1,1
    1,2,1
    2,1,1
    2,3,1
    3,3,1
    3,4,1
    4,2,1
    4,4,1
    5,1,1
    5,2,1
    5,3,1
    6,4,1
    

    2.2 实战代码

    相似度计算类ItemSimilarity,支持同现相似度、余弦相似度、欧氏距离相似度。

    import scala.math._
    import org.apache.spark.rdd.RDD
    
    /**
      * 物品相似度计算类
      * 通过设置模型参数后执行Similarity方法,进行相似度计算,返回物品与物品相似度RDD
      * 相似度计算支持:同现相似度、余弦相似度、欧氏距离相似度
      *
      */
    
    /**
      * 用户评分
      *
      * @param userid 用户
      * @param itemid 评分物品
      * @param pref   评分
      */
    case class ItemPref(
                         val userid: String,
                         val itemid: String,
                         val pref: Double
                       ) extends Serializable
    
    /**
      * 用户推荐
      *
      * @param userid 用户
      * @param itemid 推荐物品
      * @param pref   评分
      */
    case class UserRecomm(
                           val userid: String,
                           val itemid: String,
                           val pref: Double
                         ) extends Serializable
    
    /**
      * 相似度
      *
      * @param itemid1 物品
      * @param itemid2 物品
      * @param similar 相似度
      */
    case class ItemSimi(
                         val itemid1: String,
                         val itemid2: String,
                         val similar: Double
                       ) extends Serializable
    
    /**
      * 相似度计算
      * 支持同现相似度、余弦相似度、欧氏距离相似度
      */
    class ItemSimilarity extends Serializable {
    
      /**
        * 相似度计算
        *
        * @param user_rdd 用户评分
        * @param stype    计算相似度方式
        * @return 返回物品相似度
        */
      def Similarity(user_rdd: RDD[ItemPref], stype: String): (RDD[ItemSimi]) = {
        val simil_rdd = stype match {
          case "cooccurrence" => ItemSimilarity.CooccurrenceSimilarity(user_rdd)
          case "cosine" => ItemSimilarity.CosineeSimilarity(user_rdd)
          case "euclidean" => ItemSimilarity.EuclideanDistanceSimilarity(user_rdd)
          case _ => ItemSimilarity.CooccurrenceSimilarity(user_rdd)
        }
        simil_rdd
      }
    
    }
    
    object ItemSimilarity {
      /**
        * 同现相似度矩阵计算
        * w(i,j)=N(i)∩N(j)/sqrt(N(i)*N(j))
        *
        * @param user_rdd 用户评分
        * @return 返回物品相似度
        */
      def CooccurrenceSimilarity(user_rdd: RDD[ItemPref]): (RDD[ItemSimi]) = {
        //1.数据准备
        val user_rdd1: RDD[(String, String, Double)] = user_rdd.map(f => (f.userid, f.itemid, f.pref))
        val user_rdd2: RDD[(String, String)] = user_rdd1.map(f => (f._1, f._2))
        //2.(用户,物品)笛卡尔积操作=>物品与物品组合
        val user_rdd3: RDD[(String, (String, String))] = user_rdd2.join(user_rdd2)
        val user_rdd4: RDD[((String, String), Int)] = user_rdd3.map(f => (f._2, 1))
        //3.(物品,物品,频次)
        val user_rdd5: RDD[((String, String), Int)] = user_rdd4.reduceByKey((x, y) => x + y)
        //4.对角矩阵
        val user_rdd6: RDD[((String, String), Int)] = user_rdd5.filter(f => f._1._1 == f._1._2)
        //5.非对角矩阵
        val user_rdd7: RDD[((String, String), Int)] = user_rdd5.filter(f => f._1._1 != f._1._2)
        //6.计算同现相似度(物品1,物品2,同现频次)
        val user_rdd8: RDD[(String, ((String, String, Int), Int))] = user_rdd7.
          map(f => (f._1._1, (f._1._1, f._1._2, f._2))).join(user_rdd6.map(f => (f._1._1, f._2)))
        val user_rdd9: RDD[(String, (String, String, Int, Int))] = user_rdd8.map(f => (f._2._1._2, (f._2._1._1, f._2._1._2, f._2._1._3, f._2._2)))
        val user_rdd10: RDD[(String, ((String, String, Int, Int), Int))] = user_rdd9.join(user_rdd6.map(f => (f._1._1, f._2)))
        val user_rdd11: RDD[(String, String, Int, Int, Int)] = user_rdd10.map(f => (f._2._1._1, f._2._1._2, f._2._1._3, f._2._1._4, f._2._2))
        val user_rdd12: RDD[(String, String, Double)] = user_rdd11.map(f => (f._1, f._2, (f._3 / sqrt(f._4 * f._5))))
        //7.结果返回
        user_rdd12.map(f => ItemSimi(f._1, f._2, f._3))
      }
    
      /**
        * 余弦相似度矩阵计算
        * T(x,y)=∑x(i)y(i)/sqrt(∑(x(i)*y(i))*∑(y(i)*y(i)))
        *
        * @param user_rdd 用户评分
        * @return 返回物品相似度
        */
      def CosineeSimilarity(user_rdd: RDD[ItemPref]): (RDD[ItemSimi]) = {
        //1.数据准备
        val user_rdd1: RDD[(String, String, Double)] = user_rdd.map(f => (f.userid, f.itemid, f.pref))
        val user_rdd2: RDD[(String, (String, Double))] = user_rdd1.map(f => (f._1, (f._2, f._3)))
        //2.(用户,物品,评分)笛卡尔积操作=>(物品1,物品2,评分1,评分2)组合
        val user_rdd3: RDD[(String, ((String, Double), (String, Double)))] = user_rdd2.join(user_rdd2)
        val user_rdd4: RDD[((String, String), (Double, Double))] = user_rdd3.map(f => ((f._2._1._1, f._2._2._1), (f._2._1._2, f._2._2._2)))
        //3.(物品1,物品2,评分1,评分2,)组合=>(物品1,物品2,评分1*评分2)组合并累加
        val user_rdd5: RDD[((String, String), Double)] = user_rdd4.map(f => (f._1, f._2._1 * f._2._2)).reduceByKey(_ + _)
        //4.对角矩阵
        val user_rdd6: RDD[((String, String), Double)] = user_rdd5.filter(f => f._1._1 == f._1._2)
        //5.非对角矩阵
        val user_rdd7: RDD[((String, String), Double)] = user_rdd5.filter(f => f._1._1 != f._1._2)
        //6.计算相似度
        val user_rdd8: RDD[(String, ((String, String, Double), Double))] = user_rdd7.map(f => (f._1._1, (f._1._1, f._1._2, f._2))).join(user_rdd6.map(f => (f._1._1, f._2)))
        val user_rdd9: RDD[(String, (String, String, Double, Double))] = user_rdd8.map(f => (f._2._1._2, (f._2._1._1, f._2._1._2, f._2._1._3, f._2._2)))
        val user_rdd10: RDD[(String, ((String, String, Double, Double), Double))] = user_rdd9.join(user_rdd6.map(f => (f._1._1, f._2)))
        val user_rdd11: RDD[(String, String, Double, Double, Double)] = user_rdd10.map(f => (f._2._1._1, f._2._1._2, f._2._1._3, f._2._1._4, f._2._2))
        val user_rdd12: RDD[(String, String, Double)] = user_rdd11.map(f => (f._1, f._2, (f._3 / sqrt(f._4 * f._5))))
        //7.结果返回
        user_rdd12.map(f => ItemSimi(f._1, f._2, f._3))
      }
    
      /**
        * 欧氏距离相似度矩阵计算
        * d(x,y)=sqrt(∑((x(i)-y(i))*(x(i)-y(i))))
        * sim(x,y)=n/(1+d(x,y))
        *
        * @param user_rdd 用户评分
        * @return 返回物品相似度
        */
      def EuclideanDistanceSimilarity(user_rdd: RDD[ItemPref]): (RDD[ItemSimi]) = {
        //1.数据准备
        val user_rdd1: RDD[(String, String, Double)] = user_rdd.map(f => (f.userid, f.itemid, f.pref))
        val user_rdd2: RDD[(String, (String, Double))] = user_rdd1.map(f => (f._1, (f._2, f._3)))
        //2.(用户,物品,评分)笛卡尔积操作=>(物品1,物品2,评分1,评分2)组合
        val user_rdd3: RDD[(String, ((String, Double), (String, Double)))] = user_rdd2 join user_rdd2
        val user_rdd4: RDD[((String, String), (Double, Double))] = user_rdd3.map(f => ((f._2._1._1, f._2._2._1), (f._2._1._2, f._2._2._2)))
        //3.(物品1,物品2,评分1,评分2)组合=>(物品1,物品2,评分1-评分2)组合并累加
        val user_rdd5: RDD[((String, String), Double)] = user_rdd4.map(f => (f._1, (f._2._1 - f._2._2) * (f._2._1 - f._2._2))).reduceByKey(_ + _)
        //4.(物品1,物品2,评分1,评分2)组合=>(物品1,物品2,1)组合计算物品1和物品2的重叠度
        val user_rdd6: RDD[((String, String), Int)] = user_rdd4.map(f => (f._1, 1)).reduceByKey(_ + _)
        //5.非对角矩阵
        val user_rdd7: RDD[((String, String), Double)] = user_rdd5.filter(f => f._1._1 != f._1._2)
        //6.相似度计算
        val user_rdd8: RDD[((String, String), (Double, Int))] = user_rdd7.join(user_rdd6)
        val user_rdd9: RDD[(String, String, Double)] = user_rdd8.map(f => (f._1._1, f._1._2, f._2._2 / (1 + sqrt(f._2._1))))
        //7.结果返回
        user_rdd9.map(f => ItemSimi(f._1, f._2, f._3))
      }
    
    }
    

    推荐计算类RecommendedItem,推荐计算根据物品相似度和用户评分进行推荐物品计算,并过滤用户已有物品及过滤最大过滤推荐数量。

    import org.apache.spark.rdd.RDD
    import scala.collection.mutable
    
    /**
      * 物品推荐计算类
      * 通过设置模型参数,执行Recommend方法进行推荐计算,返回用户的推荐物品RDD
      * 推荐计算根据物品相似度和用户评分进行推荐物品计算,并过滤用户已有物品及过滤最大过滤推荐数量
      */
    class RecommendedItem {
    
      /**
        * 用户推荐计算
        *
        * @param items_similar 物品相似度
        * @param user_prefer   用户评分
        * @param r_number      推荐数量
        * @return 返回用户推荐物品
        */
      def Recommend(items_similar: RDD[ItemSimi], user_prefer: RDD[ItemPref], r_number: Int): (RDD[UserRecomm]) = {
        //1.数据准备
        val rdd_app1_R1: RDD[(String, String, Double)] = items_similar.map(f => (f.itemid1, f.itemid2, f.similar))
        val user_prefer1: RDD[(String, String, Double)] = user_prefer.map(f => (f.userid, f.itemid, f.pref))
        //2.矩阵计算(i行j列join)
        val rdd_app1_R2: RDD[(String, ((String, Double), (String, Double)))] = rdd_app1_R1.map(f => (f._1, (f._2, f._3))).join(user_prefer1.map(f => (f._2, (f._1, f._3))))
        //3.矩阵计算(i行j列相乘)
        val rdd_app1_R3: RDD[((String, String), Double)] = rdd_app1_R2.map(f => ((f._2._2._1, f._2._1._1), f._2._2._2 * f._2._1._2))
        //4.矩阵计算(用户:元素累加求和)
        val rdd_app1_R4: RDD[((String, String), Double)] = rdd_app1_R3.reduceByKey((x, y) => x + y)
        //5.矩阵计算(用户:对结果过滤已有物品)
        val rdd_app1_R5: RDD[(String, (String, Double))] = rdd_app1_R4.leftOuterJoin(user_prefer1.map(f => ((f._1, f._2), 1))).filter(f => f._2._2.isEmpty).map(f => (f._1._1, (f._1._2, f._2._1)))
        //6.矩阵计算(用户:用户对结果排序,过滤)
        val rdd_app1_R6: RDD[(String, Iterable[(String, Double)])] = rdd_app1_R5.groupByKey()
        val rdd_app1_R7: RDD[(String, Iterable[(String, Double)])] = rdd_app1_R6.map(f => {
          val i2: mutable.Buffer[(String, Double)] = f._2.toBuffer
          val i2_2: mutable.Buffer[(String, Double)] = i2.sortBy(_._2)
          if (i2_2.length > r_number) i2_2.remove(0, (i2_2.length - r_number))
          (f._1, i2_2.toIterable)
        })
        val rdd_app1_R8: RDD[(String, String, Double)] = rdd_app1_R7.flatMap(f => {
          val id2: Iterable[(String, Double)] = f._2
          for (w <- id2) yield (f._1, w._1, w._2)
        })
        rdd_app1_R8.map(f => UserRecomm(f._1, f._2, f._3))
      }
    
      /**
        * 用户推荐计算
        *
        * @param items_similar 物品相似度
        * @param user_prefer   用户评分
        * @return 返回用户推荐物品
        */
      def Recommend(items_similar: RDD[ItemSimi], user_prefer: RDD[ItemPref]): (RDD[UserRecomm]) = {
        //1.数据准备
        val rdd_app1_R1: RDD[(String, String, Double)] = items_similar.map(f => (f.itemid1, f.itemid2, f.similar))
        val user_prefer1: RDD[(String, String, Double)] = user_prefer.map(f => (f.userid, f.itemid, f.pref))
        //2.矩阵计算(i行和j列join)
        val rdd_app1_R2: RDD[(String, ((String, Double), (String, Double)))] = rdd_app1_R1.map(f => (f._1, (f._2, f._3))).join(user_prefer1.map(f => (f._2, (f._1, f._3))))
        //3.矩阵计算(i行j列元素相乘)
        val rdd_app1_R3: RDD[((String, String), Double)] = rdd_app1_R2.map(f => ((f._2._2._1, f._2._1._1), f._2._2._2 * f._2._1._2))
        //4.矩阵计算(用户:元素累加求和)
        val rdd_app1_R4: RDD[((String, String), Double)] = rdd_app1_R3.reduceByKey((x, y) => x + y)
        //5.矩阵计算(用户:对结果过滤已有物品)
        val rdd_app1_R5: RDD[(String, (String, Double))] = rdd_app1_R4.leftOuterJoin(user_prefer1.map(f => ((f._1, f._2), 1))).filter(f => f._2._2.isEmpty).map(f => (f._1._1, (f._1._2, f._2._1)))
        //6.矩阵计算(用户:用户对结果排序,过滤)
        val rdd_app1_R6: RDD[(String, String, Double)] = rdd_app1_R5.map(f => (f._1, f._2._1, f._2._2)).sortBy(f => (f._1, f._3))
        rdd_app1_R6.map(f => UserRecomm(f._1, f._2, f._3))
      }
    
    }
    
    

    基于物品推荐类ItemCF。

    package itemCF
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    //基于物品推荐
    object ItemCF {
      def main(args: Array[String]): Unit = {
        //1.构建Spark对象
        val conf: SparkConf = new SparkConf().setAppName("ItemCF").setMaster("local[2]")
        val sc = new SparkContext(conf)
        Logger.getRootLogger.setLevel(Level.WARN)
    
        //2.读取数据
        val data_path = "hdfs://node-1:9000/spark_data/sample_itemCF.txt"
        val data: RDD[String] = sc.textFile(data_path)
        val user_data: RDD[ItemPref] = data.map(_.split(",")).map(f => (ItemPref(f(0), f(1), f(2).toDouble))).cache()
    
        //3.建立模型
        val mysimil = new ItemSimilarity()
        val simil_rdd1: RDD[ItemSimi] = mysimil.Similarity(user_data,"cooccurrence")
        val recommd = new RecommendedItem
        val recommd_rdd1: RDD[UserRecomm] = recommd.Recommend(simil_rdd1,user_data,30)
    
        //4.打印结果
        println(s"物品相似度矩阵(物品i,物品j,相似度):${simil_rdd1.count()}")
        simil_rdd1.collect().foreach{ ItemSimi =>
          println(ItemSimi.itemid1+","+ItemSimi.itemid2+","+ItemSimi.similar)
        }
        println(s"用户推荐列表(用户,物品,推荐值):${recommd_rdd1.count()}")
        recommd_rdd1.collect().foreach{
          UserRecomm=> println(UserRecomm.userid+","+UserRecomm.itemid+","+UserRecomm.pref)
        }
    
        sc.stop()
      }
    }
    
    

    2.3 运行结果(亲测可行)

    物品相似度矩阵(物品i,物品j,相似度)10
    2,4,0.3333333333333333
    3,4,0.3333333333333333
    4,2,0.3333333333333333
    3,2,0.3333333333333333
    1,2,0.6666666666666666
    4,3,0.3333333333333333
    2,3,0.3333333333333333
    1,3,0.6666666666666666
    2,1,0.6666666666666666
    3,1,0.6666666666666666
    用户推荐列表(用户,物品,推荐值)11
    4,3,0.6666666666666666
    4,1,0.6666666666666666
    6,2,0.3333333333333333
    6,3,0.3333333333333333
    2,4,0.3333333333333333
    2,2,1.0
    5,4,0.6666666666666666
    3,2,0.6666666666666666
    3,1,0.6666666666666666
    1,4,0.3333333333333333
    1,3,1.0
    
    Process finished with exit code 0
    

    三、Spark MLlib推荐算法

    训练数据:

    196    242    3    881250949
    186    302    3    891717742
    22    377    1    878887116
    244    51    2    880606923
    166    346    1    886397596
    298    474    4    884182806
    115    265    2    881171488
    253    465    5    891628467
    
    import org.apache.spark.mllib.recommendation.{ALS, Rating}
    import org.apache.spark.{SparkConf, SparkContext}
    /**
      * 协同过滤最小二乘法demo,基于用户推荐:根据用户的相似度来为某个用户推荐物品
      */
    object UserCFDemo {
     
      /**
        * 解析String,获取Rating
        * @param str
        * @return
        */
      def parseRating(str:String):Rating={
        val fields = str.split("	")
        Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble)
      }
     
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("UserCFDemo").setMaster("local[*]")
        val sc = new SparkContext(conf)
        //用户评价数据:用户ID  影片ID  星级  时间戳
        val ratingData = sc.textFile("E:\test\ml-100k\u.data")
        //读取数据,生成RDD并转换成Rating对象
        val ratingsRDD = ratingData.map(parseRating(_))
        //隐藏因子数(也即隐藏特征,理论讲因子数越多效果越好,但太多也会造成训练模型和保存时所需的内存开销,所以一般可取50~200之间)
        val rank=50
        //最大迭代次数(每次迭代都能降低评价矩阵的重建误差,但一般经少数次迭代后ALS模型已能收敛成一个比较合理的好模型)
        val maxIter=10
        //正则化因子(该参数控制模型的正则化过程,从而控制模型的过拟合情况)
        val lambda=0.01
        //训练模型
        val model = ALS.train(ratingsRDD,rank,maxIter,lambda)
     
        //从电影ID到标题的映射
        val movies = sc.textFile("E:\test\ml-100k\u.item")
        val titles = movies.map(line=>line.split("\|")).map(array=>(array(0).toInt,array(1))).collectAsMap()
     
        //推荐物品(电影)数量
        val K=10
        //用户1
        val user1=66
        //推荐结果
        val topKRecs = model.recommendProducts(user1,K)
        println("用户"+user1)
        topKRecs.foreach(rec=>{
          val movie = titles(rec.product)
          val rating = rec.rating
          println(s"推荐电影:$movie ,预测评分:$rating")
        })
      }
    }
    

    上面代码假设为用户ID=66的用户推荐10部电影,推荐结果以及该用户对推荐电影的预测评分,并按预测评分从高到低排序如下所示:
    在这里插入图片描述

    四、基于物品的Spark MLlib代码

    训练数据:

    196    242    3    881250949
    186    302    3    891717742
    22    377    1    878887116
    244    51    2    880606923
    166    346    1    886397596
    298    474    4    884182806
    115    265    2    881171488
    253    465    5    891628467
    
    import org.apache.spark.mllib.recommendation.{ALS, Rating}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.jblas.DoubleMatrix
    /**
      * 协同过滤demo2,基于物品的推荐:根据物品的相似度给某个用户推荐物品
      */
    object ItemCFDemo {
     
      def parseRating(str:String):Rating={
        val fields = str.split("	")
        Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble)
      }
     
      /**
        * 计算两个向量的余弦相似度,1为最相似,0为不相似,-1为相反
        * 余弦相似度=向量的点积/各向量范数的乘积     值域为[-1,1]
        * @param vec1 向量1
        * @param vec2 向量2
        * @return
        */
      def cosineSimilarity(vec1:DoubleMatrix,vec2:DoubleMatrix)={
        vec1.dot(vec2)/(vec1.norm2()*vec2.norm2())
      }
     
     
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("ItemCFDemo")
        val sc = new SparkContext(conf)
        //构建ALS模型
        val ratingData = sc.textFile("E:\test\ml-100k\u.data")
        val ratingsRDD = ratingData.map(parseRating(_))
        //四个参数:评级RDD、
        val model = ALS.train(ratingsRDD,50,10,0.01)
     
        //从电影ID到标题的映射
        val movies = sc.textFile("E:\test\ml-100k\u.item")
        val titles = movies.map(line=>line.split("\|")).map(array=>(array(0).toInt,array(1))).collectAsMap()
     
        //获取给定物品在模型中对应的因子,并构建成向量
        val itemId=567
        val itemFactor: Array[Double] = model.productFeatures.lookup(itemId).head
        val itemVector = new DoubleMatrix(itemFactor)
     
        //求各个物品的余弦相似度
        val sims = model.productFeatures.map {
          case (id, factor) =>
            val factorVector = new DoubleMatrix(factor)
            val sim = cosineSimilarity(factorVector, itemVector)
            (id, sim)
        }
        //取出余弦相似度最高的10个,即为跟给定物品最相似的10种物品
        val sortedSims: Array[(Int, Double)] = sims.top(10)(Ordering.by[(Int,Double),Double]{case (id,similarity)=>similarity})
     
        println("与"+titles(itemId)+"最为相似的10部电影:")
        sortedSims.map{case (id,sim)=>(titles(id),sim)}.foreach(tuple=>println("电影:"+tuple._1+",相似度:"+tuple._2))
      }
    }
    

    基于物品的推荐,就要找出一样相似的那些物品,如上代码为物品ID=567(代码也实现了电影ID到电影名称的映射,该电影名称为《Wes Craven’s New Nightmare》)的电影找出相似的10部电影,并按照相似度从高到低排序,输出结果:

    在这里插入图片描述

    推荐模型效果的评估

    如何知道训练出来的模型是一个好模型?这就需要某种方法来评估它的预测结果。

    评估指标(evaluation metric)指那些衡量模型预测能力或准确度的方法,提供了同一模型在不同参数下,又或是不同模型之间进行比较的标准方法。通过这些指标,人们可以从待选的模型中找出表现最好的那个模型。

    均方差(Mean Squared Error,MSE)直接衡量“用户-物品”评级矩阵的重建误差。它也是一些模型里所采用的的最小化目标函数,特别是许多矩阵分解类方法,比如ALS。因此,它常用于显式评级的情形。它的定义为各平方误差的和与总数目的商。其中平方误差是指预测到的评级与真实评级的差值的平方。公式:
    在这里插入图片描述
    均方根误差(Root Mean Squared Error,RMSE)的使用也很普遍,其计算只需在MSE上取平方根即可,它等同于求预计评级和实际评级的差值的标准差,即:
    在这里插入图片描述
    代码如下:

    import learn.recommend.ItemCFDemo.parseRating
    import org.apache.spark.mllib.evaluation.RegressionMetrics
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.mllib.recommendation.{ALS, Rating}
     
    /**
      * 均方误差测试:均方误差越小,模型越好
      */
    object MSEDemo {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("ItemCFDemo")
        val sc = new SparkContext(conf)
        val ratingData = sc.textFile("E:\test\ml-100k\u.data")
        val ratingsRDD = ratingData.map(parseRating(_))
        //训练ALS模型
        val model = ALS.train(ratingsRDD,50,10,0.01)
     
     
        val usersProducts = ratingsRDD.map{
          case Rating(user, product, rating)=>(user,product)
        }
        //获取所有预测评级
        val predictions = model.predict(usersProducts).map {
          case Rating(user, product, rating) => ((user, product), rating)
        }
     
        //所有真实评级
        val ratings = ratingsRDD.map{case Rating(user,product,rating)=>((user,product),rating)}
     
        //关联两个RDD,得到((user,product),(真实评级,预测评级)) RDD
        val ratingsAndPredictions = ratings.join(predictions)
     
    //    val MSE = ratingsAndPredictions.map{
    //      case ((user,product),(actual,predicted))=>
    //        math.pow((actual-predicted),2)}.reduce(_+_)/ratingsAndPredictions.count
     
        val predictedAndTrue = ratingsAndPredictions.map {
          case ((user, product), (predicted, real)) => (predicted, real)
        }
        //求解MSE和RMSE
        val regressionMetrics = new RegressionMetrics(predictedAndTrue)
        val MSE = regressionMetrics.meanSquaredError
        val RMSE = regressionMetrics.rootMeanSquaredError
        println("Mean Squared Error="+MSE)
        println("Root Mean Squared Error="+RMSE)
      }
    }
    

    输出结果:

    Mean Squared Error=0.0838857007108934
    Root Mean Squared Error=0.2896302827932421

    从定义可知,MSE(或RMSE)越小,则说明模型越好,越贴合实际。

  • 相关阅读:
    面向对象之绑定方法与非绑定方法
    Django-admin源码解析
    单例模式
    Django-admin管理工具
    Django-CBV
    叠加装饰器,三元表达,生成,调用,递归
    迭代器与生成器
    有参装饰器
    储备知识与装饰器
    文件修改
  • 原文地址:https://www.cnblogs.com/aixing/p/13327220.html
Copyright © 2011-2022 走看看