zoukankan      html  css  js  c++  java
  • 用sparkR, 分析上亿条订单数据的脚本。

    上周我们这个10人的小团队开发的推荐拉新系统, 日拉新人数已接近4万人。过去几个月这个系统从无到有, 拉新从日增几千稳步增长到日增几万, 同事们几个月来,每天工作13个小时以上,洗澡时间都没有, 有时就住在公司, 回家怕吵到家人,只能睡客厅地板, 周日也不能保证休息。 大家的全力投入,不懈努力才能有这个结果。 非常感慨团队产生的的化学反应, 和惊人的生产效率。 产品稳定后,最近全面转入大数据分析, 和机器学习阶段, 开始做真正的增长黑客实践。 spark, R, scala都是刚刚开始深入地学习,没几天, 还好有数据, 学的快!, 不休息, 连做梦都是在做分析数据的工作, 日进千里啊。 

    刚开始用spark-sql的时候, 如果做一个复杂的查询,写一长串sql, 谁都看不懂,拆成小sql, 就要保存中间结果, 效率低下。 用了几天后, 开始切入sparkR和Scala , 发现效率比直接用spark-sql高太多了, 代码可读性也强太多。此外善用cahe,也可以有效提高效率。

    下面都是干货。废话不多少, 只希望帮到你。

    工作目标: 分析一下新手券分享的拉新效果和人数,需要对最近15日的订单大概2亿多条订单纪录, 以及300万左右的领券纪录, 几十万笔的返利信息做全库查询 , 这在msql上是不可能完成的任务。 对spark+hive来说, 也很耗时, 但一个小时内可以搞定。

    用R写了一下查询脚本, 稍后准备改成scala的。 两者都是调用spark api, 区别应该只在语法上。 

    用15个节点的spark跑这个查询脚本, 大概需要半个多小时才能出来结果。代码是最完整,最准确的文档, 提纲挈领的总结以后得空再总结。 

    ############################statistics.R################################

    #领券日期参数, 修改统计日参数
    date_parameter <- "2016-07-11"
    dayCount_parameter = 1


    hiveContext <- sparkRHive.init(sc)
    sql(hiveContext, "use honeycomb_bh_db")

    #通过hiveSql 获得想要的并集集合并且缓存下来 sql date_add
    ##程序执行阶段1: 数据准备。。。。。
    acquired_users_sql <-"select * from sc_t_acquire_record where sc_t_acquire_record.year=2016 and sc_t_acquire_record.month=07 and to_date(ct_time)='STARTDATE'"
    all_order_sql <- "select * from sc_t_order_all_info As a where a.year=2016 and a.month=07 and to_date(a.create_time)>='STARTDATE' and to_date(a.create_time)<=date_add(date('STARTDATE'),14) and product_id=210"
    rebate_order_sql <- "select * from sc_t_order_rebate_info As a where a.year=2016 and a.month=07 and to_date(a.create_time)>='STARTDATE' and to_date(a.create_time)<=date_add(date('STARTDATE'),7) and product_id=210"

    acquired_users_sql<-sub(pattern='STARTDATE', replacement=date_parameter, acquired_users_sql)
    all_order_sql<-gsub(pattern='STARTDATE', replacement=date_parameter, all_order_sql)
    rebate_order_sql<-gsub(pattern='STARTDATE', replacement=date_parameter, rebate_order_sql)


    #当天领券绑定的用户集合
    acquired_users <-sql(hiveContext,acquired_users_sql)
    cache(acquired_users)

    #15日内的全订单集合
    all_orders <-sql(hiveContext,all_order_sql)

    #7日内返利的订单集合
    rebated_orders <- sql(hiveContext,rebate_order_sql)

    #第0日领券后到14日结束前, 有打车纪录的
    acquired_users_with_orders<-join(acquired_users,all_orders, acquired_users$presentee_mobile==all_orders$passenger_phone, "left_outer")
    acquired_users_with_orders <- filter(acquired_users_with_orders, "passenger_phone is not null")


    mobiles_acquired_users <-distinct(select(acquired_users_with_orders, "presentee_mobile"))
    #write.json(acquired_users_with_orders, "file:///home/rd/spark/bin/20160711_users_convertion.json")

    #第0日领券后~第7日结束前,被返利的领券用户
    orders_rebated_within_8days <- join(acquired_users,rebated_orders, acquired_users$presentee_mobile==rebated_orders$passenger_phone, "left_outer")
    orders_rebated_within_8days <- filter(orders_rebated_within_8days, "passenger_phone is not null")

    cache(orders_rebated_within_8days)
    results <- data.frame("name" = c("frist"), "value" = c(0),stringsAsFactors=FALSE)

    ##程序执行阶段2: 开始利用spark进行集合运算。。。。。

    #第0日到第7日结束前, 券有效期内打过车的领券用户订单数据
    rules<- "to_date(a.create_time)>='STARTDATE' and to_date(a.create_time)<=date_add(date('STARTDATE'),7)"
    rules<-gsub(pattern='STARTDATE', replacement=date_parameter, rules)
    orders_within_8days = filter(acquired_users_with_orders, rules)
    mobiles_with_orders_within_8days <- distinct(select(orders_within_8days, "presentee_mobile"))


    #第8日到第14日结束前, 券过期后, 打过车的领券用户订单数据
    rules<- "to_date(a.create_time)>=date_add(date('STARTDATE'),8) and to_date(a.create_time)<=date_add(date('STARTDATE'),15)"
    rules<-gsub(pattern='STARTDATE', replacement=date_parameter, rules)
    orders_after_8days = filter(acquired_users_with_orders, rules)
    mobiles_with_orders_after_8days <- distinct(select(orders_after_8days, "presentee_mobile"))


    #第0日到第7日结束前, 被返利信息纪录的领券用户
    mobiles_user_reabted <-distinct(select(orders_rebated_within_8days, "presentee_mobile"))

    #券0~7天有效期内首单后未被返利的用户
    mobiles_my_team_losted <- except(mobiles_with_orders_within_8days, mobiles_user_reabted)

    #第8日券有效期过后, 14日内, 有成交纪录被sic统计方法, 统计进来的用户
    mobiles_after_7days_countedBySicheng <-except(mobiles_with_orders_after_8days, mobiles_user_reabted)

    #券0~7天有效期内首单后未被返利的用户, 第8日到第14日成单, 被sic统计转化的用户
    mobiles_my_team_losted_countedBySicheng <-intersect(mobiles_my_team_losted, mobiles_with_orders_after_8days)


    #第8日券有效期过后, 14日内, 思成没有统计的首单用户
    mobiles_both_losted <- except(mobiles_my_team_losted, mobiles_after_7days_countedBySicheng)

    #券0~7天有效期内首单后未被返利, 后7天没打车的用户
    mobile_first_order_withno_coupon_no_futher_order_after_7days <- except(mobiles_my_team_losted, mobiles_with_orders_after_8days)

    #7日内没打车, 后7日打车的用户
    mobiles_with_order_invoked_coupon <- except(mobiles_with_orders_after_8days, mobiles_with_orders_within_8days)

    #领券后15天里打车的用户, 由于业务特性,可以重复领券 这个存在重复统计。
    mobiles_converted = acquired_users_with_orders

    #程序运行阶段: 输出结果。。。
    results<-rbind(results, c("领新手券的用户数量", nrow(distinct(select(acquired_users, "presentee_mobile")))))
    results<-rbind(results, c("领新手券后15日转化的用户数量", nrow(mobiles_acquired_users)))
    results<-rbind(results, c("领新手券7日内打车用券转化的用户数量", nrow(mobiles_user_reabted)))
    results<-rbind(results, c("新手券有效期过期后7日内打车转化用户", nrow(mobiles_after_7days_countedBySicheng)))
    results<-rbind(results, c("sic统计方法统计的转化用户数", nrow(mobiles_user_reabted)+nrow(mobiles_after_7days_countedBySicheng)))
    results<-rbind(results, c("7日内首单未用新手券的人数", nrow(mobiles_my_team_losted)))
    results<-rbind(results, c("7日内首单未用新手券, 后7日内没打车的人数", nrow(mobiles_both_losted)))
    results<-rbind(results, c("7日内首单未用新手券, 后7日内有打车的人数", nrow(mobiles_my_team_losted_countedBySicheng)))

    results<-rbind(results, c("领新手券后7日内未打车, 后7日又打车的人数", nrow(mobiles_with_order_invoked_coupon)))
    results

  • 相关阅读:
    golang切片使用append追加内容导致切片值异常问题
    golang对通道进行select,case生效异常问题
    golang defer未按预期顺序执行
    GO实现无锁队列
    mysql建表报错:Specified key was too long
    右键快捷键
    算法与数据结构基础<三>----数据结构基础之栈和队列加强之用栈实现队列
    ios从入门到放弃之C基础巩固-----数组、字符串
    获取股票数据【使用JQData查询行情数据、财务指标、估值指标】
    IDEA配置.gitignore不生效的问题
  • 原文地址:https://www.cnblogs.com/realzjx/p/5716309.html
Copyright © 2011-2022 走看看