使用爬虫不可避免的就会遇到网站的各种封ip操作,因此就需要我们找寻代理,通过代理ip进行操作,屏蔽自己真实ip。
import requests import pymongo from lxml.html import etree class SelfIpProxy(): def __init__(self): # 设置区域 self.depth = 1 self.timeout = 10 self.collection = pymongo.MongoClient()['Proxies']['free2'] self.url = {'http':"http://19ncc.medmeeting.org/cn",'https':"https://www.baidu.com"} self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36', 'Referer': 'https://www.xicidaili.com/nn/2'} def get_ip(self): # 从网站获取ip urls = [ 'https://www.xicidaili.com/nn/{}'.format(i) for i in range(1, self.depth + 1)] for url in urls: html = requests.get(url, headers=self.headers, timeout=30) html.encoding = 'utf-8' e_html = etree.HTML(html.text) ips = e_html.xpath('//table[@id="ip_list"]/tr/td[2]/text()') ports = e_html.xpath('//table[@id="ip_list"]/tr/td[3]/text()') modes = e_html.xpath('//table[@id="ip_list"]/tr/td[6]/text()') for ip, port, mode in zip(ips, ports, modes): item = dict() item[mode.lower()] = '{}://{}:{}'.format(mode.lower(), ip, port) yield item def store_ip(self): for i in self.get_ip(): self.collection.insert_one(i) def check_ip(self): count = 0 demo = self.collection.find({}, {'_id': 0}, no_cursor_timeout=True) # 为了防止pymongo.error.CursionError,手动打开库 for ip in demo: count += 1 print('正在测试第{}个ip'.format(count)) for key, value in ip.items(): try: html = requests.get(self.url[key], headers=self.headers, proxies={key: value}, timeout=self.timeout) html.encoding = 'utf-8' html.raise_for_status() print('************当前ip测试通过,当前ip为{}************'.format(value)) except BaseException: print('当前ip测试不通过,当前ip为{}'.format(value)) self.collection.delete_one(ip) demo.close() # 手动关闭库 def anti_duplicate(self): # 去重 demo = self.collection.find({}, {'_id': 0}, no_cursor_timeout=True) l = [] for i in demo: if i not in l: l.append(i) demo.close() self.collection.drop() for i in l: self.collection.insert_one(i) if __name__ == '__main__': # 设置内容在class内部__init__()方法内部 my_ip = SelfIpProxy() my_ip.store_ip() # 获取存储ip到MongoDB中,已经成功, 很快,不需要多线程 my_ip.check_ip() # 检查ip是否可用 # my_ip.anti_duplicate() # 去重 结果,绝大部分都是不可用的,少量能用上: 正在测试第318个ip 当前ip测试不通过,当前ip为https://114.239.255.179:9999 正在测试第319个ip 当前ip测试不通过,当前ip为https://222.189.246.79:9999 正在测试第320个ip 当前ip测试不通过,当前ip为https://163.204.240.117:9999 正在测试第321个ip 当前ip测试不通过,当前ip为http://120.83.99.253:9999 正在测试第322个ip 当前ip测试通过,当前ip为http://59.57.148.10:9999 正在测试第323个ip 当前ip测试不通过,当前ip为http://182.35.81.209:9999 正在测试第324个ip 当前ip测试不通过,当前ip为http://112.87.69.236:9999 正在测试第325个ip 当前ip测试不通过,当前ip为http://120.83.108.41:9999
改成多进程,本来想将多进程的函数也写进类里面,但是不知道怎么回事main函数调用就没反应了,无奈只能在class之外重写了一个check_ip函数,全代码如下:
import requests import pymongo from lxml.html import etree from multiprocessing import Pool class SelfIpProxy(): def __init__(self): # 设置区域 self.depth = 10 self.collection = pymongo.MongoClient()['Proxies']['free2'] self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36', 'Referer': 'https://www.xicidaili.com/nn/2'} def get_ip(self): # 从网站获取ip urls = [ 'https://www.xicidaili.com/nn/{}'.format(i) for i in range(1, self.depth + 1)] for url in urls: html = requests.get(url, headers=self.headers, timeout=30) html.encoding = 'utf-8' e_html = etree.HTML(html.text) ips = e_html.xpath('//table[@id="ip_list"]/tr/td[2]/text()') ports = e_html.xpath('//table[@id="ip_list"]/tr/td[3]/text()') modes = e_html.xpath('//table[@id="ip_list"]/tr/td[6]/text()') for ip, port, mode in zip(ips, ports, modes): item = dict() item[mode.lower()] = '{}://{}:{}'.format(mode.lower(), ip, port) yield item def store_ip(self): for i in self.get_ip(): self.collection.insert_one(i) def anti_duplicate(self): # 去重 demo = self.collection.find({}, {'_id': 0}, no_cursor_timeout=True) l = [] for i in demo: if i not in l: l.append(i) demo.close() self.collection.drop() for i in l: self.collection.insert_one(i) def check_ip(proxy): url = {'http': "http://www.baidu.com", 'https': "https://www.baidu.com"} for key, value in proxy.items(): try: html = requests.get(url[key], proxies={key: value}, timeout=10) html.encoding = 'utf-8' html.raise_for_status() print('***************************当前ip测试通过,当前ip为{}*************************** '.format(value)) pymongo.MongoClient()['Proxies']['checked'].insert_one(proxy) except: print('当前ip测试失败,当前ip为{}'.format(value)) if __name__ == '__main__': # 设置内容在class内部__init__()方法内部 my_ip = SelfIpProxy() my_ip.store_ip() # 获取存储ip到MongoDB中,已经成功, 很快,不需要多线程 proxies = [] # 将库里的ip转成列表收集,以便多进程处理 demo = my_ip.collection.find({}, {'_id': 0}, no_cursor_timeout=True) # 手动打开库,是因为库长度较长,防止时间过长,引起指针报错。 for i in demo: proxies.append(i) my_ip.collection.drop() demo.close # 手动关闭库 pool = Pool(8) # 开始多进程处理模式 for i in range(len(proxies)): pool.apply_async(check_ip, args=(proxies[i], )) pool.close() pool.join() # my_ip.anti_duplicate() # 去重
多进程的效果是原来大约4个小时才能跑完1000个验证,现在大约半小时就能搞定,最后出来一共42个。