zoukankan      html  css  js  c++  java
  • 离线数据分析之 人物兴趣取向分析(2-2)离线/实时项目架构|项目流程|数仓构建(进阶篇)

    一、离线 vs 实时流框架

    用spark数据清洗的过程见:日志分析 https://www.cnblogs.com/sabertobih/p/14070357.html

    实时流和离线的区别在于数据处理之间的时间差,而不取决于工具。所以kafka,sparkstreaming亦可用于离线批处理。

    离线训练模型:多久根据需求决定,每一次模型都从头建立

    离线预测模型:spark.sql用hive建dm_final表 -> spark ml

    实时预测模型:kafka中建立dm_final表 -> spark ml 

    二、离线(本项目)处理思路

    (一)构建ML模型

    ① flume

    数据采集,实时监控新增数据,传输到kfk (见:https://www.cnblogs.com/sabertobih/p/14115501.html

    ② kafka

    削峰,实时数据监控命令:https://www.cnblogs.com/sabertobih/p/14024011.html

    ③ kafka-> sparkstreaming -> kafka

    格式转换,见第五条:https://www.cnblogs.com/sabertobih/p/14136154.html

    ④ kafka-> HBase

    由于Rowkey唯一,重复的Rowkey自动覆盖,可以完成去重

    见第六条:https://www.cnblogs.com/sabertobih/p/14136154.html

    ⑤ HBase-> hive

    建立外部映射表(数据存放在hdfs上,hive用于大批数据的复杂查询,hbase用于数据的有序映射)

    create database if not exists events_db
    SET hivevar:db=events_db
    use ${db};
    create external table ${db}.users(
    userid string,
    birthday string,
    gender string,
    locale string,
    location string,
    timezone string,
    joinedAt string
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties('hbase.columns.mapping'=':key,profile:birthday,profile:gender,region:locale,region:location,region:timezone,registration:joinedAt')
    tblproperties('hbase.table.name'='event_db:users')
    
    ---userfriends
    create external table event_db.userfriends(
    ukey string,
    userid string,
    friendid string
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties('hbase.columns.mapping'=':key,uf:userid,uf:friendid')
    tblproperties('hbase.table.name'='event_db:user_friends')
    
    
    ---events_db
    create external table event_db.events(
    eventid string,
    startTime string,
    city string,
    dstate string,
    zip string,
    country string,
    lat string,
    lng string,
    userId string,
    features string
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties('hbase.columns.mapping'=':key,schedule:startTime,location:city,location:state,location:zip,location:country,location:lat,location:lng,creator:userId,remark:features')
    tblproperties('hbase.table.name'='event_db:events')
    ---eventAttendees
    create external table event_db.eventAttendees(
    ekey string,
    eventId string,
    userId string,
    status string
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties('hbase.columns.mapping'=':key,euat:eventId,euat:userId,euat:status')
    tblproperties('hbase.table.name'='event_db:event_attendees')
    ---train
    create external table event_db.train(
    tkey string,
    userId string,
    eventId string,
    invited string,
    etimestamp string,
    interested string,
    notinterested string
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties('hbase.columns.mapping'=':key,eu:userId,eu:eventId,eu:invitedId,eu:timestamp,eu:interested,eu:notinterested')
    tblproperties('hbase.table.name'='event_db:train')
    ---test
    create external table event_db.test(
    tkey string,
    tuser string,
    event string,
    invited string,
    ttimestamp string
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties('hbase.columns.mapping'=':key,eu:user,eu:event,eu:invited,eu:timestamp')
    tblproperties('hbase.table.name'='event_db:test')

    ⑥ hive中构建数仓

    (6-1) ods层:数据存在hdfs中的内部表

    1. 以ORC格式,查询速度快(详见:https://www.jianshu.com/p/91f793a1bdb3

    2. 顺带清洗工作

    create table ${db}.users
    stored as ORC AS 
        SELECT * from ${db}.hb_users
    
    drop database if exists ods_events cascade;
    create database ods_events;
    use ods_events;
    
    create table ods_events.ods_users
    stored as ORC as select * from event_db.users;
    create table ods_events.ods_eventsTable
    stored as ORC as select * from event_db.events;
    create table ods_events.ods_eventAttendees
    stored as ORC as select * from event_db.eventAttendees;
    create table ods_events.ods_userfriends
    stored as ORC as select * from event_db.userfriends;
    create table ods_events.ods_train
    stored as ORC as select * from event_db.train;
    create table ods_events.ods_test
    stored as ORC as select * from event_db.test;
    create table ods_events.ods_locale(
    localeid string,
    locale string
    )
    row format delimited fields terminated by '	'
    location 'hdfs://192.168.56.111:9000/party/data1/locale';

    (6-2) dwd层:数据归一化

    比如统一 “男” 和 “女” 为数字,把null值替换成平均值等数据预处理工作

    //数据归一化
    drop database if exists dwd_events;
    create database dwd_events;
    use dwd_events;
    
    //birthyear空赋值
    create table dwd_users as 
    select userid,locale,location,
    case when cast(birthday as int) is null then avg_year else birthday end as birthyear,
    case gender when 'male' then -1 when 'female' then 1 else 0 end as gender,
    case when cast(timezone as int) is null then avg_timezone else timezone end as timezone,
    case when trim(joinedat)='' or lower(joinedat)='none' then avg_member else unix_timestamp(translate(joinedat,'TZ',' ')) end as members
    from
    (select * from ods_events.ods_users 
    cross join
    (
    select floor(avg(cast(birthday as int))) avg_year,
    floor(avg(cast(timezone as int))) avg_timezone,
    floor(avg(unix_timestamp(translate(joinedat,'TZ',' ')))) avg_member
    from ods_events.ods_users
    )tmp
    )a;
    
    create table dwd_events.dwd_events as 
    select eventid,unix_timestamp(translate(starttime,'TZ',' '))starttime,
    city,country,dstate as province,
    case when cast(lat as float) is null then avg_lat else lat end lat,
    case when cast(lng as float) is null then avg_lng else lng end lng,
    userid,features
    from 
    ods_events.ods_eventsTable 
    cross join
    (select round(avg(cast(lat as float)),3)avg_lat,
    round(avg(cast(lng as float)),3)avg_lng 
    from ods_events.ods_eventsTable
    )tmp;
    
    create table dwd_events.dwd_eventAttendees as select * from ods_events.ods_eventattendees;
    create table dwd_events.dwd_usersFriends as select * from ods_events.ods_userfriends;
    create table dwd_events.dwd_train as select tkey trid,userid,eventid,invited,etimestamp ttime,interested label from ods_events.ods_train;
    create table dwd_events.dwd_locale as select * from ods_events.ods_locale;

    (6-3)dws层:轻聚合维度表 

    聚合思路:

    drop database if exists dws_events;
    create database dws_events;
    use dws_events;
    
    create temporary macro maxandmin(cdata int ,maxdata int,mindata int) (cdata-mindata)/(maxdata-mindata);
    create temporary macro calcage(y int) year(current_date())-y;
    
    -- ===================dws_temp_users================
    create table dws_events.dws_temp_users as
    select
    userid,
    locale,
    gender,
    location, maxandmin(cage,max_age,min_age) age, maxandmin(timezone,max_timezone,min_timezone) timezone, maxandmin(members,max_members,min_members) members
    from ( select userid, case when l.locale is null then 0 else l.localeid end locale, gender,location calcage(birthyear) cage, min_age, max_age, timezone, min_timezone, max_timezone, members, min_members, max_members from dwd_events.dwd_users u left join dwd_events.dwd_locale l on lower(u.locale)=lower(l.locale) cross join ( select min(calcage(birthyear)) min_age, max(calcage(birthyear)) max_age, min(timezone) min_timezone, max(timezone) max_timezone, min(members) min_members, max(members) max_members from dwd_events.dwd_users ) b ) c -- ===================dws_temp_userEvent================ create table dws_events.dws_temp_userEvent as select u.userid, case when uf.friendnum is null then 0 else uf.friendnum end friendnum, case when invite_event_count is null then 0 else invite_event_count end invite_event_count, case when attended_count is null then 0 else attended_count end attended_count, case when not_attended_count is null then 0 else not_attended_count end not_attended_count, case when maybe_attended_count is null then 0 else maybe_attended_count end maybe_attended_count, case when joinnum is null then 0 else joinnum end event_count from dwd_events.dwd_users u left join (select userid,count(friendid) friendnum from dwd_events.dwd_usersFriends group by userid) uf on u.userid=uf.userid left join (select userid, sum(case when status='invited' then 1 else 0 end) as invite_event_count, sum(case when status='yes' then 1 else 0 end) as attended_count, sum(case when status='no' then 1 else 0 end) as not_attended_count, sum(case when status='maybe' then 1 else 0 end) as maybe_attended_count from dwd_events.dwd_eventAttendees group by userid ) ea on u.userid=ea.userid left join (select userid, count(eventid) joinnum from dwd_events.dwd_train group by userid) dt on u.userid=dt.userid;

    为什么使用left join?

    —— 因为dws层需要以user总表中的userid为主表,其他表为辅表 => 聚合维度表

    最后dm层还是以事实表中的id为准,使用inner join

    -- ===================合并dws_temp_userinfos================
    create table dws_events.dws_temp_userinfos as 
    select 
    du.userid,du.locale,du.gender,du.age,du.timezone,du.members,ue.friendnum,
    ue.invite_event_count,ue.attended_count,ue.not_attended_count,ue.maybe_attended_count,ue.event_count
    from dws_events.dws_temp_users du 
    inner join dws_events.dws_temp_userEvent ue 
    on du.userid=ue.userid;
    
    -- ===================视图dws_view_city_level================
    create view dws_events.dws_view_city_level as
    select city,dense_rank()over(order by partynum desc) rank
    from
    (
    select city,count(eventid) partynum 
    from dwd_events.dwd_events
    group by city
    )a
    
    -- ===================视图dws_view_country_level================
    create view dws_events.dws_view_country_level as
    select country,dense_rank()over(order by partynum desc) rank
    from
    (select country,count(eventid) partynum from dwd_events.dwd_events
    group by country)a
    
    -- ===================dws_temp_eventinfos================
    create table dws_events.dws_temp_eventinfos as
    select
    de.eventid,
    de.userid as creator,
    de.starttime,
    de.city,
    de.province,
    de.country,
    month(from_unixtime(starttime,'yyyy-MM-dd')) event_month,
    dayofweek(from_unixtime(starttime,'yyyy-MM-dd')) event_dayofweek,
    hour(from_unixtime(starttime,'yyyy-MM-dd HH:mm:ss')) event_hour,
    case when event_invited_count is null then 0 else event_invited_count end event_invited_count,
    case when event_attended_count is null then 0 else event_attended_count end event_attended_count,
    case when event_not_att_count is null then 0 else event_not_att_count end event_not_att_count,
    case when event_maybe_count is null then 0 else event_maybe_count end event_maybe_count,
    case when c1.rank is null then 0 else c1.rank end city_level,
    case when t1.rank is null then 0 else t1.rank end country_level,
    maxandmin(lat,mmtab.min_lat,mmtab.max_lat) lat_prec,
    maxandmin(lng,mmtab.min_lng,mmtab.max_lng) lng_prec,
    features
    from
    dwd_events.dwd_events de 
    left join 
    (select eventid,
    sum(case when status='invited' then 1 else 0 end) as event_invited_count,
    sum(case when status='yes' then 1 else 0 end) as event_attended_count,
    sum(case when status='no' then 1 else 0 end) as event_not_att_count,
    sum(case when status='maybe' then 1 else 0 end) as event_maybe_count
    from dwd_events.dwd_eventAttendees 
    group by eventid) ea
    on de.eventid=ea.eventid 
    left join
    dws_events.dws_view_city_level c1
    on de.city=c1.city 
    left join
    dws_events.dws_view_country_level t1
    on de.country=t1.country 
    cross join
    (select min(lat) min_lat,max(lat) max_lat,min(lng) min_lng,max(lng) max_lng from dwd_events.dwd_events)mmtab;

    -- ===================dws_temp_train================
    create table dws_events.dws_temp_train as
    select * from dwd_events.dwd_train;
    
    -- ===================dws_temp_userfriends================
    create table dws_events.dws_temp_userfriends as
    select * from dwd_events.dwd_usersFriends;
    
    -- ===================dws_temp_eventattendees================
    create table dws_events.dws_temp_eventattendees as
    select * from dwd_events.dwd_eventattendees;
    
    -- 某个用户对某次会议是否收到邀请以及回复
    -- ===================dws_temp_row_eventattendees================
    create table dws_events.dws_temp_row_eventattendees as
    select
    userid,
    eventid,
    max(case when status='invited' then 1 else 0 end) invited,
    max(case when status='yes' then 1 else 0 end) yes,
    max(case when status='maybe' then 1 else 0 end) maybe,
    max(case when status='no' then 1 else 0 end) no
    from
    dws_events.dws_temp_eventattendees
    group by userid,eventid
    
    -- 某人的某个朋友参加某次会议是否获得邀请以及这个朋友的回复
    -- ===================dws_temp_uf_infos================
    create table dws_events.dws_temp_uf_infos as
    select
    uf.userid,
    uf.friendid,
    case when ea.eventid is null then 0 else ea.eventid end eventid,
    case when ea.invited is null then 0 else ea.invited end invited,
    case when ea.yes is null then 0 else ea.yes end yes,
    case when ea.maybe is null then 0 else ea.maybe end maybe,
    case when ea.no is null then 0 else ea.no end no
    from
    dws_events.dws_temp_userfriends uf
    left join
    dws_events.dws_temp_row_eventattendees ea
    on uf.friendid=ea.userid

     

    -- ===================dws_temp_uf_summary================
    create table dws_events.dws_temp_uf_summary as
    select
    a.*,
    uf_invited_count/b.friendnum as uf_invited_prec,
    uf_attended_count/b.friendnum as uf_attended_prec,
    uf_not_attended_count/b.friendnum as uf_not_attended_prec,
    uf_maybe_count/b.friendnum as uf_maybe_prec
    from
    (select
    ufi.userid,
    ufi.eventid,
    sum(ufi.invited) as uf_invited_count,
    sum(ufi.yes) as uf_attended_count,
    sum(ufi.no) as uf_not_attended_count,
    sum(ufi.maybe) as uf_maybe_count
    from
    dws_events.dws_temp_uf_infos ufi
    group by ufi.userid,ufi.eventid
    ) a
    inner join
    (select
    userid,
    count(friendid) friendnum
    from
    dws_events.dws_temp_userfriends uf
    group by userid
    ) b
    on a.userid=b.userid

     

    -- 训练集中的某人参加的某个会议是否是这个会议主持人的朋友
    -- ==============dws_temp_isfriends==========

    create
    table dws_events.dws_temp_isfriends stored as orc as with t1 as(select tr.userid,es.eventid,es.userid as creator from dwd_events.dwd_train tr inner join dwd_events.dwd_events es on tr.eventid=es.eventid), t2 as(select t.userid,t.eventid, case when uf.friendid is null then 0 else 1 end isfriend from t1 t left join dwd_events.dwd_usersFriends uf on t.userid=uf.userid and t.creator=uf.friendid), t3 as (select t.userid,t.eventid, case when uf.friendid is null then 0 else 1 end isfriend from t1 t left join dwd_events.dwd_usersFriends uf on t.userid=uf.friendid and t.creator=uf.userid) select t2.userid,t2.eventid, case when t2.isfriend=1 or t3.isfriend=1 then 1 else 0 end isfriend from t2 inner join t3 on t2.userid=t3.userid and t2.eventid=t3.eventid

     

    (6-4)dm层:宽表

    • dm指标:
    user_interested     ----label
    user_id                ----no
    event_id            ----no
    locale                    -----1~x
    gender                    -----1,0,1
    age                        -----0~1
    timezone                -----0~x
    member_days                -----0~1
    friend_count                -----0~1
    invite_days                -----0~1
    event_count                -----0~1
    invited_event_count(被邀请)         -----0~1    
    attended_count(答应参加)            -----0~1
    not_attended_count(拒绝参加)        -----0~1
    maybe_attended_count(可能参加)        -----0~1
    user_invited(本次会议是否被邀请)    -----0,1
    uf_invited_count(多少朋友被邀请)    -----0~1
    uf_attended_count(答应去的朋友数)    -----0~1
    uf_notattended_count(不去的)        -----0~1
    uf_maybe_count(可能)                -----0~1
    uf_invited_prec(朋友被邀请的%)        -----0~1
    uf_attended_prec(朋友答应%)            -----0~1
    uf_not_attended_prec(不答应%)        -----0~1
    uf_maybe_prec(可能%)                -----0~1
    ahead_days(活动和邀请时间差)        -----0~1
    event_month(活动月份)                -----0~12
    event_dayofweek(活动星期)            -----0~52
    event_hour(活动时间)                -----0~24
    event_invited_count(活动邀请人数)    -----0~1
    event_attended_count(答应人数)        -----0~1
    event_not_att_count(不答应人数)        -----0~1
    event_maybe_count(活动答应可能)        -----0~1
    city_level(城市等级)                -----0~x
    country(国家等级)                    -----0~x
    lat_prec(经度%)                        -----0~1
    lng_prec(纬度%)                        -----0~1
    creator_is_friend                    -----0,1
    location——similar(人和会议是否同城) -----0,1
    event_type(活动分类)                -----0~x
    drop database if exists dm_events;
    create database dm_events;
    use dm_events;
    -- ==================locationSimilar是否同城================= create temporary macro locationSimilar(location String,city String,province String,country String) case when instr(lower(location),lower(city))>0 or instr(lower(location),lower(province))>0 or instr(lower(location),lower(country))>0 then 1 else 0 end -- ===================dm_usereventfinal================ drop table if exists dm_events.dm_usereventfinal; create table dm_events.dm_usereventfinal as select tr.label, tr.userid, tr.eventid, us.locale, us.gender, us.age, us.timezone, us.members as member_days, floor((ei.starttime-unix_timestamp(ttime))/3600/24) invite_days, us.friendnum as friend_count, us.invite_event_count, us.attended_count, us.not_attended_count, us.maybe_attended_count, us.event_count, tr.invited, ufe.uf_invited_count, ufe.uf_attended_count, ufe.uf_not_attended_count, ufe.uf_maybe_count, ufe.uf_invited_prec, ufe.uf_attended_prec, ufe.uf_not_attended_prec, ufe.uf_maybe_prec, ei.event_month, ei.event_dayofweek, ei.event_hour, ei.event_invited_count, ei.event_attended_count, ei.event_not_att_count, ei.event_maybe_count, ei.city_level, ei.country_level, ei.lat_prec, ei.lng_prec, ifr.isfriend as creator_is_friend, locationSimilar(us.locale,ei.city,ei.province,ei.country) as location_similar, ei.features from dws_events.dws_temp_train tr inner join dws_events.dws_temp_userinfos us on tr.userid=us.userid inner join dws_events.dws_temp_eventinfos ei on tr.eventid=ei.eventid inner join dws_events.dws_temp_uf_summary ufe on tr.userid=ufe.userid and tr.eventid=ufe.eventid inner join dws_events.dws_temp_isfriends ifr on tr.userid=ifr.userid and tr.eventid=ifr.eventid;

    ⑦ 训练kmeans模型

    使用dm_usereventfinal中的features(来源于event数据)训练kmeans模型(给events分类),输出新表dm_eventtype

    链接:https://www.cnblogs.com/sabertobih/p/14183984.html

        连接dm_usereventfinal和dm_eventtype生成新表dm_final

    create table dm_final as 
    select 
    a.label,a.userid,a.eventid,a.locale,a.gender,a.age,a.timezone,a.member_days,a.invite_days,a.friend_count,a.invite_event_count,a.attended_count,a.not_attended_count,
    a.maybe_attended_count,a.event_count,a.invited,a.uf_invited_count,a.uf_attended_count,a.uf_not_attended_count, a.uf_maybe_count,a.uf_invited_prec,a.uf_attended_prec,
    a.uf_not_attended_prec,a.uf_maybe_prec,a.event_month,a.event_dayofweek, a.event_hour,a.event_invited_count,a.event_attended_count, a.event_not_att_count,a.event_maybe_count,
    a.city_level,a.country_level,a.lat_prec,a.lng_prec,a.creator_is_friend ,a.location_similar,
    b.prediction
    as eventtype from dm_events.dm_usereventfinal a inner join dm_events.dm_eventtype b on a.eventid = b.eventid;

    ⑧ 使用 dm_final训练构建RF模型,预测分类

    链接:https://www.cnblogs.com/sabertobih/p/14183984.html

     (二)使用模型预测test数据集

     

    ① [scala] 使用spark.sql操作hive,指标保持和训练用的一致

    首先保证连的上去,见:https://www.cnblogs.com/sabertobih/p/13772985.html

    相同的分层数据处理,最终都要获得相同的计算指标表

    /**
       * 判断用户和会议是否距离比较近
       */
      val locationSimilar=udf(
        (location:String,city:String,province:String,country:String)=> {
          if (city.toLowerCase().indexOf(location.toLowerCase()) > -1 ||
            province.toLowerCase().indexOf(location.toLowerCase()) > -1 ||
            country.toLowerCase().indexOf(location.toLowerCase()) > -1) {
            1
          } else {
            0
          }
        }
      )
    
      /**
       * 最大最小值 归一化方法
      * // 为什么是Double? 因为ml之前需要数据预处理,早晚都要变成DoubleType
    */ val maxandmin = udf{ (cdata:Double,maxdata:Double,mindata:Double)=>{ (cdata-mindata)/(maxdata-mindata) } } /** * 计算用户年龄 */ val calcage = udf{ (y:Double) =>{ Calendar.getInstance().get(Calendar.YEAR)-y } } /** * 读取所有的dwd层的数据 * @param spark * @param tableName * @return */ def readHiveTable(spark:SparkSession,tableName:String) ={ spark.sql(s"select * from $tableName") } /** * 读取测试集文件 * @param spark * @param path * @return */ // 用来干嘛?变成testdf供使用 def readTestData(spark:SparkSession,path:String)={ spark.read.format("csv").option("header","true").load(path) .withColumnRenamed("user","userid") .withColumnRenamed("event","eventid") .withColumnRenamed("timestamp","ttime") } /** * 获取用户基本信息 * @param spark * @return */ // 对应dws_temp_users def getUserbaseinfo(spark:SparkSession)={ val sql = s"""select |userid,locale,gender, |location, |maxandmin(cage,max_age,min_age) age, |maxandmin(timezone,max_timezone,min_timezone) timezone, |maxandmin(members,max_members,min_members) members |from |(select userid, |case when l.locale is null then 0 else l.localeid end locale, |gender,location, |calcage(birthyear) cage,min_age,max_age, |timezone,min_timezone,max_timezone, |members,min_members,max_members |from dwd_events.dwd_users u |left join dwd_events.dwd_locale l |on lower(u.locale)=lower(l.locale) |cross join (select min(calcage(birthyear)) min_age |,max(calcage(birthyear)) max_age,min(timezone) min_timezone, |max(timezone) max_timezone, min(members) min_members,max(members) max_members |from dwd_events.dwd_users) b ) c""".stripMargin spark.sql(sql) } /** * 获取用户反馈信息及统计 * @param spark * @param test * @return */ // dws_temp_userEvent def getUserCall(spark:SparkSession,test:DataFrame)= { test.createOrReplaceTempView("view_testdata") val sql="""select |u.userid, |case when uf.friendnum is null then 0 else uf.friendnum end friendnum, |case when invited_event_count is null then 0 else invited_event_count end invited_event_count, |case when attended_count is null then 0 else attended_count end attended_count, |case when not_attended_count is null then 0 else not_attended_count end not_attended_count, |case when maybe_attended_count is null then 0 else maybe_attended_count end maybe_attended_count, |case when joinnum is null then 0 else joinnum end event_count |from |dwd_events.dwd_users u |left join |(select userid,count(friendid) friendnum from dwd_events.dwd_userFriends group by userid) uf |on u.userid=uf.userid |left join (select userid, |sum(case when statu='invited' then 1 else 0 end) as invited_event_count, |sum(case when statu='yes' then 1 else 0 end) as attended_count, |sum(case when statu='no' then 1 else 0 end) as not_attended_count, |sum(case when statu='maybe' then 1 else 0 end) as maybe_attended_count |from dwd_events.dwd_eventAttendees group by userid ) ea on u.userid=ea.userid |left join |(select userid, count(eventid) joinnum |from view_testdata group by userid) dt |on u.userid=dt.userid""".stripMargin spark.sql(sql) } /** * 获取完整的用户信息数据集 * @param spark * @param tdata */ // 合并dws_temp_userinfos def getUserinfos(spark: SparkSession,tdata:DataFrame)={ //获取用户的基本信息 val bdf = getUserbaseinfo(spark) //获取用户的反馈信息 val cdf = getUserCall(spark,tdata) //将用户基本信息和用户反馈信息合并 bdf.join(cdf,Seq("userid"),"inner") } /** * 获取城市等级 * @param spark * @return */ def getCityLevel(spark:SparkSession)={ val sql=""" |select |city,dense_rank() over(order by partynum desc) rank |from |(select city,count(eventid) partynum |from dwd_events.dwd_events |group by city ) a""".stripMargin spark.sql(sql) } /** * 获取国家等级 * @param spark * @return */ def getCountryLevel(spark:SparkSession)={ val sql="""select country,dense_rank() over(order by partynum desc) rank from |(select country,count(eventid) partynum from dwd_events.dwd_events group by country ) a""".stripMargin spark.sql(sql) } def getEventinfo(spark: SparkSession,cityLevel:DataFrame,countryLevel:DataFrame)={ cityLevel.createOrReplaceTempView("city_level") countryLevel.createOrReplaceTempView("country_level") val sql=s"""select |de.eventid, |de.userid as creator, |de.starttime,de.city,de.province,de.country, |month(from_unixtime(starttime,'yyyy-MM-dd')) event_month, |dayofweek(from_unixtime(starttime,'yyyy-MM-dd')) event_dayofweek, |hour(from_unixtime(starttime,'yyyy-MM-dd HH:mm:ss')) event_hour, |case when event_invited_count is null then 0 else event_invited_count end event_invited_count, |case when event_attended_count is null then 0 else event_attended_count end event_attended_count, |case when event_not_att_count is null then 0 else event_not_att_count end event_not_att_count, |case when event_maybe_count is null then 0 else event_maybe_count end event_maybe_count, |case when cl.rank is null then 0 else cl.rank end city_level, |case when tl.rank is null then 0 else tl.rank end country_level, |maxandmin(lat,mmtab.max_lat,mmtab.min_lat) lat_prec, |maxandmin(lng,mmtab.max_lng,mmtab.min_lng) lng_prec, |de.features |from |dwd_events.dwd_events de left join |(select eventid, |sum(case when statu='invited' then 1 else 0 end) as event_invited_count, |sum(case when statu='yes' then 1 else 0 end) as event_attended_count, |sum(case when statu='no' then 1 else 0 end) as event_not_att_count, |sum(case when statu='maybe' then 1 else 0 end) as event_maybe_count |from dwd_events.dwd_eventAttendees group by eventid) ea |on de.eventid=ea.eventid left join |city_level cl on de.city = cl.city left join |country_level tl on de.country = tl.country cross join |(select min(lat) min_lat,max(lat) max_lat,min(lng) min_lng,max(lng) max_lng from dwd_events.dwd_events) mmtab""".stripMargin spark.sql(sql) } /** * 某个用户对某次会议的是否受到邀请以及回复 * @param spark * @return */ // dws_temp_row_eventattendees def getUserEventAttendees(spark:SparkSession)={ val sql="""select |userid,eventid, |max(case when statu='invited' then 1 else 0 end) invited, |max(case when statu='yes' then 1 else 0 end) yes, |max(case when statu='maybe' then 1 else 0 end) maybe, |max(case when statu='no' then 1 else 0 end) no |from |dwd_events.dwd_eventAttendees group by userid,eventid""".stripMargin spark.sql(sql) } /** * 某人的某个朋友参加某次会议是否获得邀请以及这个朋友的回复 * @param spark */ def getUserFriendEventAttendees(spark:SparkSession,uea:DataFrame)={ uea.createOrReplaceTempView("view_row_ea") val sql="""select |uf.userid, |uf.friendid, |case when ea.eventid is null then 0 else ea.eventid end eventid, |case when ea.invited is null then 0 else ea.invited end invited, |case when ea.yes is null then 0 else ea.yes end yes, |case when ea.maybe is null then 0 else ea.maybe end maybe, |case when ea.no is null then 0 else ea.no end no |from |dwd_events.dwd_userFriends uf left join view_row_ea ea on |uf.friendid = ea.userid""".stripMargin spark.sql(sql) } /** * 统计某人在某次会议上的朋友各种情况 * @param spark * @return */ // 所有传进来的都是dws层的 def getUserFriendinfoSummary(spark:SparkSession,ufcall:DataFrame)={ ufcall.createOrReplaceTempView("view_ufcall") val sql="""select a.*, |uf_invited_count/b.friendnum as uf_invited_prec, |uf_attended_count/b.friendnum as uf_attended_prec, |uf_not_attended_count/b.friendnum as uf_not_attended_prec, |uf_maybe_count/b.friendnum as uf_maybe_prec |from ( |select |ufi.userid,ufi.eventid, |sum(ufi.invited) as uf_invited_count, |sum(ufi.yes) as uf_attended_count, |sum(ufi.no) as uf_not_attended_count, |sum(ufi.maybe) as uf_maybe_count |from |view_ufcall ufi group by ufi.userid,ufi.eventid) a |inner join ( |select userid,count(friendid) friendnum |from dwd_events.dwd_userFriends |group by userid |) b on a.userid=b.userid""".stripMargin spark.sql(sql) } def getCreatorIsFriend(spark:SparkSession,testdata:DataFrame) ={ testdata.createOrReplaceTempView("view_testdata") val sql="""with |t1 as (select tr.userid,es.eventid,es.userid as creator |from view_testdata tr |inner join dwd_events.dwd_events es on tr.eventid=es.eventid ), |t2 as (select t.userid,t.eventid, |case when uf.friendid is null then 0 else 1 end isfriend |from t1 t left join dwd_events.dwd_userFriends uf on t.userid=uf.userid and t.creator=uf.friendid ), |t3 as (select t.userid,t.eventid, |case when uf.friendid is null then 0 else 1 end isfriend |from t1 t left join dwd_events.dwd_userFriends uf on t.userid=uf.friendid and t.creator=uf.userid) |select t2.userid,t2.eventid, |case when t2.isfriend=1 or t3.isfriend=1 then 1 else 0 end isfriend |from t2 inner join t3 on t2.userid=t3.userid and t2.eventid=t3.eventid""".stripMargin spark.sql(sql) } def getDmUserEventData(spark: SparkSession,path:String)={ println("获取test数据集........") //获取test数据集 val testdf = readTestData(spark,path).cache() println("获取用户信息.........") //获取用户信息 val userdf = getUserinfos(spark,testdf) println("获得城市等级.........") //获得城市等级 val city_level = getCityLevel(spark).cache() println("获得国家等级.........") //获得国家等级 val country_level = getCountryLevel(spark).cache() println("获取会议信息.........") //获取会议信息 val eventdf= getEventinfo(spark,city_level,country_level) println("某个用户对某次会议的是否受到邀请以及回复..........") //某个用户对某次会议的是否受到邀请以及回复 val uea = getUserEventAttendees(spark).cache() println("获取某用户朋友的回复信息..........") //获取某用户朋友的回复信息 val ufea = getUserFriendEventAttendees(spark,uea).cache() println("获取用户朋友统计信息............") //获取用户朋友统计信息 val ufsummarydf = getUserFriendinfoSummary(spark,ufea) println("获取用户是否是主持人的朋友信息..........") //获取用户是否是主持人的朋友信息 val isFriend = getCreatorIsFriend(spark,testdf).cache() println("将数据构建成测试数据集合..............") //将数据构建成测试数据集合 import spark.implicits._ testdf .join(userdf,Seq("userid"),"inner") .join(eventdf,Seq("eventid"),"inner") .join(ufsummarydf,Seq("userid","eventid"),"inner") .join(isFriend,Seq("userid","eventid"),"inner") .withColumnRenamed("members","member_days") .withColumn("invite_days" ,floor(($"starttime"-unix_timestamp($"ttime"))/3600/24)) .withColumnRenamed("isfriend","creator_is_friend") .withColumn("location_similar" ,locationSimilar($"location",$"city",$"province",$"country")) .drop("ttime", "location","creator", "starttime","city", "province","country") }

       给test数据集的events分类(使用训练好的kmeans model)=> 获得dm_final

        val spark = SparkSession.builder().master("local[*]").appName("test")
          .config("hive.metastore.uris","thrift://192.168.16.150:9083")
          .enableHiveSupport().getOrCreate()
    
        //注册udf函数
        spark.udf.register("locationSimilar",locationSimilar)
        spark.udf.register("maxandmin",maxandmin)
        spark.udf.register("calcage",calcage)
    //    //执行获得test指标数据集合,test来源是每一段sparkstreaming的RDD
        val res = getDmUserEventData(spark,"e:/test.csv")
    //    //进行kmeans event分类
        val eventType = KMeansEventTypeHandler.calcEventType(spark, res)
          .withColumnRenamed("eventid","etid")
    //    //把event分类集合与test指标数据集合join
        val finalRes = res.join(eventType, res("eventid")===eventType("etid"))
          .drop("features","etid") // 如果不改名,就会把两个eventid都删掉
          .withColumnRenamed("prediction", "eventType")
            .distinct()

    KmeansEventTypeHandler:

    def calcEventType(spark:SparkSession,et:DataFrame)= {
        var tmp = et;
        if (et == null) {
          tmp = spark.sql("select * from dm_events.dm_usereventfinal")
            .select("eventid","features")
        }else{
          tmp =tmp.select("eventid","features")
        }
        //将features列拆成c_*
        val fts = split(tmp("features"), ",")
        //准备一个c_0~c_100的数组
        val cols = ArrayBuffer[String]()
        for(cl <- 0 to 100){
          tmp = tmp.withColumn("c_"+cl,fts.getItem(cl).cast(DoubleType))
          cols.append("c_"+cl)
        }
        tmp.drop("features")
    
        //将传入的数据集合进行整合为feature
        val assembler = new VectorAssembler().setInputCols(cols.toArray).setOutputCol("feature")
        val df = assembler.transform(tmp)
        //加载kmeans模型
        val model = KMeansModel.load("d:/kmmodel2")
        //数据进行分类
        model.transform(df).select("eventid","prediction")
      }

    ② 使用RF model => 预测label(if interested?)

     val prediction = RandomForestUserInterestTest.interestedTest(spark,null) // 真实情况下应该传入原始test数据集的df

    RandomForestUserInterestTest:

      def interestedTest(spark:SparkSession,finalRes:DataFrame)={
        var tmp = finalRes
        if(tmp == null){
          tmp = spark.sql("select * from dm_events.dm_testdata").cache()
        }
        val column: Array[String]  = tmp.columns
        val cls = column.map(f=>col(f).cast(DoubleType))
        val tab = tmp.select(cls:_*)
        //去除userid和eventid
        val cs = column.filter(str=>if(str=="userid"||str=="eventid"){false}else{true})
        val ass = new VectorAssembler().setInputCols(cs).setOutputCol("features")
        val cpres = ass.transform(tab)
    
        val model = RandomForestClassificationModel.load("d:/rfc")
        model.transform(cpres).select("userid","eventid","prediction")
      }

    ③  预测结果放入mysql,与预测相关的维度表也入mysql(为了下一步的数据可视化)

        //将数据预测存放到mysql数据库
        MySqlWriter.writeData(prediction,"userinterested")
        //将于预测数据相关的维度信息也存放到mysql数据库
        var users = spark.sql("select * from dwd_events.dwd_users")
        MySqlWriter.writeData(users,"users")
        var events = spark.sql("select * from dwd_events.dwd_events")
        MySqlWriter.writeData(events,"events")
        spark.stop()

    Mysqlwriter:DataFrame写入mysql

    object MySqlWriter {
      def writeData(df:DataFrame,tableName:String)={
        val prop = new Properties()
        prop.put("driver","com.mysql.jdbc.Driver")
        prop.put("user","root")
        prop.put("password","ok")
        df.write.mode("overwrite")
          .jdbc("jdbc:mysql://192.168.16.150:3306/prediction",tableName,prop)
      }
    }
  • 相关阅读:
    iOS
    iOS
    iOS
    OpenGLES入门笔记四
    OpenGLES入门笔记三
    AVPlayer无法播放
    阿里云TTS重播报pointer being freed was not allocated错误
    [AVAssetWriter startWriting] Cannot call method when status is 1
    HTTP load failed (error code: -1009) / NSURLConnection finished with error
    在iPhone5上起始页卡着不动
  • 原文地址:https://www.cnblogs.com/sabertobih/p/14170391.html
Copyright © 2011-2022 走看看