zoukankan      html  css  js  c++  java
  • 91家纺网本地版本定稿,thread版本

    import re
    from odps import ODPS
    from threading import Thread
    import threading
    from urllib import parse
    from datetime import datetime

    import random 
    import requests
    import time

    from scrapy import Selector
    from models import *

    store_list_urls = []
    product_list_urls = []

    domain = "http://www.91jf.com/"
    store_domain = "http://www.91jf.com/default.php?act=corp&sort=list&page="
    store_url_domain = 'http://www.91jf.com/default.php?act=store_goods&storeid=' # 用于拼接商户id和url
    category_url = "http://www.91jf.com/default.php?act=categorygoodslist&category_id=" # 用来拼接商品的url
    stor_url_aptitude = 'http://www.91jf.com/default.php?act=corpcert&id=' # 用于拼接商户资质的url

    def get_nodes_json():
        left_menu_text = requests.get("http://www.91jf.com/").text
        #write_txt(left_menu_text)
        #etree.HTML(res0.text)
        sel = Selector(text=left_menu_text)
        all_divs = sel.xpath("//div[@class='class_child_li']//a[@href]").extract()
        if all_divs:
            nodes_lists = []
            for i in range(len(all_divs)):
                nodes_str = all_divs[i]
                nodes_str = nodes_str.replace("&","&") # 此处&由于被转义成&导致需要重新进行处理
                nodes_lists.append(nodes_str)
            return nodes_lists
        return []

    # 获取一级目录数据,保存商品系列ID,用来拼接爬虫入口的url
    def process_nodes_list(url):    
        menu_text = requests.get(url).text
        sel = Selector(text=menu_text)
        nodes_list = sel.xpath("//div[@class='index_g_class']/ul/li")
        for item in nodes_list:
            title = item.xpath("./div[@class='class_menu']/span/text()").extract()
            title = ''.join(title)
            #主目录的名称
            catalogue_name = title
            catalogue = Catalogue()
            catalogue.catalogue_name = catalogue_name # 系列名称
            catalogue.series_level = 0 # 系列等级
            catalogue_id_0 = 0 # 系列catalogue_id
            catalogue.category_id = catalogue_id_0 # 系列catalogue_id
            catalogue.create_time = datetime.now() # 抓取时间
            existed_id = Catalogue.select().where((Catalogue.catalogue_name==catalogue_name) & (Catalogue.category_id == catalogue_id_0))
            if existed_id:
                #catalogue.save()
                pass  
            else:
                catalogue.save(force_insert=True)
                #print("插入商品目录成功")  

            _id = Catalogue.get(Catalogue.catalogue_name==title)._id # 此处获取父节点的id
            series_names = item.xpath('.//div[@class="class_child_li"]//li')
            for series_name in series_names:
                
                catalogue_0 = Catalogue()
                catalogue_0.catalogue_name = title # 系列名称
                catalogue_0.series_level = 0 # 系列等级

                series_name_0 =  series_name.xpath('.//span/text()').extract()
                series_name_0 = ''.join(series_name_0)
                
                category_id = series_name.xpath(".//a[@href]").extract()
                category_id = ''.join(category_id)
                category_id = re.search('d.?d',category_id).group()
                
                catalogue_0.category_id = category_id # 次级产品系列ID
                catalogue_0.catalogue_name = series_name_0 # 次级产品系列的名称
                catalogue_0.catalogue_level = 2 # 次级产品系列的等级
                catalogue_0.father_id = _id # 父节点的ID
                catalogue_0.create_time = datetime.now() # 抓取时间
                
                existed_id = Catalogue.select().where((Catalogue.catalogue_name==series_name_0) & (Catalogue.category_id == category_id))
                if existed_id:
                    #catalogue_0.save()
                    pass
                else:
                    catalogue_0.save(force_insert=True)  

    #根据catalogue存储的数据来获取category_id拼接商品最外层的url链接
    def get_catalogue_url():
        url_list = []
        #catalogue = Catalogue()
        id_data = Catalogue.select().where(Catalogue.catalogue_level==2)
        for item in id_data:
            url = category_url + str(item.category_id) + "&okey=salenum&order=desc"
            url_list.append(url)
        #id_data = Catalogue.get(Catalogue.series_level_0==1).category_id
        return url_list

    #获取商品的信息
    def parse_product_data(url):
        res_text = requests.get(url).text 
        sel = Selector(text=res_text)
        res_li = sel.xpath("//div[@class='pro_list_div g-clearfix c']/ul//li[@class='goods_offset']")
        flag_num = 0
        goods_list = []
        for item in res_li:
            product_id = item.xpath('./div[contains(@class,"pro_pic_box")]/a[@href]').extract() # 产品ID
            product_id = re.search('id=.*d"',''.join(product_id))
            product_id = product_id.group().replace("id=","")
            product_id = product_id.replace(""","")
            product_id = int(product_id)

            name = item.xpath("./div[@class='row row-2 title']/a/text()").extract() # 产品名字
            name = ''.join(name)
            price = item.xpath('./div[@id="goods_detail_b"]/div[@class="row row-1"]/div[@class="g_price fm2"]/strong/text()').extract() # 显示价格
            price = ''.join(price)
            try:
                price = float(price)
            except:
                print("价格会员可见|价格请咨询商家")
                continue
            sales_num = item.xpath("./div[@id='goods_detail_b']/div[2]/p[1]/text()").extract()  # 销售数量
            sales_num= ''.join(sales_num)
            sales_num = sales_num.split('销量:')[1]
            sales_num = int(sales_num)
            flag_num = sales_num
            if sales_num < 1:
                continue
            
            store_id = item.xpath("./div[@class='row row-3 c']/a[@href]").extract()
            store_id = re.search('id=.*d"',''.join(store_id))
            store_id = store_id.group().replace("id=","")
            store_id = store_id.replace(""","")
            store_id = int(store_id)

            #merchant = item.xpath("./div[@id='goods_detail_b']/div[2]/p[2]/text()").extract() # 商家
            #merchant = ''.join(merchant)

            main_Products = item.xpath("./div[@id='goods_detail_b']/div[2]/p[3]/text()").extract() # 主营
            main_Products = ''.join(main_Products)

            #merchant_Place = item.xpath("./div[@id='goods_detail_b']/div[2]/p[4]/text()").extract() # 地址
            #merchant_Place = ''.join(merchant_Place)

            product = Product()
            product.product_id = product_id
            product.name = name
            product.price = price
            product.sales_num = sales_num
            product.store_id = store_id

            create_time = datetime.now()
            product.create_time = create_time

            product_data = product # 存储单条商品信息
            goods_list.append(product_data)

        data = Spider_91JIAFAN() # 创建对象
        data.up_product_to_odps(goods_list)
        
        next_page = sel.xpath("//*[@class='pagination2']/a[@href]").extract()
        if len(next_page) > 2 and flag_num > 0:
            url_next = re.search('".*d"',next_page[-1])
            url_next = url_next.group().replace("&amp;","&") # 此处&由于被转义成&amp;导致需要重新进行处理
            url_next = url_next.replace(""","")
            url_next = parse.urljoin(domain,url_next)
            #print(url_next)
            parse_product_data(url_next)
        else:
            pass

    #获取商品链接,上一级url为商品详情页
    def parse_data_last(url):
        #store_id_list = []
        flag_num = 0
        #获取商品的详情标签
        while True:
            try:
                res_text = requests.get(url).text
            except:
                time.sleep(3)
                print('间隔休眠时间,再次处理')
            else:
                break
        sel = Selector(text=res_text)
        res_li = sel.xpath("//div[@class='pro_list_div g-clearfix c']/ul//li[@class='goods_offset']")
        for item in res_li:
            sales_num = item.xpath("./div[@id='goods_detail_b']/div[2]/p[1]/text()").extract() # 销售数量
            sales_num= ''.join(sales_num)
            sales_num = sales_num.split('销量:')[1]
            sales_num = int(sales_num)
            flag_num = int(sales_num)

            data = item.xpath("./div[@class='pro_pic_box']/a").extract()
            data = re.search('".*d"',data[0])
            data = data.group().replace("&amp;","&")
            data = data.replace(""","")
            data_url = parse.urljoin(domain,data) # 链接为销量排序之后的单个商品链接,传出链接
            #print("开始获取商品:{}".format(data_url))

            if sales_num > 0:
                r.lpush('91jiafan:catalogue_url',data_url) # 此处存储商品的url,判断条件为销售数量大于0

        #此处代码用来切到下一页链接数据,商品的详情排布页
        next_page = sel.xpath("//*[@class='pagination2']/a[@href]").extract()
        if len(next_page) > 2 and flag_num > 0:
            url_next = re.search('".*d"',next_page[-1])
            url_next = url_next.group().replace("&amp;","&") # 此处&由于被转义成&amp;导致需要重新进行处理
            url_next = url_next.replace(""","") 
            url_next = parse.urljoin(domain,url_next)
            parse_data_last(url_next)  


    #获取商品描述
    def parse_product_attributes(url):
        #获取商品的详情以及销售数量
        product_id = url.split('id=')[1] # 对商品id进行切片处理,用来获取ajax数据
        res_text = requests.get(url).text
        sel = Selector(text=res_text)
        #筛选规则,当is_price之后的value属性值为0的时候,说明不需要咨询商家,同时需要注意的是,商品会有打折批次数量的差异导致价格差异,
        #这一点需要根据具体的显示页面来处理,现在忽略,由于可能存在打折段的数据差异,所以暂时不考虑
        Is_price = sel.xpath("//input[contains(@id,'is_price')]").extract()#取到的数据用来判断价格是否需要咨询商家
        if len(Is_price) < 1:
            print("页面数据为空")
        else:    
            is_value = re.search('d',Is_price[0])
            if is_value.group() == '0': # 0表示商品价格不需要咨询商户
                datas = sel.xpath("//div[contains(@class,'show_all')]/table[1]//tr")
                price_base = 0.0
                if datas:
                    #price_base 商品基准价格
                    for item in range(len(datas)):
                        price = datas[item].xpath("./input[last()-1]").extract()
                        price = re.search('value=".*"',price[0])
                        price = re.search('d.*d',price[0])
                        price = price.group()
                        price_base = price_base + float(price)
                    price_base = price_base  / len(datas) # 商品基准价格计算
                else:
                    price_base = sel.xpath("//span[@class='price_num fl']/text()").extract()[1]
                    price_base = price_base.replace(" ","")
                    price_base = float(price_base)
                    #print(type(price_base))
                    #print(price_base)
                #此处获取商品的描述信息
                attributes_list = sel.xpath("//span[contains(@class,'attributes-list')]//li/text()").extract()
                str_attributes = ' '.join(attributes_list)
                str_attributes = str_attributes.replace("&nbsp;"," ") # 商品信息描述
                #此处发送请求获取商品购买数据
                url_sales = parse.urljoin(domain,'default.php?act=evallist')
                data = {
                    'id': product_id,
                    'page': '0',
                    'info_type': 'sale'
                }
                response = requests.post(url_sales, data=data)
                buyer_num = response.json().get("member") # 购买人数
                sale_num = response.json().get('num') # 销售数量
                buyer_rate = response.json().get('re_buyer_rate') # 商品复购率
                product_id = int(product_id) # 此处对商品ID进行转换

                product_attributes = Product_attributes()
                product_attributes.product_id = product_id
                product_attributes.price_base = price_base
                product_attributes.attributes = str_attributes
                product_attributes.buyer_num = buyer_num
                product_attributes.sale_num = sale_num
                product_attributes.buyer_rate = buyer_rate
                product_attributes.create_time = datetime.now()
                
                data_0 = Spider_91JIAFAN() # 创建对象
                data_attributes = [product_attributes]
                data_0.up_product_attributes_to_odps(data_attributes)

            else :
                price = "价格请咨询商家"

    #获取商户详细数据,处理逻辑为根据单个商品目录来获取对应的商户id
    def parse_store_id(url):
        #print(url) # 打印当前商品页的url用来定位
        res_text = requests.get(url).text
        sel = Selector(text=res_text)
        store_id = 0
        #筛选规则,当is_price之后的value属性值为0的时候,说明不需要咨询商家,同时需要注意的是,商品会有打折批次数量的差异导致价格差异,
        #这一点需要根据具体的显示页面来处理,现在忽略,由于可能存在打折段的数据差异,所以暂时不考虑
        Is_price = sel.xpath("//input[contains(@id,'is_price')]").extract()#取到的数据用来判断价格是否需要咨询商家
        if len(Is_price) < 1:
            print("页面数据为空")
        else:    
            is_value = re.search('d',Is_price[0])
            if is_value.group() == '0': # 0表示商品价格不需要咨询商户
                store_id = sel.xpath('//span[@class="container_title_span"]/a[@href]').extract()
                store_id = ''.join(store_id)
                store_id = re.search('storeid=d*"',store_id)
                store_id = store_id.group()
                store_id = store_id.split('storeid=')[1]
                store_id = store_id.replace(""","")
                store_id = int(store_id) # 商户的id
            else :
                pass
        return store_id

    #根据store_id拼接的url用来抓取商户的数据
    def parse_store_data(url):
        res_text = requests.get(url).text
        sel = Selector(text=res_text)
        if len(res_text) > 10:
            store_name = sel.xpath('//span[contains(@class,"container_title_span")]/a[@href]/text()').extract()
            store_name = ''.join(store_name) # 商户的名字
            
            store_id = sel.xpath('//span[@class="container_title_span"]/a[@href]').extract()
            store_id = ''.join(store_id)
            store_id = re.search('storeid=d*"',store_id)
            store_id = store_id.group()
            store_id = store_id.split('storeid=')[1]
            store_id = store_id.replace(""","")
            store_id = int(store_id) # 商户的id

            store_level = ''
            store_place = ''
            store_describe = ''
            store_supply = ''
            store_service = ''

            store_data = sel.xpath('//ul[contains(@class,"gy_info_list")]/li/text()').extract()
            if len(store_data) > 3:
                store_level = store_data[2] # 商户等级
                store_level = store_level.replace(" ","")
                store_level = store_level.replace(" ","")
                store_place = store_data[3] # 商户地址
                store_place = store_place.replace(" ","")

            store_aptitude = stor_url_aptitude + str(store_id) # 商户的资质

            temp_datas = sel.xpath('//li[contains(@class,"evaluate")]//div[@style]//text()').extract()
            if len(temp_datas) == 6:
                store_describe = temp_datas[0] + ':' + temp_datas[1] # 商户描述
                store_supply = temp_datas[2] + ':' + temp_datas[3] # 商户供货
                store_service = temp_datas[4] + ':' + temp_datas[5] # 商户服务
            
            store = Store()
            store.store_id = store_id
            store.store_name = store_name
            store.store_level = store_level
            store.store_place = store_place
            store.store_aptitude = store_aptitude
            store.store_describe = store_describe
            store.store_supply = store_supply
            store.store_service = store_service
            store.create_time = datetime.now()

            data_0 = Spider_91JIAFAN() # 创建对象
            data_store = [store]
            data_0.up_store_to_odps(data_store)

    class ParseproductThread(Thread):
        def run(self):
            while(1):
                try:
                    data = r.lpop('91jiafan:catalogue_url')
                    #print("开始处理商品:{}".format(data))
                    parse_product_attributes(data)
                    store_id = parse_store_id(data)
                    store_id_url = store_url_domain + str(store_id)
                    r.lpush('91jiafan:store_id_url',store_id_url)
                except:
                    time.sleep(120)
                    print("data is null")

    class Parse_storedata_Thread(Thread):
        def run(self):
            while(1):
                try:
                    data = r.lpop('91jiafan:store_id_url')
                    #print("开始处理商户:{}".format(data))
                    parse_store_data(data)
                except:
                    time.sleep(120)
                    print("data is null")


    class parse_91_url_Thread(Thread):
        def run(self):
            #获取最终需要抓取的url
            url_list = get_catalogue_url()
            for url in url_list:
                parse_data_last(url)

    class parse_91_productdata_Thread(Thread):     
        def run(self):
            #提取商品列表页的数据
            url_list = get_catalogue_url()
            for url in url_list:
                parse_product_data(url)

    class Spider_91JIAFAN():
        def __init__(self):
            self.o = ODPS(
                access_id=ODPS_ID,
                secret_access_key=ODPS_KEY,
                project=ODPS_PROJECT,
                endpoint=ODPS_ENDPOINT
                )
            self.table_name_0 = '91_product'
            self.t0 = self.o.get_table(self.table_name_0)  # 获取odps_91jiafan商品信息
            self.table_name_1 = '91_product_attributes'
            self.t1 = self.o.get_table(self.table_name_1)  # 获取odps_91jiafan商品描述信息
            self.table_name_2 = '91_store'
            self.t2 = self.o.get_table(self.table_name_2)  # 获取odps_91jiafan商户信息

        #上传商品信息
        def up_product_to_odps(self, goods_list):
            """
            上传商品至ODPS表
            :return:
            """
            part_col = datetime.strftime(datetime.now(), '%Y%m%d')
            record = []
            for goods in goods_list:
                record.append([
                    goods.product_id,
                    goods.name,
                    goods.price,
                    goods.sales_num,
                    goods.store_id,
                    goods.create_time
                ])
            while True:
                try:
                    with self.t0.open_writer(partition=f'partition_date={part_col}', create_partition=True) as writer:
                        writer.write(record)
                        print(f'{self.table_name_0} - 成功上传{len(record)}条数据', str(datetime.now()))
                except Exception as e:
                    print(e.args)
                    print(f'{self.table_name_0} - 上传ODPS失败,正在重试...')
                    continue
                else:
                    break

        #上传商品描述信息            
        def up_product_attributes_to_odps(self, goods_list):
                """
                上传商品至ODPS表
                :return:
                """
                part_col = datetime.strftime(datetime.now(), '%Y%m%d')
                record = []
                for goods in goods_list:
                    record.append([
                        goods.product_id,
                        goods.price_base,
                        goods.attributes,
                        goods.buyer_num,
                        goods.sale_num,
                        goods.buyer_rate,
                        goods.create_time
                    ])
                while True:
                    try:
                        with self.t1.open_writer(partition=f'partition_date={part_col}', create_partition=True) as writer:
                            writer.write(record)
                            print(f'{self.table_name_1} - 成功上传{len(record)}条数据', str(datetime.now()))
                    except Exception as e:
                        print(e.args)
                        print(f'{self.table_name_1} - 上传ODPS失败,正在重试...')
                        continue
                    else:
                        break

        #上传商户信息            
        def up_store_to_odps(self, stores_list):
                """
                上传商品至ODPS表
                :return:
                """
                part_col = datetime.strftime(datetime.now(), '%Y%m%d')
                record = []
                for stores in stores_list:
                    record.append([
                        stores.store_id,
                        stores.store_name,
                        stores.store_level,
                        stores.store_place,
                        stores.store_aptitude,
                        stores.store_describe,
                        stores.store_supply,
                        stores.store_service,
                        stores.create_time
                    ])
                while True:
                    try:
                        with self.t2.open_writer(partition=f'partition_date={part_col}', create_partition=True) as writer:
                            writer.write(record)
                            print(f'{self.table_name_2} - 成功上传{len(record)}条数据', str(datetime.now()))
                    except Exception as e:
                        print(e.args)
                        print(f'{self.table_name_2} - 上传ODPS失败,正在重试...')
                        continue
                    else:
                        break

    if __name__ == "__main__":
        #start_time = datetime.now()
        process_nodes_list(domain)
        parse_91_url_thread = parse_91_url_Thread()
        parse_91_productdata_thread = parse_91_productdata_Thread()
        
        parse_91_url_thread.start()
        parse_91_productdata_thread.start()
        
        
        for i in range(10):
            parse_product_thread = ParseproductThread()
            parse_product_thread.start()

        
        for i in range(8):
            parse_storedata_thread = Parse_storedata_Thread()
            parse_storedata_thread.start()
        
        #end_time = datetime.now()
        #print("一共使用时间:",end_time - start_time)
        
  • 相关阅读:
    Winform程序及dll打包成一个可执行的exe
    DotfuscatorPro防止反编译&ILSpy反编译
    C# 7-zip 压缩和解压缩
    ASP.NET MVC使用JWT代替session,实现单点登陆
    C#动态实体集的反序列化(动态JSON反序列化)
    FTP服务安装及使用
    未能加载文件或程序集“Microsoft.Web.Infrastructure, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35”或它的某一个依赖项。系统找不到指定的文件。
    项目在服务上运行出现中文水印乱码问题解决(第二篇)
    图片加水印信息处理及中文乱码解决
    MongoDB 索引
  • 原文地址:https://www.cnblogs.com/dog-and-cat/p/13355213.html
Copyright © 2011-2022 走看看