zoukankan      html  css  js  c++  java
  • spark实现UserCF

    spark实现UserCF

    package cf
    
    import breeze.numerics.{pow, sqrt}
    import org.apache.spark.sql.SparkSession
    
    object UserCF {
      def main(args: Array[String]): Unit = {
    //    多行转一行的快捷键:ctrl shift j
        val spark = SparkSession
          .builder()
          .appName("test")
          .master("local[2]")
          .enableHiveSupport()
          .getOrCreate()
    
        val df = spark.sql("select user_id,item_id,rating from badou.udata")
    //   1. 计算相似用户 cosine = a*b/(|a|*|b|)
        import spark.implicits._
    //    求分母部分|a| (|b|)
        val userScoreSum = df.rdd.map(x=>(x(0).toString,x(2).toString))
          .groupByKey()
          .mapValues(x=>sqrt(x.toArray.map(rating=>pow(rating.toDouble,2)).sum))
          .toDF("user_id","rating_sqrt_sum")
    
    //    1.1 item->user倒排表
    //    val df3 = df.selectExpr("user_id").distinct().filter("cast(user_id as bigint)<=3")
    //    df3.join(df3.selectExpr("user_id as user_v")).show()
        val df_v = df.selectExpr("user_id as user_v","item_id","rating as rating_v")
        val df_decare = df.join(df_v,"item_id")
          .filter("cast(user_id as long)<>cast(user_v as long)")
    //    计算两个用户在一个item下的评分的乘积,cosine公式的分子中的一部分
        val df_product = df_decare.selectExpr("user_id","user_v",
          "cast(rating as double)*cast(rating_v as double) as prod")
    
    //    求和,计算完整的分子部分
        val df_sim_group = df_product.groupBy("user_id","user_v")
          .agg("prod"->"sum").withColumnRenamed("sum(prod)","rating_dot")
    
        val userScoreSum_v = userScoreSum.selectExpr("user_id as user_v",
        "rating_sqrt_sum as rating_sqrt_sum_v")
    
        val df_sim = df_sim_group.join(userScoreSum,"user_id")
          .join(userScoreSum_v,"user_v")
          .selectExpr("user_id","user_v",
          "rating_dot/(rating_sqrt_sum*rating_sqrt_sum_v) as cosine_sim")
    
    //    2.获取相似用户的物品集合
    //    2.1 取得前n个相似用户
        val df_nsim = df_sim.rdd.map(x=>(x(0).toString,(x(1).toString,x(2).toString)))
          .groupByKey()
          .mapValues(_.toArray.sortWith((x,y)=>x._2>y._2).slice(0,10))
          .flatMapValues(x=>x).toDF("user_id","user_v_sim")
          .selectExpr("user_id","user_v_sim._1 as user_v","user_v_sim._2 as sim")
    
    //    2.2 获取用户的物品集合进行过滤
    //     获取user_id物品集合(同样能把user_v的物品集合取到)
        val df_user_item = df.rdd.map(x=>(x(0).toString,x(1).toString+"_"+x(2).toString))
          .groupByKey().mapValues(_.toArray).toDF("user_id","item_rating_arr")
    
    
        val df_user_item_v = df_user_item.selectExpr("user_id as user_v",
          "item_rating_arr as item_rating_arr_v")
    
        val df_gen_item = df_nsim.join(df_user_item,"user_id")
          .join(df_user_item_v,"user_v")
    //    2.3 用一个udf过滤相似用户user_v中包含user_id已经打过分的电影
        import org.apache.spark.sql.functions._
        val filter_udf = udf{(items:Seq[String],items_v:Seq[String])=>
          val fMap = items.map{x=>
            val l = x.split("_")
            (l(0),l(1))
          }.toMap
          items_v.filter{x=>
            val l=x.split("_")
            fMap.getOrElse(l(0),-1) == -1
          }
        }
    
        val df_filter_item = df_gen_item.withColumn("filtered_item",
          filter_udf(col("item_rating_arr"),col("item_rating_arr_v")))
          .select("user_id","sim","filtered_item")
        /**df_filter_item:
          * +-------+-------------------+--------------------+
            |user_id|                sim|       filtered_item|
            +-------+-------------------+--------------------+
            |     71|0.33828954632615976|[705_5, 508_5, 20...|
            |    753| 0.3968472894511972|[705_5, 508_5, 20...|
            |    376|0.32635213497817583|[508_5, 20_5, 228...|
            |    360| 0.4425631904462532|[705_5, 508_5, 20...|
            |    607|   0.29815005758727|[705_5, 508_5, 20...|
            |    392| 0.3704196358220336|[508_5, 20_5, 228...|
          * */
    //    2.4 公式计算 ①相似度*②rating
        val simRatingUDF = udf{(sim:Double,items:Seq[String])=>
          items.map{x=>
            val l = x.split("_")
            l(0)+"_"+l(1).toDouble*sim
          }
        }
        val itemSimRating = df_filter_item.withColumn("item_prod",
          simRatingUDF(col("sim"),col("filtered_item")))
          .select("user_id","item_prod")
        /**itemSimRating:
          * +-------+--------------------+
            |user_id|           item_prod|
            +-------+--------------------+
            |     71|[705_1.6914477316...|
            |    753|[705_1.9842364472...|
            |    376|[508_1.6317606748...|
            |    360|[705_2.2128159522...|
            |    607|[705_1.4907502879.../
          * */
    //       957964
        val userItemScore = itemSimRating.select(itemSimRating("user_id"),
          explode(itemSimRating("item_prod"))).toDF("user_id","item_prod")
          .selectExpr("user_id","split(item_prod,'_')[0] as item_id",
            "cast(split(item_prod,'_')[1] as double) as score")
    //   388485
    //    同一个用户,通过不同的相似用户产生相同的item,对应不一样的打分 sum求和重复的item的分值
        val userItemScoreSum = userItemScore.groupBy("user_id","item_id")
          .agg("score"->"sum").withColumnRenamed("sum(score)","last_score")
    
        val df_rec = userItemScoreSum.rdd.map(x=>(x(0).toString,(x(1).toString,x(2).toString)))
          .groupByKey()
          .mapValues(_.toArray.sortWith((x,y)=>x._2>y._2).slice(0,10))
          .flatMapValues(x=>x).toDF("user_id","item_sim")
          .selectExpr("user_id","item_sim._1 as item","item_sim._2 as score")
    
      }
    
    }
    View Code
  • 相关阅读:
    Sonar+IDEA + Maven的集成
    My97DatePicker IE9中,显示全部为1
    Datatable+jeditable+Java 结合使用实现表格的单行刷新
    Datatables表格控件的使用相关网站及遇到的问题
    xss 小练习
    主页面布局 随浏览器大小变化而变化
    使用angular中自定义的directive实现删除确认框
    使用css和js完成模态弹窗功能
    ckplayer插件的使用
    使用JQuery进行表格分页查询
  • 原文地址:https://www.cnblogs.com/xumaomao/p/12757710.html
Copyright © 2011-2022 走看看