zoukankan      html  css  js  c++  java
  • 离线数据分析之 人物兴趣取向分析(2-2)离线/实时项目架构|项目流程|数仓构建(进阶篇)

    一、离线 vs 实时流框架

    用spark数据清洗的过程见:日志分析 https://www.cnblogs.com/sabertobih/p/14070357.html

    实时流和离线的区别在于数据处理之间的时间差,而不取决于工具。所以kafka,sparkstreaming亦可用于离线批处理。

    离线训练模型:多久根据需求决定,每一次模型都从头建立

    离线预测模型:spark.sql用hive建dm_final表 -> spark ml

    实时预测模型:kafka中建立dm_final表 -> spark ml 

    二、离线(本项目)处理思路

    (一)构建ML模型

    ① flume

    数据采集,实时监控新增数据,传输到kfk (见:https://www.cnblogs.com/sabertobih/p/14115501.html

    ② kafka

    削峰,实时数据监控命令:https://www.cnblogs.com/sabertobih/p/14024011.html

    ③ kafka-> sparkstreaming -> kafka

    格式转换,见第五条:https://www.cnblogs.com/sabertobih/p/14136154.html

    ④ kafka-> HBase

    由于Rowkey唯一,重复的Rowkey自动覆盖,可以完成去重

    见第六条:https://www.cnblogs.com/sabertobih/p/14136154.html

    ⑤ HBase-> hive

    建立外部映射表(数据存放在hdfs上,hive用于大批数据的复杂查询,hbase用于数据的有序映射)

    create database if not exists events_db
    SET hivevar:db=events_db
    use ${db};
    create external table ${db}.users(
    userid string,
    birthday string,
    gender string,
    locale string,
    location string,
    timezone string,
    joinedAt string
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties('hbase.columns.mapping'=':key,profile:birthday,profile:gender,region:locale,region:location,region:timezone,registration:joinedAt')
    tblproperties('hbase.table.name'='event_db:users')
    
    ---userfriends
    create external table event_db.userfriends(
    ukey string,
    userid string,
    friendid string
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties('hbase.columns.mapping'=':key,uf:userid,uf:friendid')
    tblproperties('hbase.table.name'='event_db:user_friends')
    
    
    ---events_db
    create external table event_db.events(
    eventid string,
    startTime string,
    city string,
    dstate string,
    zip string,
    country string,
    lat string,
    lng string,
    userId string,
    features string
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties('hbase.columns.mapping'=':key,schedule:startTime,location:city,location:state,location:zip,location:country,location:lat,location:lng,creator:userId,remark:features')
    tblproperties('hbase.table.name'='event_db:events')
    ---eventAttendees
    create external table event_db.eventAttendees(
    ekey string,
    eventId string,
    userId string,
    status string
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties('hbase.columns.mapping'=':key,euat:eventId,euat:userId,euat:status')
    tblproperties('hbase.table.name'='event_db:event_attendees')
    ---train
    create external table event_db.train(
    tkey string,
    userId string,
    eventId string,
    invited string,
    etimestamp string,
    interested string,
    notinterested string
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties('hbase.columns.mapping'=':key,eu:userId,eu:eventId,eu:invitedId,eu:timestamp,eu:interested,eu:notinterested')
    tblproperties('hbase.table.name'='event_db:train')
    ---test
    create external table event_db.test(
    tkey string,
    tuser string,
    event string,
    invited string,
    ttimestamp string
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties('hbase.columns.mapping'=':key,eu:user,eu:event,eu:invited,eu:timestamp')
    tblproperties('hbase.table.name'='event_db:test')

    ⑥ hive中构建数仓

    (6-1) ods层:数据存在hdfs中的内部表

    1. 以ORC格式,查询速度快(详见:https://www.jianshu.com/p/91f793a1bdb3

    2. 顺带清洗工作

    create table ${db}.users
    stored as ORC AS 
        SELECT * from ${db}.hb_users
    
    drop database if exists ods_events cascade;
    create database ods_events;
    use ods_events;
    
    create table ods_events.ods_users
    stored as ORC as select * from event_db.users;
    create table ods_events.ods_eventsTable
    stored as ORC as select * from event_db.events;
    create table ods_events.ods_eventAttendees
    stored as ORC as select * from event_db.eventAttendees;
    create table ods_events.ods_userfriends
    stored as ORC as select * from event_db.userfriends;
    create table ods_events.ods_train
    stored as ORC as select * from event_db.train;
    create table ods_events.ods_test
    stored as ORC as select * from event_db.test;
    create table ods_events.ods_locale(
    localeid string,
    locale string
    )
    row format delimited fields terminated by '	'
    location 'hdfs://192.168.56.111:9000/party/data1/locale';

    (6-2) dwd层:数据归一化

    比如统一 “男” 和 “女” 为数字,把null值替换成平均值等数据预处理工作

    //数据归一化
    drop database if exists dwd_events;
    create database dwd_events;
    use dwd_events;
    
    //birthyear空赋值
    create table dwd_users as 
    select userid,locale,location,
    case when cast(birthday as int) is null then avg_year else birthday end as birthyear,
    case gender when 'male' then -1 when 'female' then 1 else 0 end as gender,
    case when cast(timezone as int) is null then avg_timezone else timezone end as timezone,
    case when trim(joinedat)='' or lower(joinedat)='none' then avg_member else unix_timestamp(translate(joinedat,'TZ',' ')) end as members
    from
    (select * from ods_events.ods_users 
    cross join
    (
    select floor(avg(cast(birthday as int))) avg_year,
    floor(avg(cast(timezone as int))) avg_timezone,
    floor(avg(unix_timestamp(translate(joinedat,'TZ',' ')))) avg_member
    from ods_events.ods_users
    )tmp
    )a;
    
    create table dwd_events.dwd_events as 
    select eventid,unix_timestamp(translate(starttime,'TZ',' '))starttime,
    city,country,dstate as province,
    case when cast(lat as float) is null then avg_lat else lat end lat,
    case when cast(lng as float) is null then avg_lng else lng end lng,
    userid,features
    from 
    ods_events.ods_eventsTable 
    cross join
    (select round(avg(cast(lat as float)),3)avg_lat,
    round(avg(cast(lng as float)),3)avg_lng 
    from ods_events.ods_eventsTable
    )tmp;
    
    create table dwd_events.dwd_eventAttendees as select * from ods_events.ods_eventattendees;
    create table dwd_events.dwd_usersFriends as select * from ods_events.ods_userfriends;
    create table dwd_events.dwd_train as select tkey trid,userid,eventid,invited,etimestamp ttime,interested label from ods_events.ods_train;
    create table dwd_events.dwd_locale as select * from ods_events.ods_locale;

    (6-3)dws层:轻聚合维度表 

    聚合思路:

    drop database if exists dws_events;
    create database dws_events;
    use dws_events;
    
    create temporary macro maxandmin(cdata int ,maxdata int,mindata int) (cdata-mindata)/(maxdata-mindata);
    create temporary macro calcage(y int) year(current_date())-y;
    
    -- ===================dws_temp_users================
    create table dws_events.dws_temp_users as
    select
    userid,
    locale,
    gender,
    location, maxandmin(cage,max_age,min_age) age, maxandmin(timezone,max_timezone,min_timezone) timezone, maxandmin(members,max_members,min_members) members
    from ( select userid, case when l.locale is null then 0 else l.localeid end locale, gender,location calcage(birthyear) cage, min_age, max_age, timezone, min_timezone, max_timezone, members, min_members, max_members from dwd_events.dwd_users u left join dwd_events.dwd_locale l on lower(u.locale)=lower(l.locale) cross join ( select min(calcage(birthyear)) min_age, max(calcage(birthyear)) max_age, min(timezone) min_timezone, max(timezone) max_timezone, min(members) min_members, max(members) max_members from dwd_events.dwd_users ) b ) c -- ===================dws_temp_userEvent================ create table dws_events.dws_temp_userEvent as select u.userid, case when uf.friendnum is null then 0 else uf.friendnum end friendnum, case when invite_event_count is null then 0 else invite_event_count end invite_event_count, case when attended_count is null then 0 else attended_count end attended_count, case when not_attended_count is null then 0 else not_attended_count end not_attended_count, case when maybe_attended_count is null then 0 else maybe_attended_count end maybe_attended_count, case when joinnum is null then 0 else joinnum end event_count from dwd_events.dwd_users u left join (select userid,count(friendid) friendnum from dwd_events.dwd_usersFriends group by userid) uf on u.userid=uf.userid left join (select userid, sum(case when status='invited' then 1 else 0 end) as invite_event_count, sum(case when status='yes' then 1 else 0 end) as attended_count, sum(case when status='no' then 1 else 0 end) as not_attended_count, sum(case when status='maybe' then 1 else 0 end) as maybe_attended_count from dwd_events.dwd_eventAttendees group by userid ) ea on u.userid=ea.userid left join (select userid, count(eventid) joinnum from dwd_events.dwd_train group by userid) dt on u.userid=dt.userid;

    为什么使用left join?

    —— 因为dws层需要以user总表中的userid为主表,其他表为辅表 => 聚合维度表

    最后dm层还是以事实表中的id为准,使用inner join

    -- ===================合并dws_temp_userinfos================
    create table dws_events.dws_temp_userinfos as 
    select 
    du.userid,du.locale,du.gender,du.age,du.timezone,du.members,ue.friendnum,
    ue.invite_event_count,ue.attended_count,ue.not_attended_count,ue.maybe_attended_count,ue.event_count
    from dws_events.dws_temp_users du 
    inner join dws_events.dws_temp_userEvent ue 
    on du.userid=ue.userid;
    
    -- ===================视图dws_view_city_level================
    create view dws_events.dws_view_city_level as
    select city,dense_rank()over(order by partynum desc) rank
    from
    (
    select city,count(eventid) partynum 
    from dwd_events.dwd_events
    group by city
    )a
    
    -- ===================视图dws_view_country_level================
    create view dws_events.dws_view_country_level as
    select country,dense_rank()over(order by partynum desc) rank
    from
    (select country,count(eventid) partynum from dwd_events.dwd_events
    group by country)a
    
    -- ===================dws_temp_eventinfos================
    create table dws_events.dws_temp_eventinfos as
    select
    de.eventid,
    de.userid as creator,
    de.starttime,
    de.city,
    de.province,
    de.country,
    month(from_unixtime(starttime,'yyyy-MM-dd')) event_month,
    dayofweek(from_unixtime(starttime,'yyyy-MM-dd')) event_dayofweek,
    hour(from_unixtime(starttime,'yyyy-MM-dd HH:mm:ss')) event_hour,
    case when event_invited_count is null then 0 else event_invited_count end event_invited_count,
    case when event_attended_count is null then 0 else event_attended_count end event_attended_count,
    case when event_not_att_count is null then 0 else event_not_att_count end event_not_att_count,
    case when event_maybe_count is null then 0 else event_maybe_count end event_maybe_count,
    case when c1.rank is null then 0 else c1.rank end city_level,
    case when t1.rank is null then 0 else t1.rank end country_level,
    maxandmin(lat,mmtab.min_lat,mmtab.max_lat) lat_prec,
    maxandmin(lng,mmtab.min_lng,mmtab.max_lng) lng_prec,
    features
    from
    dwd_events.dwd_events de 
    left join 
    (select eventid,
    sum(case when status='invited' then 1 else 0 end) as event_invited_count,
    sum(case when status='yes' then 1 else 0 end) as event_attended_count,
    sum(case when status='no' then 1 else 0 end) as event_not_att_count,
    sum(case when status='maybe' then 1 else 0 end) as event_maybe_count
    from dwd_events.dwd_eventAttendees 
    group by eventid) ea
    on de.eventid=ea.eventid 
    left join
    dws_events.dws_view_city_level c1
    on de.city=c1.city 
    left join
    dws_events.dws_view_country_level t1
    on de.country=t1.country 
    cross join
    (select min(lat) min_lat,max(lat) max_lat,min(lng) min_lng,max(lng) max_lng from dwd_events.dwd_events)mmtab;

    -- ===================dws_temp_train================
    create table dws_events.dws_temp_train as
    select * from dwd_events.dwd_train;
    
    -- ===================dws_temp_userfriends================
    create table dws_events.dws_temp_userfriends as
    select * from dwd_events.dwd_usersFriends;
    
    -- ===================dws_temp_eventattendees================
    create table dws_events.dws_temp_eventattendees as
    select * from dwd_events.dwd_eventattendees;
    
    -- 某个用户对某次会议是否收到邀请以及回复
    -- ===================dws_temp_row_eventattendees================
    create table dws_events.dws_temp_row_eventattendees as
    select
    userid,
    eventid,
    max(case when status='invited' then 1 else 0 end) invited,
    max(case when status='yes' then 1 else 0 end) yes,
    max(case when status='maybe' then 1 else 0 end) maybe,
    max(case when status='no' then 1 else 0 end) no
    from
    dws_events.dws_temp_eventattendees
    group by userid,eventid
    
    -- 某人的某个朋友参加某次会议是否获得邀请以及这个朋友的回复
    -- ===================dws_temp_uf_infos================
    create table dws_events.dws_temp_uf_infos as
    select
    uf.userid,
    uf.friendid,
    case when ea.eventid is null then 0 else ea.eventid end eventid,
    case when ea.invited is null then 0 else ea.invited end invited,
    case when ea.yes is null then 0 else ea.yes end yes,
    case when ea.maybe is null then 0 else ea.maybe end maybe,
    case when ea.no is null then 0 else ea.no end no
    from
    dws_events.dws_temp_userfriends uf
    left join
    dws_events.dws_temp_row_eventattendees ea
    on uf.friendid=ea.userid

     

    -- ===================dws_temp_uf_summary================
    create table dws_events.dws_temp_uf_summary as
    select
    a.*,
    uf_invited_count/b.friendnum as uf_invited_prec,
    uf_attended_count/b.friendnum as uf_attended_prec,
    uf_not_attended_count/b.friendnum as uf_not_attended_prec,
    uf_maybe_count/b.friendnum as uf_maybe_prec
    from
    (select
    ufi.userid,
    ufi.eventid,
    sum(ufi.invited) as uf_invited_count,
    sum(ufi.yes) as uf_attended_count,
    sum(ufi.no) as uf_not_attended_count,
    sum(ufi.maybe) as uf_maybe_count
    from
    dws_events.dws_temp_uf_infos ufi
    group by ufi.userid,ufi.eventid
    ) a
    inner join
    (select
    userid,
    count(friendid) friendnum
    from
    dws_events.dws_temp_userfriends uf
    group by userid
    ) b
    on a.userid=b.userid

     

    -- 训练集中的某人参加的某个会议是否是这个会议主持人的朋友
    -- ==============dws_temp_isfriends==========

    create
    table dws_events.dws_temp_isfriends stored as orc as with t1 as(select tr.userid,es.eventid,es.userid as creator from dwd_events.dwd_train tr inner join dwd_events.dwd_events es on tr.eventid=es.eventid), t2 as(select t.userid,t.eventid, case when uf.friendid is null then 0 else 1 end isfriend from t1 t left join dwd_events.dwd_usersFriends uf on t.userid=uf.userid and t.creator=uf.friendid), t3 as (select t.userid,t.eventid, case when uf.friendid is null then 0 else 1 end isfriend from t1 t left join dwd_events.dwd_usersFriends uf on t.userid=uf.friendid and t.creator=uf.userid) select t2.userid,t2.eventid, case when t2.isfriend=1 or t3.isfriend=1 then 1 else 0 end isfriend from t2 inner join t3 on t2.userid=t3.userid and t2.eventid=t3.eventid

     

    (6-4)dm层:宽表

    • dm指标:
    user_interested     ----label
    user_id                ----no
    event_id            ----no
    locale                    -----1~x
    gender                    -----1,0,1
    age                        -----0~1
    timezone                -----0~x
    member_days                -----0~1
    friend_count                -----0~1
    invite_days                -----0~1
    event_count                -----0~1
    invited_event_count(被邀请)         -----0~1    
    attended_count(答应参加)            -----0~1
    not_attended_count(拒绝参加)        -----0~1
    maybe_attended_count(可能参加)        -----0~1
    user_invited(本次会议是否被邀请)    -----0,1
    uf_invited_count(多少朋友被邀请)    -----0~1
    uf_attended_count(答应去的朋友数)    -----0~1
    uf_notattended_count(不去的)        -----0~1
    uf_maybe_count(可能)                -----0~1
    uf_invited_prec(朋友被邀请的%)        -----0~1
    uf_attended_prec(朋友答应%)            -----0~1
    uf_not_attended_prec(不答应%)        -----0~1
    uf_maybe_prec(可能%)                -----0~1
    ahead_days(活动和邀请时间差)        -----0~1
    event_month(活动月份)                -----0~12
    event_dayofweek(活动星期)            -----0~52
    event_hour(活动时间)                -----0~24
    event_invited_count(活动邀请人数)    -----0~1
    event_attended_count(答应人数)        -----0~1
    event_not_att_count(不答应人数)        -----0~1
    event_maybe_count(活动答应可能)        -----0~1
    city_level(城市等级)                -----0~x
    country(国家等级)                    -----0~x
    lat_prec(经度%)                        -----0~1
    lng_prec(纬度%)                        -----0~1
    creator_is_friend                    -----0,1
    location——similar(人和会议是否同城) -----0,1
    event_type(活动分类)                -----0~x
    drop database if exists dm_events;
    create database dm_events;
    use dm_events;
    -- ==================locationSimilar是否同城================= create temporary macro locationSimilar(location String,city String,province String,country String) case when instr(lower(location),lower(city))>0 or instr(lower(location),lower(province))>0 or instr(lower(location),lower(country))>0 then 1 else 0 end -- ===================dm_usereventfinal================ drop table if exists dm_events.dm_usereventfinal; create table dm_events.dm_usereventfinal as select tr.label, tr.userid, tr.eventid, us.locale, us.gender, us.age, us.timezone, us.members as member_days, floor((ei.starttime-unix_timestamp(ttime))/3600/24) invite_days, us.friendnum as friend_count, us.invite_event_count, us.attended_count, us.not_attended_count, us.maybe_attended_count, us.event_count, tr.invited, ufe.uf_invited_count, ufe.uf_attended_count, ufe.uf_not_attended_count, ufe.uf_maybe_count, ufe.uf_invited_prec, ufe.uf_attended_prec, ufe.uf_not_attended_prec, ufe.uf_maybe_prec, ei.event_month, ei.event_dayofweek, ei.event_hour, ei.event_invited_count, ei.event_attended_count, ei.event_not_att_count, ei.event_maybe_count, ei.city_level, ei.country_level, ei.lat_prec, ei.lng_prec, ifr.isfriend as creator_is_friend, locationSimilar(us.locale,ei.city,ei.province,ei.country) as location_similar, ei.features from dws_events.dws_temp_train tr inner join dws_events.dws_temp_userinfos us on tr.userid=us.userid inner join dws_events.dws_temp_eventinfos ei on tr.eventid=ei.eventid inner join dws_events.dws_temp_uf_summary ufe on tr.userid=ufe.userid and tr.eventid=ufe.eventid inner join dws_events.dws_temp_isfriends ifr on tr.userid=ifr.userid and tr.eventid=ifr.eventid;

    ⑦ 训练kmeans模型

    使用dm_usereventfinal中的features(来源于event数据)训练kmeans模型(给events分类),输出新表dm_eventtype

    链接:https://www.cnblogs.com/sabertobih/p/14183984.html

        连接dm_usereventfinal和dm_eventtype生成新表dm_final

    create table dm_final as 
    select 
    a.label,a.userid,a.eventid,a.locale,a.gender,a.age,a.timezone,a.member_days,a.invite_days,a.friend_count,a.invite_event_count,a.attended_count,a.not_attended_count,
    a.maybe_attended_count,a.event_count,a.invited,a.uf_invited_count,a.uf_attended_count,a.uf_not_attended_count, a.uf_maybe_count,a.uf_invited_prec,a.uf_attended_prec,
    a.uf_not_attended_prec,a.uf_maybe_prec,a.event_month,a.event_dayofweek, a.event_hour,a.event_invited_count,a.event_attended_count, a.event_not_att_count,a.event_maybe_count,
    a.city_level,a.country_level,a.lat_prec,a.lng_prec,a.creator_is_friend ,a.location_similar,
    b.prediction
    as eventtype from dm_events.dm_usereventfinal a inner join dm_events.dm_eventtype b on a.eventid = b.eventid;

    ⑧ 使用 dm_final训练构建RF模型,预测分类

    链接:https://www.cnblogs.com/sabertobih/p/14183984.html

     (二)使用模型预测test数据集

     

    ① [scala] 使用spark.sql操作hive,指标保持和训练用的一致

    首先保证连的上去,见:https://www.cnblogs.com/sabertobih/p/13772985.html

    相同的分层数据处理,最终都要获得相同的计算指标表

    /**
       * 判断用户和会议是否距离比较近
       */
      val locationSimilar=udf(
        (location:String,city:String,province:String,country:String)=> {
          if (city.toLowerCase().indexOf(location.toLowerCase()) > -1 ||
            province.toLowerCase().indexOf(location.toLowerCase()) > -1 ||
            country.toLowerCase().indexOf(location.toLowerCase()) > -1) {
            1
          } else {
            0
          }
        }
      )
    
      /**
       * 最大最小值 归一化方法
      * // 为什么是Double? 因为ml之前需要数据预处理,早晚都要变成DoubleType
    */ val maxandmin = udf{ (cdata:Double,maxdata:Double,mindata:Double)=>{ (cdata-mindata)/(maxdata-mindata) } } /** * 计算用户年龄 */ val calcage = udf{ (y:Double) =>{ Calendar.getInstance().get(Calendar.YEAR)-y } } /** * 读取所有的dwd层的数据 * @param spark * @param tableName * @return */ def readHiveTable(spark:SparkSession,tableName:String) ={ spark.sql(s"select * from $tableName") } /** * 读取测试集文件 * @param spark * @param path * @return */ // 用来干嘛?变成testdf供使用 def readTestData(spark:SparkSession,path:String)={ spark.read.format("csv").option("header","true").load(path) .withColumnRenamed("user","userid") .withColumnRenamed("event","eventid") .withColumnRenamed("timestamp","ttime") } /** * 获取用户基本信息 * @param spark * @return */ // 对应dws_temp_users def getUserbaseinfo(spark:SparkSession)={ val sql = s"""select |userid,locale,gender, |location, |maxandmin(cage,max_age,min_age) age, |maxandmin(timezone,max_timezone,min_timezone) timezone, |maxandmin(members,max_members,min_members) members |from |(select userid, |case when l.locale is null then 0 else l.localeid end locale, |gender,location, |calcage(birthyear) cage,min_age,max_age, |timezone,min_timezone,max_timezone, |members,min_members,max_members |from dwd_events.dwd_users u |left join dwd_events.dwd_locale l |on lower(u.locale)=lower(l.locale) |cross join (select min(calcage(birthyear)) min_age |,max(calcage(birthyear)) max_age,min(timezone) min_timezone, |max(timezone) max_timezone, min(members) min_members,max(members) max_members |from dwd_events.dwd_users) b ) c""".stripMargin spark.sql(sql) } /** * 获取用户反馈信息及统计 * @param spark * @param test * @return */ // dws_temp_userEvent def getUserCall(spark:SparkSession,test:DataFrame)= { test.createOrReplaceTempView("view_testdata") val sql="""select |u.userid, |case when uf.friendnum is null then 0 else uf.friendnum end friendnum, |case when invited_event_count is null then 0 else invited_event_count end invited_event_count, |case when attended_count is null then 0 else attended_count end attended_count, |case when not_attended_count is null then 0 else not_attended_count end not_attended_count, |case when maybe_attended_count is null then 0 else maybe_attended_count end maybe_attended_count, |case when joinnum is null then 0 else joinnum end event_count |from |dwd_events.dwd_users u |left join |(select userid,count(friendid) friendnum from dwd_events.dwd_userFriends group by userid) uf |on u.userid=uf.userid |left join (select userid, |sum(case when statu='invited' then 1 else 0 end) as invited_event_count, |sum(case when statu='yes' then 1 else 0 end) as attended_count, |sum(case when statu='no' then 1 else 0 end) as not_attended_count, |sum(case when statu='maybe' then 1 else 0 end) as maybe_attended_count |from dwd_events.dwd_eventAttendees group by userid ) ea on u.userid=ea.userid |left join |(select userid, count(eventid) joinnum |from view_testdata group by userid) dt |on u.userid=dt.userid""".stripMargin spark.sql(sql) } /** * 获取完整的用户信息数据集 * @param spark * @param tdata */ // 合并dws_temp_userinfos def getUserinfos(spark: SparkSession,tdata:DataFrame)={ //获取用户的基本信息 val bdf = getUserbaseinfo(spark) //获取用户的反馈信息 val cdf = getUserCall(spark,tdata) //将用户基本信息和用户反馈信息合并 bdf.join(cdf,Seq("userid"),"inner") } /** * 获取城市等级 * @param spark * @return */ def getCityLevel(spark:SparkSession)={ val sql=""" |select |city,dense_rank() over(order by partynum desc) rank |from |(select city,count(eventid) partynum |from dwd_events.dwd_events |group by city ) a""".stripMargin spark.sql(sql) } /** * 获取国家等级 * @param spark * @return */ def getCountryLevel(spark:SparkSession)={ val sql="""select country,dense_rank() over(order by partynum desc) rank from |(select country,count(eventid) partynum from dwd_events.dwd_events group by country ) a""".stripMargin spark.sql(sql) } def getEventinfo(spark: SparkSession,cityLevel:DataFrame,countryLevel:DataFrame)={ cityLevel.createOrReplaceTempView("city_level") countryLevel.createOrReplaceTempView("country_level") val sql=s"""select |de.eventid, |de.userid as creator, |de.starttime,de.city,de.province,de.country, |month(from_unixtime(starttime,'yyyy-MM-dd')) event_month, |dayofweek(from_unixtime(starttime,'yyyy-MM-dd')) event_dayofweek, |hour(from_unixtime(starttime,'yyyy-MM-dd HH:mm:ss')) event_hour, |case when event_invited_count is null then 0 else event_invited_count end event_invited_count, |case when event_attended_count is null then 0 else event_attended_count end event_attended_count, |case when event_not_att_count is null then 0 else event_not_att_count end event_not_att_count, |case when event_maybe_count is null then 0 else event_maybe_count end event_maybe_count, |case when cl.rank is null then 0 else cl.rank end city_level, |case when tl.rank is null then 0 else tl.rank end country_level, |maxandmin(lat,mmtab.max_lat,mmtab.min_lat) lat_prec, |maxandmin(lng,mmtab.max_lng,mmtab.min_lng) lng_prec, |de.features |from |dwd_events.dwd_events de left join |(select eventid, |sum(case when statu='invited' then 1 else 0 end) as event_invited_count, |sum(case when statu='yes' then 1 else 0 end) as event_attended_count, |sum(case when statu='no' then 1 else 0 end) as event_not_att_count, |sum(case when statu='maybe' then 1 else 0 end) as event_maybe_count |from dwd_events.dwd_eventAttendees group by eventid) ea |on de.eventid=ea.eventid left join |city_level cl on de.city = cl.city left join |country_level tl on de.country = tl.country cross join |(select min(lat) min_lat,max(lat) max_lat,min(lng) min_lng,max(lng) max_lng from dwd_events.dwd_events) mmtab""".stripMargin spark.sql(sql) } /** * 某个用户对某次会议的是否受到邀请以及回复 * @param spark * @return */ // dws_temp_row_eventattendees def getUserEventAttendees(spark:SparkSession)={ val sql="""select |userid,eventid, |max(case when statu='invited' then 1 else 0 end) invited, |max(case when statu='yes' then 1 else 0 end) yes, |max(case when statu='maybe' then 1 else 0 end) maybe, |max(case when statu='no' then 1 else 0 end) no |from |dwd_events.dwd_eventAttendees group by userid,eventid""".stripMargin spark.sql(sql) } /** * 某人的某个朋友参加某次会议是否获得邀请以及这个朋友的回复 * @param spark */ def getUserFriendEventAttendees(spark:SparkSession,uea:DataFrame)={ uea.createOrReplaceTempView("view_row_ea") val sql="""select |uf.userid, |uf.friendid, |case when ea.eventid is null then 0 else ea.eventid end eventid, |case when ea.invited is null then 0 else ea.invited end invited, |case when ea.yes is null then 0 else ea.yes end yes, |case when ea.maybe is null then 0 else ea.maybe end maybe, |case when ea.no is null then 0 else ea.no end no |from |dwd_events.dwd_userFriends uf left join view_row_ea ea on |uf.friendid = ea.userid""".stripMargin spark.sql(sql) } /** * 统计某人在某次会议上的朋友各种情况 * @param spark * @return */ // 所有传进来的都是dws层的 def getUserFriendinfoSummary(spark:SparkSession,ufcall:DataFrame)={ ufcall.createOrReplaceTempView("view_ufcall") val sql="""select a.*, |uf_invited_count/b.friendnum as uf_invited_prec, |uf_attended_count/b.friendnum as uf_attended_prec, |uf_not_attended_count/b.friendnum as uf_not_attended_prec, |uf_maybe_count/b.friendnum as uf_maybe_prec |from ( |select |ufi.userid,ufi.eventid, |sum(ufi.invited) as uf_invited_count, |sum(ufi.yes) as uf_attended_count, |sum(ufi.no) as uf_not_attended_count, |sum(ufi.maybe) as uf_maybe_count |from |view_ufcall ufi group by ufi.userid,ufi.eventid) a |inner join ( |select userid,count(friendid) friendnum |from dwd_events.dwd_userFriends |group by userid |) b on a.userid=b.userid""".stripMargin spark.sql(sql) } def getCreatorIsFriend(spark:SparkSession,testdata:DataFrame) ={ testdata.createOrReplaceTempView("view_testdata") val sql="""with |t1 as (select tr.userid,es.eventid,es.userid as creator |from view_testdata tr |inner join dwd_events.dwd_events es on tr.eventid=es.eventid ), |t2 as (select t.userid,t.eventid, |case when uf.friendid is null then 0 else 1 end isfriend |from t1 t left join dwd_events.dwd_userFriends uf on t.userid=uf.userid and t.creator=uf.friendid ), |t3 as (select t.userid,t.eventid, |case when uf.friendid is null then 0 else 1 end isfriend |from t1 t left join dwd_events.dwd_userFriends uf on t.userid=uf.friendid and t.creator=uf.userid) |select t2.userid,t2.eventid, |case when t2.isfriend=1 or t3.isfriend=1 then 1 else 0 end isfriend |from t2 inner join t3 on t2.userid=t3.userid and t2.eventid=t3.eventid""".stripMargin spark.sql(sql) } def getDmUserEventData(spark: SparkSession,path:String)={ println("获取test数据集........") //获取test数据集 val testdf = readTestData(spark,path).cache() println("获取用户信息.........") //获取用户信息 val userdf = getUserinfos(spark,testdf) println("获得城市等级.........") //获得城市等级 val city_level = getCityLevel(spark).cache() println("获得国家等级.........") //获得国家等级 val country_level = getCountryLevel(spark).cache() println("获取会议信息.........") //获取会议信息 val eventdf= getEventinfo(spark,city_level,country_level) println("某个用户对某次会议的是否受到邀请以及回复..........") //某个用户对某次会议的是否受到邀请以及回复 val uea = getUserEventAttendees(spark).cache() println("获取某用户朋友的回复信息..........") //获取某用户朋友的回复信息 val ufea = getUserFriendEventAttendees(spark,uea).cache() println("获取用户朋友统计信息............") //获取用户朋友统计信息 val ufsummarydf = getUserFriendinfoSummary(spark,ufea) println("获取用户是否是主持人的朋友信息..........") //获取用户是否是主持人的朋友信息 val isFriend = getCreatorIsFriend(spark,testdf).cache() println("将数据构建成测试数据集合..............") //将数据构建成测试数据集合 import spark.implicits._ testdf .join(userdf,Seq("userid"),"inner") .join(eventdf,Seq("eventid"),"inner") .join(ufsummarydf,Seq("userid","eventid"),"inner") .join(isFriend,Seq("userid","eventid"),"inner") .withColumnRenamed("members","member_days") .withColumn("invite_days" ,floor(($"starttime"-unix_timestamp($"ttime"))/3600/24)) .withColumnRenamed("isfriend","creator_is_friend") .withColumn("location_similar" ,locationSimilar($"location",$"city",$"province",$"country")) .drop("ttime", "location","creator", "starttime","city", "province","country") }

       给test数据集的events分类(使用训练好的kmeans model)=> 获得dm_final

        val spark = SparkSession.builder().master("local[*]").appName("test")
          .config("hive.metastore.uris","thrift://192.168.16.150:9083")
          .enableHiveSupport().getOrCreate()
    
        //注册udf函数
        spark.udf.register("locationSimilar",locationSimilar)
        spark.udf.register("maxandmin",maxandmin)
        spark.udf.register("calcage",calcage)
    //    //执行获得test指标数据集合,test来源是每一段sparkstreaming的RDD
        val res = getDmUserEventData(spark,"e:/test.csv")
    //    //进行kmeans event分类
        val eventType = KMeansEventTypeHandler.calcEventType(spark, res)
          .withColumnRenamed("eventid","etid")
    //    //把event分类集合与test指标数据集合join
        val finalRes = res.join(eventType, res("eventid")===eventType("etid"))
          .drop("features","etid") // 如果不改名,就会把两个eventid都删掉
          .withColumnRenamed("prediction", "eventType")
            .distinct()

    KmeansEventTypeHandler:

    def calcEventType(spark:SparkSession,et:DataFrame)= {
        var tmp = et;
        if (et == null) {
          tmp = spark.sql("select * from dm_events.dm_usereventfinal")
            .select("eventid","features")
        }else{
          tmp =tmp.select("eventid","features")
        }
        //将features列拆成c_*
        val fts = split(tmp("features"), ",")
        //准备一个c_0~c_100的数组
        val cols = ArrayBuffer[String]()
        for(cl <- 0 to 100){
          tmp = tmp.withColumn("c_"+cl,fts.getItem(cl).cast(DoubleType))
          cols.append("c_"+cl)
        }
        tmp.drop("features")
    
        //将传入的数据集合进行整合为feature
        val assembler = new VectorAssembler().setInputCols(cols.toArray).setOutputCol("feature")
        val df = assembler.transform(tmp)
        //加载kmeans模型
        val model = KMeansModel.load("d:/kmmodel2")
        //数据进行分类
        model.transform(df).select("eventid","prediction")
      }

    ② 使用RF model => 预测label(if interested?)

     val prediction = RandomForestUserInterestTest.interestedTest(spark,null) // 真实情况下应该传入原始test数据集的df

    RandomForestUserInterestTest:

      def interestedTest(spark:SparkSession,finalRes:DataFrame)={
        var tmp = finalRes
        if(tmp == null){
          tmp = spark.sql("select * from dm_events.dm_testdata").cache()
        }
        val column: Array[String]  = tmp.columns
        val cls = column.map(f=>col(f).cast(DoubleType))
        val tab = tmp.select(cls:_*)
        //去除userid和eventid
        val cs = column.filter(str=>if(str=="userid"||str=="eventid"){false}else{true})
        val ass = new VectorAssembler().setInputCols(cs).setOutputCol("features")
        val cpres = ass.transform(tab)
    
        val model = RandomForestClassificationModel.load("d:/rfc")
        model.transform(cpres).select("userid","eventid","prediction")
      }

    ③  预测结果放入mysql,与预测相关的维度表也入mysql(为了下一步的数据可视化)

        //将数据预测存放到mysql数据库
        MySqlWriter.writeData(prediction,"userinterested")
        //将于预测数据相关的维度信息也存放到mysql数据库
        var users = spark.sql("select * from dwd_events.dwd_users")
        MySqlWriter.writeData(users,"users")
        var events = spark.sql("select * from dwd_events.dwd_events")
        MySqlWriter.writeData(events,"events")
        spark.stop()

    Mysqlwriter:DataFrame写入mysql

    object MySqlWriter {
      def writeData(df:DataFrame,tableName:String)={
        val prop = new Properties()
        prop.put("driver","com.mysql.jdbc.Driver")
        prop.put("user","root")
        prop.put("password","ok")
        df.write.mode("overwrite")
          .jdbc("jdbc:mysql://192.168.16.150:3306/prediction",tableName,prop)
      }
    }
  • 相关阅读:
    Android 解决小米手机Android Studio安装app 报错的问题It is possible that this issue is resolved by uninstalling an existi
    Android Unresolved Dependencies
    Android studio 自定义打包apk名
    Android Fragment与Activity交互的几种方式
    魅族和三星Galaxy 5.0webView 问题Android Crash Report
    Android几种常见的多渠道(批量)打包方式介绍
    Android批量打包 如何一秒内打完几百个apk渠道包
    上周热点回顾(9.30-10.6)团队
    上周热点回顾(9.23-9.29)团队
    上周热点回顾(9.16-9.22)团队
  • 原文地址:https://www.cnblogs.com/sabertobih/p/14170391.html
Copyright © 2011-2022 走看看