zoukankan      html  css  js  c++  java
  • 进程池、tornado、字体

    协程:
     
    import grequests
    from fake_useragent import UserAgent
     
    urls=[f'http://bir删d.so/search?page={page}&q=Python' for page in range(1,9)]
     
    tasks=[grequests.get(url,headers={'User-Agent':UserAgent().random}) for url in urls]
    grequests.map(tasks)
     
    # import time,requests
    # st=time.time()
    # tasks=[requests.get(url,headers={'User-Agent':UserAgent().random}) for url in urls]
    # print(time.time()-st)
    **********************分割线**********************
    多线程:同时执行多种主函数,或同1种主函数执行pages实参次
     
    import time
    from threading import Thread
     
    def func1(m,n):
        print(m);time.sleep(4)
        print(m+n);time.sleep(1)
     
    def func2(x=666):
        print(x);time.sleep(2)
        print('end of func2')
     
    if __name__ == '__main__':
        Thread(target=func1,args=('中','外')).start()
        Thread(target=func2,args=('story',)).start()
    **********************分割线**********************
    线程池:
     
    import threadpool
    import requests,time
    start_time=time.time()
     
    def func(url):
        r=requests.get(url=url)
     
    #线程池的方法makeRequests,类似于进程池的map
    reqs=threadpool.makeRequests(func,['http://www.qq.com' for x in range(50)])
    p=threadpool.ThreadPool(9)
    [p.putRequest(x) for x in reqs]
    p.wait()
    print(f'用时:{time.time() - start_time}秒')
    **********************分割线**********************
    进程池:
     
    from itertools import repeat
    import requests, time
    start_time = time.time()
     
    def func(url, w='qwert'):
        r=requests.get(url=url)
        print(w)
     
    #法1之用多进程库的镜像pathos,它的map的首参函数体可以用lambda,函参可以是多个
    from pathos.multiprocessing import ProcessingPool
    def main():
        p = ProcessingPool(nodes=8)
        p.map(func, ['http://www.example.com/']*5,['wo']*9)    #*args是各组参数列表
        #result=p.amap(pow, [4, 2, 3], [2, 5, 6]).get()    #list(p.imap(pow, [4, 2, 3], [2, 5, 6]))
     
    #法2之使用原生多进程库的starmap方法
    #from multiprocessing import Pool
    #def main():
    #     with Pool(processes=8) as p:    #函参依然在starmap的2参,不过不再局限于单组序列
    #         #p.starmap(func,[('http://www.example.com/','wo')]*9)
    #         p.starmap(func,zip(['http://www.example.com/']*9,repeat('wo')))
     
    if __name__ == '__main__':  # 多进程要有此句,多线程和协程不必
        main()
        print(f'用时:{time.time() - start_time}秒')
     
    map()的2参只能是单组序列。另外比如scrapy的item,下例的testUrl,三程一步所执行的首参函数,其用到的各变量得写在函数内,若为全局变量或致半途而废、张冠李戴。
    ****************************************分割线****************************************
    Egの进程池从西刺代理获取当前可用于目标网站爬虫的各IP:
     
    import os,re,time,random,requests,pandas
    from pandas import DataFrame as DF
    from fake_useragent import UserAgent
    from multiprocessing import Pool
     
    def csvAddTitle():
        title='国家,IP,端口,归属地,匿名,类型,速度,连接时间,存活时间,验证时间'.split(',')
        DF(columns=title).to_csv(collectedIP,mode='a+',index=False,encoding='gbk')
        DF(columns=title).to_csv('D:/usableIP.csv',mode='a+',index=False,encoding='gbk')
     
    def collectingIP():
        #和西刺的标题字段一致;新浪不支持<,发到博客前正则里的<全替换为全角的<
        pattern=re.compile('alt="(Cn)".*?<td>(.*?)<.*?<td>(.*?)<.*?">(.*?)<.*?">(.*?)
    <.*?<td>(.*?)<.*?title="(.*?)".*?title="(.*?)".*?<td>(.*?)<.*?<td>(.*?)<',re.S)
        for page in range(1,9):
            h={'User-Agent':UserAgent().random}
            response=requests.get(f'http://www.xicidaili.com/wt/{page}',headers=h).text
            rows=pattern.findall(response)
            DF(rows).to_csv(collectedIP,mode='a+',index=False,header=False,encoding='gbk')
     
    def testIP(series):
        time.sleep(random.random()/5)
        testUrl='http://www.example.com/'
        h={'User-Agent':UserAgent().random}
        ip=f"{series['类型'].lower()}://{series['IP']}:{series['端口']}"
        proxies={ip.split(':')[0]:ip}   #形如{'http':'http://59.110.46.8:4000'}
        try:
            if requests.get(testUrl,headers=h,proxies=proxies,timeout=2).status_code==200:
                print(f'{ip}:在此站当前可用')
                return series.to_frame().T
        except:
            print(f'{ip}:在此站无效')
     
    def back(df):
        if not df is None:    #用if df:则报错value of a DataFrame is ambiguous
            df.to_csv('D:/usableIP.csv',mode='a+',index=False,header=False,encoding='gbk')
     
    if __name__ == '__main__':
        collectedIP='D:/collectedIP.csv'
        if not os.path.isfile(collectedIP):
            csvAddTitle()
            collectingIP()
        #上面是采集若干ip,下面是验证这些ip在目标站点是否可用
        dfのCollect=pandas.read_csv(collectedIP,encoding='gbk')
        p=Pool(4)
        [p.apply_async(testIP,(row[1],),callback=back) for row in dfのCollect.iterrows()]
        p.close()
        p.join()
    ****************************************分割线****************************************
    Egの进程池抓取meizitu网:
     
    import requests,os,re
    from multiprocessing import Pool
    from bs4 import BeautifulSoup
    from fake_useragent import UserAgent
     
    noName=r'[/:*?"<>|]'
     
    def bs(response):
        return BeautifulSoup(response,'lxml')
     
    def girlsUrl(indexUrl):
        html=requests.get(indexUrl).text
        soup=bs(html)
        girlsUrl=[x['href'] for x in soup.select('#pins li > a')]
        return girlsUrl
     
    def imgsUrl(girlUrl):
        soup=bs(requests.get(girlUrl).text)
        title=soup.select('.main-title')[0].get_text(strip=True)
        title=re.sub(noName,' ',title).strip()
        path=os.path.join('E:/pictures/',title)
        if not os.path.isdir(path):os.makedirs(path)
        os.chdir(path)  #后文保存图片时,就不需再写存放的文件夹了
        num=int(soup.select('.pagenavi span')[-2].get_text(strip=True))
        imgsUrl=[girlUrl+f'/{page}' for page in range(1,num+1)]
        print(title,f':共{num}张',sep='')
        return imgsUrl
     
    def downLoad(jpgUrl):
        soup=bs(requests.get(jpgUrl).text)
        realUrl=soup.select('.main-image img')[0]['src']
        imgName=realUrl.split('/')[-1]
        h={'User-Agent':UserAgent().random,'Referer':jpgUrl}
        imgContent=requests.get(realUrl,headers=h).content
        with open(imgName,'wb') as jpg:
            jpg.write(imgContent)
     
    if __name__ == '__main__':
        indexUrl='http://www.mzitu.com/'
        for girlUrl in girlsUrl(indexUrl):
            #girlUrl='http://www.mzitu.com/89089'   #若单独下载某人的图集,注释掉上两行
            jpgsUrl=imgsUrl(girlUrl)  #函数imgsUrl在循环内,不要有变量和它同名,以免被覆盖
            p = Pool(8)
            p.map(downLoad,[jpgUrl for jpgUrl in jpgsUrl])
            p.close()
            p.join()
    ****************************************分割线****************************************
    tornado示例里的一个高效异步爬虫:
     
    import time
    from datetime import timedelta
    from tornado import httpclient,gen,ioloop,queues
    import traceback
     
    class AsySpider(object):
        """A simple class of asynchronous spider."""
        def __init__(self,urls,concurrency=10,results=None,**kwargs):
            urls.reverse()
            self.urls=urls
            self.concurrency=concurrency
            self._q=queues.Queue()
            self._fetching=set()
            self._fetched=set()
            if results is None:
                self.results=[]
     
        def fetch(self,url,**kwargs):
            fetch=getattr(httpclient.AsyncHTTPClient(),'fetch')
            return fetch(url,raise_error=False,**kwargs)
     
        def handle_html(self,url,html):
            """handle html page"""
            print(url)
     
        def handle_response(self,url,response):
            """inherit and rewrite this method if necessary"""
            if response.code==200:
                self.handle_html(url,response.body)
            elif response.code==599:   # retry
                self._fetching.remove(url)
                self._q.put(url)
     
        @gen.coroutine
        def get_page(self,url):
            try:
                response=yield self.fetch(url)
                #print('######fetched %s' % url)
            except Exception as e:
                print('Exception:%s %s' % (e,url))
                raise gen.Return(e)
            raise gen.Return(response)
     
        @gen.coroutine
        def _run(self):
     
            @gen.coroutine
            def fetch_url():
                current_url=yield self._q.get()
                try:
                    if current_url in self._fetching:
                        return
                    #print('fetching******%s' % current_url)
                    self._fetching.add(current_url)
                    response=yield self.get_page(current_url)
                    self.handle_response(current_url,response)    # handle reponse
                    self._fetched.add(current_url)
                    for i in range(self.concurrency):
                        if self.urls:
                            yield self._q.put(self.urls.pop())
                finally:
                    self._q.task_done()
     
            @gen.coroutine
            def worker():
                while True:
                    yield fetch_url()
            self._q.put(self.urls.pop())    # add first url
            # Start workers,then wait for the work queue to be empty.
            for _ in range(self.concurrency):
                worker()
            yield self._q.join(timeout=timedelta(seconds=300000))
            try:
                assert self._fetching==self._fetched
            except AssertionError:
                print(self._fetching-self._fetched)
                print(self._fetched-self._fetching)
     
        def run(self):
            io_loop=ioloop.IOLoop.current()
            io_loop.run_sync(self._run)
     
    class MySpider(AsySpider):
     
        def fetch(self,url,**kwargs):
            """重写父类fetch方法可以添加cookies,headers等信息"""
            cookie="PHPSESSID=nms56ppb70jri4;think_language=zh-cn"
            headers={
                'User-Agent':'mozilla/5.0 (compatible; baiduspider/2.0; +http://www.baidu.com/search/spider.html)',
                'Cookie':cookie
            }
            return super(MySpider,self).fetch(url,headers=headers)
     
        def handle_html(self,url,html):
            #print(url,html)
            print(url)
     
    def main():
        urls=[f'http://www.baidu.com/?page={page}' for page in range(1,10000)]
        s=MySpider(urls)
        s.run()
     
    if __name__=='__main__':
        main()
    ****************************************分割线****************************************
    异步爬京东的Python书籍:
     
    from gevent import monkey;monkey.patch_all()
    from gevent.pool import Pool
     
    from openpyxl import Workbook
    import requests,random,time
    from fake_useragent import UserAgent
    from lxml.html import fromstring    #from bs4 import BeautifulSoup
     
    urls=[f'https://search.jd.com/Search?keyword=Python&page={2*x+1}' for x in range(10)]
    rule='.gl-item'
    #proxyList=['122.72.18.34:80', '175.5.44.79:808', '221.9.12.4:9000',]
     
    def spiderAndParse(url,rule):
        time.sleep(random.random()/4)
        #p=random.choice(proxyList)    #proxies={'http':'http://'+p,'https':'https://'+p}
        h={'UserAgent':UserAgent().random}
        html=requests.get(url,headers=h).content.decode()
     
        lx=fromstring(html)    #soup=BeautifulSoup(html,'lxml');items=soup.select(rule)
        items=lx.cssselect(rule)
     
        for x in items:  #.text取自家文本;.text_content()取后代的返回lxml对象,加[:]后转为str
            href=f'https://item.jd.com/{x.get("data-sku")}.html'  #{}的子类,取属性要写.get()
            price=x.cssselect('strong> i')[0].text    #>+~它仨的右侧要有空格,左侧可有可无
            comments=x.cssselect('strong> a')[0].text
            name=x.cssselect('a em')[0].text_content()[:].strip()
            try:    #京东自营有author和date,但许多第三方没有
                author=x.cssselect('.p-bookdetails> span')[0].text_content()[:].replace(' 著','')
                date=x.cssselect('.p-bookdetails> span')[-1].text
            except:
                author,date=None,None
            ws.append([href,price,comments,date,name,author])
     
    def main():
        p=Pool(size=8)
        [p.spawn(spiderParse,url,rule) for url in urls]   #首参是函数体,*args是各实参
        p.join()
     
    if __name__ == '__main__':
        wb=Workbook();ws=wb.active;ws.append(['网址','价格','评论数','出版日期','书名','作者'])
        main()
        wb.save('F:/京东搜索书籍.xlsx')
    ****************************************分割线****************************************
    Anti-SpiderのCustomFont:
    自定义字体.woff,有的关联在动态网址中而且每秒一换如猫睛电影,有的在用IE打开其url后会自动存到本机缓存并几天一换如天睛查。缓存形式的, 若抓到的包中无Font文件(F12在Network的Font下,Fiddler是蓝底的A图案),则本机搜.woff, 通常是最近的或名字含错乱字的class属性值的那个.woff。 
    用在线工具FontEditor(http://fontstore.baidu.com/static/editor/)打开此.woff,蓝字为key(16进制,即网页源代码所用的混淆字)黑字为value的对应关系来批替换所爬数据中的混淆字,若chr(int(下方的16进制词组去除uni等干扰后,16))≠上方对应的单字,则fontTools库对付不了。
     
    CustomFontの天睛查:公司的注册资本、注册时间、核准日期
     
    import requests
    from fake_useragent import UserAgent
    from bs4 import BeautifulSoup
    from fontTools.ttLib import TTFont
    from gevent import monkey;monkey.patch_all()
    from gevent.pool import Pool
     
    def fontCorrect(font=''):
        fontFile='D:/tyc-num.woff'
        keys='0'+''.join(TTFont(fontFile).getGlyphOrder()[3:])
        values='.8052431697'  #keys可用库提取,几天一变的values目前只能逐一手敲
        table=str.maketrans(keys,values)    #单对单,若某key或某value的len>1则用{}
        return font.translate(table)
     
    def spiderParse(url):
        h={'User-Agent':UserAgent().random,'Referer':'http://www.baidu.com/'}
        soup=BeautifulSoup(requests.get(url,headers=h).text,'lxml')
        d={'公司名称':soup.select('#company_web_top .vertival-middle')[0].text}
        newFonts=[fontCorrect(sp.text) for sp in soup.select('.tyc-num')]
        d.update(dict(zip(['注册资本','注册时间','核准日期'],newFonts)))
        print(d)
     
    def main():
        urls=['https://www.tianya删除ncha.com/company/24416401',]
        p=Pool(size=4)
        [p.spawn(spiderParse,url) for url in urls]
        p.join()
     
    if __name__=='__main__':
        main()
    **********************分割线**********************
    CustomFontの实習僧:
     
    import requests,re,pandas
    from fake_useragent import UserAgent
    from io import BytesIO
    from base64 import b64decode
    from fontTools.ttLib import TTFont
    from gevent import monkey;monkey.patch_all()
    from gevent.pool import Pool
     
    def mapping(url):
        html=requests.get(url,headers={'User-Agent':UserAgent().random}).text
        fontText=re.findall('base64,(.+?)"',html)[0]
        fontFile=BytesIO(b64decode(fontText.encode()))
        zd={}
        for k,v in TTFont(fontFile).getBestCmap().items():
            zd[f'{k:x}']=chr(int(v[3:],16)) if v.startswith('uni') else v
        return zd
     
    def spiderParse(url,zd):
        html=requests.get(url,headers={'User-Agent':UserAgent().random}).text
        html=re.sub('&#x(w{4})',lambda r:zd[r[1]],html.split('<ul>')[0])
        info=re.findall('职位名称">(.+?)<.+?公司名称">(.+?)</a>(.+?)<.+?
    span>(.+?)<.+?/i>(.+?天).+?font">(.+?)<.+?font">(.+?)<',html,re.S)
        pandas.DataFrame(info).to_csv('sxs.csv',header=False,index=False,encoding='gbk',mode='a+')
     
    def main():
        urls=[f'https://www.shix删除iseng.com/interns?k=爬虫&p={p}' for p in range(1,10)]
        zd=mapping(urls[0])
        p=Pool(size=4)
        [p.spawn(spiderParse,url,zd) for url in urls]
        p.join()
     
    if __name__=='__main__':
        main()
    **********************分割线**********************
    CustomFontの猫睛的想看数、用户评分、评分人数、累计票房:
    网页源代码中搜混淆字的class值,在style标签的js里发现了时刻在变的字体文件//vfile.*.woff。用工具如在线的FontEditor打开:蓝色的keys是在私人造字区E000至F8FF之间的unicode字串,可直接用fontTools库提取,再转为和源代码混淆字相同的格式;不过上方的黑色values也没规律,于是TTFont(*.woff).saveXML('*.xml'),观察几个.woff的.xml,发现glyf标签下,值同其TTGlyph块亦同。 
     
    import requests,re
    from fake_useragent import UserAgent
    from io import BytesIO
    from fontTools.ttLib import TTFont
    from gevent import monkey;monkey.patch_all()
    from gevent.pool import Pool
     
    baseFontFile='D:/model.woff'    #事先保存1份在本地,用作当前各.woff比对的模型
    baseKeys=tuple(TTFont(baseFontFile)['glyf'].glyphs.values())[2:]    #其类型不让用作key
    baseValues='9427635801' #顺序输出除前俩的各单字,哪天不准了再换个model.woff
     
    def fontMapping(customFontUrl):
        fonts=TTFont(BytesIO(requests.get(customFontUrl).content))['glyf']
        keys=[font[3:].lower() for font in fonts.glyphOrder[2:]]
        values=[baseValues[baseKeys.index(k)] for k in tuple(fonts.glyphs.values())[2:]]
        maps=dict(zip(keys,values))
        return maps
     
    def spiderParse(url):
        html=requests.get(url,headers={'User-Agent':UserAgent().random}).text
        customFontUrl='http:'+re.findall('//vfile.+?woff',html)[0]
        maps=fontMapping(customFontUrl)
        html=re.sub('&#x(w{4});',lambda r:maps[r[1]],html) #bs4会硬解析混淆字为乱码
        #print('还原后的网页源代码: '+html);exit()
        result={'电影名称':re.findall('h3 class="name">(.+?)<',html)[0]}
        data=re.findall('stonefont">(.+?)<',html)
        if len(data)==3:    #在各剧的主页提取数据,1页展示多剧的网站主页及榜单页没提取
            result.update(dict(zip(['用户评分','评分人数','累计票房'],data)))
        elif len(data)==2:
            result.update(dict(zip(['想看数','累计票房'],data)))
        elif len(data)==1:
            result.update({'想看数':data[0]})
        if result.get('累计票房'):
            result['累计票房']+=re.findall('unit">(.+?)<',html)[0]
        print(result)
     
    def main():
        urls=['http://maoy删除an.com/films/342068',
            'http://maoy删an.com/films/338463','http://maoy删an.com/films/346272']
        p=Pool(size=4)
        [p.spawn(spiderParse,url) for url in urls]
        p.join()
     
    if __name__=='__main__':
        main()
  • 相关阅读:
    appium的python异常处理
    appium环境搭建
    什么是Capability
    移动端自动化测试-WTF Appium
    python-selenium,关于页面滑动
    接口测试的要点
    共享文件夹在本机桌面创建快捷方式
    hosts文件失效,配置的域名不能成功访问
    隔一段时间应用就会挂掉(进程在,但停止响应,也无log输出),必须重启tomcat
    在同步方法中调用异步方法时如何避免死锁问题
  • 原文地址:https://www.cnblogs.com/scrooge/p/7693825.html
Copyright © 2011-2022 走看看