第四部分-推荐系统-离线推荐
- 本模块基于第4节得到的模型,开始为用户做离线推荐,推荐用户最有可能喜爱的5部电影。
说明几点
1.主要分为两个模块。其一是为 单个随机用户 做推荐,其二是为 所有用户做推荐,并将推荐结果进行保存
2. 其中所有推荐的结果保存在 MySQL中,HBase,Hive中 <三种版本>。
3. 其中取得的userid一定要存在于模型中, 这样就建议直接从trainingData中取。
1.为某一用户产生推荐结果
开始模块一Coding
步骤一: 继续在前面的项目中,新建config包,新建AppConf接口
为了代码不要那么冗余,我们抽离一个接口出来
package com.csylh.recommend.config
import java.util.Properties
import org.apache.spark.sql.SparkSession
/**
* Description: 后续ETL操作或者其他操作必须要实现的trait
*
* @Author: 留歌36
* @Date: 2019-07-17 16:53
*/
trait AppConf {
val localMasterURL = "local[2]"
val clusterMasterURL = "spark://hadoop001:7077"
// 面向SparkSession编程
val spark = SparkSession.builder()
// .master(localMasterURL)
.enableHiveSupport() //开启访问Hive数据, 要将hive-site.xml等文件放入Spark的conf路径
.getOrCreate()
val sc = spark.sparkContext
// 设置RDD的partitions 的数量一般以集群分配给应用的CPU核数的整数倍为宜, 4核8G ,设置为8就可以
// 问题一:为什么设置为CPU核心数的整数倍?
// 问题二:数据倾斜,拿到数据大的partitions的处理,会消耗大量的时间,因此做数据预处理的时候,需要考量会不会发生数据倾斜
val minPartitions = 8
// 在生产环境中一定要注意设置spark.sql.shuffle.partitions,默认是200,及需要配置分区的数量
val shuffleMinPartitions = "8"
spark.sqlContext.setConf("spark.sql.shuffle.partitions",shuffleMinPartitions)
//jdbc连接
val jdbcURL = "jdbc:mysql://hadoop001:3306/recommend?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
val alsTable = "recommend.alsTab"
val recResultTable = "recommend.similarTab"
val top5Table = "recommend.top5Result"
val userTable= "recommend.user"
val ratingTable= "recommend.rating"
val mysqlusername = "root"
val mysqlpassword = "123456"
val prop = new Properties
prop.put("driver", "com.mysql.jdbc.Driver")
prop.put("user", mysqlusername)
prop.put("password", mysqlpassword)
}
步骤二: 新建Recommender
package com.csylh.recommend.ml
import com.csylh.recommend.config.AppConf
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
/**
* Description: 为某一用户产生推荐结果
*
* @Author: 留歌36
* @Date: 2019-07-18 10:04
*/
object Recommender extends AppConf{
def main(args: Array[String]): Unit = {
// 从trainingData中取得的userid 一定存在于模型中。
val users = spark.sql("select distinct(userId) from trainingData order by userId asc")
// 取随意一个用户
val index = 36
val uid = users.take(index).last.getInt(0)
val modelpath = "/tmp/BestModel/0.8521581387523667"
val model = MatrixFactorizationModel.load(sc, modelpath)
val rec = model.recommendProducts(uid, 5)
val recmoviesid = rec.map(_.product)
println("我为用户" + uid + "推荐了以下5部电影:")
/**
* 1
*/
for (i <- recmoviesid) {
val moviename = spark.sql(s"select title from movies where movieId=$i").first().getString(0)
println(moviename)
}
// /**
// * 2
// */
// recmoviesid.foreach(x => {
// val moviename = spark.sql(s"select title from movies where movieId=$x").first().getString(0)
// println(moviename)
// })
//
// /**
// * 3
// */
// recmoviesid.map(x => {
// val moviename = spark.sql(s"select title from movies where movieId=$x").first().getString(0)
// println(moviename)
// })
}
}
步骤二:将创建的项目进行打包上传到服务器
mvn clean package -Dmaven.test.skip=true
步骤三:编写shell 执行脚本
[root@hadoop001 ml]# vim recommender.sh
export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop
$SPARK_HOME/bin/spark-submit
--class com.csylh.recommend.ml.Recommender
--master spark://hadoop001:7077
--name Recommender
--driver-memory 10g
--executor-memory 5g
/root/data/ml/movie-recommend-1.0.jar
步骤四:执行 sh recommender.sh 即可
[root@hadoop001 ml]# sh recommender.sh
19/10/20 21:39:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/10/20 21:40:07 WARN MatrixFactorizationModel: User factor does not have a partitioner. Prediction on individual records could be slow.
19/10/20 21:40:07 WARN MatrixFactorizationModel: User factor is not cached. Prediction could be slow.
19/10/20 21:40:07 WARN MatrixFactorizationModel: Product factor does not have a partitioner. Prediction on individual records could be slow.
19/10/20 21:40:07 WARN MatrixFactorizationModel: Product factor is not cached. Prediction could be slow.
我为用户53推荐了以下5部电影:
8 Murders a Day (2011)
49 Pulses (2017)
Styx - Caught In The Act (2007)
The Change
Earth's Natural Wonders (2016)
[root@hadoop001 ml]#
说明: val uid = users.take(index).last.getInt(0) //index=36 ,对应的uid=53
2.为所有用户产生推荐结果
开始模块二Coding
步骤一: 在ml包下新建RecommendForAllUsers
package com.csylh.recommend.ml
import com.csylh.recommend.config.AppConf
import com.csylh.recommend.entity.Result
import org.apache.spark.SparkContext
import org.apache.spark.mllib.recommendation._
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* Description: TODO
*
* @Author: 留歌36
* @Date: 2019-07-18 10:42
*/
object RecommendForAllUsers extends AppConf{
def main(args: Array[String]): Unit = {
val users = spark.sql("select distinct(userId) from trainingData order by userId asc")
// 取得所有的用户ID
val allusers = users.rdd.map(_.getInt(0)).toLocalIterator
// 方法1,可行,但是效率不高,一条条取
val modelpath = "/tmp/BestModel/0.8521581387523667"
val model = MatrixFactorizationModel.load(sc, modelpath)
while (allusers.hasNext) {
val rec = model.recommendProducts(allusers.next(), 5) // 得到Array[Rating]
writeRecResultToMysql(rec, spark, sc)
// writeRecResultToSparkSQL(rec),写入到SPARK-SQL(DataFrame)+hive,同ETL。
// writeRecResultToHbase(rec, sqlContext, sc)
}
// 方法2,不可行,因为一次将矩阵表全部加载到内存,消耗资源太大
// val recResult = model.recommendProductsForUsers(5)
def writeRecResultToMysql(uid: Array[Rating], spark: SparkSession, sc: SparkContext) {
val uidString = uid.map(x => x.user.toString() + ","
+ x.product.toString() + "," + x.rating.toString())
import spark.implicits._
val uidDFArray = sc.parallelize(uidString)
val uidDF = uidDFArray.map(_.split(",")).map(x => Result(x(0).trim().toInt, x(1).trim.toInt, x(2).trim().toDouble)).toDF
// 写入mysql数据库,数据库配置在 AppConf中
uidDF.write.mode(SaveMode.Append).jdbc(jdbcURL, alsTable, prop)
}
// // 把推荐结果写入到phoenix+hbase,通过DF操作,不推荐。
// val hbaseConnectionString = "localhost"
// val userTupleRDD = users.rdd.map { x => Tuple3(x.getInt(0), x.getInt(1), x.getDouble(2)) }
// // zkUrl需要按照hbase配置的zookeeper的url来设置,本地模式就写localhost
// userTupleRDD.saveToPhoenix("NGINXLOG_P", Seq("USERID", "MOVIEID", "RATING"), zkUrl = Some(hbaseConnectionString))
//
// // 把推荐结果写入到phoenix+hbase,通过DF操作,不推荐。
// def writeRecResultToHbase(uid: Array[Rating], sqlContext: SQLContext, sc: SparkContext) {
// val uidString = uid.map(x => x.user.toString() + "|"
// + x.product.toString() + "|" + x.rating.toString())
// import sqlContext.implicits._
// val uidDF = sc.parallelize(uidString).map(_.split("|")).map(x => Result(x(0).trim().toInt, x(1).trim.toInt, x(2).trim().toDouble)).toDF
// // zkUrl需要按照hbase配置的zookeeper的url来设置
// uidDF.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "phoenix_rec", "zkUrl" -> "hadoop001:2181"))
// }
}
}
步骤二:将创建的项目进行打包上传到服务器
mvn clean package -Dmaven.test.skip=true
步骤三:编写shell 执行脚本
[root@hadoop001 ml]# vim RecommendForAllUsers.sh
export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop
$SPARK_HOME/bin/spark-submit
--class com.csylh.recommend.ml.RecommendForAllUsers
--master spark://hadoop001:7077
--name RecommendForAllUsers
--driver-memory 10g
--executor-memory 5g
--packages "mysql:mysql-connector-java:5.1.38"
/root/data/ml/movie-recommend-1.0.jar
步骤四:执行 sh RecommendForAllUsers.sh 即可
有任何问题,欢迎留言一起交流~~
更多文章:基于Spark的电影推荐系统:https://blog.csdn.net/liuge36/column/info/29285