zoukankan      html  css  js  c++  java
  • 进程池、tornado、字体

    协程:
     
    import grequests
    from fake_useragent import UserAgent
     
    urls=[f'http://bir删d.so/search?page={page}&q=Python' for page in range(1,9)]
     
    tasks=[grequests.get(url,headers={'User-Agent':UserAgent().random}) for url in urls]
    grequests.map(tasks)
     
    # import time,requests
    # st=time.time()
    # tasks=[requests.get(url,headers={'User-Agent':UserAgent().random}) for url in urls]
    # print(time.time()-st)
    **********************分割线**********************
    多线程:同时执行多种主函数,或同1种主函数执行pages实参次
     
    import time
    from threading import Thread
     
    def func1(m,n):
        print(m);time.sleep(4)
        print(m+n);time.sleep(1)
     
    def func2(x=666):
        print(x);time.sleep(2)
        print('end of func2')
     
    if __name__ == '__main__':
        Thread(target=func1,args=('中','外')).start()
        Thread(target=func2,args=('story',)).start()
    **********************分割线**********************
    线程池:
     
    import threadpool
    import requests,time
    start_time=time.time()
     
    def func(url):
        r=requests.get(url=url)
     
    #线程池的方法makeRequests,类似于进程池的map
    reqs=threadpool.makeRequests(func,['http://www.qq.com' for x in range(50)])
    p=threadpool.ThreadPool(9)
    [p.putRequest(x) for x in reqs]
    p.wait()
    print(f'用时:{time.time() - start_time}秒')
    **********************分割线**********************
    进程池:
     
    from itertools import repeat
    import requests, time
    start_time = time.time()
     
    def func(url, w='qwert'):
        r=requests.get(url=url)
        print(w)
     
    #法1之用多进程库的镜像pathos,它的map的首参函数体可以用lambda,函参可以是多个
    from pathos.multiprocessing import ProcessingPool
    def main():
        p = ProcessingPool(nodes=8)
        p.map(func, ['http://www.example.com/']*5,['wo']*9)    #*args是各组参数列表
        #result=p.amap(pow, [4, 2, 3], [2, 5, 6]).get()    #list(p.imap(pow, [4, 2, 3], [2, 5, 6]))
     
    #法2之使用原生多进程库的starmap方法
    #from multiprocessing import Pool
    #def main():
    #     with Pool(processes=8) as p:    #函参依然在starmap的2参,不过不再局限于单组序列
    #         #p.starmap(func,[('http://www.example.com/','wo')]*9)
    #         p.starmap(func,zip(['http://www.example.com/']*9,repeat('wo')))
     
    if __name__ == '__main__':  # 多进程要有此句,多线程和协程不必
        main()
        print(f'用时:{time.time() - start_time}秒')
     
    map()的2参只能是单组序列。另外比如scrapy的item,下例的testUrl,三程一步所执行的首参函数,其用到的各变量得写在函数内,若为全局变量或致半途而废、张冠李戴。
    ****************************************分割线****************************************
    Egの进程池从西刺代理获取当前可用于目标网站爬虫的各IP:
     
    import os,re,time,random,requests,pandas
    from pandas import DataFrame as DF
    from fake_useragent import UserAgent
    from multiprocessing import Pool
     
    def csvAddTitle():
        title='国家,IP,端口,归属地,匿名,类型,速度,连接时间,存活时间,验证时间'.split(',')
        DF(columns=title).to_csv(collectedIP,mode='a+',index=False,encoding='gbk')
        DF(columns=title).to_csv('D:/usableIP.csv',mode='a+',index=False,encoding='gbk')
     
    def collectingIP():
        #和西刺的标题字段一致;新浪不支持<,发到博客前正则里的<全替换为全角的<
        pattern=re.compile('alt="(Cn)".*?<td>(.*?)<.*?<td>(.*?)<.*?">(.*?)<.*?">(.*?)
    <.*?<td>(.*?)<.*?title="(.*?)".*?title="(.*?)".*?<td>(.*?)<.*?<td>(.*?)<',re.S)
        for page in range(1,9):
            h={'User-Agent':UserAgent().random}
            response=requests.get(f'http://www.xicidaili.com/wt/{page}',headers=h).text
            rows=pattern.findall(response)
            DF(rows).to_csv(collectedIP,mode='a+',index=False,header=False,encoding='gbk')
     
    def testIP(series):
        time.sleep(random.random()/5)
        testUrl='http://www.example.com/'
        h={'User-Agent':UserAgent().random}
        ip=f"{series['类型'].lower()}://{series['IP']}:{series['端口']}"
        proxies={ip.split(':')[0]:ip}   #形如{'http':'http://59.110.46.8:4000'}
        try:
            if requests.get(testUrl,headers=h,proxies=proxies,timeout=2).status_code==200:
                print(f'{ip}:在此站当前可用')
                return series.to_frame().T
        except:
            print(f'{ip}:在此站无效')
     
    def back(df):
        if not df is None:    #用if df:则报错value of a DataFrame is ambiguous
            df.to_csv('D:/usableIP.csv',mode='a+',index=False,header=False,encoding='gbk')
     
    if __name__ == '__main__':
        collectedIP='D:/collectedIP.csv'
        if not os.path.isfile(collectedIP):
            csvAddTitle()
            collectingIP()
        #上面是采集若干ip,下面是验证这些ip在目标站点是否可用
        dfのCollect=pandas.read_csv(collectedIP,encoding='gbk')
        p=Pool(4)
        [p.apply_async(testIP,(row[1],),callback=back) for row in dfのCollect.iterrows()]
        p.close()
        p.join()
    ****************************************分割线****************************************
    Egの进程池抓取meizitu网:
     
    import requests,os,re
    from multiprocessing import Pool
    from bs4 import BeautifulSoup
    from fake_useragent import UserAgent
     
    noName=r'[/:*?"<>|]'
     
    def bs(response):
        return BeautifulSoup(response,'lxml')
     
    def girlsUrl(indexUrl):
        html=requests.get(indexUrl).text
        soup=bs(html)
        girlsUrl=[x['href'] for x in soup.select('#pins li > a')]
        return girlsUrl
     
    def imgsUrl(girlUrl):
        soup=bs(requests.get(girlUrl).text)
        title=soup.select('.main-title')[0].get_text(strip=True)
        title=re.sub(noName,' ',title).strip()
        path=os.path.join('E:/pictures/',title)
        if not os.path.isdir(path):os.makedirs(path)
        os.chdir(path)  #后文保存图片时,就不需再写存放的文件夹了
        num=int(soup.select('.pagenavi span')[-2].get_text(strip=True))
        imgsUrl=[girlUrl+f'/{page}' for page in range(1,num+1)]
        print(title,f':共{num}张',sep='')
        return imgsUrl
     
    def downLoad(jpgUrl):
        soup=bs(requests.get(jpgUrl).text)
        realUrl=soup.select('.main-image img')[0]['src']
        imgName=realUrl.split('/')[-1]
        h={'User-Agent':UserAgent().random,'Referer':jpgUrl}
        imgContent=requests.get(realUrl,headers=h).content
        with open(imgName,'wb') as jpg:
            jpg.write(imgContent)
     
    if __name__ == '__main__':
        indexUrl='http://www.mzitu.com/'
        for girlUrl in girlsUrl(indexUrl):
            #girlUrl='http://www.mzitu.com/89089'   #若单独下载某人的图集,注释掉上两行
            jpgsUrl=imgsUrl(girlUrl)  #函数imgsUrl在循环内,不要有变量和它同名,以免被覆盖
            p = Pool(8)
            p.map(downLoad,[jpgUrl for jpgUrl in jpgsUrl])
            p.close()
            p.join()
    ****************************************分割线****************************************
    tornado示例里的一个高效异步爬虫:
     
    import time
    from datetime import timedelta
    from tornado import httpclient,gen,ioloop,queues
    import traceback
     
    class AsySpider(object):
        """A simple class of asynchronous spider."""
        def __init__(self,urls,concurrency=10,results=None,**kwargs):
            urls.reverse()
            self.urls=urls
            self.concurrency=concurrency
            self._q=queues.Queue()
            self._fetching=set()
            self._fetched=set()
            if results is None:
                self.results=[]
     
        def fetch(self,url,**kwargs):
            fetch=getattr(httpclient.AsyncHTTPClient(),'fetch')
            return fetch(url,raise_error=False,**kwargs)
     
        def handle_html(self,url,html):
            """handle html page"""
            print(url)
     
        def handle_response(self,url,response):
            """inherit and rewrite this method if necessary"""
            if response.code==200:
                self.handle_html(url,response.body)
            elif response.code==599:   # retry
                self._fetching.remove(url)
                self._q.put(url)
     
        @gen.coroutine
        def get_page(self,url):
            try:
                response=yield self.fetch(url)
                #print('######fetched %s' % url)
            except Exception as e:
                print('Exception:%s %s' % (e,url))
                raise gen.Return(e)
            raise gen.Return(response)
     
        @gen.coroutine
        def _run(self):
     
            @gen.coroutine
            def fetch_url():
                current_url=yield self._q.get()
                try:
                    if current_url in self._fetching:
                        return
                    #print('fetching******%s' % current_url)
                    self._fetching.add(current_url)
                    response=yield self.get_page(current_url)
                    self.handle_response(current_url,response)    # handle reponse
                    self._fetched.add(current_url)
                    for i in range(self.concurrency):
                        if self.urls:
                            yield self._q.put(self.urls.pop())
                finally:
                    self._q.task_done()
     
            @gen.coroutine
            def worker():
                while True:
                    yield fetch_url()
            self._q.put(self.urls.pop())    # add first url
            # Start workers,then wait for the work queue to be empty.
            for _ in range(self.concurrency):
                worker()
            yield self._q.join(timeout=timedelta(seconds=300000))
            try:
                assert self._fetching==self._fetched
            except AssertionError:
                print(self._fetching-self._fetched)
                print(self._fetched-self._fetching)
     
        def run(self):
            io_loop=ioloop.IOLoop.current()
            io_loop.run_sync(self._run)
     
    class MySpider(AsySpider):
     
        def fetch(self,url,**kwargs):
            """重写父类fetch方法可以添加cookies,headers等信息"""
            cookie="PHPSESSID=nms56ppb70jri4;think_language=zh-cn"
            headers={
                'User-Agent':'mozilla/5.0 (compatible; baiduspider/2.0; +http://www.baidu.com/search/spider.html)',
                'Cookie':cookie
            }
            return super(MySpider,self).fetch(url,headers=headers)
     
        def handle_html(self,url,html):
            #print(url,html)
            print(url)
     
    def main():
        urls=[f'http://www.baidu.com/?page={page}' for page in range(1,10000)]
        s=MySpider(urls)
        s.run()
     
    if __name__=='__main__':
        main()
    ****************************************分割线****************************************
    异步爬京东的Python书籍:
     
    from gevent import monkey;monkey.patch_all()
    from gevent.pool import Pool
     
    from openpyxl import Workbook
    import requests,random,time
    from fake_useragent import UserAgent
    from lxml.html import fromstring    #from bs4 import BeautifulSoup
     
    urls=[f'https://search.jd.com/Search?keyword=Python&page={2*x+1}' for x in range(10)]
    rule='.gl-item'
    #proxyList=['122.72.18.34:80', '175.5.44.79:808', '221.9.12.4:9000',]
     
    def spiderAndParse(url,rule):
        time.sleep(random.random()/4)
        #p=random.choice(proxyList)    #proxies={'http':'http://'+p,'https':'https://'+p}
        h={'UserAgent':UserAgent().random}
        html=requests.get(url,headers=h).content.decode()
     
        lx=fromstring(html)    #soup=BeautifulSoup(html,'lxml');items=soup.select(rule)
        items=lx.cssselect(rule)
     
        for x in items:  #.text取自家文本;.text_content()取后代的返回lxml对象,加[:]后转为str
            href=f'https://item.jd.com/{x.get("data-sku")}.html'  #{}的子类,取属性要写.get()
            price=x.cssselect('strong> i')[0].text    #>+~它仨的右侧要有空格,左侧可有可无
            comments=x.cssselect('strong> a')[0].text
            name=x.cssselect('a em')[0].text_content()[:].strip()
            try:    #京东自营有author和date,但许多第三方没有
                author=x.cssselect('.p-bookdetails> span')[0].text_content()[:].replace(' 著','')
                date=x.cssselect('.p-bookdetails> span')[-1].text
            except:
                author,date=None,None
            ws.append([href,price,comments,date,name,author])
     
    def main():
        p=Pool(size=8)
        [p.spawn(spiderParse,url,rule) for url in urls]   #首参是函数体,*args是各实参
        p.join()
     
    if __name__ == '__main__':
        wb=Workbook();ws=wb.active;ws.append(['网址','价格','评论数','出版日期','书名','作者'])
        main()
        wb.save('F:/京东搜索书籍.xlsx')
    ****************************************分割线****************************************
    Anti-SpiderのCustomFont:
    自定义字体.woff,有的关联在动态网址中而且每秒一换如猫睛电影,有的在用IE打开其url后会自动存到本机缓存并几天一换如天睛查。缓存形式的, 若抓到的包中无Font文件(F12在Network的Font下,Fiddler是蓝底的A图案),则本机搜.woff, 通常是最近的或名字含错乱字的class属性值的那个.woff。 
    用在线工具FontEditor(http://fontstore.baidu.com/static/editor/)打开此.woff,蓝字为key(16进制,即网页源代码所用的混淆字)黑字为value的对应关系来批替换所爬数据中的混淆字,若chr(int(下方的16进制词组去除uni等干扰后,16))≠上方对应的单字,则fontTools库对付不了。
     
    CustomFontの天睛查:公司的注册资本、注册时间、核准日期
     
    import requests
    from fake_useragent import UserAgent
    from bs4 import BeautifulSoup
    from fontTools.ttLib import TTFont
    from gevent import monkey;monkey.patch_all()
    from gevent.pool import Pool
     
    def fontCorrect(font=''):
        fontFile='D:/tyc-num.woff'
        keys='0'+''.join(TTFont(fontFile).getGlyphOrder()[3:])
        values='.8052431697'  #keys可用库提取,几天一变的values目前只能逐一手敲
        table=str.maketrans(keys,values)    #单对单,若某key或某value的len>1则用{}
        return font.translate(table)
     
    def spiderParse(url):
        h={'User-Agent':UserAgent().random,'Referer':'http://www.baidu.com/'}
        soup=BeautifulSoup(requests.get(url,headers=h).text,'lxml')
        d={'公司名称':soup.select('#company_web_top .vertival-middle')[0].text}
        newFonts=[fontCorrect(sp.text) for sp in soup.select('.tyc-num')]
        d.update(dict(zip(['注册资本','注册时间','核准日期'],newFonts)))
        print(d)
     
    def main():
        urls=['https://www.tianya删除ncha.com/company/24416401',]
        p=Pool(size=4)
        [p.spawn(spiderParse,url) for url in urls]
        p.join()
     
    if __name__=='__main__':
        main()
    **********************分割线**********************
    CustomFontの实習僧:
     
    import requests,re,pandas
    from fake_useragent import UserAgent
    from io import BytesIO
    from base64 import b64decode
    from fontTools.ttLib import TTFont
    from gevent import monkey;monkey.patch_all()
    from gevent.pool import Pool
     
    def mapping(url):
        html=requests.get(url,headers={'User-Agent':UserAgent().random}).text
        fontText=re.findall('base64,(.+?)"',html)[0]
        fontFile=BytesIO(b64decode(fontText.encode()))
        zd={}
        for k,v in TTFont(fontFile).getBestCmap().items():
            zd[f'{k:x}']=chr(int(v[3:],16)) if v.startswith('uni') else v
        return zd
     
    def spiderParse(url,zd):
        html=requests.get(url,headers={'User-Agent':UserAgent().random}).text
        html=re.sub('&#x(w{4})',lambda r:zd[r[1]],html.split('<ul>')[0])
        info=re.findall('职位名称">(.+?)<.+?公司名称">(.+?)</a>(.+?)<.+?
    span>(.+?)<.+?/i>(.+?天).+?font">(.+?)<.+?font">(.+?)<',html,re.S)
        pandas.DataFrame(info).to_csv('sxs.csv',header=False,index=False,encoding='gbk',mode='a+')
     
    def main():
        urls=[f'https://www.shix删除iseng.com/interns?k=爬虫&p={p}' for p in range(1,10)]
        zd=mapping(urls[0])
        p=Pool(size=4)
        [p.spawn(spiderParse,url,zd) for url in urls]
        p.join()
     
    if __name__=='__main__':
        main()
    **********************分割线**********************
    CustomFontの猫睛的想看数、用户评分、评分人数、累计票房:
    网页源代码中搜混淆字的class值,在style标签的js里发现了时刻在变的字体文件//vfile.*.woff。用工具如在线的FontEditor打开:蓝色的keys是在私人造字区E000至F8FF之间的unicode字串,可直接用fontTools库提取,再转为和源代码混淆字相同的格式;不过上方的黑色values也没规律,于是TTFont(*.woff).saveXML('*.xml'),观察几个.woff的.xml,发现glyf标签下,值同其TTGlyph块亦同。 
     
    import requests,re
    from fake_useragent import UserAgent
    from io import BytesIO
    from fontTools.ttLib import TTFont
    from gevent import monkey;monkey.patch_all()
    from gevent.pool import Pool
     
    baseFontFile='D:/model.woff'    #事先保存1份在本地,用作当前各.woff比对的模型
    baseKeys=tuple(TTFont(baseFontFile)['glyf'].glyphs.values())[2:]    #其类型不让用作key
    baseValues='9427635801' #顺序输出除前俩的各单字,哪天不准了再换个model.woff
     
    def fontMapping(customFontUrl):
        fonts=TTFont(BytesIO(requests.get(customFontUrl).content))['glyf']
        keys=[font[3:].lower() for font in fonts.glyphOrder[2:]]
        values=[baseValues[baseKeys.index(k)] for k in tuple(fonts.glyphs.values())[2:]]
        maps=dict(zip(keys,values))
        return maps
     
    def spiderParse(url):
        html=requests.get(url,headers={'User-Agent':UserAgent().random}).text
        customFontUrl='http:'+re.findall('//vfile.+?woff',html)[0]
        maps=fontMapping(customFontUrl)
        html=re.sub('&#x(w{4});',lambda r:maps[r[1]],html) #bs4会硬解析混淆字为乱码
        #print('还原后的网页源代码: '+html);exit()
        result={'电影名称':re.findall('h3 class="name">(.+?)<',html)[0]}
        data=re.findall('stonefont">(.+?)<',html)
        if len(data)==3:    #在各剧的主页提取数据,1页展示多剧的网站主页及榜单页没提取
            result.update(dict(zip(['用户评分','评分人数','累计票房'],data)))
        elif len(data)==2:
            result.update(dict(zip(['想看数','累计票房'],data)))
        elif len(data)==1:
            result.update({'想看数':data[0]})
        if result.get('累计票房'):
            result['累计票房']+=re.findall('unit">(.+?)<',html)[0]
        print(result)
     
    def main():
        urls=['http://maoy删除an.com/films/342068',
            'http://maoy删an.com/films/338463','http://maoy删an.com/films/346272']
        p=Pool(size=4)
        [p.spawn(spiderParse,url) for url in urls]
        p.join()
     
    if __name__=='__main__':
        main()
  • 相关阅读:
    Oracle SQL语句大全—查看表空间
    Class to disable copy and assign constructor
    在moss上自己总结了点小经验。。高手可以飘过 转贴
    在MOSS中直接嵌入ASP.NET Page zt
    Project Web Access 2007自定义FORM验证登录实现 zt
    SharePoint Portal Server 2003 中的单一登录 zt
    vs2008 开发 MOSS 顺序工作流
    VS2008开发MOSS工作流几个需要注意的地方
    向MOSS页面中添加服务器端代码的另外一种方式 zt
    状态机工作流的 SpecialPermissions
  • 原文地址:https://www.cnblogs.com/scrooge/p/7693825.html
Copyright © 2011-2022 走看看