多线程爬虫实例,以下是源码
import requests from lxml import etree import json from queue import Queue import threading ''' 1. 创建 URL队列, 响应队列, 数据队列 在init方法中 2. 在生成URL列表中方法中,把URL添加URL队列中 3. 在请求页面的方法中,从URL队列中取出URL执行,把获取到的响应数据添加响应队列中 4. 在处理数据的方法中,从响应队列中取出页面内容进行解析, 把解析结果存储数据队列中 5. 在保存数据的方法中, 从数据队列中取出数据,进行保存 6. 开启几个线程来执行上面的方法 ''' def run_forever(func): def wrapper(obj): while True: func(obj) return wrapper class QiubaiSpider(object): def __init__(self): self.headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36' } self.url_pattern = 'https://www.qiushibaike.com/8hr/page/{}/' # url 队列 self.url_queue = Queue() # 响应队列 self.page_queue = Queue() # 数据队列 self.data_queue = Queue() def add_url_to_queue(self): # 把URL添加url队列中 for i in range(1, 14): self.url_queue.put(self.url_pattern.format(i)) @run_forever def add_page_to_queue(self): ''' 发送请求获取数据 ''' url = self.url_queue.get() # print(url) response = requests.get(url, headers=self.headers) print(response.status_code) if response.status_code != 200: self.url_queue.put(url) else: self.page_queue.put(response.content) # 完成当前URL任务 self.url_queue.task_done() @run_forever def add_dz_to_queue(self): '''根据页面内容使用lxml解析数据, 获取段子列表''' page = self.page_queue.get() # print(page) element = etree.HTML(page) # 先分组,获取分组列表 div_s = element.xpath('//*[@id="content-left"]/div') # 遍历分组列表, 再使用xpath获取内容 dz_list = [] for div in div_s: item = {} # 每个段子包含发送人头像URL, 昵称, 性别, 段子内容, 好笑数,评论数 # 头像URL item['head_url'] = self.get_first_element(div.xpath('./div[1]/a[1]/img/@src')) if item['head_url'] is not None: item['head_url'] = 'http:' + item['head_url'] # 昵称 item['author_name'] = self.get_first_element(div.xpath('./div[1]/a[2]/h2/text()')) # 性别 gender_class = self.get_first_element(div.xpath('./div[1]/div/@class')) if gender_class is not None: item['author_gender'] = 'man' if gender_class.find('man') != -1 else 'women' # 段子内容 item['dz_content'] = self.get_first_element(div.xpath('./a/div/span[1]/text()')) # 好笑数 item['dz_funny'] = self.get_first_element(div.xpath('./div[2]/span[1]/i/text()')) # 评论数 item['dz_comments'] = self.get_first_element(div.xpath('./div[2]/span[2]/a/i/text()')) # print(item dz_list.append(item) # print(dz_list) self.data_queue.put(dz_list) self.page_queue.task_done() def get_first_element(self, list): '''获取列表中第一个元素,如果是空列表就返回None''' return list[0] if len(list) != 0 else None @run_forever def save_dz_list(self): '''把段子信息保存到文件中''' dz_list = self.data_queue.get() # print(dz_list) with open('qiushi_thread.txt', 'a', encoding='utf8') as f: for dz in dz_list: json.dump(dz, f, ensure_ascii=False) f.write(' ') self.data_queue.task_done() def run_use_more_task(self, func, count=1): '''把func放到线程中执行, count:开启多少线程执行''' for i in range(0, count): t = threading.Thread(target=func) t.setDaemon(True) t.start() def run(self): # 开启线程执行上面的几个方法 url_t = threading.Thread(target=self.add_url_to_queue) # url_t.setDaemon(True) url_t.start() self.run_use_more_task(self.add_page_to_queue, 3) self.run_use_more_task(self.add_dz_to_queue, 2) self.run_use_more_task(self.save_dz_list, 2) # 使用队列join方法,等待队列任务都完成了才结束 self.url_queue.join() self.page_queue.join() self.data_queue.join() if __name__ == '__main__': qbs = QiubaiSpider() qbs.run()