单线程构建爬虫代理IP池
#!/usr/bin/python3.5 # -*- coding:utf-8 -*- import time import tempfile from lxml import etree from urllib import request user_agent = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.22 Safari/537.36 SE 2.X MetaSr 1.0' def get_content(url): # 获取网页内容 global user_agent headers = {'User-Agent': user_agent} req = request.Request(url=url, headers=headers) res = request.urlopen(req) return res.read().decode('utf-8') def get_info(tmp,content): # 提取网页信息 / ip 端口 ip_list = etree.HTML(content).xpath('//table[contains(@id,"ip_list")]/tr/td[2]/text()') port_list = etree.HTML(content).xpath('//table[contains(@id,"ip_list")]/tr/td[3]/text()') for i in range(0,len(ip_list)): out = u"" out += u"" + ip_list[i] out += u":" + port_list[i] tmp.write((out + u" ").encode('utf-8')) # 所有ip和端口号写入data文件 def verify_ip(ip,port,test_url): # 验证 ip+port 有效性 global user_agent headers = {'User-Agent': user_agent,'Host': 'www.12306.cn','Referer': 'http://www.12306.cn/'} proxy = {'http':'http://%s:%s'%(ip,port)} print(proxy) proxy_handler = request.ProxyHandler(proxy) opener = request.build_opener(proxy_handler) request.install_opener(opener) req = request.Request(url=test_url,headers=headers) time.sleep(1) try: res = request.urlopen(req) time.sleep(2) content = res.read() if content: print('{0}:{1} is ok'.format(ip,port)) with open("proxy_info.txt", "a") as fd: # 可用ip+port保存到proxy_info.txt文件中 fd.write(ip + u":" + port + " ") else: print('{0}:{1} is unavailable'.format(ip,port)) except request.URLError as e: print(e.reason) def verify_ip2(ip,port,test_url): import requests try: response = requests.get(test_url,proxies={'http':'http://{0}:{1}'.format(ip,port)},timeout=2) # print(response.status_code) except Exception as e: print("{0}:{1} failed".format(ip,port),e) else: print("{0}:{1} is ok".format(ip,port)) with open("proxy_info.txt", "a") as fd: # 可用ip+port保存到proxy_info.txt文件中 fd.write(ip + u":" + port + " ") if __name__ == '__main__': url = 'http://www.xicidaili.com/nn/' test_url = "http://httpbin.org/" url_list = [ url + str(i) for i in range(1,2) ] tmp = tempfile.TemporaryFile() for url in url_list: content = get_content(url) time.sleep(2) get_info(tmp,content) tmp.seek(0) for item in tmp.readlines(): item = item.decode('utf-8') # verify_ip(item.split(u":")[0],item.split(u":")[1].strip(),test_url) verify_ip2(item.split(u":")[0],item.split(u":")[1].strip(),test_url) tmp.close()
使用线程池加快验证代理的速度
concurrent.futures.ThreadPoolExecutor
#!/usr/bin/python3.5 # -*- coding:utf-8 -*- import time import tempfile from lxml import etree from urllib import request from concurrent.futures import ThreadPoolExecutor user_agent = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.22 Safari/537.36 SE 2.X MetaSr 1.0' ip2port = [] def get_content(url): # 获取网页内容 global user_agent headers = {'User-Agent': user_agent} req = request.Request(url=url, headers=headers) res = request.urlopen(req) return res.read().decode('utf-8') def get_info(tmp, content): # 提取网页信息 / ip 端口 ip_list = etree.HTML(content).xpath('//table[contains(@id,"ip_list")]/tr/td[2]/text()') port_list = etree.HTML(content).xpath('//table[contains(@id,"ip_list")]/tr/td[3]/text()') for i in range(0, len(ip_list)): out = u"" out += u"" + ip_list[i] out += u":" + port_list[i] tmp.write((out + u" ").encode('utf-8')) # 所有ip和端口号写入data文件 def verify_ip(ip, port, url): ret = { 'code':-1,'ipport':None } import requests try: response = requests.get(url, proxies={'http': 'http://{0}:{1}'.format(ip, port)}, timeout=3) print('{}:{} --> {}'.format(ip,port,response.status_code)) except Exception as e: # print("{0}:{1} failed".format(ip, port), e) pass else: # print("{0}:{1} is ok".format(ip, port)) if 200 == response.status_code: ret['code'] = 0 ret['ipport'] = '{0}:{1}'.format(ip,port) finally: return ret def callback(future): global ip2port ret = future.result() if 0 == ret['code']: ip2port.append(ret['ipport']) if __name__ == '__main__': url = 'http://www.xicidaili.com/nn/' verify_url = "http://httpbin.org/" url_list = [url + str(i) for i in range(1, 2)] tmp = tempfile.TemporaryFile() for url in url_list: content = get_content(url) time.sleep(2) get_info(tmp, content) print('原始数据下载完毕,开始构建代理池...') tmp.seek(0) ipports = [ item.decode('utf-8').strip().split(':') for item in tmp.readlines() ] tmp.close() pool = ThreadPoolExecutor(20) for ipport in ipports: ip,port = ipport v = pool.submit(verify_ip, ip, port, verify_url) v.add_done_callback(callback) pool.shutdown(wait=True) print('代理池构建完毕,共获得可用代理 {} 个'.format(len(ip2port))) print(ip2port)
multiprocessing.dummy.Pool
import time import requests from lxml import etree from requests.exceptions import RequestException from multiprocessing.dummy import Pool as ThreadPool available_proxies = [] def get_one_page(url): try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36' } reponse = requests.get(url, headers=headers) if reponse.status_code == 200: return reponse.text return None except RequestException: return None def get_one_parse(url): print('url: {}'.format(url)) ipports = [] html = get_one_page(url) if html: html = etree.HTML(html) ips = html.xpath('.//*[@id="list"]/table/tbody//td[1]/text()') ports = html.xpath('.//*[@id="list"]/table/tbody//td[2]/text()') for (ip, port) in zip(ips, ports): ipports.append('{}:{}'.format(ip, port)) ipports = list(set(ipports)) print('res: {}'.format(ipports)) return ipports return None def fetch(all_proxies): url = 'https://www.kuaidaili.com/free/intr/{}/' for i in range(1, 61): ret = get_one_parse(url.format(i)) if ret: all_proxies.extend(ret) time.sleep(1) all_proxies = list(set(all_proxies)) print('爬取了前60页,去重后共获得{}个代理'.format(len(all_proxies))) def save(): with open('ip2port.txt', 'a+') as wf: for item in available_proxies: wf.write(item + ' ') print('{}个可用代理保存完毕'.format(len(available_proxies))) def sub_verify(item): proxy = {'http': 'http://{0}'.format(item)} try: response = requests.get("http://httpbin.org/", proxies=proxy, timeout=3) if response.status_code == 200: print("{} is ok".format(item)) available_proxies.append(item) except Exception as e: print("{} failed".format(item)) def verify(ipports): print('开始验证可用代理...') pool = ThreadPool(20) pool.map(sub_verify, ipports) print('验证完毕,共获取可用代理 {} 个'.format(len(available_proxies))) save() if __name__ == "__main__": all_proxies = [] fetch(all_proxies) print(all_proxies,len(all_proxies)) ipports = list(map(lambda x: x.strip(), all_proxies)) verify(ipports)