zoukankan      html  css  js  c++  java
  • 自动爬取代理IP例子

    import time
    import json
    import datetime
    import threading
    import requests
    from lxml import etree
    from queue import Queue
    
    # 爬取免费代理IP 来源xicidaili.com
    # 多线程验证代理ip是否可用
    class ProxyTest:
        def __init__(self):
            self.test_url = "http://pv.sohu.com/cityjson?ie=utf-8"
            self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36",}
            self.request_queue = Queue()
    
        def parse_url(self, url, proxies, timeout=3):
            return requests.get(url, headers=self.headers, proxies=proxies, timeout=timeout).content.decode()
    
        # 请求
        def request(self):
            while True:
                # 获取ip地址
                ip = self.request_queue.get()
    
                # 发起请求
                try:
                    starttime = datetime.datetime.now()
                    html_str = self.parse_url(self.test_url, proxies={"http": ip}, timeout=5)
                    endtime = datetime.datetime.now()
                    use_time = endtime - starttime
                except Exception as e:
                    # 请求超时
                    print("timeout %s" % ip)
                    self.request_queue.task_done()
                    continue
    
                # 检查返回html
                try:
                    json_dict = json.loads(html_str[19:-1])
                except:
                    print("fail %s, use time %d" % (ip, use_time.seconds))
                    self.request_queue.task_done()
                    continue
    
                if ip.startswith("http://"+json_dict["cip"]):
                    # 代理可用
                    print("success %s, use time %d, %s" % (ip, use_time.seconds, html_str))
                    self.request_queue.task_done()
                    # 保存到文件
                    with open("proxy_ok_ip.json", "a", encoding="utf-8") as f:
                        f.write(ip)
                        f.write("
    ")
                else:
                    # ip不是高匿代理
                    print("%s invalid, use time %d" % (ip, use_time.seconds))
                    self.request_queue.task_done()
    
        def run(self):
            # 读取ip地址文件 并存储到队列中
            with open("proxy.json", "r", encoding="utf-8") as f:
                for line in f:
                    self.request_queue.put(line.strip())
    
            # 遍历,发送请求,获取响应
            for i in range(30):
                # daemon=True 把子线程设置为守护线程,该线程不重要主线程结束,子线程结束
                threading.Thread(target=self.request, daemon=True).start()
    
            self.request_queue.join() #让主线程等待阻塞,等待队列的任务完成之后再完成
    
            print("主线程结束")
    
    
    class Proxy:
        def __init__(self):
            self.headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36"
            }
    
        def start_urls_superfastip(self):
            return ["http://www.superfastip.com/welcome/freeip/%d" % i for i in range(1,11)]
    
        def get_content_list_superfastip(self, html_str):
            content_list = []
            html = etree.HTML(html_str)
            tr_list = html.xpath('/html/body/div[3]/div/div/div[2]/div/table/tbody/tr')
            for tr in tr_list:
                if tr.xpath('./td[4]/text()')[0].strip() == 'HTTP':
                    item = {}
                    item["ip"] = tr.xpath('./td[1]/text()')[0].strip()
                    item["port"] = tr.xpath('./td[2]/text()')[0].strip()
                    content_list.append(item)
    
            return content_list
    
        def start_urls_xici(self):
            return ["http://www.xicidaili.com/nn/%d" % i for i in range(1,6)]
    
        def get_content_list_xici(self, html_str):
            content_list = []
            html = etree.HTML(html_str)
            tr_list = html.xpath('//table[@id="ip_list"]/tr')[1:]
            for tr in tr_list:
                item = {}
                item["ip"] = tr.xpath('./td[2]/text()')[0].strip()
                item["port"] = tr.xpath('./td[3]/text()')[0].strip()
                content_list.append(item)
            return content_list
    
        def start_urls_kuaidaili(self):
            return ["https://www.kuaidaili.com/free/inha/%d/" % i for i in range(1, 11)]
    
        def get_content_list_kuaidaili(self, html_str):
            content_list = []
            html = etree.HTML(html_str)
            tr_list = html.xpath('//div[@id="list"]/table/tbody/tr')
            for tr in tr_list:
                item = {}
                item["ip"] = tr.xpath('./td[1]/text()')[0].strip()
                item["port"] = tr.xpath('./td[2]/text()')[0].strip()
                content_list.append(item)
            return content_list
    
        def start_urls_89ip(self):
            return ["http://www.89ip.cn/index_%d.html" % i for i in range(1, 11)]
    
        def get_content_list_89ip(self, html_str):
            content_list = []
            html = etree.HTML(html_str)
            tr_list = html.xpath('//div[@class="layui-form"]/table/tbody/tr')
            for tr in tr_list:
                item = {}
                item["ip"] = tr.xpath('./td[1]/text()')[0].strip()
                item["port"] = tr.xpath('./td[2]/text()')[0].strip()
                content_list.append(item)
            return content_list
    
        def parse_url(self, url):
            return requests.get(url, headers=self.headers).content.decode()
    
        def save_content_list(self, content_list):
            with open("proxy.json", "a", encoding="utf-8") as f:
                for ip in content_list:
                    f.write("http://%s:%s" % (ip["ip"], ip["port"]))
                    f.write("
    ")
    
        def run(self):
            # 构造请求地址列表
            start_urls_xici = self.start_urls_xici()
            start_urls_89ip = self.start_urls_89ip()
            start_urls_kuaidaili = self.start_urls_kuaidaili()
            start_urls_superfastip = self.start_urls_superfastip()
    
            all_content_list = []  # 存放所有爬取到的ip
    
            for url in start_urls_superfastip:
                html_str = self.parse_url(url)  # 获取响应
                content_list = self.get_content_list_superfastip(html_str)  # 处理响应
                all_content_list.extend(content_list)  # 将结果加到列表里
                time.sleep(0.2)
    
            for url in start_urls_xici:
                html_str = self.parse_url(url)  # 获取响应
                content_list = self.get_content_list_xici(html_str)  # 处理响应
                all_content_list.extend(content_list)  # 将结果加到列表里
                time.sleep(0.2)
    
            for url in start_urls_kuaidaili:
                html_str = self.parse_url(url)
                content_list = self.get_content_list_kuaidaili(html_str)
                all_content_list.extend(content_list)
                time.sleep(0.2)
    
            for url in start_urls_89ip:
                html_str = self.parse_url(url)
                content_list = self.get_content_list_89ip(html_str)
                all_content_list.extend(content_list)
                time.sleep(0.2)
            print("抓取完成")
            self.save_content_list(all_content_list)
    
    
    if __name__ == '__main__':
        # 抓取数据
        spider = Proxy()
        spider.run()
    
        # 检测ip是否可用
        proxy = ProxyTest()
        proxy.run()
        print("最后可以用的代理IP在proxy_ok_ip.json")
  • 相关阅读:
    suse系统FTP问题
    Oracle SQL编写注意事项
    EXP-00056: ORACLE error 6550 encountered报错;
    Linux 单网卡多 IP 的配置方法
    Authorized users only. All activity may be monitored and reported.
    使用jconsole检测linux服务器
    Suse系统用户不能登录报错
    性能测试介绍
    判断浏览器是否是手机端
    JSONP 跨域请求
  • 原文地址:https://www.cnblogs.com/blog-rui/p/11031144.html
Copyright © 2011-2022 走看看