zoukankan      html  css  js  c++  java
  • 用户画像:最喜爱的明星前3名

    package com.profile.main

    import java.math.BigDecimal
    import com.profile.comment.Comments
    import com.profile.tools._
    import org.apache.commons.lang3.StringUtils
    import scala.collection.mutable.ListBuffer
    import com.profile.main
    /**
    * listBuffer(userid,;;;;)-->,userId+"|"+actor+"|"+playtime
    * 用户画像:7)最喜爱的明星前3名
    * @param userId
    * @param actor
    * @param playtime
    */
    case class most_love_actors(userId:String,actor:String,playtime:Int){
    override def toString: String = {
    userId+"|"+actor+"|"+playtime
    }
    }
    /**
    * 7)最喜爱的明星前3名,用户画像最喜欢的演员
    * @author denghd
    * date 2017-11-08 16:16
    */
    object UserLoveActors {

    def main(args: Array[String]): Unit = {
    val date=if(args.length==1) args(0) else DateTools.getYestodayDate //yyyy-MM-dd
    val sc=SparkTools.getSparkContext
    val vodProgramMap = sc.broadcast(JdbcTools.getLoveVodProgramMap) //点播节目
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    val rdd=if(args.length==1) ReadData.readDataFromHdfs(sc,Comments.ilogslave_log_hdfs_path+date) else ReadData.readDataFromLocal(sc,"E:\logs\2017-11-07")
    val listRDD = rdd.filter(x=> x.contains("vod") && x.contains("VideoPlay")) //x=>"vod".equals(LogTools.getKeywords(x,"ProgramMethod")
    .map(x=>{
    val userId = LogTools.getKeywords(x,Comments.UserId)
    val ProgramID = if(LogTools.getKeywords(x,Comments.ProgramID) !="") new BigDecimal(LogTools.getKeywords(x,Comments.ProgramID)) else new BigDecimal(0)
    val playTime = LogTools.getKeywords(x,Comments.PlayS)
    val actors = vodProgramMap.value.get(ProgramID)
    val actor = if(null == actors || StringUtils.isBlank(actors)) "" else actors
    (userId,actor,playTime)
    }).filter(x=>StringUtils.isNoneBlank(x._2) && null !=x._2)
    .map(x=>{
    val listBuffer = ListBuffer[(String,String,Int)]()
    if(x._2.contains("|")){
    for (actor <- x._2.split("\|")){
    listBuffer +=((x._1,actor,x._3.toInt))
    }
    }else{
    listBuffer +=((x._1,x._2,x._3.toInt))
    }
    listBuffer.toList
    })
    listRDD.flatMap(x=>x).map(x=>{
    most_love_actors(x._1,x._2,x._3)
    }).toDF().registerTempTable("most_love_actors") // userId+"|"+actor+"|"+playtime
    val most_love_actors_df = sqlContext.sql("select '"+date+"' as date,userId,actor,sum(playtime) as playtime from most_love_actors group by userId,actor")
    most_love_actors_df.toDF("date","userId","actor","playtime").show(10)
    most_love_actors_df.map(r=>{
    (r.getAs[String]("userId"),(r.getAs[String]("actor"),r.getAs[Long]("playtime")))
    }).groupByKey().map(x=>{
    val i2 = x._2.toBuffer
    val i2_2 = i2.sortBy(_._2) //按时长从大到小排序
    if (i2_2.length > Comments.top_N_love_actor) i2_2.remove(0, (i2_2.length - Comments.top_N_love_actor))
    (x._1, i2_2.toIterable)
    }).flatMap(x => {
    val y = x._2
    for (w <- y) yield (x._1, w._1, w._2)
    }).toDF("userId","actor","playtime").registerTempTable("most_love_actors")
    val most_love_actors_df2=sqlContext.sql("select '"+date+"' as date,userId,actor,playtime from most_love_actors")
    SparkTools.writeDataframeToPhoenixHbase(most_love_actors_df2,Comments.hbase_t_user_most_love_actor)
    }

    }
  • 相关阅读:
    Java练习 标准输入,输出,以及switch判断
    Java练习 标准输入,输出,以及if else判断
    Java 语句和流程控制
    Java的运算符,Java的表达式
    理解 Linux 的硬链接与软链接(转)
    第一范式、第二范式、第三范式详解(转自知乎)
    TCP/IP协议图解
    POSIX条件变量
    自旋锁与读写锁
    POSIX信号量与互斥锁实现生产者消费者模型
  • 原文地址:https://www.cnblogs.com/hd-zg/p/7874561.html
Copyright © 2011-2022 走看看