zoukankan      html  css  js  c++  java
  • (五)爬虫之并发下载

      当有大量url需要下载时,串行爬取速度较慢,需要使用多线程、多进程进行爬取,以及部署分布式爬虫等

    1.多线程爬虫 

        下面代码中三个线程时,爬取61个url,花费16-25s;五个线程时,花费41-55s。(线程间的切换也消耗时间)

    #coding:utf-8
    import threading
    import multiprocessing
    import requests
    import csv
    import time
    from datetime import datetime
    import urlparse
    
    #同一个域名的下载延迟
    class Throttle(object):
        def __init__(self,delay):
            self.delay = delay
            self.domains={}
    
        def wait(self,url):
            domain = urlparse.urlparse(url).netloc  #提取网址的域名
            last_accessed = self.domains.get(domain)
            if self.delay>0 and last_accessed!=None:
                sleep_secs = self.delay-(datetime.now()-last_accessed).seconds
                if sleep_secs>0:
                    time.sleep(sleep_secs)
            self.domains[domain]=datetime.now()
    
    #网页下载
    def download(url,user_agent=None,proxies=None,num_retries=3):
        response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies)
        if response.status_code and 500<=response.status_code<600:  # 出现服务器端错误时重试三次
            if num_retries > 0:
                response = download(url,user_agent,proxies,num_retries-1)
        return response
    

    #读取需要爬取的网址文件
    def get_urls(): url_lists=[] with open('urls.csv','r') as f: reader = csv.reader(f) for row in reader: url_lists.append(row[0].replace(' ','')) return url_lists def thread_crawler(max_threads=5): urls = get_urls() def process_url(): while True: try: url = urls.pop() except IndexError as e: print e break else: throttle = Throttle(3) throttle.wait(url) response=download(url) print url, response.status_code threads=[] while urls or threads: for thread in threads: if not thread.is_alive(): threads.remove(thread) while len(threads)<max_threads and urls: thread = threading.Thread(target=process_url()) thread.setDaemon(True) thread.start() threads.append(thread) #time.sleep(2)
    if __name__=='__main__':
    thread_crawler()

    2.多进程爬虫

      2.1 多进程

         下面代码中两个进程,爬取61个url,平均花费20-30s

    #coding:utf-8
    
    import threading
    import multiprocessing
    import requests
    import csv
    import time
    from datetime import datetime
    import urlparse
    
    #同一个域名的下载延迟
    class Throttle(object):
        def __init__(self,delay):
            self.delay = delay
            self.domains={}
    
        def wait(self,url):
            domain = urlparse.urlparse(url).netloc  #提取网址的域名
            last_accessed = self.domains.get(domain)
            if self.delay>0 and last_accessed!=None:
                sleep_secs = self.delay-(datetime.now()-last_accessed).seconds
                if sleep_secs>0:
                    time.sleep(sleep_secs)
            self.domains[domain]=datetime.now()
    
    #网页下载
    def download(url,user_agent=None,proxies=None,num_retries=3):
        response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies)
        if response.status_code and 500<=response.status_code<600:  # 出现服务器端错误时重试三次
            if num_retries > 0:
                response = download(url,user_agent,proxies,num_retries-1)
        return response
    
    def get_urls():
        url_lists=[]
        with open('urls.csv','r') as f:
            reader = csv.reader(f)
            for row in reader:
                url_lists.append(row[0].replace(' ',''))
        return url_lists
    
    def process_url(url_queue):
        while url_queue.qsize()>0:
            try:
                url = url_queue.get()
            except Exception as e:
                print e
            else:
                throttle = Throttle(3)
                throttle.wait(url)
                response=download(url)
                print url, response.status_code
    
    def process_crawler():
        num_cpus = multiprocessing.cpu_count()
        q = multiprocessing.Queue()
        urls = get_urls()
        for url in urls:
            q.put(url)
        start = time.time()
        print "开始时间:%s"%start
        processes=[]
        for i in range(num_cpus):
            process = multiprocessing.Process(target=process_url,args=(q,))
            process.start()
            processes.append(process)
        for p in processes:
            p.join()
        end = time.time()
        print "结束时间:%s" %end
        print "爬取%s个url,消耗时间:%s"%(len(urls),int(end-start))
    if __name__ == '__main__':  # windows系统下运行必须加__name__=="__main__",否则多进程报错
        process_crawler()

      2.2 多进程加多线程

        下面代码中两个进程,每个进程中又有三个线程,下载61个url,平均花费10-15s

    #coding:utf-8
    
    import threading
    import multiprocessing
    import requests
    import csv
    import time
    from datetime import datetime
    import urlparse
    
    #同一个域名的下载延迟
    class Throttle(object):
        def __init__(self,delay):
            self.delay = delay
            self.domains={}
    
        def wait(self,url):
            domain = urlparse.urlparse(url).netloc  #提取网址的域名
            last_accessed = self.domains.get(domain)
            if self.delay>0 and last_accessed!=None:
                sleep_secs = self.delay-(datetime.now()-last_accessed).seconds
                if sleep_secs>0:
                    time.sleep(sleep_secs)
            self.domains[domain]=datetime.now()
    
    #网页下载
    def download(url,user_agent=None,proxies=None,num_retries=3):
        response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies)
        if response.status_code and 500<=response.status_code<600:  # 出现服务器端错误时重试三次
            if num_retries > 0:
                response = download(url,user_agent,proxies,num_retries-1)
        return response
    
    def get_urls():
        url_lists=[]
        with open('urls.csv','r') as f:
            reader = csv.reader(f)
            for row in reader:
                url_lists.append(row[0].replace(' ',''))
        return url_lists
    
    def thread_crawler(url_queue,max_threads=5):
    
        def process_url():
            while url_queue.qsize()>0:
                try:
                    url = url_queue.get_nowait() #当queue为空时,不等待
                    #print url_queue.qsize()
                except Exception as e:
                    print e
                    break
                else:
                    throttle = Throttle(3)
                    throttle.wait(url)
                    response=download(url)
                    print url, response.status_code
        threads=[]
        while url_queue.qsize()>0 or threads:
            for thread in threads:
                if not thread.is_alive():
                    threads.remove(thread)
            while len(threads)<max_threads and url_queue.qsize()>0:
                thread = threading.Thread(target=process_url)
                thread.setDaemon(True)
                thread.start()
                threads.append(thread)
            #time.sleep(2)
    
    def process_crawler():
        num_cpus = multiprocessing.cpu_count()
        q = multiprocessing.Queue()
        for url in get_urls():
            q.put(url)
        start = time.time()
        print "开始时间:%s" % start
        processes=[]
        for i in range(num_cpus):
            process = multiprocessing.Process(target=thread_crawler,args=(q,3))
            process.start()
            processes.append(process)
        for p in processes:
            p.join()
        end = time.time()
        print "结束时间:%s" % end
        print "爬取61个url,消耗时间:%s" % (int(end - start))
    if __name__ == '__main__':  # windows系统下运行必须加__name__=="__main__",否则多进程报错
        process_crawler()    

      2.3 基于MongoDB的url队列(分布式?)

        可以将需要爬取的url队列部署在MongoDB上,多台电脑可以从中获取url进行爬取,增大并发数,加速爬取。

        采用多线程,MongoDB储存url队列,代码如下:

    #coding:utf-8
    
    import threading
    import multiprocessing
    import requests
    import csv
    import time
    import urlparse
    from datetime import datetime, timedelta
    from pymongo import MongoClient,errors
    
    class MongQueue(object):
        OUTSTANDING, PROCESSING, COMPLETE=(1,2,3)  #url的三种状态:待下载,下载中,已下载
    
        def __init__(self,client=None,timeout=300):
            self.client = MongoClient('127.0.0.1',27017) if client is None else client
            self.db = self.client.urls_db
            self.timeout = timeout
    
        def isEmpty(self):
            record = self.db.crawl_queue.find_one({'status':{'$ne':self.COMPLETE}})  #$ne: not equal
            if not record:
                return True
            else:
                return False
    
        def push(self,url):
            try:
                self.db.crawl_queue.insert({'_id':url,'status':self.OUTSTANDING})
            except errors.DuplicateKeyError as e:
                print 'the url is already in the queue!'
                pass
    
        def pop(self):
    
            record = self.db.crawl_queue.find_and_modify(
                query={'status':self.OUTSTANDING},
                update={'$set':{'status':self.PROCESSING,'timestamp':datetime.now()}}
            )
            if record:
                # print record['_id'],record['status']
                return record['_id']
            else:
                self.repair()
                raise KeyError()
    
        def complete(self,url):
            self.db.crawl_queue.update({'_id':url},
                                       {'$set':{'status':self.COMPLETE}})
    
        def repair(self):
            record = self.db.crawl_queue.find_and_modify(
                query={'timestamp':{'$lt':datetime.now()-timedelta(seconds=self.timeout)},
                       'status':{'$ne':self.COMPLETE}},
                update={'$set':{'status':self.OUTSTANDING}}
            )
            if record:
                print 'Released:%s'%record['_id']
        def clear(self):
            self.db.crawl_queue.drop()
    
    #同一个域名的下载延迟
    class Throttle(object):
        def __init__(self,delay):
            self.delay = delay
            self.domains={}
    
        def wait(self,url):
            domain = urlparse.urlparse(url).netloc  #提取网址的域名
            last_accessed = self.domains.get(domain)
            if self.delay>0 and last_accessed!=None:
                sleep_secs = self.delay-(datetime.now()-last_accessed).seconds
                if sleep_secs>0:
                    time.sleep(sleep_secs)
            self.domains[domain]=datetime.now()
    
    #网页下载
    def download(url,user_agent=None,proxies=None,num_retries=3):
        response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies)
        if response.status_code and 500<=response.status_code<600:  # 出现服务器端错误时重试三次
            if num_retries > 0:
                response = download(url,user_agent,proxies,num_retries-1)
        return response
    
    def get_urls():
        url_lists=[]
        with open('urls.csv','r') as f:
            reader = csv.reader(f)
            for row in reader:
                url_lists.append(row[0].replace(' ',''))
        return url_lists
    
    
    def thread_crawler(url_queue, max_threads=5):
    
        def process_url():
            while not url_queue.isEmpty():
                try:
                    url = url_queue.pop()
                except KeyError as e:
                    print e
                    break
                else:
                    throttle = Throttle(3)
                    throttle.wait(url)
                    response=download(url)
                    url_queue.complete(url)
                    print url, response.status_code
        threads=[]
        start = time.time()
        print "开始时间:%s" % start
        while (not url_queue.isEmpty()) or threads:
            for thread in threads:
                if not thread.is_alive():
                    threads.remove(thread)
            while len(threads)<max_threads and (not url_queue.isEmpty()):
                thread = threading.Thread(target=process_url)
                thread.setDaemon(True)
                thread.start()
                threads.append(thread)
            #time.sleep(2)
        end = time.time()
        print "结束时间:%s" % end
        print "爬取61个url,消耗时间:%s" % (int(end - start))
    
    if __name__ == '__main__':  # windows系统下运行必须加__name__=="__main__",否则多进程报错
        url_queue = MongQueue()
        url_queue.clear()
        for url in get_urls():
            url_queue.push(url)
        thread_crawler(url_queue)
    View Code

        采用多进程和多线程,MongoDB储存url队列,代码如下:

    #coding:utf-8
    
    import threading
    import multiprocessing
    import requests
    import csv
    import time
    import urlparse
    from datetime import datetime, timedelta
    from pymongo import MongoClient,errors
    
    class MongQueue(object):
        OUTSTANDING, PROCESSING, COMPLETE=(1,2,3)  #url的三种状态:待下载,下载中,已下载
    
        def __init__(self,client=None,timeout=300):
            self.client = MongoClient('127.0.0.1',27017) if client is None else client
            self.db = self.client.urls_db
            self.timeout = timeout
    
        def isEmpty(self):
            record = self.db.crawl_queue.find_one({'status':{'$ne':self.COMPLETE}})  #$ne: not equal
            if not record:
                return True
            else:
                return False
    
        def push(self,url):
            try:
                self.db.crawl_queue.insert({'_id':url,'status':self.OUTSTANDING})
            except errors.DuplicateKeyError as e:
                print 'the url is already in the queue!'
                pass
    
        def pop(self):
    
            record = self.db.crawl_queue.find_and_modify(
                query={'status':self.OUTSTANDING},
                update={'$set':{'status':self.PROCESSING,'timestamp':datetime.now()}}
            )
            if record:
                # print record['_id'],record['status']
                return record['_id']
            else:
                self.repair()
                raise KeyError()
    
        def complete(self,url):
            self.db.crawl_queue.update({'_id':url},
                                       {'$set':{'status':self.COMPLETE}})
    
        def repair(self):
            record = self.db.crawl_queue.find_and_modify(
                query={'timestamp':{'$lt':datetime.now()-timedelta(seconds=self.timeout)},
                       'status':{'$ne':self.COMPLETE}},
                update={'$set':{'status':self.OUTSTANDING}}
            )
            if record:
                print 'Released:%s'%record['_id']
        def clear(self):
            self.db.crawl_queue.drop()
    
    #同一个域名的下载延迟
    class Throttle(object):
        def __init__(self,delay):
            self.delay = delay
            self.domains={}
    
        def wait(self,url):
            domain = urlparse.urlparse(url).netloc  #提取网址的域名
            last_accessed = self.domains.get(domain)
            if self.delay>0 and last_accessed!=None:
                sleep_secs = self.delay-(datetime.now()-last_accessed).seconds
                if sleep_secs>0:
                    time.sleep(sleep_secs)
            self.domains[domain]=datetime.now()
    
    #网页下载
    def download(url,user_agent=None,proxies=None,num_retries=3):
        response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies)
        if response.status_code and 500<=response.status_code<600:  # 出现服务器端错误时重试三次
            if num_retries > 0:
                response = download(url,user_agent,proxies,num_retries-1)
        return response
    
    def get_urls():
        url_lists=[]
        with open('urls.csv','r') as f:
            reader = csv.reader(f)
            for row in reader:
                url_lists.append(row[0].replace(' ',''))
        return url_lists
    
    
    def thread_crawler( max_threads=5):
        url_queue = MongQueue()
        def process_url():
            while not url_queue.isEmpty():
                try:
                    url = url_queue.pop()
                except KeyError as e:
                    print e
                    break
                else:
                    throttle = Throttle(3)
                    throttle.wait(url)
                    response=download(url)
                    url_queue.complete(url)
                    print url, response.status_code
        threads=[]
     
        while (not url_queue.isEmpty()) or threads:
            for thread in threads:
                if not thread.is_alive():
                    threads.remove(thread)
            while len(threads)<max_threads and (not url_queue.isEmpty()):
                thread = threading.Thread(target=process_url)
                thread.setDaemon(True)
                thread.start()
                threads.append(thread)
            #time.sleep(2)
    
    def process_crawler():
        num_cpus = multiprocessing.cpu_count()
        start = time.time()
        print "开始时间:%s" % start
        processes=[]
        for i in range(num_cpus):
            process = multiprocessing.Process(target=thread_crawler)
            process.start()
            processes.append(process)
        for p in processes:
            p.join()
        end = time.time()
        print "结束时间:%s" % end
        print "爬取61个url,消耗时间:%s" % (int(end - start))
    if __name__ == '__main__':  # windows系统下运行必须加__name__=="__main__",否则多进程报错
          process_crawler()
    View Code
  • 相关阅读:
    RAP开发入门-主题更换
    RAP开发入门-开发笔记-bug记录
    RAP开发入门-运行过程简析(三)
    Redis入门笔记-redis内部数据结构(01)
    JAVA基础-子类继承父类实例化对象过程
    RAP开发入门-开发笔记
    RAP开发入门-运行第一个HelloWorld(二)
    android maven eclipse
    字符与编码(摘录)
    Python 学习lesson 1
  • 原文地址:https://www.cnblogs.com/silence-cho/p/10176748.html
Copyright © 2011-2022 走看看