zoukankan      html  css  js  c++  java
  • python网络爬虫(10)分布式爬虫爬取静态数据

    目的意义

    爬虫应该能够快速高效的完成数据爬取和分析任务。使用多个进程协同完成一个任务,提高了数据爬取的效率。

    以百度百科的一条为起点,抓取百度百科2000左右词条数据。

    说明

    参阅模仿了:https://book.douban.com/subject/27061630/。

    作者说是简单的分布式爬虫(hh),在书中有详细的说明和注解。

    这里只是补漏和梳理。

    因为进程传递参数的问题,搞了几天还是放弃了在WIndows上跑,换用了Linux。

    又因为各种各样的问题,弃用CentOS(它确实是安全可靠的,但是...我不会装QQ,输入法等),换用了软件容易安装的Ubuntu。然后才装了Eclipse等各种软件后,才开始多进程的调试。

    构造

    主节点和从节点的方案实现信息爬取。结构应该让各个节点高效工作。

    从节点:

    爬虫爬取速度受到网络延时的影响和网页信息解析的影响比较严重,所以使用多个从节点用来专门负责下载网页信息,解析网页信息。

    则分为三个文件,爬取文件,下载网页文件,解析网页文件。

    爬取文件接收来自主节点发送来的网页地址。然后调用下载网页文件并完成解析,将处理好的数据发送给主节点。

    主节点:

    主节点负责发送给从节点网页地址,并接收来自从节点的解析后的网页信息,将网页信息存储下来。

    主节点任务分为分发网址,接收从节点的信息,存储网页三部分。在代码里,他建立了三个进程,来分别实现。

    主节点任务中,存储信息,定义一套存储信息的方法。分发网址,定义一套分发网址过程中可能用到的方法。主文件中,设立三个函数,建立三个进程。

    主节点设计

    主节点的三个任务,分成三个进程,三个进程(分发网址,数据接收,数据存储),做一个类。

    数据接收与分发网址,需要分布式进程。分布式进程需要使用队列Queue。这里一定是multiprocessing中的导入的队列。网址分发、数据接收分别使用一个队列。

    注册,设定地址,秘钥,完成初始化过程,将url_q,result_q分别注册到网络中。

    然后设立分发任务,传递队列给分发任务函数。分发任务使用url_q队列完成数据的发送。使用conn_q接收了新的网址,并进行存储,再次分发到url_q上。

    数据接收任务,完成了数据的接收过程,接收以后需要及时将数据存储,在这里使用了两个队列conn_q,放置接收数据中的地址信息,store_q,放置接收数据中的网页信息。

    数据存储任务,接收数据接收任务中的store_q队列信息,及时写入到磁盘中。

    所有涉及到的文件如下:

    NodeManager.py

    import time
    #import sys
    #sys.path.append('/home')#if needed ,add path as package
    from UrlManager import UrlManager
    from multiprocessing import Process,Queue
    from multiprocessing.managers import BaseManager
    from DataOutput import DataOutput
    
    class NodeManager():
        def start_manager(self,url_q,result_q):
            BaseManager.register('get_task_queue', callable=lambda:url_q)
            BaseManager.register('get_result_queue',callable=lambda:result_q)
            manager=BaseManager(address=('127.0.0.1',8001),authkey='baike'.encode('utf-8'))
            return manager
        
        def url_manager_proc(self,url_q,conn_q,root_url):
            #send url to queue and receive new urls for storing to object
            url_manager=UrlManager()
            url_manager.add_new_url(root_url)
            while True:
                while(url_manager.has_new_url()):
                    new_url=url_manager.get_new_url()
                    url_q.put(new_url)
                    print('old url size:'+str(url_manager.old_url_size()))
                    if(url_manager.old_url_size()>2000):
                        url_q.put('end')
                        url_manager.save_process('new_urls.txt',url_manager.new_urls)
                        url_manager.save_process('old_urls.txt',url_manager.old_urls)
                        print('finish url_manager_proc')
                        return
                try:
                    urls=conn_q.get()
                    url_manager.add_new_urls(urls)
                    print('get:'+urls)
                except Exception:
                    time.sleep(0.1)
            
        
        def result_solve_proc(self,result_q,conn_q,store_q):
            while True:
                if not result_q.empty():
                    content=result_q.get(True)
                    if content['new_urls']=='end':
                        print('finish result_solve_proc')
                        store_q.put('end')
                        return
                    conn_q.put(content["new_urls"])
                    store_q.put(content["data"])
                else:
                    time.sleep(0.1)
        
        def store_proc(self,store_q):
            output=DataOutput()
            while True:
                if not store_q.empty():
                    data=store_q.get()
                    if data =='end':
                        print('finish store_proc')
                        output.output_end(output.path)
                        return
                    output.store_data(data)
    
    
    
    if __name__=='__main__':
        url_q=Queue()#send url to workers
        result_q=Queue()#receive url's analytical data from works
        store_q=Queue()#analytical data which is fresh is used for storing to disk for further extract
        conn_q=Queue()#urls which is fresh are used for storing to object for further extract
        nodeObject=NodeManager()
        manager=nodeObject.start_manager(url_q,result_q)
        
        root_url='https://baike.baidu.com/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB/5162711?fr=aladdin'
        url_manager=Process(target=nodeObject.url_manager_proc,args=(url_q,conn_q,root_url,))
        result_solve=Process(target=nodeObject.result_solve_proc,args=(result_q,conn_q,store_q,))
        store=Process(target=nodeObject.store_proc,args=(store_q,))
        url_manager.start()
        result_solve.start()
        store.start()
        manager.get_server().serve_forever()
    

     UrlManager.py

    import hashlib
    import pickle
    class UrlManager():
        def __init__(self):
            self.old_urls=self.load_process('new_urls.txt')
            self.new_urls=self.load_process('old_urls.txt')
            pass
        
        def has_new_url(self):
            return self.new_url_size()!=0
        
        def new_url_size(self):
            return len(self.new_urls)
        
        def old_url_size(self):
            return len(self.old_urls)
        
        def get_new_url(self):
            new_url=self.new_urls.pop()
            m=hashlib.md5()
            m.update(new_url.encode("utf8"))
            self.old_urls.add(m.hexdigest()[8:-8])
            return new_url
        
        def add_new_url(self,url):
            if url is None:
                return
            m=hashlib.md5()
            m.update(url.encode('utf-8'))       
            url_md5=m.hexdigest()[8:-8]
            if url not in self.new_urls and url_md5 not in self.old_urls:
                self.new_urls.add(url)
        
        def add_new_urls(self,urls):
            if urls is None or len(urls) == 0:
                return
            for url in urls:
                self.add_new_url(url)
            pass
        
        def save_process(self,path,data):
            with open(path,'wb') as f:
                pickle.dump(data,f)
        
        def load_process(self,path):
            print('loading..')
            try:
                with open(path,'rb') as f:
                    tmp=pickle.load(f)
                    return tmp
            except:
                print('loading error maybe loading file not exist and will create it:'+path)
            newSet=set()
            self.save_process(path, newSet)
            return newSet
    

     DataOutput.py

    import codecs
    from os.path import os
    class DataOutput(object):
        def __init__(self):
            self.path='baike.html'
            self.output_head(self.path)
            self.datas=[]
        
        def store_data(self,data):
            if data is None:
                return
            self.datas.append(data)
            self.output_html(self.path,data)
        
        def output_head(self,path):
            if os.path.exists(path):
                return
            fout=codecs.open('baike.html', 'w', encoding='utf-8')
            fout.write("<html>")
            fout.write("<head><meta charset='urf-8'></head>")
            fout.write("<body>")
            fout.write("<table border='1' width=1800  style='word-break:break-all;word-wrap:break-word;'>")
            fout.write("<tr>")
            fout.write("<td width='20'>序号</td>")
            fout.write("<td width='300'>URL</td>")
            fout.write("<td width='100'>标题</td>")
            fout.write("<td width='1200'>释义</td>")
            fout.write("</tr>")   
            fout.close()
            
        def output_end(self,path):
            fout=codecs.open(path, 'a', encoding='utf-8')
            fout.write("</table>")  
            fout.write("</body>")      
            fout.write("</html>")
            fout.close()       
            
        def output_html(self,path,data):
            fout=codecs.open(path, 'a', encoding='utf-8')    
            fout.write("<tr>")
            fout.write("<td>%s</td>"%str(len(self.datas)))
            fout.write("<td><a href=%s>%s</a></td>"%(data['url'],data['url']))
            fout.write("<td>%s</td>"%data['title'])
            fout.write("<td>%s</td>"%data['summary'])
            fout.write("</tr>")
            fout.close()
    

    从节点设计

    从节点首先是连接到指定地址并验证秘钥。连接后获取url_q、result_q。

    从url_q中获取发来的地址,调用HTML下载器下载数据,调动HTML解析器解析数据,然后把结果放到result_q队列上。

    代码如下

    SpiderWork.py

    from multiprocessing.managers import BaseManager
    from HtmlDownloader import HtmlDownloader
    from HtmlParser import HtmlParser
    class SpiderWork():
        def __init__(self):
            BaseManager.register('get_task_queue')
            BaseManager.register('get_result_queue')
            server_addr='127.0.0.1'
            print('connect'+server_addr)
            self.m=BaseManager(address=(server_addr,8001),authkey='baike'.encode('utf-8'))
            self.m.connect()
            self.task=self.m.get_task_queue()
            self.result=self.m.get_result_queue()
            print(self.task)
            self.downloader=HtmlDownloader()
            self.parser=HtmlParser()
            print('initial finish')
        
        def crawl(self):
            while (True):
                try:
                    if not self.task.empty():
                        url=self.task.get()
                        if url == 'end':
                            print('stop spider1')
                            self.result.put({'new_urls':'end','data':'end'})
                            return
                        print('working:'+url)#url
                        content=self.downloader.download(url)
                        new_urls,data=self.parser.parser(url,content)
                        self.result.put({"new_urls":new_urls,"data":data})
                except Exception as e:
                    print(e,url)
                    
    if __name__=="__main__":
        spider=SpiderWork()
        spider.crawl()
    

     HtmlDownloader.py

    import requests
    import chardet
    class HtmlDownloader(object):
        def download(self,url):
            if url is None:
                return None
            user_agent='Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 SE 2.X MetaSr 1.0'
            headers={'User-Agent':user_agent}
            r=requests.get(url,headers=headers)
            if r.status_code is 200:
                r.encoding=chardet.detect(r.content)['encoding']
                return r.text
            return None
    

     HtmlParser.py

    import re
    from urllib import parse
    from bs4 import BeautifulSoup
    class HtmlParser(object):
        def parser(self,page_url,html_cont):
            if page_url is None or html_cont is None:
                return
            
            soup=BeautifulSoup(html_cont,'lxml')
            
            new_urls=self.getNewUrls(page_url,soup)
            
            new_data=self.getNewData(page_url,soup)
            return new_urls,new_data
        
        def getNewUrls(self,page_url,soup):
            new_urls=set()
            links=soup.find_all('a',href=re.compile(r'/item/.*'))
            for link in links:
                new_url=link['href']
                new_full_url=parse.urljoin(page_url,new_url)
                new_urls.add(new_full_url)
            return new_urls
    
        def getNewData(self,page_url,soup):
            data={}
            data['url']=page_url
            title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
            data['title']=title.get_text()
            summary = soup.find('div',class_='lemma-summary')
            #获取到tag中包含的所有文版内容包括子孙tag中的内容,并将结果作为Unicode字符串返回
            data['summary']=summary.get_text()
            return data
    

    结果

    建立.sh文件如下:

    #!/bin/bash
    rm -rf log/*
    rm -rf baike.html
    rm -rf new_urls.txt
    rm -rf old_urls.txt
    python3 control/NodeManager.py &> log/control.log & for ((i=1;i<=10;i++)) do python3 spider/SpiderWork.py &>log/spider$i.log & done

    启动主节点,然后启动10个从节点。将它们所产生的日志信息记录到log/下,并都是在后台运行的进程。

    两分钟左右,完成约1900条的数据获取

    可能用到的命令:

    kill -9 $(ps aux | grep python | awk '{print $2}')

    !kill

    可能用到的软件:

    Eclipse的pydev进程调试。

    最后

    这代码里面真的有好多的细节文件,序列化操作与存储,md5的压缩方案等,都是值得思考的。

  • 相关阅读:
    通过点击切换文本框内容的脚本示例
    使用脚本改变树控件的行为
    javascript动态创建radio button元素支持IE/Firefox
    轻量级的向导控件MultiView
    客户端脚本简单实现Repeater的无刷新分页
    在非web site项目中引用Membership
    逐步认识C#四种判断相等的方法
    C#获取csv文件内容对逗号和引号分隔的处理
    JavaScript之 值类型 和 引用类型 Better
    JS call apply bind 方法的区别 Better
  • 原文地址:https://www.cnblogs.com/bai2018/p/11098451.html
Copyright © 2011-2022 走看看