from zipfile import ZipFile from StringIO import StringIO import csv import sys from crawler import crawler from Chapter3 import LinkCrawler from Chapter3 import MongoDb class AlexaCallback: def __init__(self, max_length=1000): ''' init the seed_url and max_length :param max_length: we can get the max_length website at most ''' self.seed_url = "http://s3.amazonaws.com/alexa-static/top-1m.csv.zip" self.max_length = max_length def __call__(self, url, html): ''' get at most max_length website, and return their urls :param url: :param html: :return: urls which we want ''' if url == self.seed_url: urls = [] with ZipFile(StringIO(html)) as zf: csv_name = zf.namelist()[0] for _, website in csv.reader(zf.open(csv_name)): urls.append("http://" + website) if len(urls) == self.max_length: break return urls
from Chapter3 import download import threading import time import urlparse import AlexaCallback from Chapter3 import MongoDb Sleep_Time = 1 def threadscrawler(url, delay=5, user_agent="wuyanjing", proxies=None, num_tries=2, cache=None, scrape_callback=None, timeout=60, max_threads=10): ''' create max_threads threads to download html to realize parallel :param url: :param delay: :param user_agent: :param proxies: :param num_tries: :param cache: :param scrape_callback: :param timeout: :param max_threads: :return: ''' crawl_queue = [url] seen = set(crawl_queue) d = download.Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_tries=num_tries, timeout=timeout) def process_queue(): ''' every thread exceed this code to create a download operation :return: ''' while True: try: current_url = crawl_queue.pop() except IndexError: break else: html = d(current_url) if scrape_callback: try: links = scrape_callback(current_url, html) or [] except Exception as e: print "error in callback for: {}: {}".format(current_url, e) else: for link in links: link = normalize(url, link) if link not in seen: seen.add(link) crawl_queue.append(link) # the thread pool threads = [] while threads or crawl_queue: # remove the dead thread for thread in threads: if not thread.is_alive(): threads.remove(thread) # start a new thread while len(threads) < max_threads and crawl_queue: thread = threading.Thread(target=process_queue) thread.setDaemon(True) thread.start() threads.append(thread) # all threads have been processed # sleep temporarily so cpu can focus execution elsewhere time.sleep(Sleep_Time) def normalize(url, link): link, _ = urlparse.urldefrag(link) return urlparse.urljoin(url, link) if __name__ =="__main__": scape_callback = AlexaCallback.AlexaCallback() cache = MongoDb.MongoDb() threadscrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)
from pymongo import errors, MongoClient from datetime import datetime, timedelta class MongdbQueue: # init the three state Outstanding, Proceeding, Complete = range(3) def __init__(self, client=None, timeout=300): self.client = MongoClient() if client is None else client self.db = self.client.cache self.timeout = timeout def __nonzero__(self): ''' if there are more objects return true :return: ''' record = self.db.crawl_queue.find_one({'status': {'$ne': self.Complete}}) return True if record else False def push(self, url): ''' insert url if it not exist :param url: :return: ''' try: self.db.crawl_queue.insert({'_id': url, 'status': self.Outstanding}) except errors.DuplicateKeyError as e: pass def pop(self): ''' change the process which status is outstanding to proceeding, if not find a record raise key error :return: ''' record = self.db.crawl_queue.find_and_modify(query={'status': self.Outstanding}, update={'$set': {'status': self.Proceeding, 'timestamp': datetime.now()}}) if record: return record['_id'] else: self.repair() raise KeyError() def repair(self): ''' release stalled jobs :return: the url of the stalled jobs ''' record = self.db.crawl_queue.find_and_modify(query={'timestamp': {'$lt': datetime.now()-timedelta(self.timeout) }, 'status': self.Complete}, update={'$set': {'$ne': self.Outstanding}}) if record: print "release: ", record['_id'] def complete(self, url): ''' change the status to complete if the process has finished :return: ''' self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.Complete}}) def clear(self): self.db.crawl_queue.drop() def peek(self): record = self.db.crawl_queue.find_one({'status': self.Outstanding}) if record: return record['_id']
from Chapter3 import download import threading import time import urlparse import AlexaCallback from Chapter3 import MongoDb from MongdbQueue import MongdbQueue Sleep_Time = 1 def threadscrawler(url, delay=5, user_agent="wuyanjing", proxies=None, num_tries=2, cache=None, scrape_callback=None, timeout=60, max_threads=10): ''' create max_threads threads to download html to realize parallel :param url: :param delay: :param user_agent: :param proxies: :param num_tries: :param cache: :param scrape_callback: :param timeout: :param max_threads: :return: ''' crawl_queue = MongdbQueue() crawl_queue.clear() crawl_queue.push(url) d = download.Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_tries=num_tries, timeout=timeout) def process_queue(): ''' every thread exceed this code to create a download operation :return: ''' while True: try: current_url = crawl_queue.pop() except IndexError: break else: html = d(current_url) if scrape_callback: try: links = scrape_callback(current_url, html) or [] except Exception as e: print "error in callback for: {}: {}".format(current_url, e) else: for link in links: link = normalize(url, link) crawl_queue.push(link) crawl_queue.complete(current_url) # the thread pool threads = [] while threads or crawl_queue: # remove the dead thread for thread in threads: if not thread.is_alive(): threads.remove(thread) # start a new thread while len(threads) < max_threads and crawl_queue.peek(): thread = threading.Thread(target=process_queue) thread.setDaemon(True) thread.start() threads.append(thread) # all threads have been processed # sleep temporarily so cpu can focus execution elsewhere time.sleep(Sleep_Time) def normalize(url, link): link, _ = urlparse.urldefrag(link) return urlparse.urljoin(url, link) # if __name__ == "__main__": # scape_callback = AlexaCallback.AlexaCallback() # cache = MongoDb.MongoDb() # threadscrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)
import multiprocessing from ThreadsCrawler import threadscrawler import AlexaCallback from Chapter3 import MongoDb def processcrawler(arg, **kwargs): num_cups = multiprocessing.cpu_count() print "start process num is ", num_cups process = [] for i in range(num_cups): p = multiprocessing.Process(target=threadscrawler, args=(arg, ), kwargs=kwargs) p.start() process.append(p) for p in process: p.join() if __name__ == "__main__": scape_callback = AlexaCallback.AlexaCallback() cache = MongoDb.MongoDb() processcrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)