zoukankan      html  css  js  c++  java
  • Spark RDD 实现电影点评用户行为分析 (Scala)

    package com.xh.movies
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    import org.apache.log4j.{Level,Logger}
    
    /**
      * Created by ssss on 3/11/2017.
      * need understand what's the relationshop between  dataset & RDD
      * occupations small data set need to be broadcast
      * production env  should use Parquet ,but not easy for user to read the contents
      * Here we use 4 files below
      * 1,"ratings.dat":     UserID::MovieID::Rating::Timestamp   ///For each threat, you should assign ratings of risk impacts for each asset
      * 2,"users.dat":       UserID::Gender::Age::OccupationID::Zip-code
      * 3,"movies.dat":      MovieID::Title::Genres
      * 4,"occupations.dat":OccupationID::OccupationName
      */
    object MovieReviewsSystemUserBehaviorAnalysis {
    
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)   //only print  error log
    
        var masterUrl = "local[4]" //4 cores   default the process run my laptop for study and testing
        var dataPath = "data/medium/"  //here is where the data saved
    
        /**
          * anyway the process should be run on prod  in jar mode
          * so we need prod master url in case
         */
        if (args.length > 1 ){   //here the spark submit need 2 param in prod env at least
          masterUrl = args(0)
          dataPath = args(1)
        }
        // spark conetext musted .create the spark context
        val conf = new SparkConf().setMaster(masterUrl).setAppName("userAnalysis")
        val sscc = new SparkContext(conf)
    
        /**  0001
          * after have the sc we need read data from local or hadoop
          * Here we use rdd read file
          */
        val usersRDD: RDD[String] = sscc.textFile(dataPath + "users.dat")
        val moviesRDD: RDD[String] = sscc.textFile(dataPath + "movies.dat")
        val occupationsRDD: RDD[String] = sscc.textFile(dataPath + "occupations.dat")
        val ratingsRDD: RDD[String] = sscc.textFile(dataPath + "ratings.dat")
    
        /**
          * accroding movie id get user info
          */
        val basicUserRDD = usersRDD.map(_.split("::")).map{
          user =>(user(3),(user(0),user(1),user(2)))
        }
        val occpuation = occupationsRDD.map(_.split("::")).map(job => (job(0),job(1)))
    
        val userInfo = basicUserRDD.join(occpuation)
        //userInfo.collect().foreach(println)   //result --> occuption id user info  ,occupation name   (4,((2175,M,25),college/grad student)) works
        //ratings.dat":     UserID::MovieID
        val targetMovie = ratingsRDD.map(_.split("::")).map(x => (x(0),x(1))).filter(_._2.equals("1139"))
        val targetUser = userInfo.map(x =>((x._2._1._1),x._2))
        val finalInfo = targetMovie.join(targetUser)
        //finalInfo.collect().foreach(println)   //result (3518,(1139,((3518,F,18),executive/managerial)))
        println(finalInfo.count())
    
        /** 0002
          * get the must populate mives from rating file table by key value reduceByKey
          * "ratings.dat":     UserID::MovieID::Rating::Timestamp
          */
        val  populateRDD = ratingsRDD.map(_.split("::")).map(x => (x(1),x(0),x(2)))
        val getTotalPoint = populateRDD.map(x => (x._1,(x._3.toInt,1))) // get the (key ,value ) tuple
          .reduceByKey((v1, v2) =>( v1._1+v2._1, v1._2+v2._2))          // operation  to reduce  //  here is so complex and so clever  get the total point and total people each movie
          .map(x=>(x._2._1 / x._2._2,x._1))                            //get the average per people per movie
          .sortByKey(false).map(x =>(x._2,x._1))   // desc
          .take(15)   //get top 10
        //getTotalPoint.persist()
        //getTotalPoint.checkpoint()
        //getTotalPoint.cache()
        //getTotalPoint.collect().foreach(println)   // result ---> (2329,(1798027,640))
        //getTotalPoint.foreach(println )    //(3607,5)
    
    
        /**
          * get the most popule movies mean how many people saww the movie and get top 15
          */
    
       val mustPopulte = ratingsRDD.map(_.split("::")).map(x => (x(1),1)).reduceByKey(_+_).sortBy(_._2,false).take(15)
        mustPopulte.foreach(println)    //(2858,3428)    (260,2991)
    
    
        /**0003
          *
          * calculate the top 10 movies by gender ,we can not get the data from rating ,so we need to join user table .need
          * aggergate opeartion
          * mapjoin shuffle  is the killer in distribute system  ,while mapjoin will not cause shuffle
          * but what's the mapjoin ? maybe broadcast  , which a small table or file
          * 1,"ratings.dat":     UserID::MovieID::Rating::Timestamp
          * 2,"users.dat":       UserID::Gender::Age::OccupationID::Zip-code
          */
    
        //first  create a wide table from rating and users
        val spiltUser =usersRDD.map(_.split("::")).map(x=>(x(0),x(1),x(2),x(3)))
        val spiltRating = ratingsRDD.map(_.split("::")).map(x=>(x(0),x(1),x(2),x(3)))
        val wdie = spiltUser.map( x=> (x._1,x._2))
        val genderWideInfo = spiltRating.map(x=> (x._1,(x._1,x._2,x._3))).join(wdie)
        genderWideInfo.cache()
        //genderWideInfo.take(10).foreach(println)  (2828,((2828,3948,5),M))
    
        val maleWideInfo = genderWideInfo.filter(x => x._2._2.equals("M")).map(x => (x._2._1._2,(x._2._1._3.toDouble,1)))  //moveid ,rating point people number
          .reduceByKey((v1, v2) =>( v1._1+v2._1, v1._2+v2._2))
          .map(x=>(x._2._1.toDouble / x._2._2,x._1))
          .sortByKey(false).map(x =>(x._2,x._1))   // desc
          .take(15)   //get top 10
        val fmaleWideInfo = genderWideInfo.filter(x => x._2._2.equals("F")).map(x => (x._2._1._2,(x._2._1._3.toDouble,1)))  //moveid ,rating point people number
          .reduceByKey((v1, v2) =>( v1._1+v2._1, v1._2+v2._2))
          .map(x=>(x._2._1.toDouble / x._2._2,x._1))
          .sortByKey(false).map(x =>(x._2,x._1))   // desc
          .take(15)   //get top 10
    
        maleWideInfo.foreach(println)
        fmaleWideInfo.foreach(println)
    
        // get the ages top n number  ex. 20-29 ,30-39 so on ,so group by  maybe need filter
        //age has been etl
          /**
            * - Age is chosen from the following ranges:
            * 1:  "Under 18"
         * 18:  "18-24"
        * 25:  "25-34"
        * 35:  "35-44"
        * 45:  "45-49"
        * 50:  "50-55"
        * 56:  "56+"
            * */
    
       //get userid and user age
    
        val userAgeInterval = usersRDD.map(_.split("::")).map(x=> (x(0),x(2))).filter(_._2.equals("25"))  //"users.dat":       UserID::Gender::Age
    
        //here  just think there are a little data in usrage interval table  ,broadcast   ----data to ---> executor (one or many tasks in each exector )
    
        //since the data has been filter by age ,so only using user id ,fine
        val finalUserIdSet = mutable.HashSet() ++ userAgeInterval.map(_._1).collect()   // here should be use 2 ++
        val finalUserIdSetBroadCast = sscc.broadcast(finalUserIdSet)  // here define how to broadcast data set
    
       //"ratings.dat":     UserID::MovieID::Rating::Timestamp
        //"movies.dat":      MovieID::Title
     val movieID2Nmae = moviesRDD.map(_.split("::")).map(x=> (x(0),x(1))).collect().toMap
       val finalUserInfos =  ratingsRDD.map(_.split("::")).map(x => (x(0),x(1))).filter(x=> finalUserIdSetBroadCast.value.contains(x._1))
          .map( x => (x._2,1)) // movied and  calculate
          .reduceByKey(_+_).sortBy(_._2,false).take(19)   //false mean desc    (2858,1334)
          .map(x => (movieID2Nmae.getOrElse(x._1,null),x._2))   //(Men in Black (1997),971)
        println("top n by age: ")
        finalUserInfos.foreach(println )
    
    
    
    
    while (true){}  //using to check status by web
    
    
    sscc.stop()
    
    
    
      }
    
    }
    

  • 相关阅读:
    TCP和UDP的最完整的区别
    cluster模块实现多进程-让我的代理服务速度飞起来了
    redis多实例运行
    Nodejs实现代理服务器配置
    java统计程序运行的时间
    spring boot配置写法
    Redis: OOM command not allowed when used memory > ‘maxmemory
    最新版postgresql+pgboucer安装
    spring boot 数据库连接池配置
    Spring BOOT PERFORMANCE
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501265.html
Copyright © 2011-2022 走看看