import re
from threading import Thread
import threading
from urllib import parse
from datetime import datetime
import random
import requests
import time
from scrapy import Selector
from models import *
store_list_urls = []
product_list_urls = []
domain = "http://www.91jf.com/"
store_domain = "http://www.91jf.com/default.php?act=corp&sort=list&page="
store_url_domain = 'http://www.91jf.com/default.php?act=store_goods&storeid=' # 用于拼接商户id和url
category_url = "http://www.91jf.com/default.php?act=categorygoodslist&category_id=" # 用来拼接商品的url
stor_url_aptitude = 'http://www.91jf.com/default.php?act=corpcert&id=' # 用于拼接商户资质的url
def get_nodes_json():
left_menu_text = requests.get("http://www.91jf.com/").text
#write_txt(left_menu_text)
#etree.HTML(res0.text)
sel = Selector(text=left_menu_text)
all_divs = sel.xpath("//div[@class='class_child_li']//a[@href]").extract()
if all_divs:
nodes_lists = []
for i in range(len(all_divs)):
nodes_str = all_divs[i]
nodes_str = nodes_str.replace("&","&") # 此处&由于被转义成&导致需要重新进行处理
nodes_lists.append(nodes_str)
return nodes_lists
return []
# 获取一级目录数据,保存商品系列ID,用来拼接爬虫入口的url
def process_nodes_list(url):
menu_text = requests.get(url).text
sel = Selector(text=menu_text)
nodes_list = sel.xpath("//div[@class='index_g_class']/ul/li")
for item in nodes_list:
title = item.xpath("./div[@class='class_menu']/span/text()").extract()
title = ''.join(title)
#主目录的名称
catalogue_name = title
catalogue = Catalogue()
catalogue.catalogue_name = catalogue_name # 系列名称
catalogue.series_level = 0 # 系列等级
catalogue_id_0 = 0 # 系列catalogue_id
catalogue.category_id = catalogue_id_0 # 系列catalogue_id
existed_id = Catalogue.select().where((Catalogue.catalogue_name==catalogue_name) & (Catalogue.category_id == catalogue_id_0))
if existed_id:
#catalogue.save()
pass
else:
catalogue.save(force_insert=True)
print("插入商品目录成功")
_id = Catalogue.get(Catalogue.catalogue_name==title)._id # 此处获取父节点的id
series_names = item.xpath('.//div[@class="class_child_li"]//li')
for series_name in series_names:
catalogue_0 = Catalogue()
catalogue_0.catalogue_name = title # 系列名称
catalogue_0.series_level = 0 # 系列等级
series_name_0 = series_name.xpath('.//span/text()').extract()
series_name_0 = ''.join(series_name_0)
category_id = series_name.xpath(".//a[@href]").extract()
category_id = ''.join(category_id)
category_id = re.search('d.?d',category_id).group()
catalogue_0.category_id = category_id # 次级产品系列ID
catalogue_0.catalogue_name = series_name_0 # 次级产品系列的名称
catalogue_0.catalogue_level = 2 # 次级产品系列的等级
catalogue_0.father_id = _id # 父节点的ID
existed_id = Catalogue.select().where((Catalogue.catalogue_name==series_name_0) & (Catalogue.category_id == category_id))
if existed_id:
#catalogue_0.save()
pass
else:
catalogue_0.save(force_insert=True)
#根据catalogue存储的数据来获取category_id拼接商品最外层的url链接
def get_catalogue_url():
url_list = []
#catalogue = Catalogue()
id_data = Catalogue.select().where(Catalogue.catalogue_level==2)
for item in id_data:
url = category_url + str(item.category_id) + "&okey=salenum&order=desc"
url_list.append(url)
#id_data = Catalogue.get(Catalogue.series_level_0==1).category_id
return url_list
def parse_product(url):
#获取商品的详情以及销售数量
res_text = requests.get(url).text
sel = Selector(text=res_text)
res_li = sel.xpath("//div[@class='pro_list_div g-clearfix c']/ul//li[@class='goods_offset']")
flag_num = 0
for item in res_li:
product_id = item.xpath('./div[contains(@class,"pro_pic_box")]/a[@href]').extract() # 产品ID
product_id = re.search('id=.*d"',''.join(product_id))
product_id = product_id.group().replace("id=","")
product_id = product_id.replace(""","")
product_id = int(product_id)
name = item.xpath("./div[@class='row row-2 title']/a/text()").extract() # 产品名字
name = ''.join(name)
price = item.xpath('./div[@id="goods_detail_b"]/div[@class="row row-1"]/div[@class="g_price fm2"]/strong/text()').extract() # 显示价格
price = ''.join(price)
try:
price = float(price)
except:
print("价格会员可见|价格请咨询商家")
continue
sales_num = item.xpath("./div[@id='goods_detail_b']/div[2]/p[1]/text()").extract() # 销售数量
sales_num= ''.join(sales_num)
sales_num = sales_num.split('销量:')[1]
sales_num = int(sales_num)
flag_num = sales_num
if sales_num < 1:
continue
store_id = item.xpath("./div[@class='row row-3 c']/a[@href]").extract()
store_id = re.search('id=.*d"',''.join(store_id))
store_id = store_id.group().replace("id=","")
store_id = store_id.replace(""","")
store_id = int(store_id)
merchant = item.xpath("./div[@id='goods_detail_b']/div[2]/p[2]/text()").extract() # 商家
merchant = ''.join(merchant)
main_Products = item.xpath("./div[@id='goods_detail_b']/div[2]/p[3]/text()").extract() # 主营
main_Products = ''.join(main_Products)
merchant_Place = item.xpath("./div[@id='goods_detail_b']/div[2]/p[4]/text()").extract() # 地址
merchant_Place = ''.join(merchant_Place)
product = Product()
product.product_id = product_id
product.name = name
product.price = price
product.sales_num = sales_num
product.store_id = store_id
product.merchant = merchant
product.main_Products = main_Products
product.merchant_Place = merchant_Place
existed_name = Product.select().where(Product.product_id==product_id)
if existed_name:
pass
#product.save()
else:
product.save(force_insert=True)
next_page = sel.xpath("//*[@class='pagination2']/a[@href]").extract()
if len(next_page) > 2 and flag_num > 0:
url_next = re.search('".*d"',next_page[-1])
url_next = url_next.group().replace("&","&") # 此处&由于被转义成&导致需要重新进行处理
url_next = url_next.replace(""","")
url_next = parse.urljoin(domain,url_next)
#print(url_next)
parse_product(url_next)
else:
pass
#获取商品链接,上一级url为商品详情页
def parse_data_last(url):
#store_id_list = []
flag_num = 0
#获取商品的详情标签
while True:
try:
res_text = requests.get(url).text
except:
time.sleep(3)
print('间隔休眠时间,再次处理')
else:
break
sel = Selector(text=res_text)
res_li = sel.xpath("//div[@class='pro_list_div g-clearfix c']/ul//li[@class='goods_offset']")
for item in res_li:
sales_num = item.xpath("./div[@id='goods_detail_b']/div[2]/p[1]/text()").extract() # 销售数量
sales_num= ''.join(sales_num)
sales_num = sales_num.split('销量:')[1]
sales_num = int(sales_num)
flag_num = int(sales_num)
data = item.xpath("./div[@class='pro_pic_box']/a").extract()
data = re.search('".*d"',data[0])
data = data.group().replace("&","&")
data = data.replace(""","")
data_url = parse.urljoin(domain,data) # 链接为销量排序之后的单个商品链接,传出链接
print("开始获取商品:{}".format(data_url))
if sales_num > 0:
r.lpush('91jiafan:catalogue_url',data_url) # 此处存储商品的url,判断条件为销售数量大于0
#此处代码用来切到下一页链接数据,商品的详情排布页
next_page = sel.xpath("//*[@class='pagination2']/a[@href]").extract()
if len(next_page) > 2 and flag_num > 0:
url_next = re.search('".*d"',next_page[-1])
url_next = url_next.group().replace("&","&") # 此处&由于被转义成&导致需要重新进行处理
url_next = url_next.replace(""","")
url_next = parse.urljoin(domain,url_next)
parse_data_last(url_next)
#获取商品详细数据
def parse_product_data(url):
#获取商品的详情以及销售数量
product_id = url.split('id=')[1] # 对商品id进行切片处理,用来获取ajax数据
res_text = requests.get(url).text
sel = Selector(text=res_text)
#筛选规则,当is_price之后的value属性值为0的时候,说明不需要咨询商家,同时需要注意的是,商品会有打折批次数量的差异导致价格差异,
#这一点需要根据具体的显示页面来处理,现在忽略,由于可能存在打折段的数据差异,所以暂时不考虑
Is_price = sel.xpath("//input[contains(@id,'is_price')]").extract()#取到的数据用来判断价格是否需要咨询商家
if len(Is_price) < 1:
print("页面数据为空")
else:
is_value = re.search('d',Is_price[0])
if is_value.group() == '0': # 0表示商品价格不需要咨询商户
datas = sel.xpath("//div[contains(@class,'show_all')]/table[1]//tr")
price_base = 0.0
if datas:
#price_base 商品基准价格
for item in range(len(datas)):
price = datas[item].xpath("./input[last()-1]").extract()
price = re.search('value=".*"',price[0])
price = re.search('d.*d',price[0])
price = price.group()
price_base = price_base + float(price)
price_base = price_base / len(datas) # 商品基准价格计算
else:
price_base = sel.xpath("//span[@class='price_num fl']/text()").extract()[1]
price_base = price_base.replace(" ","")
price_base = float(price_base)
#print(type(price_base))
#print(price_base)
#此处获取商品的描述信息
attributes_list = sel.xpath("//span[contains(@class,'attributes-list')]//li/text()").extract()
str_attributes = ' '.join(attributes_list)
str_attributes = str_attributes.replace(" "," ") # 商品信息描述
#此处发送请求获取商品购买数据
url_sales = parse.urljoin(domain,'default.php?act=evallist')
data = {
'id': product_id,
'page': '0',
'info_type': 'sale'
}
response = requests.post(url_sales, data=data)
buyer_num = response.json().get("member") # 购买人数
sale_num = response.json().get('num') # 销售数量
buyer_rate = response.json().get('re_buyer_rate') # 商品复购率
product_id = int(product_id) # 此处对商品ID进行转换
product_attributes = Product_attributes()
product_attributes.product_id = product_id
product_attributes.price_base = price_base
product_attributes.attributes = str_attributes
product_attributes.buyer_num = buyer_num
product_attributes.sale_num = sale_num
product_attributes.buyer_rate = buyer_rate
existed_id = Product_attributes.select().where(Product_attributes.product_id==product_id)
if existed_id:
pass
#product_attributes.save()
else:
print("开始保存商品详细信息:{}".format(url))
product_attributes.save(force_insert=True)
else :
price = "价格请咨询商家"
#获取商户详细数据,处理逻辑为根据单个商品目录来获取对应的商户id
def parse_store_id(url):
#print(url) # 打印当前商品页的url用来定位
res_text = requests.get(url).text
sel = Selector(text=res_text)
store_id = 0
#筛选规则,当is_price之后的value属性值为0的时候,说明不需要咨询商家,同时需要注意的是,商品会有打折批次数量的差异导致价格差异,
#这一点需要根据具体的显示页面来处理,现在忽略,由于可能存在打折段的数据差异,所以暂时不考虑
Is_price = sel.xpath("//input[contains(@id,'is_price')]").extract()#取到的数据用来判断价格是否需要咨询商家
if len(Is_price) < 1:
print("页面数据为空")
else:
is_value = re.search('d',Is_price[0])
if is_value.group() == '0': # 0表示商品价格不需要咨询商户
store_id = sel.xpath('//span[@class="container_title_span"]/a[@href]').extract()
store_id = ''.join(store_id)
store_id = re.search('storeid=d*"',store_id)
store_id = store_id.group()
store_id = store_id.split('storeid=')[1]
store_id = store_id.replace(""","")
store_id = int(store_id) # 商户的id
else :
pass
return store_id
#根据store_id拼接的url用来抓取商户的数据
def parse_store_data(url):
res_text = requests.get(url).text
sel = Selector(text=res_text)
if len(res_text) > 10:
store_name = sel.xpath('//span[contains(@class,"container_title_span")]/a[@href]/text()').extract()
store_name = ''.join(store_name) # 商户的名字
store_id = sel.xpath('//span[@class="container_title_span"]/a[@href]').extract()
store_id = ''.join(store_id)
store_id = re.search('storeid=d*"',store_id)
store_id = store_id.group()
store_id = store_id.split('storeid=')[1]
store_id = store_id.replace(""","")
store_id = int(store_id) # 商户的id
store_level = ''
store_place = ''
store_describe = ''
store_supply = ''
store_service = ''
store_data = sel.xpath('//ul[contains(@class,"gy_info_list")]/li/text()').extract()
if len(store_data) > 3:
store_level = store_data[2] # 商户等级
store_level = store_level.replace(" ","")
store_level = store_level.replace("
","")
store_place = store_data[3] # 商户地址
store_place = store_place.replace(" ","")
store_aptitude = stor_url_aptitude + str(store_id) # 商户的资质
temp_datas = sel.xpath('//li[contains(@class,"evaluate")]//div[@style]//text()').extract()
if len(temp_datas) == 6:
store_describe = temp_datas[0] + ':' + temp_datas[1] # 商户描述
store_supply = temp_datas[2] + ':' + temp_datas[3] # 商户供货
store_service = temp_datas[4] + ':' + temp_datas[5] # 商户服务
store = Store()
store.store_id = store_id
store.store_name = store_name
store.store_level = store_level
store.store_place = store_place
store.store_aptitude = store_aptitude
store.store_describe = store_describe
store.store_supply = store_supply
store.store_service = store_service
existed_id = Store.select().where(Store.store_id==store_id)
if existed_id:
pass
#store.save()
else:
print("开始获取商户信息:{}".format(store_id))
store.save(force_insert=True)
class ParseproductThread(Thread):
def run(self):
while(1):
try:
data = r.lpop('91jiafan:catalogue_url')
print("开始处理商品:{}".format(data))
parse_product_data(data)
store_id = parse_store_id(data)
store_id_url = store_url_domain + str(store_id)
r.lpush('91jiafan:store_id_url',store_id_url)
except:
time.sleep(120)
print("data is null")
class Parse_storedata_Thread(Thread):
def run(self):
while(1):
try:
data = r.lpop('91jiafan:store_id_url')
print("开始处理商户:{}".format(data))
parse_store_data(data)
except:
time.sleep(120)
print("data is null")
class parse_91_url_Thread(Thread):
def run(self):
#获取最终需要抓取的url
url_list = get_catalogue_url()
for url in url_list:
parse_data_last(url)
class parse_91_productdata_Thread(Thread):
def run(self):
#提取商品列表页的数据
url_list = get_catalogue_url()
for url in url_list:
parse_product(url)
if __name__ == "__main__":
#start_time = datetime.now()
process_nodes_list(domain)
parse_91_url_thread = parse_91_url_Thread()
parse_91_productdata_thread = parse_91_productdata_Thread()
parse_91_url_thread.start()
parse_91_productdata_thread.start()
for i in range(10):
parse_product_thread = ParseproductThread()
parse_product_thread.start()
for i in range(8):
parse_storedata_thread = Parse_storedata_Thread()
parse_storedata_thread.start()
#end_time = datetime.now()
#print("一共使用时间:",end_time - start_time)