zoukankan      html  css  js  c++  java
  • ES全量索引校验-python

    import unittest
    import json
    import requests
    import ddt
    from pymysql import connect
     
     
    def getMySqlData():
        try:
            db = connect(host="xx.xx.xx.xx", user="xxx", password="xxx", db="gb_goods", port=3306,
                         charset="utf8")
            # db = connect(host="localhost", user="root", password="root", db="test", port=3306, charset="utf8")
            cur = db.cursor()
            # sql需替换测试环境的sql
            sql = "SELECT gie.good_sn,gie.v_wh_code AS goodsSn FROM goods_info_extend_s_7   gie WHERE gie.goods_status IN (2,4,5) AND gie.platform IN (1,2,4) AND gie.site_code = 'GB' GROUP BY gie.good_sn, gie.v_wh_code;"
            # sql = "select goods_sn from kp_goods order by goods_sn"
            # 分表轮询
            # sql_1 = "select goods_sn from price_%s where ..."
            # for table_num in range(10):
            #     sql_2 = sql_1 % table_num
     
            cur.execute(sql)
            result = cur.fetchall()
            cur.close()
            db.close()
        except Exception as e:
            print(e)
        return result
     
     
    @ddt.ddt
    class MyTest(unittest.TestCase):
     
        @ddt.data(*getMySqlData())
        def test_gb(self, data):
            goodsSn = data[0]
            whCode = data[1]
            goodsId = f"{goodsSn}#{whCode}"
            print(goodsId)
            url_es = "http://10.4.4.80:9200/gearbest20200212201449/sku/_search"
            url_ai = "http://10.4.4.80:9200/GB_daily_full/sku/_search"
            headers = {"Content-Type": "application/json"}
            params = {"query": {"term": {"goodsId": {"value": goodsId}}}}
            # params = {"query": {"term": {"goodsSn": {"value": goodsSn}}}}
     
            # 调用内部的方法时,方法前面加 self.
            es_goodsInfo, goodsId_es = self.gb_index(url_es, params, headers)
            ai_goodsInfo, goodsId_ai = self.gb_index(url_ai, params, headers)
     
            # 判断两个字典数据是否一致
            if es_goodsInfo != ai_goodsInfo:
                self.writeGoodsSn(goodsId_es)
                # print("数据一致")
     
        # 数据写入txt方法
        def writeGoodsSn(self, goodsId):
     
            file = r"D:goodsId_GB.txt"
            with open(file, "a+") as f:
                f.write(goodsId + "
    ")
            print("数据写入成功...")
     
        # 内嵌对象排序方法
        def func_sort(self, jsonArray, list, sort_key1, sort_key2):
            # jsonArray 排序,先按sort_key1排序,再按sort_key2排序
            jsonArray.sort(key=lambda x: (x[sort_key1], x[sort_key2]))
            # 字典类型转换为 JSON 对象,再将 JSON 对象类型转换为 Python 字典
            data = json.loads(json.dumps(jsonArray))
            # 将字典以指定key的顺序写入list
            newList = []
            for info in data:
                dict = {}
                for i in range(len(list)):
                    dict[list[i]] = info[list[i]]
                newList.append(dict)
            return newList
     
        # 请求索引数据
        def gb_index(self, url, params, headers):
            # data:字典对象    json: json字符串
            r = requests.post(url, data=json.dumps(params), headers=headers)
            print("响应对象:", r.json())
     
            r_json = r.json()  # 返回字典类型,可以通过键名获取响应的值
            if len(r_json["hits"]["hits"]) > 0:
                # 定义一个字典,存储sku对应的各字段数据
                goods_info = {}
                # GB索引字段
                goods_info["week2SalesVolume"] = r_json["hits"]["hits"][0]["_source"]["week2SalesVolume"]
                goods_info["payEndTime"] = r_json["hits"]["hits"][0]["_source"]["payEndTime"]
                goods_info["appSwellAmount"] = r_json["hits"]["hits"][0]["_source"]["appSwellAmount"]
                goods_info["appPriceType"] = r_json["hits"]["hits"][0]["_source"]["appPriceType"]
                goods_info["goodsId"] = r_json["hits"]["hits"][0]["_source"]["goodsId"]
                goods_info["youtube"] = r_json["hits"]["hits"][0]["_source"]["youtube"]
                # skuAttrs
                # skuAttrs = r_json["hits"]["hits"][0]["_source"]["skuAttrs"]
                # if len(skuAttrs) > 0:
                #     list = ["attrValueKey", "attrValue", "attrKey", "attrType", "attrName"]
                #     skuAttrs_list = self.func_sort(skuAttrs, list, list[3], list[0])
                #     goods_info["skuAttrs"] = skuAttrs_list
                # else:
                #     goods_info["skuAttrs"] = skuAttrs
     
                goods_info["week2Sales"] = r_json["hits"]["hits"][0]["_source"]["week2Sales"]
                goods_info["shopPrice"] = r_json["hits"]["hits"][0]["_source"]["shopPrice"]
                # grossMargin 暂写固定值
                goods_info["grossMargin"] = "0"
                goods_info["baseScore2"] = r_json["hits"]["hits"][0]["_source"]["baseScore2"]
                goods_info["subTitle"] = r_json["hits"]["hits"][0]["_source"]["subTitle"]
                goods_info["exposureFlag"] = r_json["hits"]["hits"][0]["_source"]["exposureFlag"]
                goods_info["originalUrl"] = r_json["hits"]["hits"][0]["_source"]["originalUrl"]
                goods_info["discount"] = r_json["hits"]["hits"][0]["_source"]["discount"]
                goods_info["whCode"] = r_json["hits"]["hits"][0]["_source"]["whCode"]
                goods_info["baseScore5"] = r_json["hits"]["hits"][0]["_source"]["baseScore5"]
                goods_info["payStartTime"] = r_json["hits"]["hits"][0]["_source"]["payStartTime"]
                goods_info["recommendedLevel"] = r_json["hits"]["hits"][0]["_source"]["recommendedLevel"]
                goods_info["thumbExtendUrl"] = r_json["hits"]["hits"][0]["_source"]["thumbExtendUrl"]
                goods_info["appStatus"] = r_json["hits"]["hits"][0]["_source"]["appStatus"]
                goods_info["passAvgScore"] = r_json["hits"]["hits"][0]["_source"]["passAvgScore"]
                goods_info["stockFlag"] = r_json["hits"]["hits"][0]["_source"]["stockFlag"]
                goods_info["brandName"] = r_json["hits"]["hits"][0]["_source"]["brandName"]
                goods_info["appDisplayPrice"] = r_json["hits"]["hits"][0]["_source"]["appDisplayPrice"]
                goods_info["centerWord"] = r_json["hits"]["hits"][0]["_source"]["centerWord"]
                goods_info["goodsTitle"] = r_json["hits"]["hits"][0]["_source"]["goodsTitle"]
                goods_info["brandCode"] = r_json["hits"]["hits"][0]["_source"]["brandCode"]
                goods_info["appDeposit"] = r_json["hits"]["hits"][0]["_source"]["appDeposit"]
                goods_info["vWhCode"] = r_json["hits"]["hits"][0]["_source"]["vWhCode"]
                goods_info["catId"] = r_json["hits"]["hits"][0]["_source"]["catId"]
                goods_info["priceRates"] = r_json["hits"]["hits"][0]["_source"]["priceRates"]
                # skuDescAttrs
                # skuDescAttrs = r_json["hits"]["hits"][0]["_source"]["skuDescAttrs"]
                # if len(skuDescAttrs) > 0:
                #     list = ["attrValueKey", "attrValue", "attrKey", "attrName"]
                #     skuDescAttrs_list = self.func_sort(skuDescAttrs, list, list[2], list[0])
                #     goods_info["skuDescAttrs"] = skuDescAttrs_list
                # else:
                #     goods_info["skuDescAttrs"] = skuDescAttrs
     
                goods_info["totalFavoriteCount"] = r_json["hits"]["hits"][0]["_source"]["totalFavoriteCount"]
                goods_info["baseScore4"] = r_json["hits"]["hits"][0]["_source"]["baseScore4"]
                goods_info["appPayEndTime"] = r_json["hits"]["hits"][0]["_source"]["appPayEndTime"]
                goods_info["urlTitle"] = r_json["hits"]["hits"][0]["_source"]["urlTitle"]
     
                # categories
                categories = r_json["hits"]["hits"][0]["_source"]["categories"]
                if len(categories) > 0:
                    list = ["level", "catId", "catName", "isDefault"]
                    categories_list = self.func_sort(categories, list, list[1], list[3])
                    goods_info["categories"] = categories_list
                else:
                    goods_info["categories"] = categories
     
                goods_info["firstUpTime"] = r_json["hits"]["hits"][0]["_source"]["firstUpTime"]
     
                # labelFlags
                # labelFlags = r_json["hits"]["hits"][0]["_source"]["labelFlags"]
                # if len(labelFlags) > 0:
                #     list = ["platform","type","labelId"]
                #     sort_key = "labelId"
                #     labelFlags_list = self.func_sort(labelFlags,list,sort_key)
                #     goods_info["labelFlags"] = labelFlags_list
                # else:
                #     goods_info["labelFlags"] = labelFlags
     
                goods_info["appPayStartTime"] = r_json["hits"]["hits"][0]["_source"]["appPayStartTime"]
                goods_info["expiredTime"] = r_json["hits"]["hits"][0]["_source"]["expiredTime"]
                goods_info["exposureSalesVolume"] = r_json["hits"]["hits"][0]["_source"]["exposureSalesVolume"]
                goods_info["appDiscount"] = r_json["hits"]["hits"][0]["_source"]["appDiscount"]
                goods_info["baseScore1"] = r_json["hits"]["hits"][0]["_source"]["baseScore1"]
                # coupons
                # coupons = r_json["hits"]["hits"][0]["_source"]["coupons"]
                # if len(coupons) > 0:
                #     list=["code","platforms"]
                #     sort_key = "code"
                #     coupons_list = self.func_sort(coupons,list,sort_key)
                #     goods_info["coupons"] = list
                # else:
                #     goods_info["coupons"] = coupons
     
                goods_info["lang"] = r_json["hits"]["hits"][0]["_source"]["lang"]
                goods_info["isCod"] = r_json["hits"]["hits"][0]["_source"]["isCod"]
                goods_info["searchWords"] = r_json["hits"]["hits"][0]["_source"]["searchWords"]
                # defaultWh 暂写固定值
                goods_info["defaultWh"] = "0"
                goods_info["mStatus"] = r_json["hits"]["hits"][0]["_source"]["mStatus"]
                goods_info["dailyRate"] = r_json["hits"]["hits"][0]["_source"]["dailyRate"]
                goods_info["exposureSalesRate"] = r_json["hits"]["hits"][0]["_source"]["exposureSalesRate"]
                goods_info["sortOrder"] = r_json["hits"]["hits"][0]["_source"]["sortOrder"]
     
                shopGroups = r_json["hits"]["hits"][0]["_source"]["shopGroups"]
                if len(shopGroups) > 0:
                    list = ["groupId", "level", "groupName", "path"]
                    shopGroups_list = self.func_sort(shopGroups, list, list[1], list[0])
                    goods_info["shopGroups"] = shopGroups_list
                else:
                    goods_info["shopGroups"] = shopGroups
     
                goods_info["passTotalNum"] = r_json["hits"]["hits"][0]["_source"]["passTotalNum"]
                goods_info["shopCode"] = r_json["hits"]["hits"][0]["_source"]["shopCode"]
                goods_info["appExpiredTime"] = r_json["hits"]["hits"][0]["_source"]["appExpiredTime"]
                goods_info["isTort"] = r_json["hits"]["hits"][0]["_source"]["isTort"]
                goods_info["baseScore3"] = r_json["hits"]["hits"][0]["_source"]["baseScore3"]
                goods_info["priceType"] = r_json["hits"]["hits"][0]["_source"]["priceType"]
                goods_info["createTime"] = r_json["hits"]["hits"][0]["_source"]["createTime"]
                goods_info["swellAmount"] = r_json["hits"]["hits"][0]["_source"]["swellAmount"]
                goods_info["saleMark"] = r_json["hits"]["hits"][0]["_source"]["saleMark"]
                goods_info["deposit"] = r_json["hits"]["hits"][0]["_source"]["deposit"]
                goods_info["goodsModelWord"] = r_json["hits"]["hits"][0]["_source"]["goodsModelWord"]
                # appDefaultWh 暂写固定值
                goods_info["appDefaultWh"] = "0"
                goods_info["goodsWebSku"] = r_json["hits"]["hits"][0]["_source"]["goodsWebSku"]
                goods_info["imgExtendUrl"] = r_json["hits"]["hits"][0]["_source"]["imgExtendUrl"]
                # activityIds
                # goods_info["activityIds"] = r_json["hits"]["hits"][0]["_source"]["activityIds"]
                goods_info["displayPrice"] = r_json["hits"]["hits"][0]["_source"]["displayPrice"]
                goods_info["goodsWebSpu"] = r_json["hits"]["hits"][0]["_source"]["goodsWebSpu"]
     
                goods_info["isPlatform"] = r_json["hits"]["hits"][0]["_source"]["isPlatform"]
                goods_info["webStatus"] = r_json["hits"]["hits"][0]["_source"]["webStatus"]
                goods_info["goodsSn"] = r_json["hits"]["hits"][0]["_source"]["goodsSn"]
                goods_info["imgUrl"] = r_json["hits"]["hits"][0]["_source"]["imgUrl"]
                # tags
                # goods_info["tags"] = r_json["hits"]["hits"][0]["_source"]["tags"]
                yesterdaySales = r_json["hits"]["hits"][0]["_source"]["yesterdaySales"]
                if yesterdaySales > 0:
                    goods_info["yesterdaySales"] = yesterdaySales
                else:
                    goods_info["yesterdaySales"] = 0
     
                    # activities
                # activities = r_json["hits"]["hits"][0]["_source"]["activities"]
                # if len(activities) > 0:
                #     list = ["activityId","activityType"]
                #     sort_key = "activityId"
                #     activities_list = self.func_sort(activities,list,sort_key)
                #     goods_info["activities"] = activities_list
                # else:
                #     goods_info["activities"] = r_json["hits"]["hits"][0]["_source"]["activities"]
     
                # 邮件专享价数据
                goods_info["mailPrice"] = r_json["hits"]["hits"][0]["_source"]["mailPrice"]
                goods_info["appMailPriceDiscount"] = r_json["hits"]["hits"][0]["_source"]["appMailPriceDiscount"]
                goods_info["appMailPriceActive"] = r_json["hits"]["hits"][0]["_source"]["appMailPriceActive"]
                goods_info["mailPriceActive"] = r_json["hits"]["hits"][0]["_source"]["mailPriceActive"]
                goods_info["mailPriceCipherText"] = r_json["hits"]["hits"][0]["_source"]["mailPriceCipherText"]
                goods_info["mailPriceDiscount"] = r_json["hits"]["hits"][0]["_source"]["mailPriceDiscount"]
     
                goodsId = r_json["hits"]["hits"][0]["_source"]["goodsId"]
                print("goods_info:", goods_info)
                return goods_info, goodsId
            else:
                print("无sku数据")
                return 0, 0
     
     
    if __name__ == '__main__':
        unittest.main()
  • 相关阅读:
    Scrum:The Definition of Done —— 作业有没有写完呢?
    中兴通讯 可视化devops 牛啊 屠亚奇
    qunar-dns
    通过业务系统的重构实践DDD
    通过业务系统的重构实践DDD
    一键部署Kubernetes高可用集群
    springboot系列
    Ubuntu · Docker —— 从入门到实践
    容器化操作系统概览
    基于 CentOS7 的 Kubernetes 集群
  • 原文地址:https://www.cnblogs.com/wakey/p/12689398.html
Copyright © 2011-2022 走看看