zoukankan      html  css  js  c++  java
  • 并发下载

    一、串行爬虫

      我们之前使用的爬虫方式,都是一个页面接着一个页面下载,也就是使用串行的方式进行爬虫。但是显然这种方式下载的速度是非常的慢的,特别是当我们需要下载大量页面的时候这个问题就会变得更加的突出。所以本节内,就学习如何进行多线程和多进程的并行爬虫。

    二、多线程爬虫

      我们在使用多线程进行爬虫的时候因为同时下载的页面过多,可能出现服务器奔溃的情况,甚至会可能出现ip被封的情况。因此需要为每个ip设置一个爬取同一个域名下的不同页面最小的执行间隔时间。

      为了获取实验所需的数据,首先我们要到指定的网站下载zip文件,将其解压,然后获取保存在其中的域名。以下是具体代码:

    from zipfile import ZipFile
    from StringIO import StringIO
    import csv
    import sys
    from crawler import crawler
    from Chapter3 import LinkCrawler
    from Chapter3 import MongoDb
    
    class AlexaCallback:
        def __init__(self, max_length=1000):
            '''
            init the seed_url and max_length
            :param max_length: we can get the max_length website at most
            '''
            self.seed_url = "http://s3.amazonaws.com/alexa-static/top-1m.csv.zip"
            self.max_length = max_length
    
        def __call__(self, url, html):
            '''
            get at most max_length website, and return their urls
            :param url:
            :param html:
            :return: urls which we want
            '''
            if url == self.seed_url:
                urls = []
                with ZipFile(StringIO(html)) as zf:
                    csv_name = zf.namelist()[0]
                    for _, website in csv.reader(zf.open(csv_name)):
                        urls.append("http://" + website)
                        if len(urls) == self.max_length:
                            break
                return urls

    有了获取域名的函数之后,我们就可以对一个进程开启多个线程,实现并发。我们需要创建一个线程池,在没有达到允许的最大线程或者需要下载的连接不为空的时候,我们就需要创建一个新的线程,在这个过程中需要去除线程池中死去的线程。所有的线程都执行相同的代码。

    from Chapter3 import download
    import threading
    import time
    import urlparse
    import AlexaCallback
    from Chapter3 import MongoDb
    
    Sleep_Time = 1
    def threadscrawler(url, delay=5, user_agent="wuyanjing", proxies=None, num_tries=2,
                    cache=None, scrape_callback=None, timeout=60, max_threads=10):
        '''
        create max_threads threads to download html to realize parallel
        :param url:
        :param delay:
        :param user_agent:
        :param proxies:
        :param num_tries:
        :param cache:
        :param scrape_callback:
        :param timeout:
        :param max_threads:
        :return:
        '''
        crawl_queue = [url]
        seen = set(crawl_queue)
        d = download.Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies,
                                num_tries=num_tries, timeout=timeout)
    
        def process_queue():
            '''
            every thread exceed this code to create a download operation
            :return:
            '''
            while True:
                try:
                    current_url = crawl_queue.pop()
                except IndexError:
                    break
                else:
                    html = d(current_url)
                    if scrape_callback:
                        try:
                            links = scrape_callback(current_url, html) or []
                        except Exception as e:
                            print "error in callback for: {}: {}".format(current_url, e)
                        else:
                            for link in links:
                                link = normalize(url, link)
                                if link not in seen:
                                    seen.add(link)
                                    crawl_queue.append(link)
        # the thread pool
        threads = []
        while threads or crawl_queue:
            # remove the dead thread
            for thread in threads:
                if not thread.is_alive():
                    threads.remove(thread)
            # start a new thread
            while len(threads) < max_threads and crawl_queue:
                thread = threading.Thread(target=process_queue)
                thread.setDaemon(True)
                thread.start()
                threads.append(thread)
            # all threads have been processed
            # sleep temporarily so cpu can focus execution elsewhere
            time.sleep(Sleep_Time)
    
    def normalize(url, link):
        link, _ = urlparse.urldefrag(link)
        return urlparse.urljoin(url, link)
    if __name__ =="__main__":
        scape_callback = AlexaCallback.AlexaCallback()
        cache = MongoDb.MongoDb()
        threadscrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)

    三、多进程爬虫

      多线程爬虫只能由一个进程进行处理,如果我们的电脑有多个cpu的时候,为了使下载的速度更快,我们可以使用多进程爬虫。要使用多进程爬虫,爬虫队列就不能在内存中,因为这样的话,其他进程就没有办法访问了。我们需要将爬虫队列放到Mongdb中。这样不同进程就能实现同步。

      首先我们先要创建Mongdbqueue。这个队列能够实现不同进程之间url下载的协调,同步。

    from pymongo import errors, MongoClient
    from datetime import datetime, timedelta
    class MongdbQueue:
        # init the three state
        Outstanding, Proceeding, Complete = range(3)
    
        def __init__(self, client=None, timeout=300):
            self.client = MongoClient() if client is None else client
            self.db = self.client.cache
            self.timeout = timeout
    
        def __nonzero__(self):
            '''
            if there are more objects return true
            :return:
            '''
            record = self.db.crawl_queue.find_one({'status': {'$ne': self.Complete}})
            return True if record else False
    
        def push(self, url):
            '''
            insert url if it not exist
            :param url:
            :return:
            '''
            try:
                self.db.crawl_queue.insert({'_id': url, 'status': self.Outstanding})
            except errors.DuplicateKeyError as e:
                pass
    
        def pop(self):
            '''
            change the process which status is outstanding to proceeding, if not find a record raise
            key error
            :return:
            '''
            record = self.db.crawl_queue.find_and_modify(query={'status': self.Outstanding},
                                                         update={'$set': {'status': self.Proceeding,
                                                                          'timestamp': datetime.now()}})
            if record:
                return record['_id']
            else:
                self.repair()
                raise KeyError()
    
        def repair(self):
            '''
            release stalled jobs
            :return: the url of the stalled jobs
            '''
            record = self.db.crawl_queue.find_and_modify(query={'timestamp': {'$lt': datetime.now()-timedelta(self.timeout)
                                                                              }, 'status': self.Complete},
                                                         update={'$set': {'$ne': self.Outstanding}})
            if record:
                print "release: ", record['_id']
    
        def complete(self, url):
            '''
            change the status to complete if the process has finished
            :return:
            '''
            self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.Complete}})
    
        def clear(self):
            self.db.crawl_queue.drop()
    
        def peek(self):
            record = self.db.crawl_queue.find_one({'status': self.Outstanding})
            if record:
                return record['_id']

    然后就要用新建的MongodbQueue来重写threadcrawler

    from Chapter3 import download
    import threading
    import time
    import urlparse
    import AlexaCallback
    from Chapter3 import MongoDb
    from MongdbQueue import MongdbQueue
    
    Sleep_Time = 1
    def threadscrawler(url, delay=5, user_agent="wuyanjing", proxies=None, num_tries=2,
                    cache=None, scrape_callback=None, timeout=60, max_threads=10):
        '''
        create max_threads threads to download html to realize parallel
        :param url:
        :param delay:
        :param user_agent:
        :param proxies:
        :param num_tries:
        :param cache:
        :param scrape_callback:
        :param timeout:
        :param max_threads:
        :return:
        '''
        crawl_queue = MongdbQueue()
        crawl_queue.clear()
        crawl_queue.push(url)
        d = download.Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies,
                                num_tries=num_tries, timeout=timeout)
    
        def process_queue():
            '''
            every thread exceed this code to create a download operation
            :return:
            '''
            while True:
                try:
                    current_url = crawl_queue.pop()
                except IndexError:
                    break
                else:
                    html = d(current_url)
                    if scrape_callback:
                        try:
                            links = scrape_callback(current_url, html) or []
                        except Exception as e:
                            print "error in callback for: {}: {}".format(current_url, e)
                        else:
                            for link in links:
                                link = normalize(url, link)
                                crawl_queue.push(link)
                    crawl_queue.complete(current_url)
        # the thread pool
        threads = []
        while threads or crawl_queue:
            # remove the dead thread
            for thread in threads:
                if not thread.is_alive():
                    threads.remove(thread)
            # start a new thread
            while len(threads) < max_threads and crawl_queue.peek():
                thread = threading.Thread(target=process_queue)
                thread.setDaemon(True)
                thread.start()
                threads.append(thread)
            # all threads have been processed
            # sleep temporarily so cpu can focus execution elsewhere
            time.sleep(Sleep_Time)
    
    def normalize(url, link):
        link, _ = urlparse.urldefrag(link)
        return urlparse.urljoin(url, link)
    # if __name__ == "__main__":
    #     scape_callback = AlexaCallback.AlexaCallback()
    #     cache = MongoDb.MongoDb()
    #     threadscrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)
    import multiprocessing
    from ThreadsCrawler import threadscrawler
    import AlexaCallback
    from Chapter3 import MongoDb
    
    def processcrawler(arg, **kwargs):
        num_cups = multiprocessing.cpu_count()
        print "start process num is ", num_cups
        process = []
        for i in range(num_cups):
            p = multiprocessing.Process(target=threadscrawler, args=(arg, ), kwargs=kwargs)
            p.start()
            process.append(p)
        for p in process:
            p.join()
    
    if __name__ == "__main__":
        scape_callback = AlexaCallback.AlexaCallback()
        cache = MongoDb.MongoDb()
        processcrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)

    四、总结

      为了能够提高下载大量页面的速度,我们采用了多线程和多进程的方式。在一定的范围内,提高线程和进程数,能够明显的提高我们的下载速度,但是一旦超过某一个度的时候,就不会提升反而下降,因为线程之间的切换,会带来大量的能量损耗。

  • 相关阅读:
    mybatis由浅入深day01_6SqlMapConfig.xml(6.2settings全局参数配置_6.3typeAliases(类型别名)_6.4typeHandlers(类型处理器)_6.5mappers(映射配置))
    mybatis由浅入深day01_5.3 Mapper动态代理方法
    mybatis由浅入深day01_5mybatis开发dao的方法(5.1SqlSession使用范围_5.2原始dao开发方法)
    mybatis由浅入深day01_ 4.11总结(parameterType_resultType_#{}和${}_selectOne和selectList_mybatis和hibernate本质区别和应用场景)
    mybatis由浅入深day01_4.9删除用户_4.10更新用户
    mybatis由浅入深day01_4.7根据用户名称模糊查询用户信息_4.8添加用户((非)自增主键返回)
    mybatis由浅入深day01_4入门程序_4.6根据用户id(主键)查询用户信息
    mybatis由浅入深day01_3mybatis框架介绍
    mybatis由浅入深day01_1课程安排_2对原生态jdbc程序中问题总结
    工作流JBPM_day02:3-预定义的活动1_4-预定义的活动2+在图片上高亮显示正在执行的上活动
  • 原文地址:https://www.cnblogs.com/navysummer/p/9939755.html
Copyright © 2011-2022 走看看