zoukankan      html  css  js  c++  java
  • 基于Spark的电影推荐系统(推荐系统~5)

    第四部分-推荐系统-离线推荐

    • 本模块基于第4节得到的模型,开始为用户做离线推荐,推荐用户最有可能喜爱的5部电影。

    说明几点

    1.主要分为两个模块。其一是为 单个随机用户 做推荐,其二是为 所有用户做推荐,并将推荐结果进行保存
    2. 其中所有推荐的结果保存在 MySQL中,HBase,Hive中 <三种版本>。
    3. 其中取得的userid一定要存在于模型中, 这样就建议直接从trainingData中取。

    1.为某一用户产生推荐结果

    开始模块一Coding

    步骤一: 继续在前面的项目中,新建config包,新建AppConf接口
    为了代码不要那么冗余,我们抽离一个接口出来

    package com.csylh.recommend.config
    
    import java.util.Properties
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * Description: 后续ETL操作或者其他操作必须要实现的trait
      *
      * @Author: 留歌36
      * @Date: 2019-07-17 16:53
      */
    trait AppConf {
      val localMasterURL = "local[2]"
      val clusterMasterURL = "spark://hadoop001:7077"
    
      // 面向SparkSession编程
      val spark = SparkSession.builder()
        //    .master(localMasterURL)
        .enableHiveSupport() //开启访问Hive数据, 要将hive-site.xml等文件放入Spark的conf路径
        .getOrCreate()
    
      val sc = spark.sparkContext
    
      // 设置RDD的partitions 的数量一般以集群分配给应用的CPU核数的整数倍为宜, 4核8G ,设置为8就可以
      // 问题一:为什么设置为CPU核心数的整数倍?
      // 问题二:数据倾斜,拿到数据大的partitions的处理,会消耗大量的时间,因此做数据预处理的时候,需要考量会不会发生数据倾斜
      val minPartitions = 8
    
      //  在生产环境中一定要注意设置spark.sql.shuffle.partitions,默认是200,及需要配置分区的数量
      val shuffleMinPartitions = "8"
      spark.sqlContext.setConf("spark.sql.shuffle.partitions",shuffleMinPartitions)
    
    
      //jdbc连接
      val jdbcURL = "jdbc:mysql://hadoop001:3306/recommend?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
    
      val alsTable = "recommend.alsTab"
      val recResultTable = "recommend.similarTab"
      val top5Table = "recommend.top5Result"
      val userTable= "recommend.user"
      val ratingTable= "recommend.rating"
    
      val mysqlusername = "root"
      val mysqlpassword = "123456"
    
      val prop = new Properties
      prop.put("driver", "com.mysql.jdbc.Driver")
      prop.put("user", mysqlusername)
      prop.put("password", mysqlpassword)
    
    }
    
    

    步骤二: 新建Recommender

    package com.csylh.recommend.ml
    
    import com.csylh.recommend.config.AppConf
    import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
    
    /**
      * Description: 为某一用户产生推荐结果
      *
      * @Author: 留歌36
      * @Date: 2019-07-18 10:04
      */
    object Recommender extends AppConf{
      def main(args: Array[String]): Unit = {
        // 从trainingData中取得的userid 一定存在于模型中。
        val users = spark.sql("select distinct(userId) from trainingData order by userId asc")
        // 取随意一个用户
        val index = 36
        val uid = users.take(index).last.getInt(0)
    
        val modelpath = "/tmp/BestModel/0.8521581387523667"
        val model = MatrixFactorizationModel.load(sc, modelpath)
        val rec = model.recommendProducts(uid, 5)
    
        val recmoviesid = rec.map(_.product)
    
        println("我为用户" + uid + "推荐了以下5部电影:")
        /**
          * 1
          */
        for (i <- recmoviesid) {
          val moviename = spark.sql(s"select title from movies where movieId=$i").first().getString(0)
          println(moviename)
        }
    
    //    /**
    //      * 2
    //      */
    //    recmoviesid.foreach(x => {
    //      val moviename = spark.sql(s"select title from movies where movieId=$x").first().getString(0)
    //      println(moviename)
    //    })
    //
    //    /**
    //      * 3
    //      */
    //    recmoviesid.map(x => {
    //      val moviename = spark.sql(s"select title from movies where movieId=$x").first().getString(0)
    //      println(moviename)
    //    })
    
    
    
      }
    }
    

    步骤二:将创建的项目进行打包上传到服务器
    mvn clean package -Dmaven.test.skip=true

    步骤三:编写shell 执行脚本

    [root@hadoop001 ml]# vim recommender.sh 
    export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop
    
    $SPARK_HOME/bin/spark-submit 
    --class com.csylh.recommend.ml.Recommender 
    --master spark://hadoop001:7077 
    --name Recommender 
    --driver-memory 10g 
    --executor-memory 5g 
    /root/data/ml/movie-recommend-1.0.jar
    

    步骤四:执行 sh recommender.sh 即可

    [root@hadoop001 ml]# sh recommender.sh 
    19/10/20 21:39:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    19/10/20 21:40:07 WARN MatrixFactorizationModel: User factor does not have a partitioner. Prediction on individual records could be slow.
    19/10/20 21:40:07 WARN MatrixFactorizationModel: User factor is not cached. Prediction could be slow.
    19/10/20 21:40:07 WARN MatrixFactorizationModel: Product factor does not have a partitioner. Prediction on individual records could be slow.
    19/10/20 21:40:07 WARN MatrixFactorizationModel: Product factor is not cached. Prediction could be slow.
    我为用户53推荐了以下5部电影:
    8 Murders a Day (2011)
    49 Pulses (2017)
    Styx - Caught In The Act (2007)
    The Change
    Earth's Natural Wonders (2016)
    [root@hadoop001 ml]# 
    

    说明: val uid = users.take(index).last.getInt(0) //index=36 ,对应的uid=53

    2.为所有用户产生推荐结果

    开始模块二Coding

    步骤一: 在ml包下新建RecommendForAllUsers

    package com.csylh.recommend.ml
    
    import com.csylh.recommend.config.AppConf
    import com.csylh.recommend.entity.Result
    import org.apache.spark.SparkContext
    import org.apache.spark.mllib.recommendation._
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    /**
      * Description: TODO
      *
      * @Author: 留歌36
      * @Date: 2019-07-18 10:42
      */
    object RecommendForAllUsers extends AppConf{
      def main(args: Array[String]): Unit = {
        val users = spark.sql("select distinct(userId) from trainingData order by userId asc")
        // 取得所有的用户ID
        val allusers = users.rdd.map(_.getInt(0)).toLocalIterator
    
        // 方法1,可行,但是效率不高,一条条取
        val modelpath = "/tmp/BestModel/0.8521581387523667"
        val model = MatrixFactorizationModel.load(sc, modelpath)
        while (allusers.hasNext) {
          val rec = model.recommendProducts(allusers.next(), 5)  // 得到Array[Rating]
          writeRecResultToMysql(rec, spark, sc)
          // writeRecResultToSparkSQL(rec),写入到SPARK-SQL(DataFrame)+hive,同ETL。
          // writeRecResultToHbase(rec, sqlContext, sc)
        }
    
        // 方法2,不可行,因为一次将矩阵表全部加载到内存,消耗资源太大
        // val recResult = model.recommendProductsForUsers(5)
    
        def writeRecResultToMysql(uid: Array[Rating], spark: SparkSession, sc: SparkContext) {
          val uidString = uid.map(x => x.user.toString() + ","
            + x.product.toString() + "," + x.rating.toString())
    
          import spark.implicits._
          val uidDFArray = sc.parallelize(uidString)
          val uidDF = uidDFArray.map(_.split(",")).map(x => Result(x(0).trim().toInt, x(1).trim.toInt, x(2).trim().toDouble)).toDF
          // 写入mysql数据库,数据库配置在 AppConf中
          uidDF.write.mode(SaveMode.Append).jdbc(jdbcURL, alsTable, prop)
        }
    
    //    // 把推荐结果写入到phoenix+hbase,通过DF操作,不推荐。
    //    val hbaseConnectionString = "localhost"
    //    val userTupleRDD = users.rdd.map { x => Tuple3(x.getInt(0), x.getInt(1), x.getDouble(2)) }
    //    // zkUrl需要按照hbase配置的zookeeper的url来设置,本地模式就写localhost
    //    userTupleRDD.saveToPhoenix("NGINXLOG_P", Seq("USERID", "MOVIEID", "RATING"), zkUrl = Some(hbaseConnectionString))
    //
    //    // 把推荐结果写入到phoenix+hbase,通过DF操作,不推荐。
    //    def writeRecResultToHbase(uid: Array[Rating], sqlContext: SQLContext, sc: SparkContext) {
    //      val uidString = uid.map(x => x.user.toString() + "|"
    //        + x.product.toString() + "|" + x.rating.toString())
    //      import sqlContext.implicits._
    //      val uidDF = sc.parallelize(uidString).map(_.split("|")).map(x => Result(x(0).trim().toInt, x(1).trim.toInt, x(2).trim().toDouble)).toDF
    //      // zkUrl需要按照hbase配置的zookeeper的url来设置
    //      uidDF.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "phoenix_rec", "zkUrl" -> "hadoop001:2181"))
    //    }
      }
    
    }
    
    

    步骤二:将创建的项目进行打包上传到服务器
    mvn clean package -Dmaven.test.skip=true

    步骤三:编写shell 执行脚本

    [root@hadoop001 ml]# vim RecommendForAllUsers.sh 
    export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop
    
    $SPARK_HOME/bin/spark-submit 
    --class com.csylh.recommend.ml.RecommendForAllUsers 
    --master spark://hadoop001:7077 
    --name RecommendForAllUsers 
    --driver-memory 10g 
    --executor-memory 5g 
    --packages "mysql:mysql-connector-java:5.1.38" 
    /root/data/ml/movie-recommend-1.0.jar
    
    

    步骤四:执行 sh RecommendForAllUsers.sh 即可

    有任何问题,欢迎留言一起交流~~
    更多文章:基于Spark的电影推荐系统:https://blog.csdn.net/liuge36/column/info/29285

  • 相关阅读:
    利用requests, beautifulsoup包爬取股票信息网站
    Mac自带编码转换工具iconv
    Flask 快速入门
    HTML模版组件
    JavaScript正则表达式及jQuery回顾
    jQuery 教程
    Document
    Document
    Document
    Document
  • 原文地址:https://www.cnblogs.com/liuge36/p/12614707.html
Copyright © 2011-2022 走看看