import re
from odps import ODPS
from threading import Thread
import threading
from urllib import parse
from datetime import datetime
import random
import requests
import time
from scrapy import Selector
from models import *
store_list_urls = []
product_list_urls = []
domain = "http://www.91jf.com/"
store_domain = "http://www.91jf.com/default.php?act=corp&sort=list&page="
store_url_domain = 'http://www.91jf.com/default.php?act=store_goods&storeid=' # 用于拼接商户id和url
category_url = "http://www.91jf.com/default.php?act=categorygoodslist&category_id=" # 用来拼接商品的url
stor_url_aptitude = 'http://www.91jf.com/default.php?act=corpcert&id=' # 用于拼接商户资质的url
def get_nodes_json():
left_menu_text = requests.get("http://www.91jf.com/").text
#write_txt(left_menu_text)
#etree.HTML(res0.text)
sel = Selector(text=left_menu_text)
all_divs = sel.xpath("//div[@class='class_child_li']//a[@href]").extract()
if all_divs:
nodes_lists = []
for i in range(len(all_divs)):
nodes_str = all_divs[i]
nodes_str = nodes_str.replace("&","&") # 此处&由于被转义成&导致需要重新进行处理
nodes_lists.append(nodes_str)
return nodes_lists
return []
# 获取一级目录数据,保存商品系列ID,用来拼接爬虫入口的url
def process_nodes_list(url):
menu_text = requests.get(url).text
sel = Selector(text=menu_text)
nodes_list = sel.xpath("//div[@class='index_g_class']/ul/li")
for item in nodes_list:
title = item.xpath("./div[@class='class_menu']/span/text()").extract()
title = ''.join(title)
#主目录的名称
catalogue_name = title
catalogue = Catalogue()
catalogue.catalogue_name = catalogue_name # 系列名称
catalogue.series_level = 0 # 系列等级
catalogue_id_0 = 0 # 系列catalogue_id
catalogue.category_id = catalogue_id_0 # 系列catalogue_id
catalogue.create_time = datetime.now() # 抓取时间
existed_id = Catalogue.select().where((Catalogue.catalogue_name==catalogue_name) & (Catalogue.category_id == catalogue_id_0))
if existed_id:
#catalogue.save()
pass
else:
catalogue.save(force_insert=True)
#print("插入商品目录成功")
_id = Catalogue.get(Catalogue.catalogue_name==title)._id # 此处获取父节点的id
series_names = item.xpath('.//div[@class="class_child_li"]//li')
for series_name in series_names:
catalogue_0 = Catalogue()
catalogue_0.catalogue_name = title # 系列名称
catalogue_0.series_level = 0 # 系列等级
series_name_0 = series_name.xpath('.//span/text()').extract()
series_name_0 = ''.join(series_name_0)
category_id = series_name.xpath(".//a[@href]").extract()
category_id = ''.join(category_id)
category_id = re.search('d.?d',category_id).group()
catalogue_0.category_id = category_id # 次级产品系列ID
catalogue_0.catalogue_name = series_name_0 # 次级产品系列的名称
catalogue_0.catalogue_level = 2 # 次级产品系列的等级
catalogue_0.father_id = _id # 父节点的ID
catalogue_0.create_time = datetime.now() # 抓取时间
existed_id = Catalogue.select().where((Catalogue.catalogue_name==series_name_0) & (Catalogue.category_id == category_id))
if existed_id:
#catalogue_0.save()
pass
else:
catalogue_0.save(force_insert=True)
#根据catalogue存储的数据来获取category_id拼接商品最外层的url链接
def get_catalogue_url():
url_list = []
#catalogue = Catalogue()
id_data = Catalogue.select().where(Catalogue.catalogue_level==2)
for item in id_data:
url = category_url + str(item.category_id) + "&okey=salenum&order=desc"
url_list.append(url)
#id_data = Catalogue.get(Catalogue.series_level_0==1).category_id
return url_list
#获取商品的信息
def parse_product_data(url):
res_text = requests.get(url).text
sel = Selector(text=res_text)
res_li = sel.xpath("//div[@class='pro_list_div g-clearfix c']/ul//li[@class='goods_offset']")
flag_num = 0
goods_list = []
for item in res_li:
product_id = item.xpath('./div[contains(@class,"pro_pic_box")]/a[@href]').extract() # 产品ID
product_id = re.search('id=.*d"',''.join(product_id))
product_id = product_id.group().replace("id=","")
product_id = product_id.replace(""","")
product_id = int(product_id)
name = item.xpath("./div[@class='row row-2 title']/a/text()").extract() # 产品名字
name = ''.join(name)
price = item.xpath('./div[@id="goods_detail_b"]/div[@class="row row-1"]/div[@class="g_price fm2"]/strong/text()').extract() # 显示价格
price = ''.join(price)
try:
price = float(price)
except:
print("价格会员可见|价格请咨询商家")
continue
sales_num = item.xpath("./div[@id='goods_detail_b']/div[2]/p[1]/text()").extract() # 销售数量
sales_num= ''.join(sales_num)
sales_num = sales_num.split('销量:')[1]
sales_num = int(sales_num)
flag_num = sales_num
if sales_num < 1:
continue
store_id = item.xpath("./div[@class='row row-3 c']/a[@href]").extract()
store_id = re.search('id=.*d"',''.join(store_id))
store_id = store_id.group().replace("id=","")
store_id = store_id.replace(""","")
store_id = int(store_id)
#merchant = item.xpath("./div[@id='goods_detail_b']/div[2]/p[2]/text()").extract() # 商家
#merchant = ''.join(merchant)
main_Products = item.xpath("./div[@id='goods_detail_b']/div[2]/p[3]/text()").extract() # 主营
main_Products = ''.join(main_Products)
#merchant_Place = item.xpath("./div[@id='goods_detail_b']/div[2]/p[4]/text()").extract() # 地址
#merchant_Place = ''.join(merchant_Place)
product = Product()
product.product_id = product_id
product.name = name
product.price = price
product.sales_num = sales_num
product.store_id = store_id
create_time = datetime.now()
product.create_time = create_time
product_data = product # 存储单条商品信息
goods_list.append(product_data)
data = Spider_91JIAFAN() # 创建对象
data.up_product_to_odps(goods_list)
next_page = sel.xpath("//*[@class='pagination2']/a[@href]").extract()
if len(next_page) > 2 and flag_num > 0:
url_next = re.search('".*d"',next_page[-1])
url_next = url_next.group().replace("&","&") # 此处&由于被转义成&导致需要重新进行处理
url_next = url_next.replace(""","")
url_next = parse.urljoin(domain,url_next)
#print(url_next)
parse_product_data(url_next)
else:
pass
#获取商品链接,上一级url为商品详情页
def parse_data_last(url):
#store_id_list = []
flag_num = 0
#获取商品的详情标签
while True:
try:
res_text = requests.get(url).text
except:
time.sleep(3)
print('间隔休眠时间,再次处理')
else:
break
sel = Selector(text=res_text)
res_li = sel.xpath("//div[@class='pro_list_div g-clearfix c']/ul//li[@class='goods_offset']")
for item in res_li:
sales_num = item.xpath("./div[@id='goods_detail_b']/div[2]/p[1]/text()").extract() # 销售数量
sales_num= ''.join(sales_num)
sales_num = sales_num.split('销量:')[1]
sales_num = int(sales_num)
flag_num = int(sales_num)
data = item.xpath("./div[@class='pro_pic_box']/a").extract()
data = re.search('".*d"',data[0])
data = data.group().replace("&","&")
data = data.replace(""","")
data_url = parse.urljoin(domain,data) # 链接为销量排序之后的单个商品链接,传出链接
#print("开始获取商品:{}".format(data_url))
if sales_num > 0:
r.lpush('91jiafan:catalogue_url',data_url) # 此处存储商品的url,判断条件为销售数量大于0
#此处代码用来切到下一页链接数据,商品的详情排布页
next_page = sel.xpath("//*[@class='pagination2']/a[@href]").extract()
if len(next_page) > 2 and flag_num > 0:
url_next = re.search('".*d"',next_page[-1])
url_next = url_next.group().replace("&","&") # 此处&由于被转义成&导致需要重新进行处理
url_next = url_next.replace(""","")
url_next = parse.urljoin(domain,url_next)
parse_data_last(url_next)
#获取商品描述
def parse_product_attributes(url):
#获取商品的详情以及销售数量
product_id = url.split('id=')[1] # 对商品id进行切片处理,用来获取ajax数据
res_text = requests.get(url).text
sel = Selector(text=res_text)
#筛选规则,当is_price之后的value属性值为0的时候,说明不需要咨询商家,同时需要注意的是,商品会有打折批次数量的差异导致价格差异,
#这一点需要根据具体的显示页面来处理,现在忽略,由于可能存在打折段的数据差异,所以暂时不考虑
Is_price = sel.xpath("//input[contains(@id,'is_price')]").extract()#取到的数据用来判断价格是否需要咨询商家
if len(Is_price) < 1:
print("页面数据为空")
else:
is_value = re.search('d',Is_price[0])
if is_value.group() == '0': # 0表示商品价格不需要咨询商户
datas = sel.xpath("//div[contains(@class,'show_all')]/table[1]//tr")
price_base = 0.0
if datas:
#price_base 商品基准价格
for item in range(len(datas)):
price = datas[item].xpath("./input[last()-1]").extract()
price = re.search('value=".*"',price[0])
price = re.search('d.*d',price[0])
price = price.group()
price_base = price_base + float(price)
price_base = price_base / len(datas) # 商品基准价格计算
else:
price_base = sel.xpath("//span[@class='price_num fl']/text()").extract()[1]
price_base = price_base.replace(" ","")
price_base = float(price_base)
#print(type(price_base))
#print(price_base)
#此处获取商品的描述信息
attributes_list = sel.xpath("//span[contains(@class,'attributes-list')]//li/text()").extract()
str_attributes = ' '.join(attributes_list)
str_attributes = str_attributes.replace(" "," ") # 商品信息描述
#此处发送请求获取商品购买数据
url_sales = parse.urljoin(domain,'default.php?act=evallist')
data = {
'id': product_id,
'page': '0',
'info_type': 'sale'
}
response = requests.post(url_sales, data=data)
buyer_num = response.json().get("member") # 购买人数
sale_num = response.json().get('num') # 销售数量
buyer_rate = response.json().get('re_buyer_rate') # 商品复购率
product_id = int(product_id) # 此处对商品ID进行转换
product_attributes = Product_attributes()
product_attributes.product_id = product_id
product_attributes.price_base = price_base
product_attributes.attributes = str_attributes
product_attributes.buyer_num = buyer_num
product_attributes.sale_num = sale_num
product_attributes.buyer_rate = buyer_rate
product_attributes.create_time = datetime.now()
data_0 = Spider_91JIAFAN() # 创建对象
data_attributes = [product_attributes]
data_0.up_product_attributes_to_odps(data_attributes)
else :
price = "价格请咨询商家"
#获取商户详细数据,处理逻辑为根据单个商品目录来获取对应的商户id
def parse_store_id(url):
#print(url) # 打印当前商品页的url用来定位
res_text = requests.get(url).text
sel = Selector(text=res_text)
store_id = 0
#筛选规则,当is_price之后的value属性值为0的时候,说明不需要咨询商家,同时需要注意的是,商品会有打折批次数量的差异导致价格差异,
#这一点需要根据具体的显示页面来处理,现在忽略,由于可能存在打折段的数据差异,所以暂时不考虑
Is_price = sel.xpath("//input[contains(@id,'is_price')]").extract()#取到的数据用来判断价格是否需要咨询商家
if len(Is_price) < 1:
print("页面数据为空")
else:
is_value = re.search('d',Is_price[0])
if is_value.group() == '0': # 0表示商品价格不需要咨询商户
store_id = sel.xpath('//span[@class="container_title_span"]/a[@href]').extract()
store_id = ''.join(store_id)
store_id = re.search('storeid=d*"',store_id)
store_id = store_id.group()
store_id = store_id.split('storeid=')[1]
store_id = store_id.replace(""","")
store_id = int(store_id) # 商户的id
else :
pass
return store_id
#根据store_id拼接的url用来抓取商户的数据
def parse_store_data(url):
res_text = requests.get(url).text
sel = Selector(text=res_text)
if len(res_text) > 10:
store_name = sel.xpath('//span[contains(@class,"container_title_span")]/a[@href]/text()').extract()
store_name = ''.join(store_name) # 商户的名字
store_id = sel.xpath('//span[@class="container_title_span"]/a[@href]').extract()
store_id = ''.join(store_id)
store_id = re.search('storeid=d*"',store_id)
store_id = store_id.group()
store_id = store_id.split('storeid=')[1]
store_id = store_id.replace(""","")
store_id = int(store_id) # 商户的id
store_level = ''
store_place = ''
store_describe = ''
store_supply = ''
store_service = ''
store_data = sel.xpath('//ul[contains(@class,"gy_info_list")]/li/text()').extract()
if len(store_data) > 3:
store_level = store_data[2] # 商户等级
store_level = store_level.replace(" ","")
store_level = store_level.replace("
","")
store_place = store_data[3] # 商户地址
store_place = store_place.replace(" ","")
store_aptitude = stor_url_aptitude + str(store_id) # 商户的资质
temp_datas = sel.xpath('//li[contains(@class,"evaluate")]//div[@style]//text()').extract()
if len(temp_datas) == 6:
store_describe = temp_datas[0] + ':' + temp_datas[1] # 商户描述
store_supply = temp_datas[2] + ':' + temp_datas[3] # 商户供货
store_service = temp_datas[4] + ':' + temp_datas[5] # 商户服务
store = Store()
store.store_id = store_id
store.store_name = store_name
store.store_level = store_level
store.store_place = store_place
store.store_aptitude = store_aptitude
store.store_describe = store_describe
store.store_supply = store_supply
store.store_service = store_service
store.create_time = datetime.now()
data_0 = Spider_91JIAFAN() # 创建对象
data_store = [store]
data_0.up_store_to_odps(data_store)
class ParseproductThread(Thread):
def run(self):
while(1):
try:
data = r.lpop('91jiafan:catalogue_url')
#print("开始处理商品:{}".format(data))
parse_product_attributes(data)
store_id = parse_store_id(data)
store_id_url = store_url_domain + str(store_id)
r.lpush('91jiafan:store_id_url',store_id_url)
except:
time.sleep(120)
print("data is null")
class Parse_storedata_Thread(Thread):
def run(self):
while(1):
try:
data = r.lpop('91jiafan:store_id_url')
#print("开始处理商户:{}".format(data))
parse_store_data(data)
except:
time.sleep(120)
print("data is null")
class parse_91_url_Thread(Thread):
def run(self):
#获取最终需要抓取的url
url_list = get_catalogue_url()
for url in url_list:
parse_data_last(url)
class parse_91_productdata_Thread(Thread):
def run(self):
#提取商品列表页的数据
url_list = get_catalogue_url()
for url in url_list:
parse_product_data(url)
class Spider_91JIAFAN():
def __init__(self):
self.o = ODPS(
access_id=ODPS_ID,
secret_access_key=ODPS_KEY,
project=ODPS_PROJECT,
endpoint=ODPS_ENDPOINT
)
self.table_name_0 = '91_product'
self.t0 = self.o.get_table(self.table_name_0) # 获取odps_91jiafan商品信息
self.table_name_1 = '91_product_attributes'
self.t1 = self.o.get_table(self.table_name_1) # 获取odps_91jiafan商品描述信息
self.table_name_2 = '91_store'
self.t2 = self.o.get_table(self.table_name_2) # 获取odps_91jiafan商户信息
#上传商品信息
def up_product_to_odps(self, goods_list):
"""
上传商品至ODPS表
:return:
"""
part_col = datetime.strftime(datetime.now(), '%Y%m%d')
record = []
for goods in goods_list:
record.append([
goods.product_id,
goods.name,
goods.price,
goods.sales_num,
goods.store_id,
goods.create_time
])
while True:
try:
with self.t0.open_writer(partition=f'partition_date={part_col}', create_partition=True) as writer:
writer.write(record)
print(f'{self.table_name_0} - 成功上传{len(record)}条数据', str(datetime.now()))
except Exception as e:
print(e.args)
print(f'{self.table_name_0} - 上传ODPS失败,正在重试...')
continue
else:
break
#上传商品描述信息
def up_product_attributes_to_odps(self, goods_list):
"""
上传商品至ODPS表
:return:
"""
part_col = datetime.strftime(datetime.now(), '%Y%m%d')
record = []
for goods in goods_list:
record.append([
goods.product_id,
goods.price_base,
goods.attributes,
goods.buyer_num,
goods.sale_num,
goods.buyer_rate,
goods.create_time
])
while True:
try:
with self.t1.open_writer(partition=f'partition_date={part_col}', create_partition=True) as writer:
writer.write(record)
print(f'{self.table_name_1} - 成功上传{len(record)}条数据', str(datetime.now()))
except Exception as e:
print(e.args)
print(f'{self.table_name_1} - 上传ODPS失败,正在重试...')
continue
else:
break
#上传商户信息
def up_store_to_odps(self, stores_list):
"""
上传商品至ODPS表
:return:
"""
part_col = datetime.strftime(datetime.now(), '%Y%m%d')
record = []
for stores in stores_list:
record.append([
stores.store_id,
stores.store_name,
stores.store_level,
stores.store_place,
stores.store_aptitude,
stores.store_describe,
stores.store_supply,
stores.store_service,
stores.create_time
])
while True:
try:
with self.t2.open_writer(partition=f'partition_date={part_col}', create_partition=True) as writer:
writer.write(record)
print(f'{self.table_name_2} - 成功上传{len(record)}条数据', str(datetime.now()))
except Exception as e:
print(e.args)
print(f'{self.table_name_2} - 上传ODPS失败,正在重试...')
continue
else:
break
if __name__ == "__main__":
#start_time = datetime.now()
process_nodes_list(domain)
parse_91_url_thread = parse_91_url_Thread()
parse_91_productdata_thread = parse_91_productdata_Thread()
parse_91_url_thread.start()
parse_91_productdata_thread.start()
for i in range(10):
parse_product_thread = ParseproductThread()
parse_product_thread.start()
for i in range(8):
parse_storedata_thread = Parse_storedata_Thread()
parse_storedata_thread.start()
#end_time = datetime.now()
#print("一共使用时间:",end_time - start_time)