zoukankan      html  css  js  c++  java
  • 糖豆实时推荐系统设计与实现

    1.实时推荐系统与相关工作

    1.1 原因

    实时计算能够及时捕获用户短时兴趣,同时能够快速反馈分发当前系统的用户兴趣内容。大量实践以及发表的文章都显示了推荐系统实时化,对推荐精准度的提升的有效性和必要性。

    1.2 腾讯架构与实现

    实时推荐相关工作非常多,腾讯和北大合作的两篇SIGMOD文章是比较实际和详细的实现,采用的计算框架能够支持大规模数据的实时推荐,以下将会分开简述以下两篇文章。

    2015年

    Huang发表了基于Storm和KV存储的大规模实时推荐系统 (TencentRec: Real-time Stream Recommendation in Practice)

    1. 实现了一系列经典推荐算法的实时版本
    2. 实现了数种实时算法提高推荐精度
    3. 广泛应用于业务有效提高

    腾讯采用使用storm原因,支持实时数据流式计算,良好的可扩展性、可容错性,采用简单编程模型。
    文章核心包括实时增量计算的ItemCF,以及用户隐式反馈计算、实时剪枝算法、基于用户画像的数据稀疏性策略。应用在多个业务上都有不同程度的提升,最明显的是腾讯视频的全局表现提升高达30%。

    全文核心应该是下图六道公式,阐述腾讯如何具体实现的增量itemcf。

    文章中的co-rating,其实就是我们常说的user bias. 公式3和4解决了用户隐式反馈问题,细节的计算可以参考2016的文章,实际是一个log函数融合了用户的浏览、点击、分享、购买等行为,转化成rating.


    corating.png

    请注意公式4,由于他们定义了corating,实际是将相似度的增量计算从L2范数的计算转化成了L1范数计算.(当Rup取x的时候,y=1/x)。

    可扩展的增量计算


    itemcf.png

    initemcf.png

    2016年

    腾讯视频的推荐应用(Real-time Video Recommendation Exploration)

    1. 实时处理、大规模数据下的准确率和可扩展性。
    2. 开发了一个基于矩阵分解的大规模在线协同过滤算法,以及一系列的自适应更新策略。
    3. 通过增加包括视频类别、时间因素影响、用户画像剪枝以及训练等方法,提高实时TopN推荐的精度。

    在我们看来,全文核心在于实时计算的数据流转,如下图所示:


    tecvideo.png

    基于storm的实时计算topology图:


    topo.png

    2. 糖豆的设计与实现

    2.1 架构

    糖豆整体推荐框架,从离线,近线,在线三套计算流程组合而成。在线流程基于Spark Streaming框架实现,部署在近线集群。 在线推荐框架实时根据用户行为,生成实时推荐列表,从而满足用户瞬时兴趣,提高推荐系统的推荐新鲜度。简单架构图如下:


    糖豆实时架构.png

    2.2 基于Spark Streaming的实现

    2.2.1. 计算流程

    实时计算流程如下图所示:


    实时计算流程图


    分解步骤:

    1. Spark Streaming 读取Kafka,原始日志ETL
    2. 提取用户隐式反馈,生成候选集tuple (uid,vid)
    3. 每天凌晨会将离线计算好的ItemCF模型结果集导入Redis。itemcf数据结构是一个similarity vid list。
    4. 实时维护看过视频set,对看过视频的处理候选集tuple过滤该用户看过的视频
    5. 实时更新推荐过视频set,候选集tuple过滤当天已经被推荐过的视频
    6. 候选集写入Redis推荐list

    python实现:

    if __name__ == "__main__":
        print sys.argv
        reload(sys)
        sys.setdefaultencoding('utf-8')
        sc = SparkContext(appName="real_time_etl")
        #20秒
        ssc = StreamingContext(sc, 15)
        brokers = "kafka-servers:9092"
        topic = "logstash"
        #读取kafka
        kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
       #解析日志、过滤无关数据、读取相似视频
        lines = kvs.map(lambda x : readJson(x[1])).filter(lambda x: x is not None).map(lambda x: getTopkfromRedis(x))
        #lines.pprint()
         #写入推荐结果
        lines.foreachRDD(lambda rdd: list2Redis(rdd))  
        ssc.start()
        ssc.awaitTermination()

    2.2.2 监控

    部署在集群Master节点的监控脚本会每30s扫描一次实时计算代码进程,如果发现进程被failed,会自动拉起实时计算Spark Steaming进程。如果进程拉起失败会触发邮件、短信报警

    #! /bin/sh
    
    MOBILE="your phone numbers"
    RT_HOME=/home/realtime/recommend.py
    
    DIR=/data/rtdamon
    PID_FILE=$DIR/.run/rt-litetl-damon.pid
    LOG_FILE=$DIR/.log/rt-litetl-damon.log
    t=$(date -d "today" +"%Y-%m-%d %H:%M:%S")
    
    source /etc/profile 
    echo $PID_FILE $LOG_FILE
    
    if [ -e "$PID_FILE" ];
    then
            pid=`cat $PID_FILE`
            echo $pid
            damon_process_exists=`ps v -p $pid | grep "rt-litetl-damon.sh" | grep -v grep|grep -v <defunct> `
            echo "damon process exists : $process_exists"
            if [ -n "$damon_process_exists" ]
            then
                    echo "Process rt-litetl-damon.sh is running! $t" >> $LOG_FILE
                    exit
            fi
    fi
    
    pid=$$
    echo "$pid" > $PID_FILE
    
    while :
    do
            process_exists=`ps -ef|grep "$RT_HOME"|grep "spark"|grep -v grep|wc -l`
            echo "process exists : $process_exists" >>$LOG_FILE
            if [ "$process_exists" == "0" ]; then
    
    
    /hadoop/spark/bin/spark-submit  --master yarn --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 --py-files /hadoop/user/rt/redis.zip --num-executors 10 --executor-cores 7 --executor-memory 6g /home/realtime/recommend.py>>/data/rtlog/rtrecommed.log  2>&1 &
        /usr/bin/php -f /data/rtdamon/yunsms.class.php "$MOBILE" "recommend.py"
                    echo "realtime recommendation process already restarted at $t" >> $LOG_FILE
            fi
    
            #sleep `expr 3600 * 3`
            sleep `expr 60 * 1`
    done

    2.3 收益

    根据我们的AB测试数据来看,整体CTR提升25%。用推荐系统的A版对比无推荐的B版,用户观看时长提升47%。


    recabdata.png

    3. 问题与改进

      1. 较多代码逻辑集中在Redis。目前Redis无灾备措施,同时IO和负载也会出现Peak。
      2. Spark Streaming 目前实时级别在分钟级。需要升级成storm的秒、毫秒级别。
      3. 需要用户点击等行为才会生产数据,容易召回不足。
  • 相关阅读:
    MyCat分库分表-安装
    oracle 字符串格式转化 与 今天 /本周 /本月 查询
    java 接口开发时 后台无法获取前端传过来的参数值
    orace 异常 ORA-01830: 日期格式图片在转换整个输入字符串之前结束
    java json数据返回值中文乱码 出现???
    特殊的日子 2018年总结(一个人的胡言乱语)
    写在2016年的第365天,记录我的2016
    app mui框架 安卓手机app禁止截屏
    oracle 查询函数wm_concat,decode,COALESCE
    后台获取select的值,给页面添加默认值 【js】待续...
  • 原文地址:https://www.cnblogs.com/ventlam/p/7268267.html
Copyright © 2011-2022 走看看