zoukankan      html  css  js  c++  java
  • 电影推荐系统项目的数据处理部分

    这个项目的整体业务逻辑是通过Spring进行搭建,并部署在Tomcat上的。业务产生的数据一部分被存储到mongoDB并用于spark sql和ml的离线计算。另一部分被传送到Flume,经kafka到达spark streaming进行实时计算。还有一部分数据存储到redis,同样运用到spark streaming上。本文主要关注spark相关的部分。项目的原始实现主要基于RDD,而且有不少低效的代码实现。本文在此基础上对80%的spark相关代码进行了重写,使新的实现在运行效率上提高了两倍以上、内存使用减少了几倍、代码量也减少近一半。

    项目架构

    综合业务:Spring、Tomcat

    数据存储:业务数据MongoDB、搜索服务数据ES、缓存数据Redis

    离线和实施推荐:Spark DF、ML、Streaming

    消息服务:Kafka

    1. 【数据加载】

      数据加载服务,主要用于项目的数据初始化,用于将三个数据集(Movies【电影的数据集】、Rating【用户对于电影的评分】、Tags【用户对于电影的标签】)初始化到Mongodb数据库以及ElasticSearch里面。

    2. 【离线推荐】

      • 通过Azkaban周期性的调度【离线统计服务】和【离线推荐服务】。
      • 【离线统计服务】
        • 最热电影统计算法  =>  RateMoreMovies表中
        • 当前最热电影统计算法   =>  RateMoreRecentlyMovies 表中
        • 电影的平均评分算法   =>  AvgMovies 表中
        • 电影类别TOP10推荐   =>   GenresTopMovies表中。
      • 【离线推荐服务】
        • 通过Spark ALS算法计算模型的训练
        • 基于Model产生电影相似度矩阵   =>  MovieRecs表中
        • 基于Model产生用户推荐矩阵    =>  UserRecs表中
    3. 【实时推荐】

      • 当一个用户完成了对电影的评分之后,触发实时推荐,后台服务将评分数据实时写入到LOG日志中。
      • Kafka会通过kafkaStream程序将log队列中的数据进行格式化,然后将数据推送到recommender的队列中
      • Spark Streaming实时推荐程序读取kafka中推送过来的数据,配合读取Redis缓存中的数据,运行实时推荐算法【基于模型的推荐】,把结果数据写入到MongoDB的StraemRecs表中
    4. 【推荐结果的聚合】

      • 综合业务服务会根据一定的比例聚合 【离线推荐服务(ALS的协同过滤)】、【基于内容的推荐(基于ElasticSearch More like this功能)】、【实时推荐服务(基于模型的推荐)】这些结果。
      • 聚合完成之后,将结果返回到用户端。

    前期工作:数据加载

    利用Spark SQL将数据分别导入到 MongoDB 和 ElasticSearch

    目标

    • MongoDB

      1. 将Movie【电影数据集】/ Rating【用户对电影的评分数据集】/ Tag【用户对电影的标签数据集】数据集分别加载到MongoDB数据库中的Movie / Rating / Tag表中
    • ElasticSearch

      1. 需要将Movie【电影数据集】加载到ElasticSearch名叫Movie的Index中。需要将Tag数据和Movie数据融合。

    步骤

    1. 先新建一个Maven项目,将依赖添加好。
    2. 分析数据集Movie、Rating、Tag
    3. Spark DF加载数据集
    4. 将DF加载到MongoDB中:
      • 将原来的Collection全部删除
      • 通过DF的write方法将数据写入
      • 创建数据库索引
      • 关闭MongoDB连接
    5. 将DF加载到ElasticSearch中:
      • 将存在的Index删除掉,然后创建新的Index
      • 通过DF的write方法将数据写入
    6. 关闭Spark集群

    数据格式说明

    // MONGO_MOVIE_COLLECTION,包含电影ID、描述、时长、发行日期、拍摄日期、语言、类型、演员、导演
    case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String, shoot: String, language: String, genres: String, actors: String, directors: String)
    
    // MONGO_RATING_COLLECTION,包含用户ID、电影ID、用户对电影的评分、用户评分时间
    case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int)
    
    // MONGO_TAG_COLLECTION,包含用户ID、电影ID、标签的具体内容、用户打标签时间
    case class Tag(uid: Int, mid: Int, tag: String, timestamp: Int)
    

    加载数据到MongoDB

    // 由于上面三种数据的加载处理步骤一样,这里就只展示其中之一的MovieRating。
    // 先构建schema,尽管spark有推断功能,但是为了稳定,还是自己定义schema比较好。
    val ratingSchema =  StructType(Array(
      StructField("uid", IntegerType, true),
      StructField("mid", IntegerType, true),
      StructField("score", DoubleType, true),
      StructField("timestamp", IntegerType, true)
      ))
    
    // 读取数据
    val ratingDF = spark.read
      .format("csv")
      .schema(ratingSchema)
      .load(RATING_DATA_PATH)
    
    // 新建一个到MongoDB的连接
    val mongoClient = MongoClient(MongoClientURI(mongodbConfig.uri))
    // 如果MongoDB中有对应的数据库,那么应该删除
    mongoClient(mongodbConfig.db)(MONGO_RATING_COLLECTION).dropCollection()
    
    // 写入数据
    ratingDF
          .write
          .option("uri", mongodbConfig.uri)
          .option("collection", MONGO_RATING_COLLECTION)
          .mode("overwrite")
          .format(MONGO_DRIVER_CLASS)
          .save()
    
    // 对数据表建索引
    mongoClient(mongodbConfig.db)(MONGO_RATING_COLLECTION).createIndex(MongoDBObject("uid" -> 1))
    
    // 关闭MongoDB连接
    mongoClient.close()
    

    将数据写入ES

    这里先讲Tag数据进行转换,并与movie表合并,最后将这个表加载到ES。

    // 对tag数据进行处理,使之变为下面格式
    /**
    * |MID | Tags |
    * |1   | tag1|tag2|tag3|tag4....|
    */
    val newTag = tagDF.groupBy($"mid")
      .agg(concat_ws("|", collect_set($"tag")).as("tags"))
      .select("mid", "tags")
    
    // 需要将处理后的Tag数据,和Moive数据融合,产生新的Movie数据
    val joinExpression = movieDF.col("mid") === newTag.col("mid")
    val movieWithTagsDF = movieDF.join(newTag, joinExpression, "left")
    
    // 需要将新的Movie数据保存到ES中
    //新建一个配置
    val settings = Settings.builder()
      .put("cluster.name", eSConfig.clustername)
      .build()
    
    //新建一个ES的客户端
    val esClient = new PreBuiltTransportClient(settings)
    
    //需要将 TransportHosts 添加到 esClient 中
    val REGEX_HOST_PORT = "(.+):(\d+)".r
    eSConfig.transportHosts.split(",")
      .foreach {
        // 通过正则表达提取
        case REGEX_HOST_PORT(host: String, port: String) =>
          // 将所有的 es节点 的地址都加入到 esClient 中
          esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port.toInt))
      }
    
    //需要清除掉ES中遗留的数据
    if (esClient.admin().indices().exists(
      new IndicesExistsRequest(eSConfig.index)).actionGet().isExists) {
      esClient.admin().indices().delete(new DeleteIndexRequest(eSConfig.index))
    }
    esClient.admin().indices().create(new CreateIndexRequest(eSConfig.index))
    
    //将数据写入到ES中
    movieDF
      .write
      .option("es.nodes", eSConfig.httpHosts)
      .option("es.http.timeout", "100m")
      .option("es.mapping.id", "mid") // es地址中取得的id值会被赋值到 _id ,这和 mid 不一样,所以用 mapping 做转换
      .mode("overwrite")
      .format(ES_DRIVER_CLASS)
      .save(eSConfig.index + "/" + ES_TYPE)
    

    离线推荐

    统计推荐

    目标

    1. 优质电影

      获取所有历史数据中评分最多的电影的集合,统计每个电影的评分数 => RateMoreMovies

    2. 热门电影

      认为按照月来统计,这个月中评分数量最多的电影我们认为是热门电影,统计每个月中的每个电影的评分数量 => RateMoreRecentlyMovie

    3. 统计电影的平均评分

      将Rating数据集中所有的用户评分数据进行平均,计算每一个电影的平均评分 => AverageMovies

    4. 统计每种类别电影的TOP10电影

      将每种类别的电影中评分最高的10个电影分别计算出来, => GenresTopMovies

    步骤

    1. 新建一个项目StaticticsRecommender

    2. 统计所有历史数据中电影的评分个数

      通过Rating数据集,用mid进行groupby操作,count计算总数

    3. 统计以月为单位的电影的评分个数

      • 利用from_unixtime对timestamp进行转换,结果格式为yyyyMM
      • 通过groupby 年月,mid 来完成统计
    4. 统计每个电影评分得分
      通过Rating数据集,用户mid进行group by操作,avg计算评分

    5. 统计每种类别中评分最高的10个电影

      • 需要通过JOIN操作将电影的平均评分数据和Movie数据集进行合并,产生MovieWithScore数据集
      • 利用split和explode将MovieWithScore拆开为以一种Genre为开头的MovieWithScore
      • 此处有两种方案(这里展示第一种,但最好的是建立Aggregator,在ALS离线推荐中介绍):
        • 通过Genre作为Key,进行groupByKey操作,将相同电影类别的电影进行聚集并按评分取TOP10。(TopN利用PriorityQueue实现)
        • 利用repartitionAndSortWithinPartitions、mapPartition和groupSorted(轮子)实现分Genre的TOP10。
      • 上面数据转化为DF后输出到MongoDB中

    离线计算由原来的RDD改为基于Spark DataFrame的实现,而DF本身比RDD具有更多的默认优化,比如Tungsten运用了堆外内存,减少GC开销、序列化比RDD的Java默认序列化更加高效、全代码生成等优势。上述是效率提升是理论上的,下面是实际上的。

    首先,DF相比于RDD能这大大地减少了代码量。下面的功能就是简单地统计每部电影的评分数、以月为单位统计每部电影的评分数、每部电影平均分、不同类型电影的评分top10(即分组topn)。这里需要连接电影数据集和评分集,前者包含了电影类型,后者包含平均分。原代码使用RDD的groupByKey,这是一种低效的操作,因为它不会进行map-side聚合,而且分组后使用map,直接调用sortWith对每个分组进行排序,再用slice进行取top10。这种实现并不好,在Scala中运用sortWith虽然方便,但是效率很低,它比toArray.sort慢不少。slice也是,用take更高效。但实际上这里RDD的实现也不需要用全排序再取前10。利用aggregateByKey加优先队列,即堆排的实现,效率更高。其用时由原来的20多秒变为6秒左右。当然,在DF中可用windowfunction来实现topn,但是windowfunction的底层实现是先shuffle然后sort,最后根据所定义的窗口进行相应的聚合。这一过程同样没有map-side聚合,所以尽管数据量少时速度更快,但在数据量大时或许不是一个好的topn实现。其实我觉得最好的topn实现应该是自定义一个aggregator,这个在离线推荐部分会提到。

    // 统计所有历史数据中电影的评分个数
    val rateMoreMoviesDF = ratingDF
      .groupBy($"mid")
      .agg(count($"mid"))
    
    // 统计以月为单位的电影评分个数
    val dateFormat = "yyyyMM"
    val rateMoreRecentLyMovies = ratingDF
      .withColumn("yearmouth", from_unixtime($"timestamp", dateFormat))
      .groupBy($"yearmouth", $"mid")
      .agg(count($"mid") as "count")
      .select($"mid", $"count", $"yearmouth")
    
    // 统计每个电影的平均得分
    val averageMoviesDF = ratingDF
      .select($"mid", $"score")
      .groupBy($"mid")
      .agg(avg($"score").as("avg"))
    
    // 计算不同类别电影的Top10
    val joinExpression = movieDF.col("mid") === averageMoviesDF.col("mid")
    // 用 inner join 将没有评分的电影会除去
    val movieWithScoreRDD = movieDF
      .join(averageMoviesDF, joinExpression, "inner")
      .withColumn("splitted_genres", split($"genres", "\|"))
      .withColumn("single_genre", explode($"splitted_genres"))
      .map(row => {
        (row.getAs[String]("single_genre"), (row.getAs[Int]("mid"), row.getAs[Double]("avg")))
      })
      .rdd
    
    val ord = new Ordering[(Int, Double)] {
      override def compare(x: (Int, Double), y: (Int, Double)): Int = {
        val compare2 = x._2.compareTo(y._2)
        if (compare2 != 0) -compare2 else 0
      }
    }
    
    // 利用aggregateByKey和PriorityQueue实现map-side aggregation
    val res = movieWithScoreRDD
      .aggregateByKey(new mutable.PriorityQueue[(Int, Double)]()(ord))((acc, v) => {
      if (acc.length < 10) {
        acc.enqueue(v)
        acc
      } else {
        acc += ord.min(acc.dequeue(), v)
      }
    },
      (acc1, acc2) => {
        val acc0 = acc1 ++ acc2
        while (acc0.length > 10) {
          acc0.dequeue()
        }
        acc0
      }).toDF()
    

    ALS离线推荐

    目标

    1. 训练ALS推荐模型并计算出属于用户各自的Top20电影推荐
    2. 计算电影相似度矩阵

    步骤

    1. 训练ALS推荐模型

      • 实例化ALS模型,按常规的模型训练流程训练模型后调用recommendForAllUsers完成推荐。
    2. 计算电影相似度矩阵

      • 获取电影的特征矩阵,转换成DoubleMatrix
      • 电影的特征矩阵之间做笛卡尔积,通过余弦相似度计算两个电影的相似度
      • 将数据通过GroupBy处理后,输出
    3. ALS模型的参数选择

      • 通过计算ALS的均方根误差来判断参数的优劣程度

    ALS模型需要三列,用户id、电影id和评分,并从中计算出用户矩阵和电影矩阵。项目需要给每个用户推荐20部电影,此处我直接使用了2.2提供的recommendForAllUsers函数便直接得到了结果。但后续项目还要求求出电影的相似度矩阵,用于后续实时推荐计算。这个算法spark没有现成的实现,但是思路根recommendForAllUsers是一样的。就是先求电影矩阵自身的cross join,然后求每部电影对除其自身外的其他所有电影的余弦相似度。公式为两个向量的点积除以两个向量长度的乘积。然后取出每部电影中,余弦相似度前100的电影,这样子构建出电影的相似度矩阵。我根据spark的recommendForAll的源码,重新实现了这一功能,这比原代码的实现快上1倍,而且cross join产生的中间数据的内存也减少了几倍,由几G到了几十M。实际上这里也得出了一个cross join的优化方式.....然后就是取每部电影的前100相似的电影。这里同样是分组TopN,但源码使用了私有的TopNAggregator。同样我也实现了一个,这是我觉得在数据量大时最好的TopN实现。首先它基于DS,也享受了DF的部分优化(比如Catalyst、序列化等)这是优于rdd的aggregateByKey的部分,然后,它包含了map-side聚合,减少了shuffle期间需要传输的数据,这是优于windowfunc的部分。

    // 训练模型的一般步骤
    val alsModel = new ALS()
    val paramGrid = new ParamGridBuilder()
      .addGrid(alsModel.rank, Seq(5))
      .addGrid(alsModel.regParam, Seq(1.0))
      .build()
    
    val regEval = new RegressionEvaluator()
      .setLabelCol("score")
      .setPredictionCol("prediction")
      .setMetricName("rmse")
    
    val tvs = new TrainValidationSplit()
      .setTrainRatio(0.5)
      .setEstimatorParamMaps(paramGrid)
      .setEstimator(alsModel)
      .setEvaluator(regEval)
    
    val trainedModel = tvs.fit(trainData)
    val bestModel = trainedModel.bestModel.asInstanceOf[ALS]
    val fittedALS = bestModel.fit(trainData)
    
    // 2.2.0 新增方法,直接可以对训练集中的 user 或 item 进行推荐
    val perUserPredictions = fittedALS.recommendForAllUsers(20)
    
    // 计算电影相似度矩阵,Spark2.2只有根据用户推荐电影,或者反过来,并没有更具电影推荐电影的,但是思路其实是一样的,这里根据spark源码中recommendForAllUsers的原理实现电影相似度的计算。
    // 提取item因子
    val movieFeatures = fittedALS.itemFactors
    // cross join优化的预处理
    val ready2Crossjoin = movieFeatures.as[(Int, Array[Float])]
      .mapPartitions(_.grouped(4096))
    
    val ratings = ready2Crossjoin
      .crossJoin(ready2Crossjoin)
      .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
      .flatMap {
        case (mf1Iter, mf2Iter) =>
          val m1 = mf1Iter.size
          val m2 = math.min(mf2Iter.size, 100)
          var i = 0
          val output = new Array[(Int, Int, Float)](m1 * m2)
          val pq = mutable.PriorityQueue[(Int, Float)]()(Ordering.by(-_._2))
          val vectorOp = new F2jBLAS
          mf1Iter.foreach { case (m1Id, mf1Factor) =>
            mf2Iter.foreach { case (m2Id, mf2Factor) =>
              if (m1Id == m2Id) {
                // do nothing
              } else {
                val simScore = consinSim(ALSRank, vectorOp, mf1Factor, mf2Factor)
                if (pq.length < m2) {
                  pq.enqueue((m2Id, simScore))
                } else {
                  val temp = pq.dequeue()
                  pq += (if (temp._2 > simScore) temp else (m2Id, simScore))
                }
              }
            }
            pq.foreach { case (mf2Id, score) =>
              output(i) = (m1Id, mf2Id, score)
              i += 1
            }
            pq.clear()
          }
          output.toSeq
      }
    
    // TopNAggregator在下面进行介绍。
    val topNAggregator = new TopNAggregator[Int, Int, Float](10, Ordering.by(-_._2))
    val rowRes = ratings.as[(Int, Int, Float)]
      .groupByKey(_._1)
      .agg(topNAggregator.toColumn)
      .toDF("moviceID", "similar_movieID")
    
    // 用于转换数据格式,实际上是将“_2”中的FloatType转换为DoubleType
    val arrayType = ArrayType(
      new StructType()
        .add("_1", IntegerType)
        .add("_2", DoubleType)
    )
    
    // 得出结果
    val res = rowRes.select($"moviceID", $"similar_movieID".cast(arrayType))
    

    TopNAggregator的实现

    class TopNAggregator[K1: TypeTag, K2: TypeTag, V: TypeTag](num: Int, ord: Ordering[(K2, V)])
      extends Aggregator[(K1, K2, V), mutable.PriorityQueue[(K2, V)], Array[(K2, V)]] {
    
      override def zero: mutable.PriorityQueue[(K2, V)] = new mutable.PriorityQueue[(K2, V)]()(ord)
    
      override def reduce(q: mutable.PriorityQueue[(K2, V)],
                           a: (K1, K2, V)): mutable.PriorityQueue[(K2, V)] = {
        if (q.size < num) {
          q += ((a._2, a._3))
        } else {
          q += ord.min((a._2, a._3), q.dequeue)
        }
      }
    
      override def merge(q1: mutable.PriorityQueue[(K2, V)],
                          q2: mutable.PriorityQueue[(K2, V)]): mutable.PriorityQueue[(K2, V)] = {
        q1 ++= q2
        while (q1.length > num) {
          q1.dequeue()
        }
        q1
      }
    
      override def finish(r: mutable.PriorityQueue[(K2, V)]): Array[(K2, V)] = {
        r.toArray.sorted(ord.reverse)
      }
    
      override def bufferEncoder: Encoder[mutable.PriorityQueue[(K2, V)]] = {
        Encoders.kryo[mutable.PriorityQueue[(K2, V)]]
      }
    
      override def outputEncoder: Encoder[Array[(K2, V)]] = ExpressionEncoder[Array[(K2, V)]]()
    }
    

    实时推荐

    日志预处理:构建kafka流,从大量日志数据中过滤出特定前缀的用户评分log,并传到recommender topic让下面的spark streaming接收。

    1. 构建LogProcessor实现Processor接口,实现对于数据的处理
    2. 构建StreamsConfig配置数据
    3. 构建TopologyBuilder来设置数据处理拓扑关系,就是输入、处理器和输出的关系。
    4. 构建KafkaStreams来启动整个处理

    前提

    1. 在Redis集群中存储了每一个用户最近对电影的K次评分。实时算法可以快速获取。
    2. 离线推荐算法已经将电影相似度矩阵提前计算到了MongoDB中。
    3. Kafka已经获取到了用户实时的评分数据。

    读取MonogDB中的相似度矩阵然后广播。读取kafka stream,提取里面的uid、mid、score评分、timestamp,即某个用户对某部电影的进行评分从而产生的日志,然后foreachRDD,对每条日志执行下面三个计算。从被广播的相似度矩阵中获取这次被评分的电影P最相似的K个电影(当然,实现还会从评分集中获取用户已看过的电影,并将这些电影过滤掉)这些作为推荐的候选电影。然后获取用户最近N次电影评分,实际是从redis中读取,这些数据用于调整候选电影的优先级别。后面更多就是利用Scala实现算法的思想了,和spark等技术相关性不大。首先新建一个数组存储候选电影的评分、两个map存储候选电影的增强和减弱因子。然后对每个候选电影进行遍历,遍历中再遍历用户最近评分的电影。针对每个候选电影,看用户最近评分的电影中有没有与它相似的,有的话返回其在相似度表中的分数,没有就返回0.0。然后如果相似度大于0.6,就将这个相似度作为权重,乘以用户对其相似电影的评分。得出候选电影的初步评分。然后根据用户对该相似电影的评分,如果大于3,则候选电影的增强因子+1,否则在减弱因子中-1。最后计算所有部候选电影的初步评分的均值,并加上其增强因子的对数,减去其减弱因子的对数,得出最终得分,并把这个得分表插入到MongoDB用于用户评分后的推荐。

    // 获取当前最近的M次电影评分。实际是从redis中读取,结果为Array[(Int, Double)],int是mid,double是评分
    val userRecentlyRatings = getUserRecentRatings(ConnHelper.jedis, uid, MAX_USER_RATINGS_COUNT)
    
    // 获取这次被评分的电影P最相似的K个电影。结果为Array[Int]。思路是先从广播中的map获取相似电影Array[(Int, Double)],然后从MONGO_RATING_COLLECTION中获取用户已看过的电影,得到Array[Int],最后从相似的电影中filter用户没看过的,且只返回mid。
    // simMovies实际上就是待推荐电影(仅仅根据相似度表得出的结果)。从电影相似度表中找出与用户本次评分的电影相似的电影,并去掉用户已经看过的。
    val simMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM, mid, uid, simMoviesBV.value)
    
    // 计算待选电影的推荐优先级别
    val streamRecs = computeMovieScores(simMoviesBV.value, userRecentlyRatings, simMovies)
    // 具体实现如下
    def computeMovieScores(simMovies: Map[Int, Array[(Int, Double)]], userRecentlyRatings: Array[(Int, Double)],
                           topSimMovie: Array[Int]): Array[(Int, Double)] ={
      // 用于保存每个待选电影和最近评分的每一个电影的权重得分
      val score = ArrayBuffer[(Int, Double)]()
    
      // 用于保存每个电影的增强因子
      val increMap = mutable.HashMap[Int, Int]()
    
      // 用于保存每个电影的减弱因子
      val decreMap = mutable.HashMap[Int, Int]()
    
      for (topSimMovie <- topSimMovie; userRecentlyRating <- userRecentlyRatings) {
        // 针对每个topSimMovie,看与它相似的电影中有没有用户最近评分的电影,有的话返回其在相似度表中的分数,没有就返回0.0。具体实现看下面1
        val simScore = getMoviesSimScore(simMovies, userRecentlyRating._1, topSimMovie)
        if (simScore > 0.6) {
          // topSimMovie的相似电影中如果有被用户最近评过分的电影,且在相似度表的分数大于0.6,就将这个分数 * 用户对topSimMovie相似电影中的某部电影的分数,并存储到score
          score += ((topSimMovie, simScore * userRecentlyRating._2))
          // 根据用户最近的评分对每一个topSimMovie的评分进行调整。
          if (userRecentlyRating._2 > 3) {
            increMap(topSimMovie) = increMap.getOrElse(topSimMovie, 0) + 1
          } else {
            decreMap(topSimMovie) = decreMap.getOrElse(topSimMovie, 0) + 1
          }
        }
      }
      // 最后分组聚合score,同时利用increMap和decreMap对分数进行调整
      score.groupBy(_._1)
        .map{case (mid, sims) => (mid, sims.map(_._2).sum / sims.length + log(increMap(mid)) - log(decreMap(mid)))}
        .toArray
    }
    
    // 1
    def getMoviesSimScore(simMovies: Map[Int, Array[(Int, Double)]], userRatingMovie: Int, topSimMovie: Int): Double ={
      simMovies.get(topSimMovie) match {
        case Some(sim) => // sim:与某部电影相似的一列电影,包含mid和score
          var res = 0.0
          sim.foreach(smid =>
            res = if (smid._1 == userRatingMovie) smid._2 else 0.0
          )
          res
    
        case None => 0.0
      }
    }
    
    // 最后将结果存储到MongoDB
    
  • 相关阅读:
    观察者(Observer)模式
    Stragety Pattern(策略模式)
    数据库设计范式深入浅出
    建造者(Builder)模式
    吉杰,以及快乐男声
    言情小说通用情节[转]
    过年的任务
    将一家创业公司三年之内推动上市是1999年的思维方式
    修改系统时间格式?解决now()
    经济类吴晓波的《大败局》,韩德强的《碰撞》。几年前看的了,现在还很有印象。
  • 原文地址:https://www.cnblogs.com/code2one/p/9927026.html
Copyright © 2011-2022 走看看