zoukankan      html  css  js  c++  java
  • 相似人群画像算法

    欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~

    本文由week 发表于云+社区专栏

    一、数据源

    1、相似人群数据存在TDW库中,数据字典说明:

    CREATE TABLE sim_people_tdw_tbl(
        uid STRING COMMENT 'reader id',
        sim_uids STRING COMMENT 'sim_uids',
        sim_num BIGINT COMMENT 'sim_num',
        update_date STRING COMMENT 'update_date'
    )
    
    字段 类型 含义
    uid string 用户标识
    sim_uids string 与uid喜好相似的人群,格式为用户编号:相同阅读量,相似用户之间以逗号分隔
    sim_num BIGINT 相似人群的人数
    update_date string 数据日期

    2、基础用户画像存在MongoDB中

    img基础用户画像

    字段 含义
    _id 用户id
    profile(离线)positive(实时) 用户正画像(喜欢),每个维度以分号间隔,每个子维度以逗号间隔,值格式为key_id:weight,维度含义依次为一级分类、二级分类、关键字、topic、阅读来源
    negative 负画像(不喜欢),其他字段的含义与正画像一样
    update_time 更新时间
    cityCode或city 城市编码

    3、相似人群画像也存在MongoDB中

    img

    二、整体思路

    由于TESLA集群无法直接操作MongoDB,需要将TDW里面的用户画像数据,通过洛子系统导出至HDFS,再与MongoDB中原有群画像进行合并。

    img整体流程

    三、算法流程

    img算法流程图

    四、核心代码

    #! /usr/bin/python2.7
    # -*- coding: utf8 -*-
    import decimal
    import time
    import math
    import sys
    import os
    import param_map
    from pymongo.collection import Collection
    from decimal import Decimal
    import datetime
    
    reload(sys)
    sys.setdefaultencoding("utf-8")
    sys.path.append("../")
    from utils import mongoUtils, confUtils
    
    decimal.getcontext().prec = 6
    BATCH_NUM = 100000
    
    now_time = datetime.datetime.now()
    delta = datetime.timedelta(days=30)
    delta30 = now_time - delta
    time_limit = int(time.mktime(delta30.timetuple()))
    print(time_limit)
    
    
    def split_uid_similarity(uid_num_str):
        """
        拆分uid和相似度,并分别返回
        :param uid_num_str:
        :return:uid,相似度
        """
        uid_num = uid_num_str.split(":")
        return uid_num[0], float(uid_num[1])
    
    
    def split_uid_sim_user(user_hd):
        """
        拆分uid和相似人群,并分别返回
        :param user_hd:
        :return: uid,相似人群
        """
        uid_sim_user = user_hd.strip().split("	")
        return uid_sim_user[0], uid_sim_user[1]
    
    
    def dimension_profile_limit(dimension_profile, min_i, max_i, limit, cluster_profile_str):
        """
        :param dimension_profile:
        :param min_i:
        :param max_i:
        :param limit:
        :param cluster_profile_str:
        :return: 返回前limit个特征标签,并对特征权重进行映射
        """
        if len(dimension_profile) != 0:
            # 先排序
            dimension_profile = sorted(dimension_profile.iteritems(), key=lambda c: c[1], reverse=True)
            # 再对前limit条记录进行映射
            size = limit if len(dimension_profile) > limit else len(dimension_profile)
            for i in range(size):
                tag = dimension_profile[i]
                tag_id = tag[0]
                tag_value = tag[1]
                tag_value = max_i if tag_value > max_i else tag_value
                if tag_value >= min_i:
                    cluster_profile_str = cluster_profile_str + str(tag_id) + ":" + str(tag_value) + ","
            if len(dimension_profile) != 0:
                # 假如长度不为0,将最后一个逗号删掉
                cluster_profile_str = cluster_profile_str[:-1]
        return cluster_profile_str
    
    
    def cluster_profile_dic2list(cluster_profile, dimension_param_dic):
        """
        相似用户群画像阈值过滤,dic->list
        :param dimension_param_dic: 维度阈值
        :return: 相似用户群特征list
        :param cluster_profile:群体画像
        """
        cluster_profile_str = ""
        if len(cluster_profile) == 0:
            return None
        for key, dimension_profile in cluster_profile.items():
            # 取出维度的阈值
            dimention_param = dimension_param_dic.get(str(key))
            if dimention_param is not None:
                min_i = dimention_param.get("min")
                max_i = dimention_param.get("max")
                limit = dimention_param.get("limit")
                if dimension_profile is not None:
                    cluster_profile_str = dimension_profile_limit(dimension_profile, min_i, max_i, limit,
                                                                  cluster_profile_str)
            # values为不为None 都需要追加一个分号
            cluster_profile_str = cluster_profile_str + ";"
        cluster_profile_list = cluster_profile_str[:-1].split(";")
        return cluster_profile_list
    
    
    def sim_users_dic2list(cluster_dic, sim_users_max_size):
        """
        # 相似人群数量限制,dic->list
        :param sim_users_max_size: 相似人群的最大值
        :type cluster_dic: 字典表
        :param cluster_dic:相似人群字典表
        :return: 相似度最高的相似人群
        """
        user_similarity_list = sorted(cluster_dic.iteritems(), key=lambda b: b[1], reverse=True)
        sim_users_s = ""
        i = 0
        new_cluster_dic = {}
        for i in range(len(user_similarity_list)):
            if i < sim_users_max_size:
                user_similarity = user_similarity_list[i]
                key = user_similarity[0]
                value = user_similarity[1]
                new_cluster_dic[key] = value
                sim_users_s = sim_users_s + key + ":" + str(value) + ","
            else:
                break
            i = i + 1
        sim_users_list = sim_users_s[:-1].split(",")
        return sim_users_list, new_cluster_dic
    
    
    class ClusterProfileComputer(object):
        cf = confUtils.getConfig("../conf/setting.conf")
    
        def __init__(self, environment):
            self.xw_database, self.xw_client = mongoUtils.getMongodb("XW")
            self.pac_database, self.pac_client = mongoUtils.getMongodb("PAC")
            self.om_database, self.pac_client = mongoUtils.getMongodb("OM")
            item = "LOCAL_SIM_USERS_PATH" if environment == "local" else "SIM_USERS_PATH"
            self.sim_users_path = confUtils.getFilePath(self.cf, "SIM_USERS", item)
            self.decay_factor = param_map.SIM_USERS_PARAM.get("decay_factor")
            self.sim_users_max_size = param_map.SIM_USERS_PARAM.get("sim_users_max_size")
            self.similarity_low = param_map.SIM_USERS_PARAM.get("similarity_low")
            self.similarity_high = param_map.SIM_USERS_PARAM.get("similarity_high")
    
        @staticmethod
        def basic_cursor2dic(platform, mongodb_cursor):
            """
            mongodb取出的基础画像存到字典表
            :param platform: 平台
            :param mongodb_cursor:
            :return:
            """
            users_profile_map = {}
            for user_profile in mongodb_cursor:
                _uid = user_profile["name"] if platform == "PAC" else user_profile["_id"]
                users_profile_map[_uid] = user_profile
            return users_profile_map
    
        @staticmethod
        def get_sim_users_profile(all_users_profile, users_similarity):
            """
            :param all_users_profile:
            :param users_similarity:
            :return:相似人群的画像
            """
            rs = []
            for uid_similarity in users_similarity:
                uid, similarity = split_uid_similarity(uid_similarity)
                profile = all_users_profile.get(uid)
                if profile is not None:
                    rs.append(profile)
            return rs
    
        def dump_basic_profile(self, all_uid, batch_num, platform, profile_collection):
            # type: (list, int) -> dict
            """
            :return: 平台基础画像
            :param platform: 平台
            :return: 基础画像字典表
            :param profile_collection: 数据库集合
            :param all_uid:用户的编号列表
            :type batch_num: int
            """
            rs = {}
            # 数据库查询所有人群用户画像,此画像中没有相似人群
            for x in xrange(0, int(math.ceil(len(all_uid) / float(batch_num)))):
                key = "name" if platform == "PAC" else "_id"
                cursor = profile_collection.find({"$and": [{key: {'$in': all_uid[x * batch_num:(x + 1) * batch_num]}},
                                                           {"update_time": {"$gt": time_limit}}]}, no_cursor_timeout=True)
                rs.update(self.basic_cursor2dic(platform, cursor))
                cursor.close()
            return rs
    
        def compute_single_file(self, path, xw_profile_collection, pac_profile_collection, om_profile_collection):
            users = open(path)
            all_uid_list = []
            uid_sim_map = {}
            # uid_sim_map["1_291083852"] = ["1_757155427:8"]
            for user_str in users:
                # 从hdfs中取出udi的相似人群
                uid_hf, sim_users_hd = split_uid_sim_user(user_str)
                uid_sim_map[uid_hf] = sim_users_hd.split(",")
                all_uid_list.append(uid_hf)
            print("uid_sim_map : %d" % len(uid_sim_map))
            # 数据库查询所有用户的基础画像,此画像中没有相似人群
            platform_basic_profile_dic = {}
    
            xw_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "XW", xw_profile_collection)
            platform_basic_profile_dic["XW"] = xw_users_basic_profile_map
    
            pac_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "PAC", pac_profile_collection)
            platform_basic_profile_dic["PAC"] = pac_users_basic_profile_map
    
            om_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "OM", om_profile_collection)
            platform_basic_profile_dic["OM"] = om_users_basic_profile_map
            # print("dump basic profile %d records" % len(pac_all_users_profile_map))
            # 数据库查询相似人群画像
            cluster_profile_collection = self.xw_database.get_collection(
                param_map.MONGODB_CLUSTER_PROFILE_MAP["Cluster"])  # type: Collection
            old_cluster_profile_map = dump_cluster_profile_history(self, all_uid_list, cluster_profile_collection,
                                                                   BATCH_NUM)
            print("dump cluster profile %d records" % len(old_cluster_profile_map))
            #index = 0
            for uid, sim_users_list in uid_sim_map.items():
                print ("uid = %s" % uid)
                # 合并新老相似人群,并使用衰减因子来计算相似度
                users_similarity_dic = merge_sim_users(uid, sim_users_list, self.decay_factor, self.similarity_low,
                                                       self.similarity_high, old_cluster_profile_map)
                # 相似人群---->将字典表转化为list,存储mongodb
                sim_users_list, users_similarity_dic = sim_users_dic2list(users_similarity_dic, self.sim_users_max_size)
                print("similar people len: %d" % len(sim_users_list))
                platform_cluster_profile_list = []
                for platform_name, platform_basic_profile in platform_basic_profile_dic.items():
                    # 取出用户i相似人群的画像
                    sim_users_profile_list = self.get_sim_users_profile(platform_basic_profile, sim_users_list)
                    cluster_profile_dic = cluster_profile_compute(platform_name, sim_users_profile_list,
                                                                  users_similarity_dic)
                    # 结果区间映射,相似人群画像特征----->字典表转list,便于存储mongodb
                    cluster_profile_list = cluster_profile_dic2list(cluster_profile_dic, param_map.DIMENSION_PARAM)
                    platform_cluster_profile_list.append(cluster_profile_list)
                   
                xw_cluster_profile = platform_cluster_profile_list[0]
                pac_cluster_profile = platform_cluster_profile_list[1]
                om_cluster_profile = platform_cluster_profile_list[2]
    
                old_profile = cluster_profile_collection.find_one({"_id": uid})
                if old_profile is None:
                    create_time = int(time.time())
                else:
                    create_time = old_profile["create_time"]
                document = ({"_id": uid, "sim_users": sim_users_list, "xw_cluster_profile": xw_cluster_profile,
                             "pac_cluster_profile": pac_cluster_profile, "om_cluster_profile": om_cluster_profile,
                             "create_time": create_time,
                             "update_time": int(time.time())})
                cluster_profile_collection.save(document)
                #if index >= 100:
                #    break
                #index = index + 1
            print("end")
            users.close()
    
        def run(self):
            # 相似人群HDFS
            xw_profile_collection = self.xw_database.get_collection(param_map.MONGODB_PROFILE_MAP["XW"])
            pac_profile_collection = self.pac_database.get_collection(param_map.MONGODB_PROFILE_MAP["PAC"])
            om_profile_collection = self.om_database.get_collection(param_map.MONGODB_PROFILE_MAP["OM"])
            for dir_path, dir_names, file_names in os.walk(self.sim_users_path):
                print(dir_names)
                for file_name in file_names:
                    if "attempt_" in file_name:
                        print(file_name)
                        path = os.path.join(dir_path, file_name)
                        self.compute_single_file(path, xw_profile_collection, pac_profile_collection, om_profile_collection)
    
    
    def accumulate_dimension_profile(cluster_dimension_feature, user_dimension, ratio):
        """
        将user指定维度的特征累加到群画像
        :param cluster_dimension_feature:群画像某个维度的特征
        :param user_dimension:用户某个维度的特征
        :param ratio:user的权重,公式为相似度/(相似度+10),区间为(1/3,10/11)
        :return:指定维度的群画像
        """
        if user_dimension != "":
            user_feature_list = user_dimension.split(",")
            for feature in user_feature_list:
                atom = feature.split(":")
                if len(atom) == 2:
                    k = atom[0]
                    w = atom[1]
                    if cluster_dimension_feature.get(k) is None:
                        cluster_dimension_feature[k] = Decimal(w) * ratio
                    else:
                        cluster_dimension_feature[k] = Decimal(w) * ratio + Decimal(cluster_dimension_feature[k])
        return cluster_dimension_feature
    
    
    def dump_cluster_profile_history(self, all_uid, collection, batch_num):
        rs = {}
        for x in xrange(0, int(math.ceil(len(all_uid) / float(batch_num)))):
            cursor = collection.find({'_id': {'$in': all_uid[x * batch_num:(x + 1) * batch_num]}},
                                     no_cursor_timeout=True)
            rs.update(cluster_cursor2dic(cursor))
            cursor.close()
        return rs
    
    
    def cluster_cursor2dic(mongodb_cursor):
        """
        mongodb取出的人群画像存到字典表
        :param mongodb_cursor:
        :return:
        """
        users_profile_map = {}
        for user_profile in mongodb_cursor:
            _uid = user_profile["_id"]
            users_profile_map[_uid] = user_profile
        return users_profile_map
    
    
    def merge_sim_users(uid_hdf, sim_users_new, decay_factor, similarity_low, similarity_high, old_cluster_profile_dic):
        """
        合并相似人群
        :param similarity_low: 相似度最低值
        :param similarity_high: 相似度最高值
        :param uid_hdf: 用户编号
        :param sim_users_new: 最新的相似用户
        :param decay_factor: 衰减因子
        :param old_cluster_profile_dic:老群体画像
        :return:最新的相似人群
        """
        cluster_union_dic = {}
        # 提取uid和相似度到字典表
        for user_similarity in sim_users_new:
            _uid, similarity = split_uid_similarity(user_similarity)
            cluster_union_dic[_uid] = similarity
    
        # 从mongodb中读取老画像
        old = old_cluster_profile_dic.get(uid_hdf)
        if old is not None:
            sim_users_old = old['sim_users']
            for uid_similarity_old in sim_users_old:
                uid_similarity_old_list = uid_similarity_old.split(":")
                if len(uid_similarity_old_list) == 2:
                    sim_uid_old = uid_similarity_old_list[0]
                    try:
                        weight_old = float(uid_similarity_old_list[1]) * float(decay_factor)
                    except IndexError:
                        pass
                    else:
                        if (cluster_union_dic.get(sim_uid_old) is None) and (weight_old >= similarity_low):
                            cluster_union_dic[sim_uid_old] = weight_old
                        else:
                            weight_new = weight_old + cluster_union_dic[sim_uid_old]
                            if weight_new > similarity_high:
                                weight_new = similarity_high
                            if weight_new < similarity_low:
                                del cluster_union_dic[sim_uid_old]
                            else:
                                cluster_union_dic[sim_uid_old] = weight_new
        return cluster_union_dic
    
    
    def cluster_profile_compute(platform, sim_users_profile_array, sim_users_dic):
        # type: (String, list, dic) -> dic
        """
        相似人群特征计算
        :param platform:平台
        :param sim_users_profile_array: 从mongodb中查出来的相似人群的画像
        :param sim_users_dic: 相似人群的相似度字典表
        :return: 相似人群画像字典表
        """
        cluster_profile_rs = {}
        for sim_user_obj in sim_users_profile_array:
            key = "name" if platform == "PAC" else "_id"
            sim_user_id = sim_user_obj.get(key)
            # 获取两两用户的相似度
            similarity = sim_users_dic.get(sim_user_id)
            if similarity is not None:
                sim_num = Decimal(similarity)
                # 用户对应的权重
                rate = Decimal(sim_num / (10 + sim_num))
                # 取出某一个人的画像
                profile = sim_user_obj.get("profile") if sim_user_obj.get("profile") is not None else ""
                dimension_list = profile.split(";")
                i = 0
                for u_dimension in dimension_list:
                    # 获取群体维度i的特征
                    dimension_feature = cluster_profile_rs.get(i)
                    if dimension_feature is None:
                        dimension_feature = {}
                    # 更新维度i的特征
                    cluster_profile_rs[i] = accumulate_dimension_profile(dimension_feature, u_dimension, rate)
                    i = i + 1
        return cluster_profile_rs
    
    
    if __name__ == "__main__":
        if len(sys.argv) == 2:
            env = sys.argv[1]
        else:
            env = "local"
        computer = ClusterProfileComputer(env)
        computer.run()
    

    问答

    linux实时调度算法?

    相关阅读

    5 种 Docker 日志最佳实践

    你的nginx访问过慢?增加个模块吧!

    MySQL 8.0 版本功能变更介绍

    此文已由作者授权腾讯云+社区发布,原文链接:https://cloud.tencent.com/developer/article/1159230?fromSource=waitui

    欢迎大家前往腾讯云+社区或关注云加社区微信公众号(QcloudCommunity),第一时间获取更多海量技术实践干货哦~

    海量技术实践经验,尽在云加社区

  • 相关阅读:
    LintCode2016年8月22日算法比赛----骰子求和
    LintCode2016年8月22日算法比赛----平面列表
    LintCode2016年8月22日算法比赛----将数组重新排序以构造最小值
    LintCode2016年8月22日算法比赛----克隆二叉树
    Leetcode算法比赛----Longest Absolute File Path
    Leetcode算法比赛----First Unique Character in a String
    vue运行报错Error: listen EADDRNOTAVAIL 192.168.1.105:8080
    vue使用lrz插件压缩图片
    <input type="file">原型难看
    vue创建全局变量以及全局方法
  • 原文地址:https://www.cnblogs.com/qcloud1001/p/9359404.html
Copyright © 2011-2022 走看看