zoukankan      html  css  js  c++  java
  • python 线程池

     1 #coding:utf8
     2 from threading import Thread,Lock
     3 from Queue import Queue,Empty
     4 
     5 class Worker(Thread):
     6     def __init__(self,threadPool):
     7         Thread.__init__(self)
     8         self.threadPool = threadPool
     9         self.daemon = True
    10         self.state = None
    11         self.start()    #启动线程
    12 
    13     def stop(self):
    14         self.state = 'STOP'
    15 
    16     def run(self):
    17         while True:
    18             if self.state == 'STOP':
    19                 break
    20             #获取队列任务
    21             try:
    22                 func,args,kargs = self.threadPool.getTask(timeout = 1)
    23             except Empty:
    24                 continue
    25             #保存结果
    26             #try:
    27             result = func(*args,**kargs)
    28             if result:
    29                 self.threadPool.putTaskResult(result)#存储到结果队列
    30             self.threadPool.taskDone()
    31             #except:
    32             #   print 'error'
    33 
    34 class ThreadPool(object):
    35     def __init__(self,threadNum):
    36         self.pool = [] #线程池
    37         self.threadNum = threadNum #线程数
    38         self.lock = Lock()#线程锁
    39         self.taskQueue = Queue()#任务队列
    40         self.resultQueue = Queue()#结果队列
    41 
    42     def startThreads(self):#开启threadNum个线程
    43         for i in range(self.threadNum):
    44             self.pool.append(Worker(self))
    45 
    46     def stopThreads(self):
    47         for thread in self.pool:
    48             thread.stop()
    49             thread.join()
    50         del self.pool[:]
    51 
    52     def putTask(self,func,*args,**kargs):#放入任务队列
    53         self.taskQueue.put((func,args,kargs))
    54 
    55     def getTask(self,*args,**kargs):
    56         task = self.taskQueue.get(*args,**kargs)
    57         return task
    58     
    59     def taskJoin(self, *args, **kargs):
    60         self.taskQueue.join()
    61 
    62     def taskDone(self, *args, **kargs):
    63         self.taskQueue.task_done()
    64 
    65     def putTaskResult(self, *args):
    66         self.resultQueue.put(args)
    67 
    68     def getTaskResult(self, *args, **kargs):
    69         return self.resultQueue.get(*args, **kargs) 

    这是参考别人的线程池的代码。https://github.com/lvyaojia/crawler/blob/master/threadPool.py

    然后自己的那个爬虫修改如下:

      1 import urllib2
      2 import re
      3 from pyquery import PyQuery as pq
      4 from lxml import etree
      5 import urlparse
      6 import time
      7 from threadPool import ThreadPool
      8 
      9 allUrls = set()
     10 allMails = set()
     11 urlsDownlist = []
     12 
     13 class mailCrawler:
     14     def __init__(self,mailExpression,start_url,maxcount,threadNum):   
     15         ''' mailExpressoin 邮箱的正则表达式;
     16         start_url开始邮箱;
     17         maxcount最大数量'''
     18         self.mailpattern = re.compile(mailExpression)
     19         self.maxcount = maxcount
     20         self.htmlcount = 0
     21         self.UrlsQlist = []#url queue 实现广度优先
     22         self.url = start_url
     23         self.threadPool = ThreadPool(threadNum)
     24 
     25     
     26     def url_normal(self,url):
     27         '''url 规范化 '''
     28         scheme,netloc,path,query = urlparse.urlsplit(url)[:4]
     29         netloc = netloc.lower()
     30 
     31         url.encode("utf-8")
     32 
     33         if path:
     34             path = re.sub('/{2,}','/',path)#去除url中的重复/
     35             path = re.sub(r'\.$','',path)#去除url中结尾多余的点
     36             path = re.sub('/$','',path)#去除url中结尾多余的/
     37             path = re.sub('\s','',path)#取出url中的空格
     38         if query:
     39             return '%s://%s%s?%s' % (scheme,netloc,path or '/',query)
     40         else:
     41             return '%s://%s%s' % (scheme,netloc,path)
     42 
     43     def geturls(self,data):
     44         '''解析html中的url'''
     45         urls = set()
     46         if data:  
     47             d = pq(data)
     48             label_a = d.find('a')#用pyquery库去找到 a 标签.
     49             if label_a:
     50                 label_a_href = d('a').map(lambda i,e:pq(e)('a').attr('href'))
     51                 for u in label_a_href:
     52                     if u[0:10]!="javascript" and u[0:6]!="mailto" :  
     53                         if u[0:4] == "http":
     54                             normal_url = self.url_normal(u)
     55                             urls.add(normal_url)
     56                         else:
     57                             normal_url = self.url_normal(self.url + u)
     58                             urls.add(normal_url)              
     59             return urls
     60         else:
     61             return None
     62         
     63     def gethtml(self,url):
     64         '''下载html  5s超时'''
     65         try:
     66             fp = urllib2.urlopen(url,None,5)
     67         except:
     68             print "urllib2.urlopen error  or timeout"
     69             return None
     70         else:
     71             try:
     72                 mybytes =fp.read()
     73             except:
     74                 print "fp.read() error"
     75                 return None
     76             else:             
     77                 fp.close()
     78                 return mybytes
     79             
     80     def savehtmlAndmails(self,pagecontent,htmlcount,url):
     81         '''保存html文件 '''
     82         global allMails
     83         if pagecontent != None:
     84             f = open("E:/py/crawler/html/"+str(htmlcount)+".html","w")
     85             f.write(pagecontent)
     86             f.close()
     87             '''将抓取到的url存放到 allmails中 ,set去重复'''
     88             mailResult = self.mailpattern.findall(pagecontent)
     89             mailResultset = set(mailResult)
     90             if mailResultset:
     91                 allMails.update(mailResultset)
     92         else:
     93             f = open("E:/py/crawler/html/"+str(htmlcount)+"error"+".html","w")
     94             try:
     95                 f.write(url)
     96             except:
     97                 f.write("encode error")
     98             f.close()
     99             
    100     def BFS(self):
    101         '''用队列实现广度优先,爬取url '''
    102         global allUrls
    103         global urlsDownlist
    104         self.threadPool.startThreads()#开始多线程
    105         allUrls.add(self.url)
    106         self.UrlsQlist = list(allUrls)
    107         while self.htmlcount < self.maxcount : #数量小于最大值
    108             tempUrl = self.UrlsQlist.pop(0)# the queue
    109             print tempUrl
    110             urlsDownlist.append(tempUrl)
    111             myWebStr = self.gethtml(tempUrl)
    112             #self.savehtml(myWebStr,self.htmlcount,tempUrl)
    113             #self.savemails(myWebStr)
    114             self.threadPool.putTask(self.savehtmlAndmails,myWebStr,self.htmlcount,tempUrl)
    115             firstUrls_set = self.geturls(myWebStr)#初始页面的处理
    116             if firstUrls_set != None:
    117                 for u in firstUrls_set:
    118                     if u not in allUrls:
    119                         allUrls.add(u)
    120                         self.UrlsQlist.append(u)       
    121             self.htmlcount = self.htmlcount + 1
    122         self.threadPool.stopThreads()#结束多线程   
    123 
    124 def main():
    125     reg = r'[A-Za-z0-9_]+@(?:[A-Za-z0-9]+\.)+[A-Za-z]+'
    126     url = "http://www.163.com"
    127     count = 100
    128     threadNum = 10
    129     fmails = open("E:/py/crawler/mailresult.txt","a")
    130     furls = open("E:/py/crawler/urlresult.txt","a")
    131     fdownUrls = open("E:/py/crawler/urlDownresult.txt","a")
    132     newcrawler = mailCrawler(reg,url,count,threadNum)
    133     newcrawler.BFS()
    134     for u in allMails:
    135         try:
    136             fmails.write(u)
    137             fmails.write('\n')
    138         except:
    139             continue
    140     for u in allUrls:
    141         try:
    142             furls.write(u)
    143             furls.write('\n')
    144         except:
    145             continue
    146     for u in urlsDownlist:
    147         try:
    148             fdownUrls.write(u)
    149             fdownUrls.write('\n')
    150         except:
    151             continue
    152     fmails.close()
    153     furls.close()
    154     fdownUrls.close()
    155 
    156 if __name__ == '__main__':
    157     main()
    View Code

    我主要是把写文件交给了线程处理。

      经测试,发现速度根本没有变化。。。冏。。。。

    始终觉得根本没有多线程,还没有最初的那个稳定。。。

    不知道 从何处下手了。。。

    
    
    
    
  • 相关阅读:
    Redis主从同步原理-SYNC【转】
    redis3.0集群部署和测试
    Zabbix3.0配置邮件报警
    3分钟学会git命令的基础使用
    Rsync文件同步工具
    logstash grok 内置正则
    logrotate实现Mysql慢日志分割
    Python之unittest测试代码
    Zabbix如何实现批量监控端口状态
    Centos7搭建Confluence破解版
  • 原文地址:https://www.cnblogs.com/xibaohe/p/3075846.html
Copyright © 2011-2022 走看看