该爬虫是将本地一批 query
(待翻译的文本),通过 bing
翻译的接口进行翻译,采用的是 asyncio
协程进行并发抓取。
问题:该爬虫因代理问题导致抓取任务失败,之前的做法是将失败的 query
写入另一个文件中,手续手动重新抓取,虽然也能解决但是不够智能也过于繁琐。
需求:实现失败自动重抓,直至将所有 query
全部翻译完毕(另外需要实现批量并发抓取)
# coding=utf-8
import asyncio
import json
import sys
import traceback
import aiohttp
from queue import Queue
import redis
class BingSpider:
def __init__(self, _file_path, _file_name, _ori_lang='auto-detect', _lang='zh-Hans'):
self.url = 'https://cn.bing.com/ttranslatev3?isVertical=1&&IG=22AB846A476149628EFA6E9FA7863C7B&IID=translator.5025.1'
self.timeout = 20
self.f_que = Queue()
self.file_name = _file_name
self.file_path = _file_path
self.ori_lt = _ori_lang
self.lt = _lang
self.pool = redis.ConnectionPool(host='', port=6380, password='xxx')
self.client = redis.Redis(connection_pool=self.pool)
self.f_ok = None
self.init_file_obj()
def init_file_obj(self):
self.f_ok = open(self.file_name + '_ok', 'w', encoding='utf-8')
async def get_exception(self):
"""获取错误信息"""
msg = traceback.format_exc()
return ",".join(msg.split('
'))
async def gen_headers(self):
headers = {
'User-Agent': 'xxx'
}
return headers
def read_file(self):
with open(self.file_path, encoding='utf-8') as f:
for line in f:
if not line:
continue
yield line.strip()
async def fetch(self):
line = self.f_que.get() # 从队列中获取待翻译文本
payload = {
'fromLang': self.ori_lt,
'to': self.lt,
'text': line
}
headers = await self.gen_headers()
proxy = await self.get_available_proxy() # 获取可靠代理
status, data = 0, line
try:
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.post(url=self.url, data=payload, timeout=self.timeout, headers=headers,
proxy=proxy) as res:
status = res.status
content = await res.json()
if status == 200 and isinstance(content, list):
data = {
'origin_lt': self.ori_lt,
'lt': self.lt,
'origin_lang': line,
'translate_str': content[0]['translation'][0]
}
except Exception as e:
print(e)
error_msg = await self.get_exception()
# log.error(f"fetch===>{error_msg}")
return status, data
async def check_proxy_post(self, proxy):
"""检查代理"""
url = 'http://httpbin.org/post'
status, data = 0, ""
try:
payload = {'a': '1', 'b': 2}
headers = self.gen_headers()
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.post(url=url, data=payload, timeout=self.timeout, headers=headers,
proxy=proxy) as res:
status = res.status
data = await res.text()
except Exception as e:
print(e)
return status, data
async def get_available_proxy(self):
while True:
# 从 redis 获取代理 IP
ip = self.client.lpop('xxxx')
if not ip:
continue
ip = ip.decode('utf-8')
proxy = f'http://{ip}'
# 检查代理是否可用
status, data = await self.check_proxy_post(proxy)
if status == 200 and data:
break
return proxy
async def save(self, data):
"""保存"""
self.f_ok.write(json.dumps(data, ensure_ascii=False))
self.f_ok.write('
')
async def schedule(self):
"""调度"""
while not self.f_que.empty():
code, data = await self.fetch()
if code == 200 and isinstance(data, dict):
await self.save(data)
else:
# 失败的再添加到 f_que 队列中
self.f_que.put(data)
def main(self):
loop = asyncio.get_event_loop()
work_list = []
work_num = 300
# 读取文件,写入队列
for line in self.read_file():
self.f_que.put(line)
# 开启 300 个协程
for _ in range(work_num):
work_list.append(
asyncio.ensure_future(self.schedule(), loop=loop)
)
loop.run_until_complete(asyncio.gather(*work_list))
self.f_ok.close()
loop.close()
if __name__ == '__main__':
if len(sys.argv) != 5:
sys.exit()
arg_list = sys.argv
file_name = arg_list[1].strip() # 生成的文件名
ori_lang = arg_list[2].strip() # 原始语种
lang = arg_list[3].strip() # 要翻译的语种
file_path = arg_list[4].strip() # 本地待翻译文本路径
bs = BingSpider(file_path, file_name, ori_lang, lang)
bs.main()