zoukankan      html  css  js  c++  java
  • 简单分布式爬虫

    # url管理器

    # url管理器
    import pickle
    import hashlib
    
    
    class UrlManager():
        def __init__(self):
            self.new_urls = self.load_progress('new_urls.txt')  # 未爬取url集合
            self.old_urls = self.load_progress('old_urls.txt')  # 已爬取集合
    
        def has_new_url(self):
            '''
            判断是否有未爬取的URL
            :return:
            '''
            return self.new_url_size() != 0
    
        def get_new_url(self):
            '''
            获取一个未爬取的URL
            :return:
            '''
            new_url = self.new_urls.pop()
            m = hashlib.md5()
            m.update(new_url)
            self.old_urls.add(m.hexdigest()[8:-8])
            return new_url
    
        def add_new_url(self, url):
            '''
            将新的URL添加到未爬取的集合中
            :param url: 单个URL
            :return:
            '''
            if url is None:
                return
            m = hashlib.md5()
            m.update(url)
            url_md5 = m.hexdigest()[8:-8]
            if url not in self.new_urls and url_md5 not in self.old_urls:
                self.new_urls.add(url)
    
        def add_new_urls(self, urls):
            '''
            将新的URL添加到未爬取的URL集合中
            :param urls: URL集合
            :return:
            '''
            if urls is None or len(urls) == 0:
                return
            for url in urls:
                self.add_new_url(url)
    
        def new_url_size(self):
            '''
            获取未爬取URL集合的大小
            :return:
            '''
            return len(self.new_urls)
    
        def old_url_size(self):
            '''
            获取已经爬取的URL集合的大小
            :return:
            '''
            return len(self.old_urls)
    
        def save_progress(self, path, data):
            '''
            保存进度
            :param path: 文件路径
            :param data: 数据
            :return:
            '''
            with open(path, 'wb') as f:
                pickle.dump(data, f)
    
        def load_progress(self, path):
            '''
            从本地文件加载进度
            :param path: 文件路径
            :return: 返回set集合
            '''
            print('[+] 从文件加载进度:%s' % path)
            try:
                with open(path, 'rb') as f:
                    tmp = pickle.load(f)
                    return tmp
            except:
                print('[!] 文件无效,创建:%s' % path)
            return set()
    #爬虫管理器
    from multiprocessing.managers import BaseManager
    from .HTML_downloader import HtmlDownloader
    from .HTML_parser import HtmlParser
    class Spiderwork():
        def __init__(self):
            BaseManager.register('get_task_queue')
            BaseManager.register('get_result_queue')
            server_addr = '127.0.0.1'
            print('connect to %s ....'% server_addr)
            self.m = BaseManager(address=(server_addr,8001),authkey='baike'.encode('utf-8'))
            self.m.connect()
            self.task = self.m.get_task_queue()
            self.result = self.m.get_result_queue()
            self.downloader = HtmlDownloader()
            self.parser = HtmlParser()
            print('init finshed..')
    
        def crawl(self):
            while True:
                try:
                    if not self.task.empty():
                        url = self.task.get()
                        if url =='end':
                            print('控制节点通知爬虫节点停止工作。')
                            self.result.put({'new_urls':'end','data':'end'})
                            return
                        print('爬虫节点正在解析:%s' % url.encode('utf-8'))
                        content=self.downloader.download(url)
                        new_urls,data=self.parser.parser(url,content)
                        self.result.put({'new_urls':url,'data':data})
                except EOFError as e :
                    print('链接工作节点失败')
                    return
                except Exception as e :
                    print(e)
                    print('crawl fial')
    
    if __name__ =='__main__':
        spider = Spiderwork()
        spider.crawl()
    # HTML解析器
    import re
    from urllib import parse
    from bs4 import BeautifulSoup
    
    class HtmlParser():
    
        def parser(self,page_url,html_cont):
            '''
            用于解析网页内容,抽取URL和数据
            :param page_url: 下载页面的URL
            :param html_cont: 下载的网页内容
            :return:
            '''
            if page_url is None or html_cont is None:
                return
            soup = BeautifulSoup(html_cont,'html.parser')
            new_urls = self._get_new_urls(page_url,soup)
            new_data = self._get_new_data(page_url,soup)
    
            return new_urls,new_data
    
        def _get_new_urls(self,page_url,soup):
            '''
            抽取新的URL集合
            :param page_url: 下载页面的URL
            :param soup: soup
            :return:
            '''
            new_urls = set()
            #抽取符合要求的a标签
            links = soup.find_all('a',href = re.compile(r'/item/.'))
            for link in links:
                # 提取href属性
                new_url = link['href']
                # 拼接成完整的网址
                new_full_url = parse.urljoin(page_url,new_url)
                new_urls.add(new_full_url)
    
            return new_urls
    
        def _get_new_data(self,page_url,soup):
            '''
            抽取有效数据
            :param page_url: 下载页面URL
            :param soup:
            :return: 返回有效数据
            '''
            data = {}
            data['url'] = page_url
            title = soup.find('dd',class_ = 'lemmaWgt-lemmaTitle-title').find('h1')
            data['title'] = title.text
            summary = soup.find('div',class_ = 'lemma-summary')
            #获取tag中包含的所有文本内容
            data['summary'] = summary.text
            return data
    # HTML下载器
    import requests
    
    class HtmlDownloader():
    
        def download(self,url):
            if url is None:
                return None
            headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel …) Gecko/20100101 Firefox/57.0'.encode('utf-8')}
            r = requests.get(url,headers=headers)
            if r.status_code ==200:
                r.encoding = 'utf-8'
                return r.text
            return None
    # 数据存储器
    import codecs
    import time
    
    
    class DataOutput():
        def __init__(self):
            self.filepath = 'baike_%s.html' % (time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime()))
            self.output_head(self.filepath)
            self.datas = []
    
        def store_data(self, data):
            if data is None:
                return
            self.datas.append(data)
            if len(self.datas) > 10:
                self.output_html(self.filepath)
    
        def output_head(self, path):
            '''
            将HTML头写进去
            :param path:
            :return:
            '''
            fout = codecs.open(path, 'w', encoding='utf-8')
            fout.write('<html>')
            fout.write('<table>')
            fout.write('<table>')
            fout.close()
    
        def output_html(self, path):
            '''
            将数据写入HTML文件中
            :return:
            '''
            fout = codecs.open(path, 'a', encoding='utf-8')
            for data in self.datas:
                fout.write('<tr>')
                fout.write('<td>%s</td>' % data['url'])
                fout.write('<td>%s</td>' % data['title'])
                fout.write('<td>%s</td>' % data['summary'])
                fout.write('</tr>')
                self.datas.remove(data)
            fout.write('</table>')
            fout.write('</table>')
            fout.write('</html>')
            fout.close()
    
        def output_end(self, path):
            '''
            将HTML尾写进去
            :param path:
            :return:
            '''
            fout = codecs.open(path, 'a', encoding='utf-8')
            fout.write('</table>')
            fout.write('</table>')
            fout.write('</html>')
            fout.close()
    #控制调度器
    import random,time,queue
    from multiprocessing.managers import BaseManager
    from multiprocessing import Process
    from .URLManager import UrlManager
    from .Data_store import DataOutput
    
    
    class NodeManager():
    
        def start_Manager(self,url_q,result_q):
            '''
            创建一个分布式管理器
            :param url_q: url队列
            :param result_q: 结果队列
            :return:
            '''
            BaseManager.register('get_task_queue', callable=lambda: url_q)
    
            BaseManager.register('get_result_queue', callable=lambda: result_q)
    
            manager = BaseManager(address=('', 8001), authkey='baike'.encode('utf-8'))
    
            return manager
    
        def url_manager_proc(self,url_q,conn_q,root_url):
            url_manager = UrlManager()
            url_manager.add_new_urls(root_url)
            while True:
                while (url_manager.has_new_url()):
                    #从URL管理器获取新的URL
                    new_url = url_manager.get_new_url()
                    # 将新URL发送给工作节点
                    url_q.put(new_url)
                    print('old_url=',url_manager.old_url_size())
                    # 判断,当爬取2000个链接后关闭并保存
                    if (url_manager.old_url_size()>2000):
                        url_q.put('end')
                        print('控制节点发起结束通知')
                        # 关闭管理节点。同时存储set状态
                        url_manager.save_progress('new_urls.txt',url_manager.new_urls)
                        url_manager.save_progress('old_urls.txt',url_manager.old_urls)
                        return
                try:
                    if not conn_q.empty():
                        urls = conn_q.get()
                        url_manager.add_new_urls(urls)
                except BaseException as e:
                    time.sleep(0.1)
    
        def result_solve_proc(self,result_q,conn_q,store_q):
            while True:
                try:
                    if not result_q.empty():
                        content= result_q.get(True)
                        if content['new_urls'] =='end':
                            #结果分析进程接受通知然后结束
                            print('结果分析进程接收通知然后结束')
                            store_q.put('end')
                            return
                        conn_q.put(content['new_urls']) # url为set类型
                        store_q.put(content['data'])#解析出来的数据为dict类型
                    else:
                        time.sleep(0.1)
                except BaseException as e:
                    time.sleep(0.1)
    
        def store_proc(self,store_q):
            output = DataOutput()
            while True:
                if not store_q.empty():
                    data = store_q.get()
                    if data =='end':
                        print('存储进程接受通知然后结束')
                        output.output_end(output.filepath)
                        return
                    output.store_data(data)
                else:
                    time.sleep(0.1)
    if __name__ =='__main__':
        # 初始化4个队列
        url_q = queue.Queue()
        result_q = queue.Queue()
        store_q = queue.Queue()
        conn_q = queue.Queue()
        # 创建分布式管理器
        node = NodeManager()
        manager = node.start_Manager(url_q,result_q)
        # 创建URL管理进程,数据提取进程和数据存储进程
        url_manager_proc = Process(target=node.url_manager_proc,args=(url_q,conn_q,'http://baike.baidu.com/view/284853.htm'))
        result_solve_proc = Process(target=node.result_solve_proc,args=(result_q,conn_q,store_q))
        store_proc = Process(target=node.store_proc,args=(store_q,))
        # 启动3个进程和分布式管理器
        url_manager_proc.start()
        result_solve_proc.start()
        store_proc.start()
        manager.get_server().serve_forever()
  • 相关阅读:
    PBOC规范研究之九---静态数据认证(转)
    PBOC规范研究之五、安全相关的PKI基础知识(转)
    PBOC规范研究之三、TypeB协议(转)
    PBOC规范研究之二、PBOC规范中,对于通讯速率的约定(转)
    PBOC规范研究之一、ISO14443协议和PBOC关于CID的约定(转)
    qml js
    qml 信号与信号 信号与方法链接使用 带参数会报错
    调试bug的几种方法
    CDN方式使用iview
    iView--3
  • 原文地址:https://www.cnblogs.com/Erick-L/p/7719349.html
Copyright © 2011-2022 走看看