案例:多线程爬虫
目标:爬取糗事百科段子,待爬取页面URL:http://www.qiushibaike.com/8hr/page/1
要求:
- 使用requests获取页面信息,用XPATH/re 做数据提取
- 获取每个帖子里的 用户头像链接、用户主页、用户名、用户性别、用户年龄、段子内容、点赞次数、评论次数
- 保存到本地json文件内
- 采用多线程
queue(队列对象)
queue是python中的标准库,可以直接import queue引用,队列是线程间最常用的交换数据的形式
python下多线程:
对于资源,加锁是个重要的环节。因为python原生的list, dict等,都是not thread safe的。而queue,是thread safe(线程案例)的,因此在满足使用条件下,建议使用队列
- 初始化:class queue.Queue(maxsize) FIFO(先进先出)
- 常用方法:
- queue.Queue.qsize() 返回队列的大小
- queue.Queue.empty() 如果队列为空,返回True,反之返回False
- queue.Queue.full() 如果队列满了,返回True,反之返回False
- queue.Queue.get([block[, timeout]]) 从队列中取出一个值,timeout为等待时间
- 创建一个“队列”对象
- import queue
- myqueue = queue.Queue(maxsize = 10)
- 将一个值放入队列中
- myqueue.put(10)
- 将一个值从队列中取出
- myqueue.get()
#!/usr/bin/python3 # -*- coding:utf-8 -*- __author__ = 'mayi' """ 案例:多线程爬虫 目标:爬取糗事百科段子,待爬取页面首页URL:http://www.qiushibaike.com/8hr/page/1 要求: 1.使用requests获取页面信息,用XPATH/re 做数据提取 2.获取每个帖子里的 用户头像链接、用户主页、用户名、用户性别、用户年龄、段子内容、点赞次数、评论次数 3.保存到json文件内 4.采用多线程 """ import requests from lxml import etree from queue import Queue import threading import time import json # 数据队列 data_queue = Queue() exitFlag_Parser = False # 锁 lock = threading.Lock() class ThreadCrawl(threading.Thread): """ 爬取线程类 """ def __init__(self, thread_name, page_queue): threading.Thread.__init__(self) self.thread_name = thread_name self.page_queue = page_queue self.url = "http://www.qiushibaike.com/8hr/page/" self.header = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36'} def run(self): print(self.thread_name + " Starting...") self.qiushi_spider() print(self.thread_name + " Exiting...") def qiushi_spider(self): global data_queue while True: # page队列为空时,循环结束 if self.page_queue.empty(): break else: page = self.page_queue.get() full_url = self.url + str(page) + "/" print(full_url) # 多次尝试发送请求失败后结束、防止死循环 timeout = 5 while timeout: try: # 防止访问太快 time.sleep(1) content = requests.get(full_url, headers = self.header) data_queue.put(content.text) break except Exception as e: print(e) timeout -= 1 time.sleep(1) class ThreadParser(threading.Thread): """ 页面解析类 """ def __init__(self, thread_name, file_name): threading.Thread.__init__(self) self.thread_name = thread_name self.file_name = file_name def run(self): # 开始 print(self.thread_name + " Starting...") global data_queue, exitFlag_Parser while not exitFlag_Parser: try: item = data_queue.get(block = False) if item: self.parse_data(item) data_queue.task_done() except: pass # 结束 print(self.thread_name + " Exiting...") def parse_data(self, item): """ 解析网页函数 :param item: 网页内容 """ global lock try: html = etree.HTML(item) # id = qiushi_tag_119336220:id均包含:qiushi_tag_ result = html.xpath('//div[contains(@id,"qiushi_tag_")]') for res in result: try: # 用户头像链接、用户主页、用户名、用户性别、用户年龄、段子内容、点赞次数、评论次数 # 用户头像链接 head_url = res.xpath('.//img/@src')[0] # 用户主页 home_url = "http://www.qiushibaike.com" + res.xpath('.//a/@href')[0] # 用户名 user_name = res.xpath('.//h2')[0].text # 用户性别:匿名用户,匹配不到性别 article_gender = res.xpath('./div/div/@class') if article_gender: gender = article_gender[0].split()[-1].replace("Icon", "") else: gender = "" # 用户年龄:匿名用户,匹配不到年龄 article_age = res.xpath('./div/div') if article_age: age = article_age[0].text else: age = 0 # 段子内容 content = res.xpath('.//div[@class="content"]/span')[0].text.strip() # 点赞次数 stats_vote = res.xpath('.//span[@class="stats-vote"]//i[@class="number"]') if stats_vote: stats_vote = stats_vote[0].text.strip() else: stats_vote = "0" # 评论次数 stats_comments = res.xpath('.//span[@class="stats-comments"]//i[@class="number"]') if stats_comments: stats_comments = stats_comments[0].text.strip() else: stats_comments = "0" record = { "head_url": head_url, "home_url": home_url, "user_name": user_name, "gender": gender, "age": age, "content": content, "stats_vote": stats_vote, "stats_comments": stats_comments } with lock: self.file_name.write(json.dumps(record, ensure_ascii = False) + ",") except Exception as e: print(e) except Exception as e: print(e) def main(): """ 主函数 :return: """ # 采集的数据存储在本地磁盘的文件名 file_name = open("糗事百科.json", "a", encoding = "utf-8") # 待采集的起始页码 start_page = int(input("请输入起始页码:")) # 待采集的终止页码 end_page = int(input("请输入终止页码:")) # 定义一个page队列 pageQueue = Queue() for page in range(start_page, end_page + 1): # 页码入队列 pageQueue.put(page) # 初始化采集线程 crawl_threads = [] crawl_list = ["采集线程1", "采集线程2", "采集线程3"] for thread_name in crawl_list: thread = ThreadCrawl(thread_name, pageQueue) thread.start() crawl_threads.append(thread) # 初始化解析线程 parser_threads = [] parser_list = ["解析线程1", "解析线程2", "解析线程3"] for thread_name in parser_list: thread = ThreadParser(thread_name, file_name) thread.start() parser_threads.append(thread) # 等待列队被清空 while not pageQueue.empty(): pass # 等待所有线程处理完成 for thread in crawl_threads: thread.join() # 等待队列被清空 while not data_queue.empty(): pass # 通知线程退出 global exitFlag_Parser exitFlag_Parser = True for thread in parser_threads: thread.join() with lock: file_name.close() if __name__ == '__main__': # 运行主函数 main()