zoukankan      html  css  js  c++  java
  • Python简单分布式爬虫

    分布式爬虫采用主从模式。主从模式是指由一台主机作为控制节点,负责管理所有运行网络爬虫的主机(url管理器,数据存储器,控制调度器),爬虫只需要从控制节点哪里接收任务,并把新生成任务提交给控制节点。此次使用三台主机进行分布式爬取,一台主机作为控制节点,另外两台主机作为爬虫节点。

    控制节点主要分为url管理器、数据存储器和控制调度器。控制调度器通过三个进程来协调URL管理器和数据存储器的工作:一个是URL管理进程,负责URL的管理和将URL传递给爬虫节点,一个是数据提取进程,负责读取爬虫节点返回的数据,将返回数据中的URL交给URL管理进程,数据存储进程,负责将数据提取进程中提交的数据进行本地存储。

    url管理器

    # coding:utf-8
    try :
    import cPickle as pickle
    except ImportError:
    import pickle
    #cPickle引用序列化包
    import hashlib


    class UrlManager(object):

    def __init__(self):
    self.new_urls = self.load_progress('new_urls.txt') # 未爬取URL集合
    self.old_urls = self.load_progress('old_urls.txt') # 已爬取URL集合

    def has_new_url(self):
    # 判断是否有未爬取的URL

    return self.new_url_size() != 0

    def get_new_url(self):
    # 获取一个未爬取的URL

    new_url = self.new_urls.pop()
    m = hashlib.md5()#对url进行MD5加密
    m.update(new_url)
    self.old_urls.add(m.hexdigest()[8:-8])#
    return new_url

    def add_new_url(self, url):
    # 将新的URL添加到未爬取的URL结合中
    if url is None:
    return
    m = hashlib.md5()
    m.update(url)
    url_md5 = m.hexdigest()[8:-8]
    if url not in self.new_urls and url_md5 not in self.old_urls:
    self.new_urls.add(url) # 将新的url添加到列表中

    # 批量添加url
    def add_new_urls(self, urls):
    # 将新的URL添加到未爬取的URL集合中

    if urls is None or len(urls) == 0:
    return
    for url in urls:
    self.add_new_url(url)

    # 获取未爬取url集合的大小
    def new_url_size(self):
    return len(self.new_urls)

    # 获取已经爬取URL集合的大小
    def old_url_size(self):
    return len(self.old_urls)

    #保存进度
    #param path:文件路径
    #param data:数据
    # return:
    def save_progress(self, path, data):

    with open(path, 'wb') as f:
    pickle.dump(data, f)

    #从本地文件加载进度
    #param path 文件路径
    #return set集合
    def load_progress(self, path):

    print '[+]从文件加载进度:%s' %path
    try:

    with open(path,'rb') as f:
    tmp = pickle.load(f)
    return tmp
    except:

    print '[!]无进度文件,创建:%s' % path

    return set()

    数据存储器 

    # coding:utf-8
    import codecs
    import sys
    import time
    from urllib import unquote
    class DataOutput(object):

    def __init__(self):
    self.filepath ='baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()))
    self.output_head(self.filepath)
    self.datas = []

    def store_data(self, data):

    if data is None:
    return

    self.datas.append(data)
    if len(self.datas)>10:
    self.output_html(self.filepath)
    #将HTML头写进去
    #param path:保存路径
    def output_head(self, path):
    fout = codecs.open(path, 'w', encoding = 'uft-8')
    fout.write("<html>")
    fout.write("<body>")
    fout.write("<table>")
    fout.close()
    #将数据写入HTML文件中
    #param path:文件路径
    def output_html(self,path):

    fout = codecs.open(path, 'w', encoding = 'utf-8')
    for data in self.datas:
    fout.write("<tr>")
    fout.write("<td>%s</td>" % data['url'])
    fout.write("<td>%s</td>" % data['title'])
    fout.write("<td>%s</td>" % data['summary'])
    fout.write("</tr>")
    self.datas.remove(data)
    fout.close()

    #输出HTML结束
    #param path文件存储路径
    def output_end(self,path):

    fout = codecs.open(path, 'a', encoding = 'utf-8')
    fout.write("</table>")
    fout.write("</body>")
    fout.write("</html>")
    fout.close()

    控制调度器

    # coding:utf-8
    import time, sys, Queue
    import multiprocessing
    from multiprocessing.managers import BaseManager
    from UrlManager import UrlManager
    from DataOutput import DataOutput

    class QueueManager(BaseManager):
    pass

    class NodeManager(object):
    # 创建一个分布式管理器
    # param:url_q url队列
    # param result_q 结果队列
    def start_Manager(self, url_q, result_q):
    # 把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象
    # 将Queue对象在网络中暴露
    QueueManager.register('get_task_queue', callable=lambda: url_q)
    QueueManager.register('get_result_queue', callable=lambda: result_q)
    # 绑定端口8001,设置验证口令"baike"
    manger = BaseManager(address=('', 8001), authkey='baike')
    # 返回manager对象
    return manger


    def url_manager_proc(self, url_q, conn_q, root_url):
    url_manager = UrlManager()
    url_manager.add_new_url(root_url)
    while True:
    # 从URL管理器获取新的URL
    while (url_manager.has_new_url()):

    new_url = url_manager.get_new_url()
    # 将新的URL发给工作节点
    url_q.put(new_url)
    print 'old_url=', url_manager.old_url_size()

    if (url_manager.old_url_size() > 2000):
    # 通知爬虫节点工作结束
    url_q.put('end')
    print '控制节点发起结束通知!'
    # 关闭管理节点,同时存储set状态
    url_manager.save_progress('new_urls.txt', url_manager.new_urls)
    url_manager.save_progress('old_urls.txt', url_manager.old_urls)
    return

    try:

    if not conn_q.empty():
    urls = conn_q.get()
    url_manager.add_new_urls(urls)
    except BaseException, e:
    time.sleep(0.1) # 延时休息


    def result_solve_proc(self, result_q, conn_q, store_q):
    while (True):

    try:
    if not result_q.empty():
    content = result_q.get(True)
    if content['new_urls'] == 'end':
    print '结果分析进程接收通知然后结束!'
    store_q.put('end')
    return
    conn_q.put(content['new_urls']) # url为set类型
    store_q.put(content['data']) # 解析出来的数据为dict类型
    else:
    time.sleep(0.1) # 延时休息
    except BaseException, e:
    time.sleep(0.1) # 延时休息


    def store_proc(self, store_q):
    output = DataOutput()
    while True:
    if not store_q.empty():
    data = store_q.get()
    if data == 'end':
    print '存储进程接受通知然后结束'
    output.ouput_end(output.filepath)
    return
    output.store_data(data)

    else:
    time.sleep(0.1)


    if __name__ == '__main__':
    # 初始化4个队列
    url_q = Queue.Queue()
    result_q = Queue.Queue()
    store_q = Queue.Queue()
    conn_q = Queue.Queue()
    # 创建分布式管理器
    node = NodeManager()
    manager = node.start_Manager(url_q, result_q)
    # 创建URL管理进程、数据提取进程和数据存储进程
    url_manager_proc = multiprocessing.Process(target=node.url_manager_proc, args=(url_q, conn_q, 'https://baike.baidu.com/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB/5162711?fr=aladdin&fromid=22046949&fromtitle=%E7%88%AC%E8%99%AB'))
    result_solve_proc = multiprocessing.Process(target=node.result_solve_proc, args=(result_q, conn_q, store_q))
    # 启动3个进程和分布式管理器
    url_manager_proc.start()
    result_solve_proc.start()
    manager.get_server().serve_forever()

     

    HTML下载器

    # coding:utf-8
    import requests
    import urllib2
    import sys
    type = sys.getfilesystemencoding()
    class HtmlDownloader(object):

    def download(slef, url):

    if url is None:
    return None

    user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36'

    headers = {'User-Agent': user_agent}
    req = urllib2.Request(url, headers=headers)
    response = urllib2.urlopen(req)
    if response.getcode() == 200:
    html = response.read().decode("UTF-8").encode(type)
    return html


    return None

    HTML解析器

    # coding:utf-8
    import re
    import urlparse
    from bs4 import BeautifulSoup


    class HtmlParser(object):

    # page_url下载页面的URL
    # html_cont 下载的网页内容
    # 返回URL和数据
    def parser(self, page_url, html_cont):

    if page_url is None or html_cont is None:
    return

    soup = BeautifulSoup(html_cont, 'html.parser')

    new_urls = self._get_new_urls(page_url, soup)
    new_data = self._get_new_data(page_url, soup)

    return new_urls, new_data

    # page_url下载页面的url
    # soup:soup
    # 返回新的URL集合
    def _get_new_urls(self, page_url, soup):
    new_urls = set()

    # 抽取符合要求的a标记
    links = soup.find_all('a', href=re.compile(r'/item/.*'))
    for link in links:
    # 提取href属性
    new_url = link['href']
    # 拼接成完整网址
    new_full_url = urlparse.urljoin(page_url, new_url)
    new_urls.add(new_full_url)

    return new_urls

    # 下载页面的url
    def _get_new_data(self, page_url, soup):
    data = {}
    data['url'] = page_url
    title = soup.find('dd', class_='lemmaWgt-lemmaTitle-title').find('h1')
    data['title'] = title.get_text()

    summary = soup.find('div', class_='lemma-summary')
    # 获取tag中包含的所有文本内容,包括子孙tag中的内容,并将结果作为Unicode字符串返回
    data['summary'] = summary.get_text()

    return data

    爬虫调度器

    # coding:utf-8
    import time, sys, Queue
    from multiprocessing.managers import BaseManager
    from UrlManager import UrlManager
    from DataOutput import DataOutput
    from HtmlDownloader import HtmlDownloader
    from HtmlParser import HtmlParser

    class SpoderWork(object):
    def __init__(self):
    #初始化分布式进程中工作节点的连接工作
    #实现第一步:使用BaseManager注册用于获取Queue的方法名称
    BaseManager.register('get_task_queue')
    BaseManager.register('get_result_queue')
    #实现第二步:连接到服务器
    server_addr = '127.0.0.1'
    print ('Connect to server %s....' % server_addr)

    self.m = BaseManager(address=(server_addr,8001),authkey='baike')
    #从网络连接
    self.m.connect()
    #实现第三步:获取Queue对象
    self.task = self.m.get_task_queue()
    self.result = self.m.get_result_queue()
    #初始化网页下载器和解析器
    self.downloader = HtmlDownloader()
    self.parser = HtmlParser()
    print 'init finish'

    def crawl(self):
    while(True):
    try:
    if not self.task.empty():
    url = self.task.get()

    if url=='end':

    print '控制节点通知爬虫节点停止工作'

    self.result.put({'new_urls':'end','data':'end'})
    return
    print '爬虫节点正在解析:%s' % url.encode('utf-8')
    content = self.downloader.download(url)
    new_urls,data = self.parser.parser(url,content)
    self.result.put({'new_urls': new_urls, 'data': data})

    except EOFError,e:

    print '连接工作节点失败'
    return

    except Exception,e:
    print e
    print 'Crawl fail'


    if __name__=='__main__':

    spider = SpoderWork()
    spider.crawl()

  • 相关阅读:
    centos7搭建ELK开源实时日志分析系统
    基于ELK的简单数据分析
    用ELK打造可视化集中式日志
    elk单台环境搭建
    用logstash,elasticSearch,kibana实现数据收集和统计分析工作
    用Kibana和logstash快速搭建实时日志查询、收集与分析系统
    elasticsearch按照配置时遇到的一些坑 [Failed to load settings from [elasticsearch.yml]]
    分布式搜索elasticsearch几个概念解析
    分布式搜索elasticsearch配置文件详解
    CENTOS安装ElasticSearch
  • 原文地址:https://www.cnblogs.com/paulversion/p/8386240.html
Copyright © 2011-2022 走看看