orm模型
_models.py
from peewee import * from datetime import date db = MySQLDatabase("spider",host="127.0.0.1",port=3306,user="root",password="root") class BaseModel(Model): class Meta: database = db; # 设计数据表的时候有几个注意点 ''' char类型,默认长度255 对于无法确定最大长度的字段,可以设置为Text 设计表的时候 采集到的数据尽量先做格式化处理 default 和 null = True ''' # 主题表 class Topic(BaseModel): title = CharField() content = TextField(default='') tid = IntegerField(unique=True) # 用户url的id,而为username uid = CharField() create_time = DateTimeField(default=0) answer_nums = IntegerField(default=0) # 回复表 class Answer(BaseModel): tid = IntegerField() uid = CharField() content = TextField(default='') create_time = DateTimeField() # 用户表 class Author(BaseModel): uid = CharField(unique=True) # 用户名 name = CharField() #网名 desc = TextField(null = True) if __name__ == '__main__': db.create_tables([Topic,Answer,Author])
主程序:
# 提取 # 解析 # 存储 import re import ast import requests from scrapy import Selector from datetime import datetime from _models import Topic domain = "https://xxx.xxxx.xxx" userAgent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36' # 将js文件转成list def get_nodes_json(): # js内容 left_menu_text = requests.get('https://xxx.xxxx.xxx/xxxxxx_js/xxxx_xxxx.js?xxxx').text nodes_str_match = re.search("forumNodes: (.*]),",left_menu_text) if nodes_str_match: nodes_str = nodes_str_match.group(1).replace('null','None') nodes_list = ast.literal_eval(nodes_str) return nodes_list return [] url_list = [] # 将js的格式转换提取出url到list中--递归 def process_nodes_list(nodes_list): for item in nodes_list: if 'url' in item: if 'children' in item: process_nodes_list(item['children']) else: url_list.append(item['url']) # 获取最终的url列表 def get_list_urls(): nodes_list = get_nodes_json() process_nodes_list(nodes_list) all_urls = [] for url in url_list: if not url.startswith('http'): url = domain + url all_urls.append(url) all_urls.append(url+"/closed") all_urls.append(url+"/recommend") return all_urls def parse_list(url): headers = { 'User-Agent':userAgent } res_text = requests.get(url,headers = headers).text sel = Selector(text=res_text) all_trs = sel.css(".forums_tab_table tbody tr") for tr in all_trs: # print(url) if tr.css(".forums_title::attr(href)").extract(): topicurl = domain+(tr.css(".forums_title::attr(href)").extract()[0]) else: continue topic = Topic() if tr.css(".forums_title::text").extract(): topic.title = tr.css(".forums_title::text").extract()[0] if tr.css(".forums_author a::attr(href)").extract(): topic.uid = tr.css(".forums_author a::attr(href)").extract()[0].split('/')[-1] topic.tid = topicurl.split('/')[-1] try: topic.save() parse_topic(topic) except: print('存入失败') # 递归获取下一页 if sel.css(".page_nav a.next_page.next_page").extract(): last_menu = sel.css(".page_nav a.next_page.next_page::text").extract()[-1] if(last_menu == '下一页'): nextPath = domain + sel.css(".page_nav a.next_page.next_page::attr(href)").extract()[-1] parse_list(nextPath) def parse_topic(topic): # 获取帖子的详情以及回复 url = domain + '/topics/' + topic.tid res_text = requests.get(url).text sel = Selector(text=res_text) if sel.css(".floorOwner .post_body.post_body_min_h::text").extract(): topic.content = sel.css(".floorOwner .post_body.post_body_min_h::text").extract()[0] if sel.css(".control_l.fl label label::text").extract(): topic.create_time = sel.css(".control_l.fl label label::text").extract()[0] if sel.css(".reply_intro .reply_num::text").extract(): topic.answer_nums = sel.css(".reply_intro .reply_num::text").extract()[0] try: # print(topic.tid) topic.save() except: # print('存入详情失败') pass def parse_author(url): # 获取作者的详情 pass if __name__ == "__main__": # 所有分类的链接 urls = get_list_urls() for i in urls: parse_list(i)