在上一节我们回顾了python 多线程的知识。
queue这个线程安全的序列正是python用来实现线程池的关键。我们可以把爬虫需要解析的URL放入这个序列中,供其它空闲的线程获取并使用。
线程池的实现:
import ThreadPool.dlthread class threadpool: def __init__( self,queue,handlers,maxdepth,num_of_threads): self.queue = queue self.handlers=handlers self.maxdepth=maxdepth self.threads = [] self.__createThreadPool(num_of_threads) def __createThreadPool( self, num_of_threads): for i in range(num_of_threads): thread = ThreadPool.dlthread.dlthread(self.queue,self.handlers,self.maxdepth) self.threads.append(thread) def wait_for_complete(self): while len(self.threads): thread = self.threads.pop() thread.setDaemon(True) thread.start() if thread.isAlive(): thread.join()
可以看到python 的线程池还挺简单的。下面的代码是工作线程的实现:
import threading import Crawler.propertybag import copy class dlthread(threading.Thread): def __init__(self, queue,handlers,maxdepth): super(dlthread,self).__init__() self.maxdepth=maxdepth self.queue=queue self.handlers=copy.deepcopy(handlers)#深表复制 self.handlers_num=len(handlers) def run(self): while True: urltuple=self.queue.get() if self.handlers_num > 0: ps=Crawler.propertybag.propertybag(urltuple[1],self.queue,self.maxdepth) for i in range(0,self.handlers_num): self.handlers[i].Handle(urltuple[0],ps) self.queue.task_done()
其中propertybag是用来在管道(Handler)间传递数据用的。为了线程间不互相干扰,所有的Handler都是进行深表复制后才赋予线程操作的。 所有的管道(在NCrawler中叫pipeline,我这里叫Handler)实现的基类是:
class BaseHandler: def __init__(self): return def Handle(self,url,pbags): pass
最后是Crawler类:
import queue import time from ThreadPool.threadpool import threadpool import Handler.HTMLHandler from Crawler.cyclethread import cyclethread class Crawler: def __init__(self,starturl, handles, threads_num=5, maxdepth=14): if handles is None or len(handles) == 0: raise Exception('No handlers is given') self.queue=queue.Queue() self.starturl=starturl self.queue.put((starturl,0)) self.maxdepth=maxdepth if type(handles) is list and len(handles)>0: self.handlers=handles else: raise Exception('no handlers is given') self.threads_num=threads_num def crawl(self): tp=threadpool(maxdepth=self.maxdepth,queue=self.queue,handlers=self.handlers,num_of_threads=self.threads_num) tp.wait_for_complete() def cyclecrawl(self,period): ct=cyclethread(self.queue,(self.starturl,0),period) ct.setDaemon(True) ct.start() self.crawl()
它负责开启线程池并按照用户指定的方式运行爬虫。