zoukankan      html  css  js  c++  java
  • 如何将平时所学的代码知识,用在实际测试场景中

    需求如下:

     第一个:每30张卡片最多包含2张过去14天没有登录的用户,卡片尽量不要出现在前15张的位置

    # 需求:每30张卡片最多包含2张过去14天没有登录的用户,卡片尽量不要出现在前15张的位置
    from knight_util.get_db_token import tk_
    import requests
    from knight_util.util import KnightUtil
    from knight_util.write_txt import write_log
    from knight_util.get_current_time import current_time
    import threading
    import json
    m1 = KnightUtil().mysql_()
    tk = tk_(2397).get('EPAL_WEB')
    # 创建线程锁
    mutex = threading.Lock()
    
    
    def get_user_id_list():
        headers = {
            "_tk_": tk,
            "content-type": "application/json",
            "user-agent": "Mozilla / 5.0(Windows NT 10.0;Win64;x64)"
        }
        data = {"limit": 30, "likeUserIds": [], "dislikeUserIds": [], "excludeUserIds": []}
        url = "https://test-community.gamesegirl.com/user/meets/match/v-3.6"
        response = requests.post(url=url, json=data, headers=headers).json()
        # response = json.loads(response_)
        if response['status'] == 'OK':
            response_length = len(response['content']['data'])
            userid_list = [response['content']['data'][i]['userId'] for i in range(response_length)]
            print(f"总共卡片数:{len(userid_list)}")
            print(f"接口返回的所有meet卡片数据{userid_list}")
            return userid_list
    
    
    def get_t_user_id():
        """获取t_user表里过去14天没有登录的用户id"""
        start_time = current_time("sub_time", 14)
        print(f"查询开始时间:{start_time}")
        end_time = current_time("add_time", 1)
        print(f"查询结束时间:{end_time}")
        # sql = '''SELECT id FROM community.t_user t1 WHERE DATE_FORMAT(t1.last_access_time,'%Y-%m-%d %H:%i:%S') BETWEEN '2020-12-30 00:00:00' AND  '2021-01-13 23:59:59';'''
        sql = '''SELECT id FROM community.t_user t1 WHERE DATE_FORMAT(t1.last_access_time,'%Y-%m-%d %H:%i:%S') BETWEEN '{}' AND  '{}';'''.format(
            start_time, end_time)
        # 锁定
        mutex.acquire()
        m1.execute(sql)
        # 释放
        mutex.release()
        id_list = [data[0] for data in m1.fetchall()]
        print("在去14天登录的用户数据(含今天)")
        print(id_list)
        return id_list
    
    
    def get_last_access_time(id_):
        """获取最后一次访问时间"""
        sql = '''SELECT t1.last_access_time FROM community.t_user t1 WHERE id={}'''.format(id_)
        mutex.acquire()
        m1.execute(sql)
        mutex.release()
        for data in m1.fetchall():
            # print(data[0])
            return data[0]
    
    
    def run():
        db_list = get_t_user_id()  # 数据库数据
        api_list = get_user_id_list()  # 接口数据
        # 没在数据库里面返回的数据
        not_in_list = [id_ for id_ in api_list if id_ not in db_list]
        # 在数据库里面返回的数据
        in_list = [id_ for id_ in api_list if id_ in db_list]
        expire_user_dict = {}
        for j in in_list:
            expire_user_dict.update({f"在过去14天登录的用户ID:{j})": f"最后一次访问时间{get_last_access_time(j)}"})
        print(f"过去14天登录过的用户{in_list}")
        print(expire_user_dict)
        write_log(expire_user_dict)
        last_access_time_dict = {}
        for i in not_in_list:
            last_access_time_dict.update({f"在过去14天没有登录的用户ID{i}": f"最后一次访问时间{get_last_access_time(i)}"})
        # 利用索引找到卡片位置,因索引从0开始,所以返回结果需要加1
        for x in not_in_list:
            print(f"卡片出现位置在:{api_list.index(x) + 1}处")
            write_log(f"卡片出现位置在:{api_list.index(x) + 1}处")
        print(not_in_list)
        print(last_access_time_dict)
        write_log(last_access_time_dict)
    
    
    if __name__ == '__main__':
        run()
        # lock = threading.Lock()
        # for i in range(5):
        #     t1 = threading.Thread(target=run)
        #     # t1.setDaemon(True)
        #     t1.start()
            
    需求:每30张卡片最多包含2张欢迎度小于等于10%的用户,尽量不要出现在前15张
    # 需求:每30张卡片最多包含2张欢迎度小于等于10%的用户,尽量不要出现在前15张
    from knight_util.get_db_token import tk_
    import requests
    from knight_util.Util import KnightUtil
    
    m1 = KnightUtil().mysql_()
    tk = tk_(2397).get('EPAL_WEB')
    
    
    def get_user_id_list():
        headers = {
            "_tk_": tk,
            "content-type": "application/json",
            "user-agent": "Mozilla / 5.0(Windows NT 10.0;Win64;x64)"
        }
        data = {"limit": 30, "likeUserIds": [], "dislikeUserIds": [], "excludeUserIds": []}
        url = "https://test-community.gamesegirl.com/user/meets/match/v-3.6"
        response = requests.post(url=url, json=data, headers=headers).json()
        if response['status'] == 'OK':
            response_length = len(response['content']['data'])
            userid_list = [response['content']['data'][i]['userId'] for i in range(response_length)]
            print(f"总共卡片数:{len(userid_list)}")
            print(f"接口返回的所有meet卡片数据{userid_list}")
            return userid_list
    
    
    def get_t_user_id():
        """获取欢迎度小于10%的"""
    
        sql = '''SELECT user_id,popularity FROM community.user_meets um WHERE um.popularity <=0.10;'''
        m1.execute(sql)
        id_list = [data[0] for data in m1.fetchall()]
        print(f"欢迎度小于10%的所有数据{id_list}")
        return id_list
    
    
    def get_popularity(user_id):
        sql = '''SELECT popularity FROM community.user_meets WHERE user_id={};'''.format(user_id)
        m1.execute(sql)
        for data in m1.fetchall():
            return data[0]
    
    
    def get_region_name(user_id):
        sql = '''SELECT country,region_name FROM egirl.t_dict_area_info WHERE user_id={};'''.format(user_id)
        m1.execute(sql)
        for data in m1.fetchall():
            print(data)
            return data
    
    
    def run():
        db_list = get_t_user_id()  # 数据库数据
        api_list = get_user_id_list()  # 接口数据
        # 欢迎度大于10
        popularity_greater = [id_ for id_ in api_list if id_ not in db_list]
        print(f"接口返回欢迎度大于10%的数据{popularity_greater}")
        # 欢迎度小于10
        popularity_less = [id_ for id_ in api_list if id_ in db_list]
        print(f"接口返回欢迎度小于10%的数据{popularity_less}")
        popularity_dict = {}
        for j in popularity_less:
            popularity_dict.update({f"欢迎度userID是:{j}": get_popularity(j)})
        print(popularity_dict)
        # 利用索引找到卡片位置,因索引从0开始,所以返回结果需要加1
        for x in popularity_less:
            print(f"卡片出现位置在:{api_list.index(x) + 1}处")
        # 查是否同城
        for t in popularity_less:
            get_region_name(t)
    
    
    if __name__ == '__main__':
        run()
    每30张卡片最多包含2张非真人图片,尽量不要出现在前15张
    # 每30张卡片最多包含2张非真人图片,尽量不要出现在前15张
    from knight_util.get_db_token import tk_
    import requests
    from knight_util.Util import KnightUtil
    from knight_util.write_txt import write_log
    
    m1 = KnightUtil().mysql_()
    tk = tk_(2397).get('EPAL_WEB')
    
    
    def get_user_id_list():
        headers = {
            "_tk_": tk,
            "content-type": "application/json",
            "user-agent": "Mozilla / 5.0(Windows NT 10.0;Win64;x64)"
        }
        data = {"limit": 30, "likeUserIds": [], "dislikeUserIds": [], "excludeUserIds": []}
        url = "https://test-community.gamesegirl.com/user/meets/match/v-3.6"
        response = requests.post(url=url, json=data, headers=headers).json()
        if response['status'] == 'OK':
            response_length = len(response['content']['data'])
            userid_list = []
            for i in range(response_length):
                userid_list.append(response['content']['data'][i]['userId'])
            print(userid_list)
            print(f"总共卡片数:{len(userid_list)}")
            return userid_list
    
    
    def is_person(user_id):
        """是否真人"""
        sql = f"SELECT real_person_img FROM community.user_meets WHERE user_id={user_id} AND user_img_status='PASS' AND status='ENABLED';"
        m1.execute(sql)
        for data in m1.fetchall():
            return data[0]
    
    
    def get_real_person_code(user_id):
        """获取是否是真人图片状态码"""
        sql = '''SELECT real_person_img FROM community.user_meets WHERE user_id={} '''.format(user_id)
        m1.execute(sql)
        for data in m1.fetchall():
            return data[0]
    
    
    def run():
        userid_dict = {}
        id_list = get_user_id_list()
        # 是真人
        real_people = []
        # 不是真人
        not_real_people = []
        for i in id_list:
            if is_person(i) == 0:
                real_people.append(i)
                # userid_dict.update({f"user_id:{i}": f"是真人"})
            elif is_person(i) == 1:
                not_real_people.append(i)
                # userid_dict.update({f"user_id:{i}": f"不是真人"})
        print(f"是真人图片的userid:{real_people}")
        print(f"不是真人图片的userid:{not_real_people}")
        dic = {}
        for i in not_real_people:
            dic.update({i: get_real_person_code(i)})
        print(f"不是真人图片的数据:{dic}")
        for x in not_real_people:
            print(f"卡片出现位置在:{id_list.index(x) + 1}处")
    
    
    if __name__ == '__main__':
        run()

     需求如图:

    代码如下:

    # 智能图片正向场景测试
    from knight_util.MysqlUtil import MysqlPool
    import requests
    from multiprocessing import Pool
    from knight_util.Util import KnightUtil
    from knight_util.sso_token import query_meet_token
    import random
    from knight_util.write_txt import write_log
    
    mysql = MysqlPool()
    redis = KnightUtil().redis_()
    # TODO 全局变量user_id
    user_id = [3801]
    token_list = query_meet_token(user_id[0])
    
    
    def query_redis_exposureCount():
        """查询曝光度"""
        redis_data = redis.get("meets:image:calculate:{}".format(user_id[0]))
        img_dict = {}
        for i in range(len(eval(redis_data))):
            img_dict.update({eval(redis_data)[i]['userImg']: eval(redis_data)[i]['exposureCount']})
            write_log(eval(redis_data)[i])
        count_list = [j for j in img_dict.values()]
        return sum(count_list)
    
    
    def batch_like(userid_list=user_id):
        """批量喜欢"""
        # 查计算中的图片
        sql = "SELECT user_img FROM community.user_meets_image WHERE user_id={} AND calculate_status='CALCULATING';".format(
            userid_list[0])
        data = mysql.fetch_all(sql)
        # 计算中的图片
        # user_img_list = [i['user_img'] for i in data]
        user_img_list = ['https://community-oss.epal.gg/data/community/meet/pt/1611841764302.jpeg']
        # 删除数据触发列表
        triggering_condition_list = [20, 80, 130, 180, 230, 270]
        like_num = 0
        while True:
            for tk in token_list:
                like_num += 1
                print(f"第{like_num}次请求开始")
                write_log(f"第{like_num}次请求开始")
                # 喜欢状态
                like_status_list = ["like", "dislike"]
                # like_status_list = ["like", "dislike", "superlike"]
                # 随机喜欢不喜欢
                status = random.choice(like_status_list)
                # 随机正在计算中的图片
                img_url = random.choice(user_img_list)
                # 请求url, token, 参数
                headers = {
                    "_tk_": tk
                }
                url = "https://test-community.gamesegirl.com/user/meets/likeOrDislike"
                data1 = {"likeUserIds": userid_list,
                         "dislikeUserIds": [],
                         "likeUserIdAndImg": [{"userId": userid_list[0],
                                               "imgUrl": img_url}],
                         "disLikeUserIdAndImg": []}
                data2 = {"likeUserIds": [],
                         "dislikeUserIds": userid_list,
                         "likeUserIdAndImg": [],
                         "disLikeUserIdAndImg": [{"userId": userid_list[0],
                                                  "imgUrl": img_url}]}
    
                if status == "like":
                    # 喜欢
                    try:
                        response1 = requests.post(url=url, json=data1, headers=headers).json()
                        print(response1)
                        if response1['status'] == 'OK':
                            print(f"喜欢成功:{img_url}")
                            print(f"曝光总次数:{query_redis_exposureCount()}")
    
                            # 总曝光数等于300时结束运行
                            if query_redis_exposureCount() == 299:
                                print("结束计算")
                                return
                            # 满足条件时,删除meet关系数据
                            if like_num in triggering_condition_list:
                                del_sql = "DELETE FROM community.user_relation WHERE user_id={} OR to_user_id={};".format(
                                    userid_list[0], userid_list[0])
                                print(f"共删除{mysql.delect(del_sql)}条数据")
    
                    except Exception as e:
                        raise e
                # 超级喜欢
                elif status == "superlike":
                    try:
                        super_url = "https://test-community.gamesegirl.com/user/meets/super/like"
                        data3 = {"userId": userid_list[0],
                                 "imgUrl": img_url}
                        response3 = requests.post(url=super_url, json=data3, headers=headers).json()
                        print(response3)
                        if response3['status'] == 'OK':
                            print(f"超级喜欢成功:{img_url}")
                            print(f"曝光总次数:{query_redis_exposureCount()}")
                            # # 删除超级喜欢数据
                            list_keys = redis.keys("1002:test:user:member:superLike:*")
                            if list_keys:
                                for key in list_keys:
                                    print(redis.delete(key))
                                    print("删除超级喜欢会员数据成功")
                            # 总曝光数等于298时结束运行
                            if query_redis_exposureCount() == 299:
                                print("结束计算")
                                return
                            # 满足条件时,删除meet关系数据
                            if like_num in triggering_condition_list:
                                del_sql = "DELETE FROM community.user_relation WHERE user_id={} OR to_user_id={};".format(
                                    userid_list[0], userid_list[0])
                                print(f"共删除{mysql.delect(del_sql)}条数据")
    
                    except Exception as e:
                        raise e
    
                else:
                    # 不喜欢
                    try:
                        response2 = requests.post(url=url, json=data2, headers=headers).json()
                        print(response2)
                        if response2['status'] == 'OK':
                            print(f"不喜欢成功:{img_url}")
                            print(f"曝光总次数:{query_redis_exposureCount()}")
                            # 总曝光数等于298时结束运行
                            if query_redis_exposureCount() == 299:
                                print("结束计算")
                                # print("开始写入最终结果")
                                return
                            # 满足条件时,删除meet关系数据
                            if like_num in triggering_condition_list:
                                del_sql = "DELETE FROM community.user_relation WHERE user_id={} OR to_user_id={};".format(
                                    userid_list[0], userid_list[0])
                                print(f"共删除{mysql.delect(del_sql)}条数据")
    
                    except Exception as e:
                        raise e
    
    
    if __name__ == '__main__':
        pool = Pool(6)
        pool.apply_async(batch_like)
        pool.close()
        pool.join()
        # a = query_redis_exposureCount()
        # print(a)

  • 相关阅读:
    移动端-纯css隐藏滚动条解决方案
    阻止点击穿透
    JS的防抖与节流
    go 自动安装项目依赖包
    git 修改远程仓库
    git 基础命令
    go 包govalidator
    go email
    windows下Redis的安装和使用
    go xorm,负载均衡
  • 原文地址:https://www.cnblogs.com/xiamaojjie/p/14291952.html
Copyright © 2011-2022 走看看