zoukankan      html  css  js  c++  java
  • 020 使用Tornado和协程爬取博客园文章

    Python3.5后 Tornado官方建议使用async和await的方式实现异步程序,尝试了下使用Tornado和协程爬取博客园的文章并使用peewee_async异步写入MySQL数据库。

    一. 博客园文章抓取测试:

    这里我以我自己的一篇文章详情作为测试url,https://www.cnblogs.com/FG123/p/9934244.html ,主要是抓取文章标题、内容及作者信息:

    文章标题、内容、作者用户名可通过上述的详情页url获取,但是作者信息需通过http://www.cnblogs.com/mvc/blog/news.aspx?blogApp=FG123获取,FG123是我这篇文章的作者用户名,下面看使用beautiful soup抓取测试的代码及结果:

    detail_article_html = requests.get("https://www.cnblogs.com/FG123/p/9934244.html").content
    author_profile_html = requests.get("http://www.cnblogs.com/mvc/blog/news.aspx?blogApp=FG123").content
    detail_soup = BeautifulSoup(detail_article_html)
    title = detail_soup.find(id="cb_post_title_url").get_text()
    info = detail_soup.find(id="cnblogs_post_body")
    author_soup = BeautifulSoup(author_profile_html)
    author = author_soup.select('div > a')
    author_name = author[0].get_text()
    blog_age = author[1].get_text()
    fans_num = author[2].get_text()
    follow_num = author[3].get_text()
    
    print("文章标题:{}".format(title))
    print("博主昵称:{}".format(author_name))
    print("博主园龄:{}".format(blog_age))
    print("粉丝数:{}".format(fans_num))
    print("关注数:{}".format(follow_num))
    print("文章内容:{}".format(info))

    结果:

    二. 使用Tornado和协程异步抓取逻辑:

    这里的抓取逻辑采用tornado官方文档爬虫例子的逻辑,使用Tornado的Queue实现异步生产者/消费者模式,当Queue满时会切换协程,首先定义协程通过解析url获取相关链接并去除无效的链接:

     1 async def get_links_from_url(url):
     2     """
     3     通过AsyncHTTPClient异步fetch url,
     4     通过BeautifulSoup提取解析内容中的所有url
     5     :param url:
     6     :return:
     7     """
     8     response = await httpclient.AsyncHTTPClient().fetch(url)
     9     print('fetched %s' % url)
    10 
    11     html = response.body.decode("utf8", errors='ignore')
    12     soup = BeautifulSoup(html)
    13     return set([urljoin(url, remove_fragment(a.get("href")))
    14             for a in soup.find_all("a", href=True)])
    15 
    16 
    17 def remove_fragment(url):
    18     """
    19     去除无效的链接
    20     :param url: 
    21     :return: 
    22     """
    23     pure_url, frag = urldefrag(url)
    24     return pure_url
    View Code

    当前url通过调用协程获取它包含的有效url_list,并将非外链接的url放入tornado的queue中:

     1 async def fetch_url(current_url):
     2         """
     3         fetching是已爬取过的url集合,
     4         通过调用协程get_links_from_url获取current_url所有的url,
     5         并将 非外链接 放入到queue中
     6         :param current_url:
     7         :return:
     8         """
     9         if current_url in fetching:
    10             return
    11 
    12         print('fetching %s' % current_url)
    13         fetching.add(current_url)
    14         urls = await get_links_from_url(current_url)
    15         fetched.add(current_url)
    16 
    17         for new_url in urls:
    18             # 非外链接
    19             if new_url.startswith(base_url) and new_url.endswith(".html"):
    20                 await q.put(new_url)
    View Code

     

    使用async for的方式取出queue中的url,并调用协程fetch_url获取它包含的urls,调用协程get_info_data获取url页面详情数据:

     1 async def worker():
     2         """
     3         使用async for的方式取出q中的url
     4         并调用协程fetch_url获取它包含的urls
     5         调用协程get_info_data获取url页面详情数据
     6         :return:
     7         """
     8         async for url in q:
     9             if url is None:
    10                 return
    11             try:
    12                 await fetch_url(url)
    13                 await get_info_data(url)
    14             except Exception as e:
    15                 print('Exception: %s %s' % (e, url))
    16             finally:
    17                 q.task_done()
    View Code

    定义主协程,通过tornado的gen.multi同时初始化concurrency个协程,并将协程放入到事件循环中等待完成,等到队列全部为空或超时的时候放入与协程数量相同的None来结束协程的事件循环。

     1 async def main():
     2     """
     3     主协程,通过tornado的gen.multi同时初始化concurrency个协程,
     4     并将协程放入到事件循环中等待完成,等到队列全部为空或超时
     5     :return:
     6     """
     7     q = queues.Queue()
     8     start = time.time()
     9     fetching, fetched = set(), set()
    10 
    11     # 放入初始url到队列
    12     await q.put(base_url)
    13 
    14     workers = gen.multi([worker() for _ in range(concurrency)])
    15     await q.join(timeout=timedelta(seconds=300))
    16     assert fetching == fetched
    17     print('Done in %d seconds, fetched %s URLs.' % (
    18         time.time() - start, len(fetched)))
    19 
    20     # 队列中放入concurrency数量的None 结束相应协程 在worker()中取到None会结束
    21     for _ in range(concurrency):
    22         await q.put(None)
    23         
    24     await workers
    View Code

    三. 使用peewee_async和aiomysql将爬取的数据异步写入MySQL数据库

    使用peewee创建并生成model:

     1 # coding:utf-8
     2 from peewee import *
     3 import peewee_async
     4 
     5 database = peewee_async.MySQLDatabase(
     6     'xxx', host="192.168.xx.xx",
     7     port=3306, user="root", password="xxxxxx"
     8 )
     9 
    10 objects = peewee_async.Manager(database)
    11 
    12 database.set_allow_sync(True)
    13 
    14 
    15 class Blogger(Model):
    16     article_id = CharField(max_length=50, verbose_name="文章ID")
    17     title = CharField(max_length=150, verbose_name="标题")
    18     content = TextField(null=True, verbose_name="内容")
    19     author_name = CharField(max_length=50, verbose_name="博主昵称")
    20     blog_age = CharField(max_length=50, verbose_name="园龄")
    21     fans_num = IntegerField(null=True, verbose_name="粉丝数")
    22     follow_num = IntegerField(null=True, verbose_name="关注数")
    23 
    24     class Meta:
    25         database = database
    26         table_name = "blogger"
    27 
    28 
    29 def init_table():
    30     database.create_tables([Blogger])
    31 
    32 
    33 if __name__ == "__main__":
    34     init_table()
    View Code

    获取博客文章的详情信息,并将信息异步写入MySQL数据库:

     1 async def get_info_data(url):
     2     """
     3     获取详情信息并异步写入MySQL数据库
     4     :param url:
     5     :return:
     6     """
     7     response = await httpclient.AsyncHTTPClient().fetch(url)
     8     html = response.body.decode("utf8")
     9     soup = BeautifulSoup(html)
    10     title = soup.find(id="cb_post_title_url").get_text()
    11     content = soup.find(id="cnblogs_post_body")
    12     name = url.split("/")[3]
    13     article_id = url.split("/")[-1].split(".")[0]
    14     author_url = "http://www.cnblogs.com/mvc/blog/news.aspx?blogApp={}".format(name)
    15     author_response = await httpclient.AsyncHTTPClient().fetch(author_url)
    16     author_html = author_response.body.decode("utf8")
    17     author_soup = BeautifulSoup(author_html)
    18     author = author_soup.select('div > a')
    19     author_name = author[0].get_text()
    20     blog_age = author[1].get_text()
    21     fans_num = author[2].get_text()
    22     follow_num = author[3].get_text()
    23     await objects.create(
    24         Blogger, title=title,
    25         article_id=article_id,
    26         content=content,
    27         author_name=author_name,
    28         blog_age=blog_age,
    29         fans_num=fans_num,
    30         follow_num=follow_num
    31     )
    View Code

    爬取结果:

     简单体验了下使用Tornado结合协程的方式爬取博客园,这里我开启了10个协程,已经感觉速度很快了,协程间的切换开销是非常小的,而且一个线程或进程可以拥有多个协程,经过实测相比多线程的爬虫确实要快些。

  • 相关阅读:
    ActiveMQ 即时通讯服务 浅析
    Asp.net Mvc (Filter及其执行顺序)
    ActiveMQ基本介绍
    ActiveMQ持久化消息的三种方式
    Windows Azure Virtual Machine (27) 使用psping工具,测试Azure VM网络连通性
    Azure China (10) 使用Azure China SAS Token
    Windows Azure Affinity Groups (3) 修改虚拟网络地缘组(Affinity Group)的配置
    Windows Azure Storage (22) Azure Storage如何支持多级目录
    Windows Azure Virtual Machine (26) 使用高级存储(SSD)和DS系列VM
    Azure Redis Cache (2) 创建和使用Azure Redis Cache
  • 原文地址:https://www.cnblogs.com/abdm-989/p/12021998.html
Copyright © 2011-2022 走看看