Demo源码地址
https://github.com/CHUNL09/tornado/tree/master/demos/webspider
这个Demo的作用是用来获取特定URL的网页中的链接(链接是以特定URL作为开头的,比如设置了base_url="http://www.baidu.com",那么只会获取以"http://www.baidu.com开头的链接")。代码如下:
#!/usr/bin/env python import time from datetime import timedelta try: #python 2.7 适用 from HTMLParser import HTMLParser from urlparse import urljoin, urldefrag except ImportError: from html.parser import HTMLParser from urllib.parse import urljoin, urldefrag from tornado import httpclient, gen, ioloop, queues base_url = 'http://www.tornadoweb.org/en/stable/' concurrency = 10 @gen.coroutine def get_links_from_url(url): """Download the page at `url` and parse it for links. Returned links have had the fragment after `#` removed, and have been made absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes 'http://www.tornadoweb.org/en/stable/gen.html'. """ try: response = yield httpclient.AsyncHTTPClient().fetch(url) print('fetched %s' % url) html = response.body if isinstance(response.body, str) else response.body.decode() urls = [urljoin(url, remove_fragment(new_url)) for new_url in get_links(html)] except Exception as e: print('Exception: %s %s' % (e, url)) raise gen.Return([]) raise gen.Return(urls) def remove_fragment(url): pure_url, frag = urldefrag(url) return pure_url def get_links(html): # get all links in html page class URLSeeker(HTMLParser): def __init__(self): HTMLParser.__init__(self) self.urls = [] def handle_starttag(self, tag, attrs): href = dict(attrs).get('href') if href and tag == 'a': self.urls.append(href) url_seeker = URLSeeker() url_seeker.feed(html) return url_seeker.urls @gen.coroutine def main(): q = queues.Queue() start = time.time() fetching, fetched = set(), set() @gen.coroutine def fetch_url(): current_url = yield q.get() try: if current_url in fetching: return print('fetching %s' % current_url) fetching.add(current_url) urls = yield get_links_from_url(current_url) fetched.add(current_url) for new_url in urls: # Only follow links beneath the base URL if new_url.startswith(base_url): yield q.put(new_url) finally: q.task_done() @gen.coroutine def worker(): while True: yield fetch_url() q.put(base_url) # Start workers, then wait for the work queue to be empty. for _ in range(concurrency): worker() yield q.join(timeout=timedelta(seconds=300)) assert fetching == fetched print('Done in %d seconds, fetched %s URLs.' % ( time.time() - start, len(fetched))) if __name__ == '__main__': import logging logging.basicConfig() io_loop = ioloop.IOLoop.current() io_loop.run_sync(main)
下面开始分析这个代码。
1 从程序的最终执行部分看起:
1 if __name__ == '__main__': 2 import logging 3 logging.basicConfig() 4 io_loop = ioloop.IOLoop.current() 5 io_loop.run_sync(main)
这里logging.basicConfig()貌似没有起作用,这个方法是在logging模块中用来设置日志的基本格式用的。这里显然没有用到。IOLoop.current()用来返回当前线程的IOloop. run_sync方法是用来启动IOLoop,运行,并且结束(Starts the IOLoop
, runs the given function, and stops the loop.)。
run_sync函数和tornado.gen.coroutine配合使用,主要是为了在mian函数中能够异步调用。Tornado官方给出了如下的使用示例:
@gen.coroutine def main(): # do stuff... if __name__ == '__main__': IOLoop.current().run_sync(main)
关于IOLoop.current()和IOLoop.instance()的区别请点击这里。
2 main函数。
首先,main函数前面带了@gen.coroutine装饰器,为了能够在main函数中实现异步调用。
1 @gen.coroutine 2 def main(): 3 q = queues.Queue() 4 start = time.time() 5 fetching, fetched = set(), set() 6 7 @gen.coroutine 8 def fetch_url(): 9 current_url = yield q.get() 10 try: 11 if current_url in fetching: 12 return 13 14 print('fetching %s' % current_url) 15 fetching.add(current_url) 16 urls = yield get_links_from_url(current_url) # 获取current_url页面中的link 17 fetched.add(current_url) 18 19 for new_url in urls: # 对于子链接进行处理,只有符合条件的链接才会放入到queue中 20 # Only follow links beneath the base URL 21 if new_url.startswith(base_url): 22 yield q.put(new_url) 23 24 finally: 25 q.task_done() #Indicate that a formerly enqueued task is complete. 表示get从queue中取出的任务已经完成 26 27 @gen.coroutine 28 def worker(): 29 while True: 30 yield fetch_url() 31 32 q.put(base_url) 33 34 # Start workers, then wait for the work queue to be empty. 35 for _ in range(concurrency): 36 worker() 37 yield q.join(timeout=timedelta(seconds=300)) 38 assert fetching == fetched 39 print('Done in %d seconds, fetched %s URLs.' % ( 40 time.time() - start, len(fetched)))
line3 初始化了一个queue,这里使用的是tornado提供的queue(需要from tornado import queues ).
line5 初始化了两个集合fetching和fetched. fetching中存放正在处理的URL,而fetched中存放处理完成的URL。
line7-25 定义了函数fetch_url()主要是用来从queue中获取URL,并处理。
line27-30 定义了worker()函数,在其中使用了while True, 会不停的去yield fetch_url(). 这里while True是必须的,否则执行过一次的yield fetch_url()会hang住直到timeout.
line35-36 模拟并发效果,这里也可以取消for循环,但是实际结果消耗时间会大大多于并发的情况(可以自行测试实验)。
line37 q.join()的作用是block,直到queue中所有的任务都完成或者timeout.
line38 用断言来判断fetching 和fetched集合,正常情况下,两个集合中的URL数量应该是相等的。否则的话会raise一个断言的error出来。
3 其他定义的函数
代码如下:
1 @gen.coroutine 2 def get_links_from_url(url): 3 """Download the page at `url` and parse it for links. 4 5 Returned links have had the fragment after `#` removed, and have been made 6 absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes 7 'http://www.tornadoweb.org/en/stable/gen.html'. 8 """ 9 try: 10 response = yield httpclient.AsyncHTTPClient().fetch(url) 11 print('fetched %s' % url) 12 13 html = response.body if isinstance(response.body, str) 14 else response.body.decode() 15 urls = [urljoin(url, remove_fragment(new_url)) 16 for new_url in get_links(html)] 17 except Exception as e: 18 print('Exception: %s %s' % (e, url)) 19 raise gen.Return([]) 20 21 raise gen.Return(urls) 22 23 24 def remove_fragment(url): 25 pure_url, frag = urldefrag(url) 26 return pure_url 27 28 29 def get_links(html): # get all links in html page 30 class URLSeeker(HTMLParser): 31 def __init__(self): 32 HTMLParser.__init__(self) 33 self.urls = [] 34 35 def handle_starttag(self, tag, attrs): 36 href = dict(attrs).get('href') 37 if href and tag == 'a': 38 self.urls.append(href) 39 40 url_seeker = URLSeeker() 41 url_seeker.feed(html) 42 return url_seeker.urls
get_links_from_url函数
line 1-21定义的get_links_from_url函数,函数接收一个URL参数,并返回这个URL页面中所有的链接数量。使用URL获取页面内容这里使用的是tornado的httpclient中的方法httpclient.AsyncHTTPClient().fetch(). [也可以使用urllib.request.urlopen来抓取页面内容].
line15-16 分别调用了两个函数get_links和remove_fragment来获取新的URLs.
最终返回的是一个URL的列表。line 21 这里的raise gen.Return(urls) 可以直接替换为return urls,前者是旧版本tornado的用法。
get_links函数
line29-42定义了get_links函数,它接收html页面的内容,并将页面中的a标签的链接返回。实现方式是用HTMLParser。具体实现时要重写handle_starttag方法
remove_fragment 函数
line 24-26定义了remove_fragment函数,函数接收一个URL,并且会把URL中'#'后面的内容截掉,如:
>>> pure_url,frag = urldefrag("http://docs.python.org/2/reference/compound_stmts.html#the-with-statement #h1 #h2") >>> pure_url 'http://docs.python.org/2/reference/compound_stmts.html' >>> frag 'the-with-statement #h1 #h2'
小结
整体代码比较简洁,主要是使用了tornado的异步方式来获取。后续有时间会在这个基础上扩展下实现一个完整的爬虫。