# -*- coding: utf-8 -*-
import requests
from lxml import html
from concurrent.futures import ThreadPoolExecutor
import threading
from loguru import logger
import time
lock = threading.Lock()
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36"
}
rate_dic = dict()
rate_dic["total"] = 0
rate_dic["success"] = 0
rate_dic["fail"] = 0
rate_dic["msg"] = ""
class ProxyTest(object):
def __init__(self):
self.session = requests.Session()
self.session.headers = headers
def get_response(self, url, proxies=None, timeout=5):
if proxies is None:
proxies = {}
try:
req = self.session.get(url, proxies=proxies, timeout=timeout)
if req.status_code == 200:
return req.text
except Exception as e:
return False
def test_ip(self, ip):
url = "http://myip.ipip.net/"
proxy = {"http": "http://{}".format(ip),
"https": "https://{}".format(ip)
}
source = self.get_response(url, proxies=proxy)
with lock:
if source:
rate_dic["success"] += 1
# redis.add(value=ip, key="proxy_test_ip66")
else:
rate_dic["fail"] += 1
rate_dic["msg"] = source[:100].replace("
", "").replace(" ", "").replace("
", "").strip() if source else ""
rate_dic["total"] += 1
logger.info(f"rate_dic:{rate_dic}")
def parse_ip(self, source):
root = html.fromstring(source)
ip_list = root.xpath("//br")
ip_datas = []
for item in ip_list[:100]:
ip = item.tail.strip()
if ip:
ip_datas.append(ip)
logger.info(f"获取ip,{len(ip_datas)}条")
with ThreadPoolExecutor(max_workers=10) as pool:
pool.map(self.test_ip, ip_datas)
def run(self):
global total
global success
global fail
ip_url = "http://www.xxx.cn/nmtq.php?getnum=1000&isp=0&anonymoustype=0&start=&ports=&export=&ipaddress=&area=1&proxytype=2&api=66ip"
source = self.get_response(ip_url)
start = time.time()
self.parse_ip(source)
rate = rate_dic["success"] / rate_dic["total"]
end = time.time()
logger.info(f"成功率:{rate:.2f},总用时:{(end - start):.2f}s")
if __name__ == '__main__':
pt = ProxyTest()
pt.run()