zoukankan      html  css  js  c++  java
  • 爬虫中的并发下载

    以下内容是《用Python写网络爬虫》的读书笔记:

    一、串行爬虫

      我们之前使用的爬虫方式,都是一个页面接着一个页面下载,也就是使用串行的方式进行爬虫。但是显然这种方式下载的速度是非常的慢的,特别是当我们需要下载大量页面的时候这个问题就会变得更加的突出。所以本节内,就学习如何进行多线程和多进程的并行爬虫。

    二、多线程爬虫

      我们在使用多线程进行爬虫的时候因为同时下载的页面过多,可能出现服务器奔溃的情况,甚至会可能出现ip被封的情况。因此需要为每个ip设置一个爬取同一个域名下的不同页面最小的执行间隔时间。

      为了获取实验所需的数据,首先我们要到指定的网站下载zip文件,将其解压,然后获取保存在其中的域名。以下是具体代码:

    from zipfile import ZipFile
    from StringIO import StringIO
    import csv
    import sys
    from crawler import crawler
    from Chapter3 import LinkCrawler
    from Chapter3 import MongoDb
    
    class AlexaCallback:
        def __init__(self, max_length=1000):
            '''
            init the seed_url and max_length
            :param max_length: we can get the max_length website at most
            '''
            self.seed_url = "http://s3.amazonaws.com/alexa-static/top-1m.csv.zip"
            self.max_length = max_length
    
        def __call__(self, url, html):
            '''
            get at most max_length website, and return their urls
            :param url:
            :param html:
            :return: urls which we want
            '''
            if url == self.seed_url:
                urls = []
                with ZipFile(StringIO(html)) as zf:
                    csv_name = zf.namelist()[0]
                    for _, website in csv.reader(zf.open(csv_name)):
                        urls.append("http://" + website)
                        if len(urls) == self.max_length:
                            break
                return urls

       有了获取域名的函数之后,我们就可以对一个进程开启多个线程,实现并发。我们需要创建一个线程池,在没有达到允许的最大线程或者需要下载的连接不为空的时候,我们就需要创建一个新的线程,在这个过程中需要去除线程池中死去的线程。所有的线程都执行相同的代码。

    from Chapter3 import download
    import threading
    import time
    import urlparse
    import AlexaCallback
    from Chapter3 import MongoDb
    
    Sleep_Time = 1
    def threadscrawler(url, delay=5, user_agent="wuyanjing", proxies=None, num_tries=2,
                    cache=None, scrape_callback=None, timeout=60, max_threads=10):
        '''
        create max_threads threads to download html to realize parallel
        :param url:
        :param delay:
        :param user_agent:
        :param proxies:
        :param num_tries:
        :param cache:
        :param scrape_callback:
        :param timeout:
        :param max_threads:
        :return:
        '''
        crawl_queue = [url]
        seen = set(crawl_queue)
        d = download.Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies,
                                num_tries=num_tries, timeout=timeout)
    
        def process_queue():
            '''
            every thread exceed this code to create a download operation
            :return:
            '''
            while True:
                try:
                    current_url = crawl_queue.pop()
                except IndexError:
                    break
                else:
                    html = d(current_url)
                    if scrape_callback:
                        try:
                            links = scrape_callback(current_url, html) or []
                        except Exception as e:
                            print "error in callback for: {}: {}".format(current_url, e)
                        else:
                            for link in links:
                                link = normalize(url, link)
                                if link not in seen:
                                    seen.add(link)
                                    crawl_queue.append(link)
        # the thread pool
        threads = []
        while threads or crawl_queue:
            # remove the dead thread
            for thread in threads:
                if not thread.is_alive():
                    threads.remove(thread)
            # start a new thread
            while len(threads) < max_threads and crawl_queue:
                thread = threading.Thread(target=process_queue)
                thread.setDaemon(True)
                thread.start()
                threads.append(thread)
            # all threads have been processed
            # sleep temporarily so cpu can focus execution elsewhere
            time.sleep(Sleep_Time)
    
    def normalize(url, link):
        link, _ = urlparse.urldefrag(link)
        return urlparse.urljoin(url, link)
    if __name__ =="__main__":
        scape_callback = AlexaCallback.AlexaCallback()
        cache = MongoDb.MongoDb()
        threadscrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)

     三、多进程爬虫

      多线程爬虫只能由一个进程进行处理,如果我们的电脑有多个cpu的时候,为了使下载的速度更快,我们可以使用多进程爬虫。要使用多进程爬虫,爬虫队列就不能在内存中,因为这样的话,其他进程就没有办法访问了。我们需要将爬虫队列放到Mongdb中。这样不同进程就能实现同步。

      首先我们先要创建Mongdbqueue。这个队列能够实现不同进程之间url下载的协调,同步。

    from pymongo import errors, MongoClient
    from datetime import datetime, timedelta
    class MongdbQueue:
        # init the three state
        Outstanding, Proceeding, Complete = range(3)
    
        def __init__(self, client=None, timeout=300):
            self.client = MongoClient() if client is None else client
            self.db = self.client.cache
            self.timeout = timeout
    
        def __nonzero__(self):
            '''
            if there are more objects return true
            :return:
            '''
            record = self.db.crawl_queue.find_one({'status': {'$ne': self.Complete}})
            return True if record else False
    
        def push(self, url):
            '''
            insert url if it not exist
            :param url:
            :return:
            '''
            try:
                self.db.crawl_queue.insert({'_id': url, 'status': self.Outstanding})
            except errors.DuplicateKeyError as e:
                pass
    
        def pop(self):
            '''
            change the process which status is outstanding to proceeding, if not find a record raise
            key error
            :return:
            '''
            record = self.db.crawl_queue.find_and_modify(query={'status': self.Outstanding},
                                                         update={'$set': {'status': self.Proceeding,
                                                                          'timestamp': datetime.now()}})
            if record:
                return record['_id']
            else:
                self.repair()
                raise KeyError()
    
        def repair(self):
            '''
            release stalled jobs
            :return: the url of the stalled jobs
            '''
            record = self.db.crawl_queue.find_and_modify(query={'timestamp': {'$lt': datetime.now()-timedelta(self.timeout)
                                                                              }, 'status': self.Complete},
                                                         update={'$set': {'$ne': self.Outstanding}})
            if record:
                print "release: ", record['_id']
    
        def complete(self, url):
            '''
            change the status to complete if the process has finished
            :return:
            '''
            self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.Complete}})
    
        def clear(self):
            self.db.crawl_queue.drop()
    
        def peek(self):
            record = self.db.crawl_queue.find_one({'status': self.Outstanding})
            if record:
                return record['_id']

      然后就要用新建的MongodbQueue来重写threadcrawler

    from Chapter3 import download
    import threading
    import time
    import urlparse
    import AlexaCallback
    from Chapter3 import MongoDb
    from MongdbQueue import MongdbQueue
    
    Sleep_Time = 1
    def threadscrawler(url, delay=5, user_agent="wuyanjing", proxies=None, num_tries=2,
                    cache=None, scrape_callback=None, timeout=60, max_threads=10):
        '''
        create max_threads threads to download html to realize parallel
        :param url:
        :param delay:
        :param user_agent:
        :param proxies:
        :param num_tries:
        :param cache:
        :param scrape_callback:
        :param timeout:
        :param max_threads:
        :return:
        '''
        crawl_queue = MongdbQueue()
        crawl_queue.clear()
        crawl_queue.push(url)
        d = download.Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies,
                                num_tries=num_tries, timeout=timeout)
    
        def process_queue():
            '''
            every thread exceed this code to create a download operation
            :return:
            '''
            while True:
                try:
                    current_url = crawl_queue.pop()
                except IndexError:
                    break
                else:
                    html = d(current_url)
                    if scrape_callback:
                        try:
                            links = scrape_callback(current_url, html) or []
                        except Exception as e:
                            print "error in callback for: {}: {}".format(current_url, e)
                        else:
                            for link in links:
                                link = normalize(url, link)
                                crawl_queue.push(link)
                    crawl_queue.complete(current_url)
        # the thread pool
        threads = []
        while threads or crawl_queue:
            # remove the dead thread
            for thread in threads:
                if not thread.is_alive():
                    threads.remove(thread)
            # start a new thread
            while len(threads) < max_threads and crawl_queue.peek():
                thread = threading.Thread(target=process_queue)
                thread.setDaemon(True)
                thread.start()
                threads.append(thread)
            # all threads have been processed
            # sleep temporarily so cpu can focus execution elsewhere
            time.sleep(Sleep_Time)
    
    def normalize(url, link):
        link, _ = urlparse.urldefrag(link)
        return urlparse.urljoin(url, link)
    # if __name__ == "__main__":
    #     scape_callback = AlexaCallback.AlexaCallback()
    #     cache = MongoDb.MongoDb()
    #     threadscrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)

      最后是多进程爬虫,我写的这个函数报错了,但是不知道是哪里出了问题,有待考证。

    import multiprocessing
    from ThreadsCrawler import threadscrawler
    import AlexaCallback
    from Chapter3 import MongoDb
    
    def processcrawler(arg, **kwargs):
        num_cups = multiprocessing.cpu_count()
        print "start process num is ", num_cups
        process = []
        for i in range(num_cups):
            p = multiprocessing.Process(target=threadscrawler, args=(arg, ), kwargs=kwargs)
            p.start()
            process.append(p)
        for p in process:
            p.join()
    
    if __name__ == "__main__":
        scape_callback = AlexaCallback.AlexaCallback()
        cache = MongoDb.MongoDb()
        processcrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)

      以下是我的出错日志:

    E:python27python.exe E:/pycharm/crawling/Chapter4/ProcessCrawler.py
    start process num is  4
    Traceback (most recent call last):
      File "E:/pycharm/crawling/Chapter4/ProcessCrawler.py", line 20, in <module>
        processcrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)
      File "E:/pycharm/crawling/Chapter4/ProcessCrawler.py", line 12, in processcrawler
        p.start()
      File "E:python27libmultiprocessingprocess.py", line 130, in start
        self._popen = Popen(self)
      File "E:python27libmultiprocessingforking.py", line 277, in __init__
        dump(process_obj, to_child, HIGHEST_PROTOCOL)
      File "E:python27libmultiprocessingforking.py", line 199, in dump
        ForkingPickler(file, protocol).dump(obj)
      File "E:python27libpickle.py", line 224, in dump
        self.save(obj)
      File "E:python27libpickle.py", line 331, in save
        self.save_reduce(obj=obj, *rv)
      File "E:python27libpickle.py", line 425, in save_reduce
        save(state)
      File "E:python27libpickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "E:python27libpickle.py", line 655, in save_dict
        self._batch_setitems(obj.iteritems())
      File "E:python27libpickle.py", line 687, in _batch_setitems
        save(v)
      File "E:python27libpickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "E:python27libpickle.py", line 655, in save_dict
        self._batch_setitems(obj.iteritems())
      File "E:python27libpickle.py", line 687, in _batch_setitems
        save(v)
      File "E:python27libpickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "E:python27libpickle.py", line 731, in save_inst
        save(stuff)
      File "E:python27libpickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "E:python27libpickle.py", line 655, in save_dict
        self._batch_setitems(obj.iteritems())
      File "E:python27libpickle.py", line 687, in _batch_setitems
        save(v)
      File "E:python27libpickle.py", line 331, in save
        self.save_reduce(obj=obj, *rv)
      File "E:python27libpickle.py", line 425, in save_reduce
        save(state)
      File "E:python27libpickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "E:python27libpickle.py", line 655, in save_dict
        self._batch_setitems(obj.iteritems())
      File "E:python27libpickle.py", line 687, in _batch_setitems
        save(v)
      File "E:python27libpickle.py", line 306, in save
        rv = reduce(self.proto)
    TypeError: can't pickle thread.lock objects
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "E:python27libmultiprocessingforking.py", line 381, in main
        self = load(from_parent)
      File "E:python27libpickle.py", line 1384, in load
        return Unpickler(file).load()
      File "E:python27libpickle.py", line 864, in load
        dispatch[key](self)
      File "E:python27libpickle.py", line 886, in load_eof
        raise EOFError
    EOFError
    
    Process finished with exit code 1

    四、总结

      为了能够提高下载大量页面的速度,我们采用了多线程和多进程的方式。在一定的范围内,提高线程和进程数,能够明显的提高我们的下载速度,但是一旦超过某一个度的时候,就不会提升反而下降,因为线程之间的切换,会带来大量的能量损耗。

      

      

  • 相关阅读:
    简述location规则优先级-实现域名跳转-不同语言-终端跳转-错误页面返回首页-腾讯公益首页
    配置nginx访问控制-设置防盗链
    nginx虚拟机及热部署(在线升级)
    阿里云 OSS 网页端直传
    阿里云OSS Java 生成STS
    阿里ECS访问七牛及阿里OSS速度测试
    Maven学习总结(八)——使用Maven构建多模块项目
    关于部分手机无法搜索到5G wifi信号的解决方法
    Spring Boot 2.x
    利用 qshell qupload 批量迁移服务器上的网站图片到七牛云
  • 原文地址:https://www.cnblogs.com/whatyouknow123/p/7751514.html
Copyright © 2011-2022 走看看