package cf
import breeze.numerics.{pow, sqrt}
import org.apache.spark.sql.SparkSession
object UserCF {
def main(args: Array[String]): Unit = {
// 多行转一行的快捷键:ctrl shift j
val spark = SparkSession
.builder()
.appName("test")
.master("local[2]")
.enableHiveSupport()
.getOrCreate()
val df = spark.sql("select user_id,item_id,rating from badou.udata")
// 1. 计算相似用户 cosine = a*b/(|a|*|b|)
import spark.implicits._
// 求分母部分|a| (|b|)
val userScoreSum = df.rdd.map(x=>(x(0).toString,x(2).toString))
.groupByKey()
.mapValues(x=>sqrt(x.toArray.map(rating=>pow(rating.toDouble,2)).sum))
.toDF("user_id","rating_sqrt_sum")
// 1.1 item->user倒排表
// val df3 = df.selectExpr("user_id").distinct().filter("cast(user_id as bigint)<=3")
// df3.join(df3.selectExpr("user_id as user_v")).show()
val df_v = df.selectExpr("user_id as user_v","item_id","rating as rating_v")
val df_decare = df.join(df_v,"item_id")
.filter("cast(user_id as long)<>cast(user_v as long)")
// 计算两个用户在一个item下的评分的乘积,cosine公式的分子中的一部分
val df_product = df_decare.selectExpr("user_id","user_v",
"cast(rating as double)*cast(rating_v as double) as prod")
// 求和,计算完整的分子部分
val df_sim_group = df_product.groupBy("user_id","user_v")
.agg("prod"->"sum").withColumnRenamed("sum(prod)","rating_dot")
val userScoreSum_v = userScoreSum.selectExpr("user_id as user_v",
"rating_sqrt_sum as rating_sqrt_sum_v")
val df_sim = df_sim_group.join(userScoreSum,"user_id")
.join(userScoreSum_v,"user_v")
.selectExpr("user_id","user_v",
"rating_dot/(rating_sqrt_sum*rating_sqrt_sum_v) as cosine_sim")
// 2.获取相似用户的物品集合
// 2.1 取得前n个相似用户
val df_nsim = df_sim.rdd.map(x=>(x(0).toString,(x(1).toString,x(2).toString)))
.groupByKey()
.mapValues(_.toArray.sortWith((x,y)=>x._2>y._2).slice(0,10))
.flatMapValues(x=>x).toDF("user_id","user_v_sim")
.selectExpr("user_id","user_v_sim._1 as user_v","user_v_sim._2 as sim")
// 2.2 获取用户的物品集合进行过滤
// 获取user_id物品集合(同样能把user_v的物品集合取到)
val df_user_item = df.rdd.map(x=>(x(0).toString,x(1).toString+"_"+x(2).toString))
.groupByKey().mapValues(_.toArray).toDF("user_id","item_rating_arr")
val df_user_item_v = df_user_item.selectExpr("user_id as user_v",
"item_rating_arr as item_rating_arr_v")
val df_gen_item = df_nsim.join(df_user_item,"user_id")
.join(df_user_item_v,"user_v")
// 2.3 用一个udf过滤相似用户user_v中包含user_id已经打过分的电影
import org.apache.spark.sql.functions._
val filter_udf = udf{(items:Seq[String],items_v:Seq[String])=>
val fMap = items.map{x=>
val l = x.split("_")
(l(0),l(1))
}.toMap
items_v.filter{x=>
val l=x.split("_")
fMap.getOrElse(l(0),-1) == -1
}
}
val df_filter_item = df_gen_item.withColumn("filtered_item",
filter_udf(col("item_rating_arr"),col("item_rating_arr_v")))
.select("user_id","sim","filtered_item")
/**df_filter_item:
* +-------+-------------------+--------------------+
|user_id| sim| filtered_item|
+-------+-------------------+--------------------+
| 71|0.33828954632615976|[705_5, 508_5, 20...|
| 753| 0.3968472894511972|[705_5, 508_5, 20...|
| 376|0.32635213497817583|[508_5, 20_5, 228...|
| 360| 0.4425631904462532|[705_5, 508_5, 20...|
| 607| 0.29815005758727|[705_5, 508_5, 20...|
| 392| 0.3704196358220336|[508_5, 20_5, 228...|
* */
// 2.4 公式计算 ①相似度*②rating
val simRatingUDF = udf{(sim:Double,items:Seq[String])=>
items.map{x=>
val l = x.split("_")
l(0)+"_"+l(1).toDouble*sim
}
}
val itemSimRating = df_filter_item.withColumn("item_prod",
simRatingUDF(col("sim"),col("filtered_item")))
.select("user_id","item_prod")
/**itemSimRating:
* +-------+--------------------+
|user_id| item_prod|
+-------+--------------------+
| 71|[705_1.6914477316...|
| 753|[705_1.9842364472...|
| 376|[508_1.6317606748...|
| 360|[705_2.2128159522...|
| 607|[705_1.4907502879.../
* */
// 957964
val userItemScore = itemSimRating.select(itemSimRating("user_id"),
explode(itemSimRating("item_prod"))).toDF("user_id","item_prod")
.selectExpr("user_id","split(item_prod,'_')[0] as item_id",
"cast(split(item_prod,'_')[1] as double) as score")
// 388485
// 同一个用户,通过不同的相似用户产生相同的item,对应不一样的打分 sum求和重复的item的分值
val userItemScoreSum = userItemScore.groupBy("user_id","item_id")
.agg("score"->"sum").withColumnRenamed("sum(score)","last_score")
val df_rec = userItemScoreSum.rdd.map(x=>(x(0).toString,(x(1).toString,x(2).toString)))
.groupByKey()
.mapValues(_.toArray.sortWith((x,y)=>x._2>y._2).slice(0,10))
.flatMapValues(x=>x).toDF("user_id","item_sim")
.selectExpr("user_id","item_sim._1 as item","item_sim._2 as score")
}
}