zoukankan      html  css  js  c++  java
  • 使用Tornado和协程爬取博客园文章

    Python3.5后 Tornado官方建议使用async和await的方式实现异步程序,尝试了下使用Tornado和协程爬取博客园的文章并使用peewee_async异步写入MySQL数据库。

    一. 博客园文章抓取测试:

    这里我以我自己的一篇文章详情作为测试url,https://www.cnblogs.com/FG123/p/9934244.html ,主要是抓取文章标题、内容及作者信息:

    文章标题、内容、作者用户名可通过上述的详情页url获取,但是作者信息需通过http://www.cnblogs.com/mvc/blog/news.aspx?blogApp=FG123获取,FG123是我这篇文章的作者用户名,下面看使用beautiful soup抓取测试的代码及结果:

    detail_article_html = requests.get("https://www.cnblogs.com/FG123/p/9934244.html").content
    author_profile_html = requests.get("http://www.cnblogs.com/mvc/blog/news.aspx?blogApp=FG123").content
    detail_soup = BeautifulSoup(detail_article_html)
    title = detail_soup.find(id="cb_post_title_url").get_text()
    info = detail_soup.find(id="cnblogs_post_body")
    author_soup = BeautifulSoup(author_profile_html)
    author = author_soup.select('div > a')
    author_name = author[0].get_text()
    blog_age = author[1].get_text()
    fans_num = author[2].get_text()
    follow_num = author[3].get_text()
    
    print("文章标题:{}".format(title))
    print("博主昵称:{}".format(author_name))
    print("博主园龄:{}".format(blog_age))
    print("粉丝数:{}".format(fans_num))
    print("关注数:{}".format(follow_num))
    print("文章内容:{}".format(info))

    结果:

    二. 使用Tornado和协程异步抓取逻辑:

    这里的抓取逻辑采用tornado官方文档爬虫例子的逻辑,使用Tornado的Queue实现异步生产者/消费者模式,当Queue满时会切换协程,首先定义协程通过解析url获取相关链接并去除无效的链接:

     1 async def get_links_from_url(url):
     2     """
     3     通过AsyncHTTPClient异步fetch url,
     4     通过BeautifulSoup提取解析内容中的所有url
     5     :param url:
     6     :return:
     7     """
     8     response = await httpclient.AsyncHTTPClient().fetch(url)
     9     print('fetched %s' % url)
    10 
    11     html = response.body.decode("utf8", errors='ignore')
    12     soup = BeautifulSoup(html)
    13     return set([urljoin(url, remove_fragment(a.get("href")))
    14             for a in soup.find_all("a", href=True)])
    15 
    16 
    17 def remove_fragment(url):
    18     """
    19     去除无效的链接
    20     :param url: 
    21     :return: 
    22     """
    23     pure_url, frag = urldefrag(url)
    24     return pure_url
    View Code

    当前url通过调用协程获取它包含的有效url_list,并将非外链接的url放入tornado的queue中:

     1 async def fetch_url(current_url):
     2         """
     3         fetching是已爬取过的url集合,
     4         通过调用协程get_links_from_url获取current_url所有的url,
     5         并将 非外链接 放入到queue中
     6         :param current_url:
     7         :return:
     8         """
     9         if current_url in fetching:
    10             return
    11 
    12         print('fetching %s' % current_url)
    13         fetching.add(current_url)
    14         urls = await get_links_from_url(current_url)
    15         fetched.add(current_url)
    16 
    17         for new_url in urls:
    18             # 非外链接
    19             if new_url.startswith(base_url) and new_url.endswith(".html"):
    20                 await q.put(new_url)
    View Code

     

    使用async for的方式取出queue中的url,并调用协程fetch_url获取它包含的urls,调用协程get_info_data获取url页面详情数据:

     1 async def worker():
     2         """
     3         使用async for的方式取出q中的url
     4         并调用协程fetch_url获取它包含的urls
     5         调用协程get_info_data获取url页面详情数据
     6         :return:
     7         """
     8         async for url in q:
     9             if url is None:
    10                 return
    11             try:
    12                 await fetch_url(url)
    13                 await get_info_data(url)
    14             except Exception as e:
    15                 print('Exception: %s %s' % (e, url))
    16             finally:
    17                 q.task_done()
    View Code

    定义主协程,通过tornado的gen.multi同时初始化concurrency个协程,并将协程放入到事件循环中等待完成,等到队列全部为空或超时的时候放入与协程数量相同的None来结束协程的事件循环。

     1 async def main():
     2     """
     3     主协程,通过tornado的gen.multi同时初始化concurrency个协程,
     4     并将协程放入到事件循环中等待完成,等到队列全部为空或超时
     5     :return:
     6     """
     7     q = queues.Queue()
     8     start = time.time()
     9     fetching, fetched = set(), set()
    10 
    11     # 放入初始url到队列
    12     await q.put(base_url)
    13 
    14     workers = gen.multi([worker() for _ in range(concurrency)])
    15     await q.join(timeout=timedelta(seconds=300))
    16     assert fetching == fetched
    17     print('Done in %d seconds, fetched %s URLs.' % (
    18         time.time() - start, len(fetched)))
    19 
    20     # 队列中放入concurrency数量的None 结束相应协程 在worker()中取到None会结束
    21     for _ in range(concurrency):
    22         await q.put(None)
    23         
    24     await workers
    View Code

    三. 使用peewee_async和aiomysql将爬取的数据异步写入MySQL数据库

    使用peewee创建并生成model:

     1 # coding:utf-8
     2 from peewee import *
     3 import peewee_async
     4 
     5 database = peewee_async.MySQLDatabase(
     6     'xxx', host="192.168.xx.xx",
     7     port=3306, user="root", password="xxxxxx"
     8 )
     9 
    10 objects = peewee_async.Manager(database)
    11 
    12 database.set_allow_sync(True)
    13 
    14 
    15 class Blogger(Model):
    16     article_id = CharField(max_length=50, verbose_name="文章ID")
    17     title = CharField(max_length=150, verbose_name="标题")
    18     content = TextField(null=True, verbose_name="内容")
    19     author_name = CharField(max_length=50, verbose_name="博主昵称")
    20     blog_age = CharField(max_length=50, verbose_name="园龄")
    21     fans_num = IntegerField(null=True, verbose_name="粉丝数")
    22     follow_num = IntegerField(null=True, verbose_name="关注数")
    23 
    24     class Meta:
    25         database = database
    26         table_name = "blogger"
    27 
    28 
    29 def init_table():
    30     database.create_tables([Blogger])
    31 
    32 
    33 if __name__ == "__main__":
    34     init_table()
    View Code

    获取博客文章的详情信息,并将信息异步写入MySQL数据库:

     1 async def get_info_data(url):
     2     """
     3     获取详情信息并异步写入MySQL数据库
     4     :param url:
     5     :return:
     6     """
     7     response = await httpclient.AsyncHTTPClient().fetch(url)
     8     html = response.body.decode("utf8")
     9     soup = BeautifulSoup(html)
    10     title = soup.find(id="cb_post_title_url").get_text()
    11     content = soup.find(id="cnblogs_post_body")
    12     name = url.split("/")[3]
    13     article_id = url.split("/")[-1].split(".")[0]
    14     author_url = "http://www.cnblogs.com/mvc/blog/news.aspx?blogApp={}".format(name)
    15     author_response = await httpclient.AsyncHTTPClient().fetch(author_url)
    16     author_html = author_response.body.decode("utf8")
    17     author_soup = BeautifulSoup(author_html)
    18     author = author_soup.select('div > a')
    19     author_name = author[0].get_text()
    20     blog_age = author[1].get_text()
    21     fans_num = author[2].get_text()
    22     follow_num = author[3].get_text()
    23     await objects.create(
    24         Blogger, title=title,
    25         article_id=article_id,
    26         content=content,
    27         author_name=author_name,
    28         blog_age=blog_age,
    29         fans_num=fans_num,
    30         follow_num=follow_num
    31     )
    View Code

    爬取结果:

     简单体验了下使用Tornado结合协程的方式爬取博客园,这里我开启了10个协程,已经感觉速度很快了,协程间的切换开销是非常小的,而且一个线程或进程可以拥有多个协程,经过实测相比多线程的爬虫确实要快些。

  • 相关阅读:
    leetcode 2 Add Two Numbers
    leetcode1
    二叉树的最大高度和最大宽度
    插入排序
    eventEmitter学习
    用node.js做一个爬虫
    HTLM5 WebSocket权威指南
    (new Function("return " + json))();
    JS中的this变量的使用介绍
    node.js 模块加载原理
  • 原文地址:https://www.cnblogs.com/FG123/p/9934279.html
Copyright © 2011-2022 走看看