zoukankan      html  css  js  c++  java
  • 小白爬虫第四弹之爬虫快跑(多进程 + 多线程)

    PS:使用多线程时好像在目录切换的问题上存在问题,可以给线程加个锁试试 Hello 大家好!我又来了。QQ图片20161102215153你是不是发现下载图片速度特别慢、难以忍受啊!对于这种问题 一般解决办法就是多进程了!一个进程速度慢!我就用十个进程,相当于十个人一起干。速度就会快很多啦!(为什么不说多线程?懂点 Python 的小伙伴都知道、GIL 的存在 导致 Python 的多线程有点坑啊!)今天就教大家来做一个多进程的爬虫(其实吧、可以用来做一个超简化版的分布式爬虫) 其实吧!还有一种加速的方法叫做 “异步”!不过这玩意儿我没怎么整明白就不出来误人子弟了!(因为爬虫大部分时间都是在等待 response 中!‘异步’则能让程序在等待 response 的时间去做的其他事情。)QQ图片20161022193315学过 Python 基础的同学都知道、在多进程中,进程之间是不能相互通信的,这就有一个很坑爹的问题的出现了!多个进程怎么知道那那些需要爬取、哪些已经被爬取了! 这就涉及到一个东西!这玩意儿叫做队列!!队列!!队列!!其实吧正常来说应该给大家用队列来完成这个教程的, 比如 Tornado 的 queue 模块。(如果需要更为稳定健壮的队列,则请考虑使用 Celery 这一类的专用消息传递工具) 不过为了简化技术种类啊!(才不会告诉你们是我懒,嫌麻烦呢!)这次我们继续使用 MongoDB。 好了!先来理一下思路: 每个进程需要知道那些 URL 爬取过了、哪些 URL 需要爬取!我们来给每个 URL 设置两种状态: outstanding: 等待爬取的 URL complete: 爬取完成的 URL 诶!等等我们好像忘了啥? 失败的 URL 的怎么办啊?我们在增加一种状态: processing: 正在进行的 URL。 嗯!当一个所有初始的 URL 状态都为 outstanding;当开始爬取的时候状态改为:processing;爬取完成状态改为:complete;失败的 URL 重置状态为:outstanding。为了能够处理 URL 进程被终止的情况、我们设置一个计时参数,当超过这个值时;我们则将状态重置为 outstanding。 下面开整 Go Go Go! 首先我们需要一个模块:datetime (这个模块比内置 time 模块要好使一点) 不会装??不是吧! pip install datetime 还有上一篇博文我们已经使用过的 pymongo 下面是队列的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    from datetime import datetime, timedelta
    from pymongo import MongoClient, errors

    class MogoQueue():

    OUTSTANDING = 1 ##初始状态
    PROCESSING = 2 ##正在下载状态
    COMPLETE = 3 ##下载完成状态

    def __init__(self, db, collection, timeout=300):##初始mongodb连接
    self.client = MongoClient()
    self.Client = self.client[db]
    self.db = self.Client[collection]
    self.timeout = timeout

    def __bool__(self):
    """
    这个函数,我的理解是如果下面的表达为真,则整个类为真
    至于有什么用,后面我会注明的(如果我的理解有误,请指点出来谢谢,我也是Python新手)
    $ne的意思是不匹配
    """
    record = self.db.find_one(
    {'status': {'$ne': self.COMPLETE}}
    )
    return True if record else False

    def push(self, url, title): ##这个函数用来添加新的URL进队列
    try:
    self.db.insert({'_id': url, 'status': self.OUTSTANDING, '主题': title})
    print(url, '插入队列成功')
    except errors.DuplicateKeyError as e: ##报错则代表已经存在于队列之中了
    print(url, '已经存在于队列中了')
    pass
    def push_imgurl(self, title, url):
    try:
    self.db.insert({'_id': title, 'statue': self.OUTSTANDING, 'url': url})
    print('图片地址插入成功')
    except errors.DuplicateKeyError as e:
    print('地址已经存在了')
    pass

    def pop(self):
    """
    这个函数会查询队列中的所有状态为OUTSTANDING的值,
    更改状态,(query后面是查询)(update后面是更新)
    并返回_id(就是我们的URL),MongDB好使吧,^_^
    如果没有OUTSTANDING的值则调用repair()函数重置所有超时的状态为OUTSTANDING,
    $set是设置的意思,和MySQL的set语法一个意思
    """
    record = self.db.find_and_modify(
    query={'status': self.OUTSTANDING},
    update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
    )
    if record:
    return record['_id']
    else:
    self.repair()
    raise KeyError

    def pop_title(self, url):
    record = self.db.find_one({'_id': url})
    return record['主题']

    def peek(self):
    """这个函数是取出状态为 OUTSTANDING的文档并返回_id(URL)"""
    record = self.db.find_one({'status': self.OUTSTANDING})
    if record:
    return record['_id']

    def complete(self, url):
    """这个函数是更新已完成的URL完成"""
    self.db.update({'_id': url}, {'$set': {'status': self.COMPLETE}})

    def repair(self):
    """这个函数是重置状态$lt是比较"""
    record = self.db.find_and_modify(
    query={
    'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
    'status': {'$ne': self.COMPLETE}
    },
    update={'$set': {'status': self.OUTSTANDING}}
    )
    if record:
    print('重置URL状态', record['_id'])

    def clear(self):
    """这个函数只有第一次才调用、后续不要调用、因为这是删库啊!"""
    self.db.drop()
     

    好了,队列我们做好了,下面是获取所有页面的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    from Download import request
    from mongodb_queue import MogoQueue
    from bs4 import BeautifulSoup


    spider_queue = MogoQueue('meinvxiezhenji', 'crawl_queue')
    def start(url):
    response = request.get(url, 3)
    Soup = BeautifulSoup(response.text, 'lxml')
    all_a = Soup.find('div', class_='all').find_all('a')
    for a in all_a:
    title = a.get_text()
    url = a['href']
    spider_queue.push(url, title)
    """上面这个调用就是把URL写入MongoDB的队列了"""

    if __name__ == "__main__":
    start('http://www.mzitu.com/all')

    """这一段儿就不解释了哦!超级简单的"""
     

    下面就是多进程 + 多线程的下载代码了:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    import os
    import time
    import threading
    import multiprocessing
    from mongodb_queue import MogoQueue
    from Download import request
    from bs4 import BeautifulSoup

    SLEEP_TIME = 1

    def mzitu_crawler(max_threads=10):
    crawl_queue = MogoQueue('meinvxiezhenji', 'crawl_queue') ##这个是我们获取URL的队列
    ##img_queue = MogoQueue('meinvxiezhenji', 'img_queue')
    def pageurl_crawler():
    while True:
    try:
    url = crawl_queue.pop()
    print(url)
    except KeyError:
    print('队列没有数据')
    break
    else:
    img_urls = []
    req = request.get(url, 3).text
    title = crawl_queue.pop_title(url)
    mkdir(title)
    os.chdir('D:mzitu\' + title)
    max_span = BeautifulSoup(req, 'lxml').find('div', class_='pagenavi').find_all('span')[-2].get_text()
    for page in range(1, int(max_span) + 1):
    page_url = url + '/' + str(page)
    img_url = BeautifulSoup(request.get(page_url, 3).text, 'lxml').find('div', class_='main-image').find('img')['src']
    img_urls.append(img_url)
    save(img_url)
    crawl_queue.complete(url) ##设置为完成状态
    ##img_queue.push_imgurl(title, img_urls)
    ##print('插入数据库成功')

    def save(img_url):
    name = img_url[-9:-4]
    print(u'开始保存:', img_url)
    img = request.get(img_url, 3)
    f = open(name + '.jpg', 'ab')
    f.write(img.content)
    f.close()

    def mkdir(path):
    path = path.strip()
    isExists = os.path.exists(os.path.join("D:mzitu", path))
    if not isExists:
    print(u'建了一个名字叫做', path, u'的文件夹!')
    os.makedirs(os.path.join("D:mzitu", path))
    return True
    else:
    print(u'名字叫做', path, u'的文件夹已经存在了!')
    return False

    threads = []
    while threads or crawl_queue:
    """
    这儿crawl_queue用上了,就是我们__bool__函数的作用,为真则代表我们MongoDB队列里面还有数据
    threads 或者 crawl_queue为真都代表我们还没下载完成,程序就会继续执行
    """
    for thread in threads:
    if not thread.is_alive(): ##is_alive是判断是否为空,不是空则在队列中删掉
    threads.remove(thread)
    while len(threads) < max_threads or crawl_queue.peek(): ##线程池中的线程少于max_threads 或者 crawl_qeue时
    thread = threading.Thread(target=pageurl_crawler) ##创建线程
    thread.setDaemon(True) ##设置守护线程
    thread.start() ##启动线程
    threads.append(thread) ##添加进线程队列
    time.sleep(SLEEP_TIME)

    def process_crawler():
    process = []
    num_cpus = multiprocessing.cpu_count()
    print('将会启动进程数为:', num_cpus)
    for i in range(num_cpus):
    p = multiprocessing.Process(target=mzitu_crawler) ##创建进程
    p.start() ##启动进程
    process.append(p) ##添加进进程队列
    for p in process:
    p.join() ##等待进程队列里面的进程结束

    if __name__ == "__main__":
    process_crawler()
     

    好啦!一个多进程多线的爬虫就完成了,(其实你可以设置一下 MongoDB,然后调整一下连接配置,在多台机器上跑哦!!嗯,就是超级简化版的分布式爬虫了,虽然很是简陋。) 本来还想下载图片那一块儿加上异步(毕竟下载图片是I\O等待最久的时间了,),可惜异步我也没怎么整明白,就不拿出来贻笑大方了。 另外,各位小哥儿可以参考上面代码,单独处理图片地址试试(就是多个进程直接下载图片)? 我测试了一下八分钟下载 100 套图 PS:请务必使用 第二篇博文中的下载模块,或者自己写一个自动更换代理的下载模块!!!不然寸步难行,分分钟被服务器 BAN 掉!QQ图片20161102215153小白教程就到此结束了,后面我教大家玩玩 Scrapy;目标 顶点小说网, 爬完全站的小说。 再后面带大家玩玩 抓新浪 汤不热、模拟登录 之类的。或许维护一个公共代理 IP 池之类的。 这个所有代码我放在这个位置了:https://github.com/thsheep/mzitu/

  • 相关阅读:
    Entity Framework 学习中级篇3—存储过程(中)
    Entity Framework 学习高级篇1—改善EF代码的方法(上)
    Entity Framework 学习中级篇2—存储过程(上)
    Entity Framework 学习初级篇7基本操作:增加、更新、删除、事务
    Entity Framework 学习中级篇4—存储过程(下)
    Entity Framework 学习初级篇5ObjectQuery查询及方法
    LINQ之路 9:LINQ to SQL 和 Entity Framework(上)
    在List创建的时候自动加上Rating功能
    利用Feature receiver自动注册/卸载Http Module
    使用SharePoint Web Service(1):创建子站点
  • 原文地址:https://www.cnblogs.com/ExMan/p/14871236.html
Copyright © 2011-2022 走看看