zoukankan      html  css  js  c++  java
  • bing 翻译之 asyncio+queue解决失败重抓问题(2)

    该爬虫是将本地一批 query(待翻译的文本),通过 bing 翻译的接口进行翻译,采用的是 asyncio 协程进行并发抓取。

    问题:该爬虫因代理问题导致抓取任务失败,之前的做法是将失败的 query 写入另一个文件中,手续手动重新抓取,虽然也能解决但是不够智能也过于繁琐。

    需求:实现失败自动重抓,直至将所有 query 全部翻译完毕(另外需要实现批量并发抓取)

    # coding=utf-8
    
    import asyncio
    import json
    import sys
    import traceback
    import aiohttp
    from queue import Queue
    import redis
    
    
    class BingSpider:
        def __init__(self, _file_path, _file_name, _ori_lang='auto-detect', _lang='zh-Hans'):
            self.url = 'https://cn.bing.com/ttranslatev3?isVertical=1&&IG=22AB846A476149628EFA6E9FA7863C7B&IID=translator.5025.1'
            self.timeout = 20
            self.f_que = Queue()
            self.file_name = _file_name
            self.file_path = _file_path
            self.ori_lt = _ori_lang
            self.lt = _lang
            self.pool = redis.ConnectionPool(host='', port=6380, password='xxx')
            self.client = redis.Redis(connection_pool=self.pool)
            self.f_ok = None
            self.init_file_obj()
    
        def init_file_obj(self):
            self.f_ok = open(self.file_name + '_ok', 'w', encoding='utf-8')
    
        async def get_exception(self):
            """获取错误信息"""
            msg = traceback.format_exc()
            return ",".join(msg.split('
    '))
    
        async def gen_headers(self):
            headers = {
                'User-Agent': 'xxx'
            }
            return headers
    
        def read_file(self):
            with open(self.file_path, encoding='utf-8') as f:
                for line in f:
                    if not line:
                        continue
    
                    yield line.strip()
    
        async def fetch(self):
            line = self.f_que.get()  # 从队列中获取待翻译文本
            payload = {
                'fromLang': self.ori_lt,
                'to': self.lt,
                'text': line
            }
            headers = await self.gen_headers()
            proxy = await self.get_available_proxy()  # 获取可靠代理
            status, data = 0, line
            try:
                async with aiohttp.ClientSession(trust_env=True) as session:
                    async with session.post(url=self.url, data=payload, timeout=self.timeout, headers=headers,
                                            proxy=proxy) as res:
                        status = res.status
                        content = await res.json()
                        if status == 200 and isinstance(content, list):
                            data = {
                                'origin_lt': self.ori_lt,
                                'lt': self.lt,
                                'origin_lang': line,
                                'translate_str': content[0]['translation'][0]
                            }
            except Exception as e:
                print(e)
                error_msg = await self.get_exception()
                # log.error(f"fetch===>{error_msg}")
    
            return status, data
    
        async def check_proxy_post(self, proxy):
            """检查代理"""
            url = 'http://httpbin.org/post'
            status, data = 0, ""
            try:
                payload = {'a': '1', 'b': 2}
                headers = self.gen_headers()
                async with aiohttp.ClientSession(trust_env=True) as session:
                    async with session.post(url=url, data=payload, timeout=self.timeout, headers=headers,
                                            proxy=proxy) as res:
                        status = res.status
                        data = await res.text()
            except Exception as e:
                print(e)
    
            return status, data
    
        async def get_available_proxy(self):
            while True:
                # 从 redis 获取代理 IP
                ip = self.client.lpop('xxxx')
                if not ip:
                    continue
    
                ip = ip.decode('utf-8')
                proxy = f'http://{ip}'
    
                # 检查代理是否可用
                status, data = await self.check_proxy_post(proxy)
                if status == 200 and data:
                    break
    
            return proxy
    
        async def save(self, data):
            """保存"""
            self.f_ok.write(json.dumps(data, ensure_ascii=False))
            self.f_ok.write('
    ')
    
        async def schedule(self):
            """调度"""
            while not self.f_que.empty():
                code, data = await self.fetch()
                if code == 200 and isinstance(data, dict):
                    await self.save(data)
                else:
                    # 失败的再添加到 f_que 队列中
                    self.f_que.put(data)
    
        def main(self):
            loop = asyncio.get_event_loop()
            work_list = []
            work_num = 300
    
            # 读取文件,写入队列
            for line in self.read_file():
                self.f_que.put(line)
    
            # 开启 300 个协程
            for _ in range(work_num):
                work_list.append(
                    asyncio.ensure_future(self.schedule(), loop=loop)
                )
    
            loop.run_until_complete(asyncio.gather(*work_list))
    
            self.f_ok.close()
            loop.close()
    
    
    if __name__ == '__main__':
        if len(sys.argv) != 5:
            sys.exit()
    
        arg_list = sys.argv
        file_name = arg_list[1].strip()  # 生成的文件名
        ori_lang = arg_list[2].strip()  # 原始语种
        lang = arg_list[3].strip()  # 要翻译的语种
        file_path = arg_list[4].strip()  # 本地待翻译文本路径
    
        bs = BingSpider(file_path, file_name, ori_lang, lang)
        bs.main()
    
  • 相关阅读:
    安卓系统浏览器中select下拉按钮无法弹出选择面板奇怪问题解决
    Webkit浏览器点击控件时出现的边框消除
    UML序列图总结
    UML序列图总结
    UML类图几种关系的总结
    UML类图几种关系的总结
    UML用例图总结
    UML用例图总结
    类与类之间的关系
    java核心技术----Object类
  • 原文地址:https://www.cnblogs.com/midworld/p/14778912.html
Copyright © 2011-2022 走看看