zoukankan      html  css  js  c++  java
  • 推荐系统-05-Spark电影推荐、评估与部署

    一、新建scala项目

    二、构造程序


    代码如下

    package xyz.pl8
    
    import java.io.File
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.mllib.evaluation.RegressionMetrics
    import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating, ALS}
    import org.apache.spark.rdd.RDD
    
    import scala.util.Random
    
    object MovieLensALS {
    
      //1. Define a rating elicitation function
      // Seq[Rating]
      def elicitateRating(movies: Seq[(Int, String)])={
        val prompt="Please rate the following movie(1-5(best) or 0 if not seen: )"
        println(prompt)
        val ratings= movies.flatMap{x=>	
          var rating: Option[Rating] = None  //  Rating(user: Int, product: Int, rating: Double) 
          var valid = false
          while(!valid){
            println(x._2+" :")
            try{
              val r = Console.readInt()
              if (r>5 || r<0){
                println(prompt)
              } else {
                valid = true
                if (r>0){
                  rating = Some(Rating(0, x._1, r))
                }
              }
            } catch{
              case e:Exception => println(prompt)
            }
          }
          rating match {
            case Some(r) => Iterator(r)  // FlatMap将结构解散成元素, 这里是Rating
            case None => Iterator.empty
          }
        }
        if (ratings.isEmpty){
          error("No ratings provided!")
        } else {
          ratings
        }
      }
    
      //2. Define a RMSE computation function
      def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating]) = {
        val prediction = model.predict(data.map(x=>(x.user, x.product)))
        val predDataJoined = prediction.map(x=> ((x.user,x.product),x.rating)).join(data.map(x=> ((x.user,x.product),x.rating))).values
        new RegressionMetrics(predDataJoined).rootMeanSquaredError
      }
    
      //3. Main
      def main(args: Array[String]) {
      //3.1 Setup env
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    
        if (args.length !=1){
          print("Usage: movieLensHomeDir")
          sys.exit(1)
        }
    
        val conf = new SparkConf().setAppName("MovieLensALS")
        .set("spark.executor.memory","500m")
        val sc = new SparkContext(conf)
    
      //3.2 Load ratings data and know your data
      // ratings.dat 的格式 UserID::MovieID::Rating::Timestamp
        val movieLensHomeDir=args(0)
    	// RDD[long, Rating]
        val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map {line =>
           val fields = line.split("::")
          //timestamp, user, product, rating
    	  // 取模成分成10组
          (fields(3).toLong%10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
        }
    	// movies.dat格式 MovieID::Title::Genres
    	// Map[Int,String]
        val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map {line =>
          val fields = line.split("::")
          //movieId, movieName
          (fields(0).toInt, fields(1))
        }.collectAsMap()
    
        val numRatings = ratings.count()
        val numUser = ratings.map(x=>x._2.user).distinct().count()
        val numMovie = ratings.map(_._2.product).distinct().count()
    
        println("Got "+numRatings+" ratings from "+numUser+" users on "+numMovie+" movies.")
    
      //3.3 Elicitate personal rating
        // = RDD[(long,Rating) -> Array[int] -> Map[Int, long] -> Seq[(Int, long)] -> Seq[(Int,long)] ->  Seq[Int]
        val topMovies = ratings.map(_._2.product).countByValue().toSeq.sortBy(-_._2).take(50).map(_._1)
        val random = new Random(0)
    	// Seq[(Int, String)]
        val selectMovies = topMovies.filter(x=>random.nextDouble() < 0.2).map(x=>(x, movies(x)))
    
        val myRatings = elicitateRating(selectMovies)
        val myRatingsRDD = sc.parallelize(myRatings, 1)
    
      //3.4 Split data into train(60%), validation(20%) and test(20%)
        val numPartitions = 10
    		// 6组(即60%),并上手工输入评价
    	    val trainSet = ratings.filter(x=>x._1<6).map(_._2).union(myRatingsRDD).repartition(numPartitions).persist()
        val validationSet = ratings.filter(x=>x._1>=6 && x._1<8).map(_._2).persist()
        val testSet = ratings.filter(x=>x._1>=8).map(_._2).persist()
    
        val numTrain = trainSet.count()
        val numValidation = validationSet.count()
        val numTest = testSet.count()
    
        println("Training data: "+numTrain+" Validation data: "+numValidation+" Test data: "+numTest)
    
      //3.5 Train model and optimize model with validation set
        val numRanks = List(8, 12)
        val numIters = List(10, 20)
        val numLambdas = List(0.1, 10.0)
        var bestRmse = Double.MaxValue
        var bestModel: Option[MatrixFactorizationModel] = None
        var bestRanks = -1
        var bestIters = 0
        var bestLambdas = -1.0
    	// 寻找优化参数的模型
        for(rank <- numRanks; iter <- numIters; lambda <- numLambdas){
          val model = ALS.train(trainSet, rank, iter, lambda)
          val validationRmse = computeRmse(model, validationSet)
          println("RMSE(validation) = "+validationRmse+" with ranks="+rank+", iter="+iter+", Lambda="+lambda)
    
          if (validationRmse < bestRmse) {
            bestModel = Some(model)
            bestRmse = validationRmse
            bestIters = iter
            bestLambdas = lambda
            bestRanks = rank
          }
        }
    
        //3.6 Evaluate model on test set
    	// 用测试集来评估模型
    	// 测试集均方根差
        val testRmse = computeRmse(bestModel.get, testSet)
        println("The best model was trained with rank="+bestRanks+", Iter="+bestIters+", Lambda="+bestLambdas+
          " and compute RMSE on test is "+testRmse)
    
        //3.7 Create a baseline and compare it with best model
    	// 创建基线 并与模型进行比较
        val meanRating = trainSet.union(validationSet).map(_.rating).mean() // 训练集与验证集和的均数
    	// 最佳根均方错误线(基线)
        val bestlineRmse = new RegressionMetrics(testSet.map(x=>(x.rating, meanRating))).rootMeanSquaredError  // 测试集与均数的均方根差
    	// testRmse(这个数应该更优,值更小)
        val improvement = (bestlineRmse - testRmse)/bestlineRmse*100
        println("The best model improves the baseline by "+"%1.2f".format(improvement)+"%.")
    
        //3.8 Make a personal recommendation
    	// 进行个人推荐, 排除自己已经评分内容
        val moviesId = myRatings.map(_.product)
        val candidates = sc.parallelize(movies.keys.filter(!moviesId.contains(_)).toSeq)
        val recommendations = bestModel.get
        .predict(candidates.map(x=>(0, x)))
        .sortBy(-_.rating)
        .take(50)
    
        var i = 0
        println("Movies recommended for you:")
        recommendations.foreach{ line=>
          println("%2d".format(i)+" :"+movies(line.product))
          i += 1
        }
      sc.stop()
      }
    }
    
    

    导入引用库

    三、打包部署

    程序运行时,需要指定输入数据路径,数据包含了ratings.dat和movies.dat,数据都包含在了一个数据包。点击下载, 然后解压。
    配置运行参数

    • 点击edit configuration,在左侧点击该项目。在右侧在右侧VM options中输入“-Dspark.master=local”,指示本程序本地单线程运行

    • 在Program argguemnts指定,上面解压的路径。
      然后,在IDEA上选择MovieLensALS右键选择运行,即可运行了。
      按照引导,输入自己的评价后,最后输出形式如下:

      The best model was trained with rank=12, Iter=20, Lambda=0.1 and compute RMSE on test is 0.868464888081759
      The best model improves the baseline by 22.01%.
      Movies recommended for you:
      0 :Julien Donkey-Boy (1999)
      1 :Love Serenade (1996)
      2 :Catwalk (1995)

    四、HADOOP集群部署

    导出jar包设置


    选main类对后,点击OK确定, 这个时候配置已经完成了, 我们就可以进行编译 jar文件了, 选择菜单Build->Build Artifacts..., 生成的文件路径为/out/artifacts/MovieLensALS_jar/MovieLensALS.jar

    准备HADOOP环境

    假设我们的HADOOP环境已经搭建成功。 接下来我们要把需要计算的数据文件上传到hadoop; 首先,在hadoop上面创建文件夹,命令如下:

    hdfs dfs -mkdir -p /recommendation/data
    

    上传数据文件命令如下:

    hdfs dfs -put *.dat /recommendation/data
    

    这时时候我们可以通过命令查看,上传是否成功

    hdfs dfs -cat /recommendation/data/users.dat
    

    运行


    在上面红框中,指定了生成的jar文件名, 所在路径, 以及MainClass。这面就是通过spark执行:

    /usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --master local  --class "xyz.pl8.MovieLensALS" /home/hartifacts/movielensals_jar/movielensals.jar /recommendation/data
    
  • 相关阅读:
    Android 实现Path2.0中绚丽的的旋转菜单
    Android SQLite数据库增删改查操作
    Android addRule()
    Android 实现全屏、无标题栏
    微信公众号开发教程
    HEAP CORRUPTION DETECTED
    Introduction to gaussian filter 高斯滤波器
    Windows 7硬盘安装CentOS 6.4 双系统 (WIN7硬盘安装Linux(Fedora 16,CentOS 6.2,Ubuntu 12.04))
    使用Scala操作Mongodb
    数字三角——递归、递归、内存搜索
  • 原文地址:https://www.cnblogs.com/freebird92/p/9047575.html
Copyright © 2011-2022 走看看