一、串行爬虫
我们之前使用的爬虫方式,都是一个页面接着一个页面下载,也就是使用串行的方式进行爬虫。但是显然这种方式下载的速度是非常的慢的,特别是当我们需要下载大量页面的时候这个问题就会变得更加的突出。所以本节内,就学习如何进行多线程和多进程的并行爬虫。
二、多线程爬虫
我们在使用多线程进行爬虫的时候因为同时下载的页面过多,可能出现服务器奔溃的情况,甚至会可能出现ip被封的情况。因此需要为每个ip设置一个爬取同一个域名下的不同页面最小的执行间隔时间。
为了获取实验所需的数据,首先我们要到指定的网站下载zip文件,将其解压,然后获取保存在其中的域名。以下是具体代码:
from zipfile import ZipFile from StringIO import StringIO import csv import sys from crawler import crawler from Chapter3 import LinkCrawler from Chapter3 import MongoDb class AlexaCallback: def __init__(self, max_length=1000): ''' init the seed_url and max_length :param max_length: we can get the max_length website at most ''' self.seed_url = "http://s3.amazonaws.com/alexa-static/top-1m.csv.zip" self.max_length = max_length def __call__(self, url, html): ''' get at most max_length website, and return their urls :param url: :param html: :return: urls which we want ''' if url == self.seed_url: urls = [] with ZipFile(StringIO(html)) as zf: csv_name = zf.namelist()[0] for _, website in csv.reader(zf.open(csv_name)): urls.append("http://" + website) if len(urls) == self.max_length: break return urls
有了获取域名的函数之后,我们就可以对一个进程开启多个线程,实现并发。我们需要创建一个线程池,在没有达到允许的最大线程或者需要下载的连接不为空的时候,我们就需要创建一个新的线程,在这个过程中需要去除线程池中死去的线程。所有的线程都执行相同的代码。
from Chapter3 import download import threading import time import urlparse import AlexaCallback from Chapter3 import MongoDb Sleep_Time = 1 def threadscrawler(url, delay=5, user_agent="wuyanjing", proxies=None, num_tries=2, cache=None, scrape_callback=None, timeout=60, max_threads=10): ''' create max_threads threads to download html to realize parallel :param url: :param delay: :param user_agent: :param proxies: :param num_tries: :param cache: :param scrape_callback: :param timeout: :param max_threads: :return: ''' crawl_queue = [url] seen = set(crawl_queue) d = download.Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_tries=num_tries, timeout=timeout) def process_queue(): ''' every thread exceed this code to create a download operation :return: ''' while True: try: current_url = crawl_queue.pop() except IndexError: break else: html = d(current_url) if scrape_callback: try: links = scrape_callback(current_url, html) or [] except Exception as e: print "error in callback for: {}: {}".format(current_url, e) else: for link in links: link = normalize(url, link) if link not in seen: seen.add(link) crawl_queue.append(link) # the thread pool threads = [] while threads or crawl_queue: # remove the dead thread for thread in threads: if not thread.is_alive(): threads.remove(thread) # start a new thread while len(threads) < max_threads and crawl_queue: thread = threading.Thread(target=process_queue) thread.setDaemon(True) thread.start() threads.append(thread) # all threads have been processed # sleep temporarily so cpu can focus execution elsewhere time.sleep(Sleep_Time) def normalize(url, link): link, _ = urlparse.urldefrag(link) return urlparse.urljoin(url, link) if __name__ =="__main__": scape_callback = AlexaCallback.AlexaCallback() cache = MongoDb.MongoDb() threadscrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)
三、多进程爬虫
多线程爬虫只能由一个进程进行处理,如果我们的电脑有多个cpu的时候,为了使下载的速度更快,我们可以使用多进程爬虫。要使用多进程爬虫,爬虫队列就不能在内存中,因为这样的话,其他进程就没有办法访问了。我们需要将爬虫队列放到Mongdb中。这样不同进程就能实现同步。
首先我们先要创建Mongdbqueue。这个队列能够实现不同进程之间url下载的协调,同步。
from pymongo import errors, MongoClient from datetime import datetime, timedelta class MongdbQueue: # init the three state Outstanding, Proceeding, Complete = range(3) def __init__(self, client=None, timeout=300): self.client = MongoClient() if client is None else client self.db = self.client.cache self.timeout = timeout def __nonzero__(self): ''' if there are more objects return true :return: ''' record = self.db.crawl_queue.find_one({'status': {'$ne': self.Complete}}) return True if record else False def push(self, url): ''' insert url if it not exist :param url: :return: ''' try: self.db.crawl_queue.insert({'_id': url, 'status': self.Outstanding}) except errors.DuplicateKeyError as e: pass def pop(self): ''' change the process which status is outstanding to proceeding, if not find a record raise key error :return: ''' record = self.db.crawl_queue.find_and_modify(query={'status': self.Outstanding}, update={'$set': {'status': self.Proceeding, 'timestamp': datetime.now()}}) if record: return record['_id'] else: self.repair() raise KeyError() def repair(self): ''' release stalled jobs :return: the url of the stalled jobs ''' record = self.db.crawl_queue.find_and_modify(query={'timestamp': {'$lt': datetime.now()-timedelta(self.timeout) }, 'status': self.Complete}, update={'$set': {'$ne': self.Outstanding}}) if record: print "release: ", record['_id'] def complete(self, url): ''' change the status to complete if the process has finished :return: ''' self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.Complete}}) def clear(self): self.db.crawl_queue.drop() def peek(self): record = self.db.crawl_queue.find_one({'status': self.Outstanding}) if record: return record['_id']
然后就要用新建的MongodbQueue来重写threadcrawler
from Chapter3 import download import threading import time import urlparse import AlexaCallback from Chapter3 import MongoDb from MongdbQueue import MongdbQueue Sleep_Time = 1 def threadscrawler(url, delay=5, user_agent="wuyanjing", proxies=None, num_tries=2, cache=None, scrape_callback=None, timeout=60, max_threads=10): ''' create max_threads threads to download html to realize parallel :param url: :param delay: :param user_agent: :param proxies: :param num_tries: :param cache: :param scrape_callback: :param timeout: :param max_threads: :return: ''' crawl_queue = MongdbQueue() crawl_queue.clear() crawl_queue.push(url) d = download.Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_tries=num_tries, timeout=timeout) def process_queue(): ''' every thread exceed this code to create a download operation :return: ''' while True: try: current_url = crawl_queue.pop() except IndexError: break else: html = d(current_url) if scrape_callback: try: links = scrape_callback(current_url, html) or [] except Exception as e: print "error in callback for: {}: {}".format(current_url, e) else: for link in links: link = normalize(url, link) crawl_queue.push(link) crawl_queue.complete(current_url) # the thread pool threads = [] while threads or crawl_queue: # remove the dead thread for thread in threads: if not thread.is_alive(): threads.remove(thread) # start a new thread while len(threads) < max_threads and crawl_queue.peek(): thread = threading.Thread(target=process_queue) thread.setDaemon(True) thread.start() threads.append(thread) # all threads have been processed # sleep temporarily so cpu can focus execution elsewhere time.sleep(Sleep_Time) def normalize(url, link): link, _ = urlparse.urldefrag(link) return urlparse.urljoin(url, link) # if __name__ == "__main__": # scape_callback = AlexaCallback.AlexaCallback() # cache = MongoDb.MongoDb() # threadscrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)
import multiprocessing from ThreadsCrawler import threadscrawler import AlexaCallback from Chapter3 import MongoDb def processcrawler(arg, **kwargs): num_cups = multiprocessing.cpu_count() print "start process num is ", num_cups process = [] for i in range(num_cups): p = multiprocessing.Process(target=threadscrawler, args=(arg, ), kwargs=kwargs) p.start() process.append(p) for p in process: p.join() if __name__ == "__main__": scape_callback = AlexaCallback.AlexaCallback() cache = MongoDb.MongoDb() processcrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)
四、总结
为了能够提高下载大量页面的速度,我们采用了多线程和多进程的方式。在一定的范围内,提高线程和进程数,能够明显的提高我们的下载速度,但是一旦超过某一个度的时候,就不会提升反而下降,因为线程之间的切换,会带来大量的能量损耗。