zoukankan      html  css  js  c++  java
  • 离线数据分析之 人物兴趣取向分析(2-1)数据探索

    一、上传文件 

    hdfs dfs -mkdir -p /party/data
    hdfs dfs -put /opt/data/event_data/*.csv /party/data

    二、Data Exploration

    【数据探索:数据有没有毛病,保证质量】 

    开启spark

    ./spark-shell

    1.去头的两种方法

    ## rdd去头
    val rdd = spark.sparkContext.textFile("hdfs://192.168.56.111:9000/party/data/users.csv").cache()
    val head = rdd.first();
    rdd.filter(x=>x!=head).foreach(println(_))
    
    ## df去头
    val dfUsers = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.111:9000/party/data/users.csv").cache()
    dfUsers.show(false)

    2.去重user_id

    ## 去重user_id
    import org.apache.spark.sql.expressions.Window
    import spark.implicits._
    val wnd = Window.partitionBy("user_id").orderBy(desc("user_id"))
    dfUsers.select($"user_id",row_number().over(wnd).as("rank"))
    .filter("rank=1").show(false)

    3.birthday是否符合格式

    => 正则匹配过滤null+用均值填充null => 为了保证正态分布

    .filter("friends_1 is not null").cache()
    // 另写作:filter($"friends_1".isNotNull)

    方法1:crossJoin(一列) => 自动多加一列

            when-otherwise 

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    // 1920-2020
    val test = udf{
    (year:String)=>{
    val reg = "(19[2-9][0-9])|(20[0-2][0-9])"
    year.matches(reg)    
    }
    }
    
    // when-otherwise
    val df1 = dfUsers.filter(test($"birthyear")).agg(floor(avg($"birthyear")).as("avgyear"))
    dfUsers.crossJoin(df1).withColumn("birthyear",when(test($"birthyear"),$"birthyear").otherwise($"avgyear")).show()

    方法2:利用cast转型时none自动转为null,avg时自动跳过null   

    val dfUsers = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.111:9000/party/data/users.csv").cache()
    import org.apache.spark.sql.types._
    import spark.implicits._
    // 把birthday转成 数字or null => 过滤null+不实数据,求出avg
    val dfuser1 = dfUsers.withColumn("birthyear_",$"birthyear".cast(IntegerType)).filter($"birthyear_".isNotNull && $"birthyear_" > lit(1920))
    val dfavg = dfuser1.select(avg("birthyear_").cast(IntegerType).as("avg_year"))
    dfavg.show
    // 将null 和 不实数据 => 填补成avg数据
    val dfFinalUser = dfUsers.withColumn("birthyear",$"birthyear".cast(IntegerType)).crossJoin(dfavg).withColumn("birthyear",when($"birthyear".isNull or $"birthyear" < lit(1920),$"avg_year").otherwise($"birthyear")).drop("avg_year")
    dfFinalUser.show

    4.gender => 只能 “male/female/unknown” 

    // 查看有哪些分类
    dfUsers.select("gender").distinct.show
    // (如何查出null值?)
    正确:
    dfUsers.filter("gender is null ").show()
    错误:可能是留下""的,不等同于null,所以查不出来
    dfUsers.filter("gender!= 'male' and gender!='female' ").show()
    // 使用when-otherwise转分类
    val myuser = dfUsers.withColumn("gender",when($"gender".isNull,"unknown").otherwise($"gender"))
    // 验证是否改分类成功
    myuser.select("gender").distinct.show

    5. 验证locale和文件一一对应   

    【使用主表(目标表)left join 从表 => 看null值】

    外连接查询结果 = 内连接结果+主表中有而从表没有的记录

    //方法一:适用于两表列名称相同
    val rdd1 = sc.textFile("hdfs://192.168.56.111:9000/party/data/locale.txt")
    val df1 = rdd1.map(x=>{
    val strs = x.split("	")
    (strs(0),strs(1))
    }).toDF("id","locale")
    dfUsers
    .join(df1,Seq("locale"),"left")
      .filter("id is null")
      .select($"locale").distinct
      .show(false)
    // 方法二:适用于两边列名不一样
    val rdd1 = sc.textFile("hdfs://192.168.56.111:9000/party/data/locale.txt")
    val df1 = rdd1.map(x=>{
    val strs = x.split("	")
    (strs(0),strs(1))
    }).toDF("id","localeName")
    dfUsers
    .alias("u").join(df1.alias("f"),$"u.locale" === $"f.localeName","left")
    .filter("id is null")
    .select($"locale").distinct
    .show(false)

    6.ETL每个环节都要验证count数目正确  

    val events = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.111:9000/party/data/events.csv")
    events.count
    res25: Long = 3137972

    7.没有举办方的活动(left join)   

    filter(“xxx is null ”) 需要先.withColumn("xxx",$"xxx".cast(对应的Type)) => 把None 转化为 null才能判断出来!

    (一)是否需要写withColumn("xxx",$"xxx".cast(对应的Type))?

    => 1. 如果需求是distinct count:是否需要值为null的记录数?

               如果需要null的记录 =>  不需要;(none和null都会变成1个记录)

               如果要去除null的记录 => withColumn转成null后用filter过滤null的值(否则无法过滤none的值)

    => 2.如果需求是”null转均值“:需要

    注意:转Type的时候要注意是Integer还是Long

    (二) 两种left join的区别(结合9的图):

     方法1:适用于判断字段相同

        方法2:适用于判断字段不同

    events.select($"user_id",$"event_id")
    .join(dfUsers,Seq("user_id"),"left")
    .filter("user_id is not null and locale is null") 
    .select($"event_id").distinct.count

    8.哪个用户举办的活动最多(分组count)  

    events.groupBy($"user_id").agg(count("event_id").alias("num")).orderBy($"num".desc).show()

    9.多少event中host的user_id 不在users.csv里面(left join)    

    events.alias("e1").join(dfUsers.alias("u1"),$"e1.user_id" === $"u1.user_id","left_outer").filter($"u1.user_id".isNull).select($"e1.user_id").distinct.count

    等同于 

    events.select($"user_id",$"event_id")
    //.withColumn("user_id",$"user_id".cast(IntegerType)) 此举有误 .join(dfUsers,Seq("user_id"),"left") .filter("user_id is not null and locale is null") .select($"user_id").distinct.count

    关于灰色字段为什么加上去就错,因为

    => 应该是LongType,转成Int失败会转成null

    10.把 user 和 friends 的关系表 => 转成一一对应的(行转列lateral view explode)   

    见:https://www.cnblogs.com/sabertobih/p/13589760.html

    11.friend_id 非空的记录数

    distinct 会把所有null变成1个

    val f1 = friends.select($"user".alias("user_id"),explode(split($"friends"," ")).alias("friends_1"))
    .filter("friends_1 is not null")
    f1.distinct.count

    12.表结构转换(多个行转列合并) 

    实现需求如下:

    方法1:spark sql

    val dfeventAttendees = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.111:9000/party/data/event_attendees.csv").cache()
    dfeventAttendees.createOrReplaceTempView("eventAttendees")
    val dfyes = spark.sql("""
    select 
    event,userid,"yes" as type 
    from 
    eventAttendees
    lateral view explode(split(yes,' '))userid as userid
    """)
    val dfinvited = spark.sql("""
    select 
    event,userid,"invited" as type 
    from 
    eventAttendees
    lateral view explode(split(invited,' '))userid as userid
    """)
    val dfmaybe = spark.sql("""
    select 
    event,userid,"maybe" as type 
    from 
    eventAttendees
    lateral view explode(split(maybe,' '))userid as userid
    """)
    val dfno = spark.sql("""
    select 
    event,userid,"no" as type 
    from 
    eventAttendees
    lateral view explode(split(no,' '))userid as userid
    """)
    dfyes.union(dfinvited).union(dfmaybe).union(dfno).filter("userid is not null").distinct.count

    >>> 11245008

    或者直接写在一条大sql中,每一个加distinct 然后 union all 然后整体 where userid is not null

    方法2:spark sql API

    val yes = df.select($"event_id",explode(split($"yes"," ")).alias("user_id"),lit("yes").alias("status"))
    // 省略其他
    yes.union(dfinvited).union(dfmaybe).union(dfno).filter($"user_id".isNotNull).distinct.count

    方法3:rdd

    注意用rdd分割csv有 .map(x=>x.split(",",-1)) 操作。具体见:https://i.cnblogs.com/posts/edit;postId=13706382

    rdd过滤数组中空字符串: .filter(!_.trim.equals("") 

    // 先去首行,再map割
    val rddpre = sc.textFile("hdfs://192.168.56.111:9000/party/data/event_attendees.csv") val head = rddpre.first() val rdd = rddpre.filter(x=>x!=head)
              .
    map(x=>x.split(",",-1)).cache
    val rdd1
    = rdd.flatMap(x=>x(1).split(" ").filter(!_.trim.equals("")).map(y=>(x(0),y,"yes")))
    val rdd2
    = rdd.flatMap(x=>x(2).split(" ").filter(!_.trim.equals("")).map(y=>(x(0),y,"maybe")))
    val rdd3
    = rdd.flatMap(x=>x(3).split(" ").filter(!_.trim.equals("")).map(y=>(x(0),y,"invited")))
    val rdd4
    = rdd.flatMap(x=>x(4).split(" ").filter(!_.trim.equals("")).map(y=>(x(0),y,"no")))
    rdd1.union(rdd2).union(rdd3).union(rdd4).distinct.count

    13.dfeventAttendees中去除event

    val dfeventAttendees = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.111:9000/party/data/event_attendees.csv").cache()
    val events = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.111:9000/party/data/events.csv")
    dfeventAttendees.alias("e1").join(events.alias("e2"),$"e1.event" === $"e2.event_id","left_outer" ).filter($"e2.event_id".isNull).select($"e1.event").distinct.count
    >>> 280

    14.users和event举办人重叠(user和event交集)

    dfUsers.alias("e1").join(events.alias("e2"),$"e1.user_id" === $"e2.user_id","inner" ).select($"e1.user_id").distinct.count
    >>> 569

    15. 去重5种方法,选取时间戳最大的行

    1)distinct

    2)groupby方法,但只能显示user+event列

    val train = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.111:9000/party/data/train.csv")
    train.groupBy("user","event").agg(max(unix_timestamp($"timestamp").as("time")).as("ts")).count
    >>> 15220

    * 如何显示其他列?

    3)窗口函数

    import org.apache.spark.sql.expressions.Window
    val wnd = Window.partitionBy($"user",$"event").orderBy($"stime".desc)
    // 直接用withColumn,不用select无数列 train2.withColumn("rank",row_number().
    over(wnd)).filter("rank=1").count >>> 15220

    4)优化方法【适用面广】

    以下以为时间需要转成timestamp才能用max取最大值,其实直接max(时间)也可以

    val train2 = train.select($"user",$"event",$"invited",unix_timestamp($"timestamp").alias("stime"),$"interested",$"not_interested")
    // select写的不好,改成 =>
    val train2 = train.withColumn("stime",unix_timestamp($"timestamp"))
    
    train2.createOrReplaceTempView("train2")
    spark.sql("""
    select 
    count(1)
    from 
    train2 a
    where exists(
    select 1 
    from
    (
    select max(stime) as stime,user,event 
    from train2 group by user,event
    )b
    where b.user = a.user and b.event = a.event and a.stime = b.stime
    ) 
    """).show
    >>> 15220

     5)独有方法dropDuplicates【最方便】

    去重(取第一个),并保留其他列

    缺点:没办法决定顺序

    train.dropDuplicates("user","event").count
    >>> 15220

    保留最大时间戳,取第一个(取最大时间戳)

    repartition : 根据xx重新分区 + sortWithinPartition : 分区内排序

    train.repartition($"user",$"event").sortWithinPartitions($"timestamp".desc).dropDuplicates("user","event").count

     

  • 相关阅读:
    PHP开发者的MySQL错误
    shell编程技术和实例《linux0.01内核分析与操作系统设计》
    函数问题1 init_EMUX
    sizeof问题
    C语言读书心得
    《深入浅出MFC》读书感受
    计算机专业学习多年的苦恼
    一个完整的字符设备驱动程序导读
    学习书籍选择
    鼠标滑动、文本添加(倒计时)
  • 原文地址:https://www.cnblogs.com/sabertobih/p/14070357.html
Copyright © 2011-2022 走看看