zoukankan      html  css  js  c++  java
  • python网络爬虫(10)分布式爬虫爬取静态数据

    目的意义

    爬虫应该能够快速高效的完成数据爬取和分析任务。使用多个进程协同完成一个任务,提高了数据爬取的效率。

    以百度百科的一条为起点,抓取百度百科2000左右词条数据。

    说明

    参阅模仿了:https://book.douban.com/subject/27061630/。

    作者说是简单的分布式爬虫(hh),在书中有详细的说明和注解。

    这里只是补漏和梳理。

    因为进程传递参数的问题,搞了几天还是放弃了在WIndows上跑,换用了Linux。

    又因为各种各样的问题,弃用CentOS(它确实是安全可靠的,但是...我不会装QQ,输入法等),换用了软件容易安装的Ubuntu。然后才装了Eclipse等各种软件后,才开始多进程的调试。

    构造

    主节点和从节点的方案实现信息爬取。结构应该让各个节点高效工作。

    从节点:

    爬虫爬取速度受到网络延时的影响和网页信息解析的影响比较严重,所以使用多个从节点用来专门负责下载网页信息,解析网页信息。

    则分为三个文件,爬取文件,下载网页文件,解析网页文件。

    爬取文件接收来自主节点发送来的网页地址。然后调用下载网页文件并完成解析,将处理好的数据发送给主节点。

    主节点:

    主节点负责发送给从节点网页地址,并接收来自从节点的解析后的网页信息,将网页信息存储下来。

    主节点任务分为分发网址,接收从节点的信息,存储网页三部分。在代码里,他建立了三个进程,来分别实现。

    主节点任务中,存储信息,定义一套存储信息的方法。分发网址,定义一套分发网址过程中可能用到的方法。主文件中,设立三个函数,建立三个进程。

    主节点设计

    主节点的三个任务,分成三个进程,三个进程(分发网址,数据接收,数据存储),做一个类。

    数据接收与分发网址,需要分布式进程。分布式进程需要使用队列Queue。这里一定是multiprocessing中的导入的队列。网址分发、数据接收分别使用一个队列。

    注册,设定地址,秘钥,完成初始化过程,将url_q,result_q分别注册到网络中。

    然后设立分发任务,传递队列给分发任务函数。分发任务使用url_q队列完成数据的发送。使用conn_q接收了新的网址,并进行存储,再次分发到url_q上。

    数据接收任务,完成了数据的接收过程,接收以后需要及时将数据存储,在这里使用了两个队列conn_q,放置接收数据中的地址信息,store_q,放置接收数据中的网页信息。

    数据存储任务,接收数据接收任务中的store_q队列信息,及时写入到磁盘中。

    所有涉及到的文件如下:

    NodeManager.py

    import time
    #import sys
    #sys.path.append('/home')#if needed ,add path as package
    from UrlManager import UrlManager
    from multiprocessing import Process,Queue
    from multiprocessing.managers import BaseManager
    from DataOutput import DataOutput
    
    class NodeManager():
        def start_manager(self,url_q,result_q):
            BaseManager.register('get_task_queue', callable=lambda:url_q)
            BaseManager.register('get_result_queue',callable=lambda:result_q)
            manager=BaseManager(address=('127.0.0.1',8001),authkey='baike'.encode('utf-8'))
            return manager
        
        def url_manager_proc(self,url_q,conn_q,root_url):
            #send url to queue and receive new urls for storing to object
            url_manager=UrlManager()
            url_manager.add_new_url(root_url)
            while True:
                while(url_manager.has_new_url()):
                    new_url=url_manager.get_new_url()
                    url_q.put(new_url)
                    print('old url size:'+str(url_manager.old_url_size()))
                    if(url_manager.old_url_size()>2000):
                        url_q.put('end')
                        url_manager.save_process('new_urls.txt',url_manager.new_urls)
                        url_manager.save_process('old_urls.txt',url_manager.old_urls)
                        print('finish url_manager_proc')
                        return
                try:
                    urls=conn_q.get()
                    url_manager.add_new_urls(urls)
                    print('get:'+urls)
                except Exception:
                    time.sleep(0.1)
            
        
        def result_solve_proc(self,result_q,conn_q,store_q):
            while True:
                if not result_q.empty():
                    content=result_q.get(True)
                    if content['new_urls']=='end':
                        print('finish result_solve_proc')
                        store_q.put('end')
                        return
                    conn_q.put(content["new_urls"])
                    store_q.put(content["data"])
                else:
                    time.sleep(0.1)
        
        def store_proc(self,store_q):
            output=DataOutput()
            while True:
                if not store_q.empty():
                    data=store_q.get()
                    if data =='end':
                        print('finish store_proc')
                        output.output_end(output.path)
                        return
                    output.store_data(data)
    
    
    
    if __name__=='__main__':
        url_q=Queue()#send url to workers
        result_q=Queue()#receive url's analytical data from works
        store_q=Queue()#analytical data which is fresh is used for storing to disk for further extract
        conn_q=Queue()#urls which is fresh are used for storing to object for further extract
        nodeObject=NodeManager()
        manager=nodeObject.start_manager(url_q,result_q)
        
        root_url='https://baike.baidu.com/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB/5162711?fr=aladdin'
        url_manager=Process(target=nodeObject.url_manager_proc,args=(url_q,conn_q,root_url,))
        result_solve=Process(target=nodeObject.result_solve_proc,args=(result_q,conn_q,store_q,))
        store=Process(target=nodeObject.store_proc,args=(store_q,))
        url_manager.start()
        result_solve.start()
        store.start()
        manager.get_server().serve_forever()
    

     UrlManager.py

    import hashlib
    import pickle
    class UrlManager():
        def __init__(self):
            self.old_urls=self.load_process('new_urls.txt')
            self.new_urls=self.load_process('old_urls.txt')
            pass
        
        def has_new_url(self):
            return self.new_url_size()!=0
        
        def new_url_size(self):
            return len(self.new_urls)
        
        def old_url_size(self):
            return len(self.old_urls)
        
        def get_new_url(self):
            new_url=self.new_urls.pop()
            m=hashlib.md5()
            m.update(new_url.encode("utf8"))
            self.old_urls.add(m.hexdigest()[8:-8])
            return new_url
        
        def add_new_url(self,url):
            if url is None:
                return
            m=hashlib.md5()
            m.update(url.encode('utf-8'))       
            url_md5=m.hexdigest()[8:-8]
            if url not in self.new_urls and url_md5 not in self.old_urls:
                self.new_urls.add(url)
        
        def add_new_urls(self,urls):
            if urls is None or len(urls) == 0:
                return
            for url in urls:
                self.add_new_url(url)
            pass
        
        def save_process(self,path,data):
            with open(path,'wb') as f:
                pickle.dump(data,f)
        
        def load_process(self,path):
            print('loading..')
            try:
                with open(path,'rb') as f:
                    tmp=pickle.load(f)
                    return tmp
            except:
                print('loading error maybe loading file not exist and will create it:'+path)
            newSet=set()
            self.save_process(path, newSet)
            return newSet
    

     DataOutput.py

    import codecs
    from os.path import os
    class DataOutput(object):
        def __init__(self):
            self.path='baike.html'
            self.output_head(self.path)
            self.datas=[]
        
        def store_data(self,data):
            if data is None:
                return
            self.datas.append(data)
            self.output_html(self.path,data)
        
        def output_head(self,path):
            if os.path.exists(path):
                return
            fout=codecs.open('baike.html', 'w', encoding='utf-8')
            fout.write("<html>")
            fout.write("<head><meta charset='urf-8'></head>")
            fout.write("<body>")
            fout.write("<table border='1' width=1800  style='word-break:break-all;word-wrap:break-word;'>")
            fout.write("<tr>")
            fout.write("<td width='20'>序号</td>")
            fout.write("<td width='300'>URL</td>")
            fout.write("<td width='100'>标题</td>")
            fout.write("<td width='1200'>释义</td>")
            fout.write("</tr>")   
            fout.close()
            
        def output_end(self,path):
            fout=codecs.open(path, 'a', encoding='utf-8')
            fout.write("</table>")  
            fout.write("</body>")      
            fout.write("</html>")
            fout.close()       
            
        def output_html(self,path,data):
            fout=codecs.open(path, 'a', encoding='utf-8')    
            fout.write("<tr>")
            fout.write("<td>%s</td>"%str(len(self.datas)))
            fout.write("<td><a href=%s>%s</a></td>"%(data['url'],data['url']))
            fout.write("<td>%s</td>"%data['title'])
            fout.write("<td>%s</td>"%data['summary'])
            fout.write("</tr>")
            fout.close()
    

    从节点设计

    从节点首先是连接到指定地址并验证秘钥。连接后获取url_q、result_q。

    从url_q中获取发来的地址,调用HTML下载器下载数据,调动HTML解析器解析数据,然后把结果放到result_q队列上。

    代码如下

    SpiderWork.py

    from multiprocessing.managers import BaseManager
    from HtmlDownloader import HtmlDownloader
    from HtmlParser import HtmlParser
    class SpiderWork():
        def __init__(self):
            BaseManager.register('get_task_queue')
            BaseManager.register('get_result_queue')
            server_addr='127.0.0.1'
            print('connect'+server_addr)
            self.m=BaseManager(address=(server_addr,8001),authkey='baike'.encode('utf-8'))
            self.m.connect()
            self.task=self.m.get_task_queue()
            self.result=self.m.get_result_queue()
            print(self.task)
            self.downloader=HtmlDownloader()
            self.parser=HtmlParser()
            print('initial finish')
        
        def crawl(self):
            while (True):
                try:
                    if not self.task.empty():
                        url=self.task.get()
                        if url == 'end':
                            print('stop spider1')
                            self.result.put({'new_urls':'end','data':'end'})
                            return
                        print('working:'+url)#url
                        content=self.downloader.download(url)
                        new_urls,data=self.parser.parser(url,content)
                        self.result.put({"new_urls":new_urls,"data":data})
                except Exception as e:
                    print(e,url)
                    
    if __name__=="__main__":
        spider=SpiderWork()
        spider.crawl()
    

     HtmlDownloader.py

    import requests
    import chardet
    class HtmlDownloader(object):
        def download(self,url):
            if url is None:
                return None
            user_agent='Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 SE 2.X MetaSr 1.0'
            headers={'User-Agent':user_agent}
            r=requests.get(url,headers=headers)
            if r.status_code is 200:
                r.encoding=chardet.detect(r.content)['encoding']
                return r.text
            return None
    

     HtmlParser.py

    import re
    from urllib import parse
    from bs4 import BeautifulSoup
    class HtmlParser(object):
        def parser(self,page_url,html_cont):
            if page_url is None or html_cont is None:
                return
            
            soup=BeautifulSoup(html_cont,'lxml')
            
            new_urls=self.getNewUrls(page_url,soup)
            
            new_data=self.getNewData(page_url,soup)
            return new_urls,new_data
        
        def getNewUrls(self,page_url,soup):
            new_urls=set()
            links=soup.find_all('a',href=re.compile(r'/item/.*'))
            for link in links:
                new_url=link['href']
                new_full_url=parse.urljoin(page_url,new_url)
                new_urls.add(new_full_url)
            return new_urls
    
        def getNewData(self,page_url,soup):
            data={}
            data['url']=page_url
            title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
            data['title']=title.get_text()
            summary = soup.find('div',class_='lemma-summary')
            #获取到tag中包含的所有文版内容包括子孙tag中的内容,并将结果作为Unicode字符串返回
            data['summary']=summary.get_text()
            return data
    

    结果

    建立.sh文件如下:

    #!/bin/bash
    rm -rf log/*
    rm -rf baike.html
    rm -rf new_urls.txt
    rm -rf old_urls.txt
    python3 control/NodeManager.py &> log/control.log & for ((i=1;i<=10;i++)) do python3 spider/SpiderWork.py &>log/spider$i.log & done

    启动主节点,然后启动10个从节点。将它们所产生的日志信息记录到log/下,并都是在后台运行的进程。

    两分钟左右,完成约1900条的数据获取

    可能用到的命令:

    kill -9 $(ps aux | grep python | awk '{print $2}')

    !kill

    可能用到的软件:

    Eclipse的pydev进程调试。

    最后

    这代码里面真的有好多的细节文件,序列化操作与存储,md5的压缩方案等,都是值得思考的。

  • 相关阅读:
    Java实现 蓝桥杯VIP 基础练习 完美的代价
    Java实现 蓝桥杯VIP基础练习 矩形面积交
    Java实现 蓝桥杯VIP 基础练习 完美的代价
    Java实现 蓝桥杯 蓝桥杯VIP 基础练习 数的读法
    Java实现 蓝桥杯 蓝桥杯VIP 基础练习 数的读法
    Java实现 蓝桥杯 蓝桥杯VIP 基础练习 数的读法
    Java实现 蓝桥杯 蓝桥杯VIP 基础练习 数的读法
    Java实现 蓝桥杯 蓝桥杯VIP 基础练习 数的读法
    核心思想:想清楚自己创业的目的(如果你没有自信提供一种更好的产品或服务,那就别做了,比如IM 电商 搜索)
    在Linux中如何利用backtrace信息解决问题
  • 原文地址:https://www.cnblogs.com/bai2018/p/11098451.html
Copyright © 2011-2022 走看看