zoukankan      html  css  js  c++  java
  • 推荐系统-0X-电影推荐与结果评估

    import spark.sql
    import org.apache.spark.sql.types._
    import org.apache.spark.mllib.recommendation.ALS
    import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
    import org.apache.spark.mllib.recommendation.Rating
    
    // 数据预处理
    case class Movie(movieId:Int, title:String, genres:Seq[String])
    case class User(userId:Int, gender:String, age:Int, occupation:Int, zip:String)
    
    def parseMovie(str:String):Movie={
    	val fields = str.split("::")
    	assert(fields.size == 3)
    	Movie(fields(0).toInt, fields(1).toString, Seq(fields(2)))
    }
    
    def parseUser(str:String):User={
    	val fields = str.split("::")
    	assert(fields.size == 5)
    	User(fields(0).toInt, fields(1).toString, fields(2).toInt, fields(3).toInt, fields(4).toString)	
    }
    
    def parseRating(str:String):Rating={
    	val fields = str.split("::")
    	assert(fields.size == 4)
    	Rating(fields(0).toInt, fields(1).toInt, fields(2).toInt)	
    }
    // 加载评分文件到RDD, 这个也可以是HADOOP源
    val ratingText = sc.textFile("file:/home/hadoop/ml-1m/ratings.dat");
    ratingText.first()
    
    // 对原始RDD数据, 进行转换处理,并缓存
    val ratingRDD = ratingText.map(parseRating).cache()
    // 下面是打印查看一下相关信息
    println("Total number of ratings : " + ratingRDD.count())
    println("Total number of movies rated : " + ratingRDD.map(_.product).distinct().count())
    println("Total number of users who rated moives:" + ratingRDD.map(_.user).distinct().count())
    
    // 将RDD转换成为DataFrame
    val ratingDF = ratingRDD.toDF();
    // 同理,加载电影信息
    val movieDF=sc.textFile("file:/home/hadoop/ml-1m/movies.dat").map(parseMovie).toDF();
    // 同理,加载用户信息
    val userDF=sc.textFile("file:/home/hadoop/ml-1m/users.dat").map(parseUser).toDF();
    ratingDF.printSchema()
    movieDF.printSchema()
    userDF.printSchema()
    
    // 将DataFrame数据注册临时表, 就可以临时表进行SQL操作
    ratingDF.registerTempTable("ratings")
    movieDF.registerTempTable("movies")
    userDF.registerTempTable("users")
    
    // SQL操作DataFrame数据后,返回DataFrame数据
    val result = sql("""select title, rmax, rmin, ucnt from 
    (select product, max(rating) as rmax, min(rating) as rmin, count(distinct user) as ucnt from ratings group by product) ratingsCNT
    join movies on product=movieId
    order by ucnt desc""")
    result.show()
    
    // SQL操作DataFrame数据后,返回DataFrame数据
    val mostActiveUser=sql("""select user, count(*) as cnt 
    from ratings group by user order by cnt desc limit 10 """)
    mostActiveUser.show()
    // SQL操作DataFrame数据后,返回DataFrame数据
    var result = sql("""select title from ratings join movies on movieId=product
    where user=4169 and rating>4""")
    result.show()
    
    // ALS(交替最小二乘法)算法处理
    // 将评分RDD数据化分成训练集与测试集
    val split=ratingRDD.randomSplit(Array(0.8,0.2), 0L)
    val trainingSet=split(0).cache()
    val testSet=split(1).cache()
    trainingSet.count()
    testSet.count()
    
    // 这里的RANK是UV间的feature秩, 训练得出模型
    val model = (new ALS().setRank(20).setIterations(10).run(trainingSet))
    
    // Array[Rating], 这里注意DF,没有直接的map操作
    // 利用模型进行电影推荐
    val recomForTopUser=model.recommendProducts(4169,5)
    val movieTitle = movieDF.rdd.map(array=>(array(0),array(1))).collectAsMap();
    val recomResult=recomForTopUser.map(rating=>(movieTitle(rating.product), rating.rating)).foreach(println)
    
    // 这里MAP运算, 类匹配
    val testUserProduct=testSet.map{
    	case Rating(user,product,rating) => (user,product)
    }
    // 对测试集进行预测
    val testUserProductPredict=model.predict(testUserProduct)
    testUserProductPredict.take(10).mkString("
    ")
    	
    val testSetPair=testSet.map{
    	case Rating(user,product,rating) => ((user,product), rating)
    }
    
    val predictionPair=testUserProductPredict.map{
    	case Rating(user,product,rating) => ((user,product), rating)
    }
    // 将测试集的预测评分与测试集给定的评分相减, 统计得出平均错误mae
    val joinTestPredict=testSetPair.join(predictionPair)
    val mae=joinTestPredict.map{
    	case ((user,product),(ratingT,ratingP)) =>
    	val err=ratingT-ratingP
    	Math.abs(err)
    }.mean()
    
    //FP, 过滤一下低分和高分
    val fp = joinTestPredict.filter{
    	case ((user,product),(ratingT,ratingP)) =>
    	(ratingT <= 1 & ratingP >=4)
    }
    fp.count()
    
    import org.apache.spark.mllib.evaluation._
    val ratingTP=joinTestPredict.map{
    	case ((user,product),(ratingT,ratingP))=>
    	(ratingP,ratingT)
    }
    // 现测试一下平均绝对误差
    val evaluator = new RegressionMetrics(ratingTP)
    evaluator.meanAbsoluteError
    
  • 相关阅读:
    Android状态栏白底黑字,只需两步轻松搞定
    MyBatis注解
    MyBatis延迟加载和缓存
    MyBatis关联查询
    mybatis智能标签1
    Mybatis智能标签
    增删改查
    初始MyBatis
    第7章:Servlet 基础
    第3章 JSP数据交互(二)
  • 原文地址:https://www.cnblogs.com/freebird92/p/9038301.html
Copyright © 2011-2022 走看看