zoukankan      html  css  js  c++  java
  • Spark Scala DataFram join 操作

    package com.xh.movies
    
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    
    /**
      * Created by xxxxx on 3/15/2017.
      */
    object DataFrameJoin {
    
      def main(args: Array[String]): Unit = {
    
        val sparkConf = new SparkConf().setMaster("local").setAppName("dataframe")
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()
        val sc = spark.sparkContext   // 不同形式风格的sc
        //* 1,"ratings.dat":     UserID::MovieID::Rating::Timestamp   ///For each threat, you should assign ratings of risk impacts for each asset
        //* 2,"users.dat":       UserID::Gender::Age::OccupationID::Zip-code
        val userRDD= sc.textFile("data/medium/users.dat").map(_.split("::")).map(x => (x(0),x(1)))
        val ratingRDD= sc.textFile("data/large/ratings.dat").map(_.split("::")).map(x => (x(0),x(1)))
    
        //define the struct and type   这部分内容 让人想让不爽 ,麻烦
    
        val schemaUser  = StructType("UserID::Gender".split("::").map(column => StructField(column,StringType,true)))
        val schemaRating  = StructType("UserID::MovieID".split("::").map(column => StructField(column,StringType,true)))
    
        val rowUser: RDD[Row] = userRDD.map(line => Row(line._1,line._2))
        val rowRating: RDD[Row] = ratingRDD.map(line => Row(line._1,line._2))
    
        val userDataFaram = spark.createDataFrame(rowUser,schemaUser)
        val ratingDataFram = spark.createDataFrame(rowRating,schemaRating)
    
        ratingDataFram.filter(s" movieid = 3578")
          .join(userDataFaram,"userid")
          .select("movieid","gender")
          .groupBy("gender")
          .count()
          .show(10)
        //gender  挺麻烦
    //      +------+-----+
    //      |gender|count|
    //      +------+-----+
    //      |     F|  319|
    //      |     M|  896|
    //      +------+-----+3
    
        //userDataFaram.registerTempTable()   //已经被遗弃了
        userDataFaram.createOrReplaceTempView("users")
        ratingDataFram.createOrReplaceTempView("ratings")
    
        val sql = "select count(*) as count ,gender from users u join ratings r on u.userid = r.userid where movieid = 3578  group by gender order by 2"
    
        spark.sql(sql).show()
    
        while (true){}
        sc.stop()
      }
    
    }
    


    
    

    
    
    
                
    
  • 相关阅读:
    NLP(七):textCNN
    pandas(一):选取部分(行、列)写入到另一个文件
    (二) PCA的数学原理
    NLP(六):BiLSTM_Attention实现
    python(三):collection模块
    tensorflow(二十五):Tensorboard可视化
    推荐系统(二):基于pytorch的textdeepfm
    引用相关
    带缺省参数的重复声明
    运算符优先级
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501262.html
Copyright © 2011-2022 走看看