zoukankan      html  css  js  c++  java
  • python代理池搭建

    熟悉爬虫的,必定会熟悉各种反爬机制。今天就讲一下自己如何建立ip代理池的。

    一个合格的代理池必须拥有一个爬取代理IP的爬取器、一个验证IP可否使用的校验器、一个存储IP的数据库、调用这些的调度器以及可以供获取IP的接口(这里推荐flask,比较简单)。

    先来说说爬取器,首先要爬取的代理IP网站尽量是无需登录的,其次是对代理IP更新较快的,前者加快代理池的效率,后者增加代理池的质量。这里我对市面上部分代理网站进行爬取,当然一些常用的代理IP网站提供IP质量不高,比如西刺无忧66这些经常被爬取(西刺偶尔还会崩溃,估计是爬取的人有些多,网站保护性503)

        def crawl_xici():
            """
            西刺代理:http://www.xicidaili.com
            """
            url = "http://www.xicidaili.com/{}"
    
            items = []
            for page in range(1, 21):
                items.append(("wt/{}".format(page), "http://{}:{}"))
                items.append(("wn/{}".format(page), "https://{}:{}"))
    
            for item in items:
                proxy_type, host = item
                html = requests(url.format(proxy_type))
                if html:
                    doc = pyquery.PyQuery(html)
                    for proxy in doc("table tr").items():
                        ip = proxy("td:nth-child(2)").text()
                        port = proxy("td:nth-child(3)").text()
                        if ip and port:
                            yield host.format(ip, port)
    
    
        def crawl_zhandaye():
            """
            站大爷代理:http://ip.zdaye.com/dayProxy.html
            """
            url = 'http://ip.zdaye.com/dayProxy.html'
            html = requests(url)
            sttrs = re.findall('<H3 class="title"><a href="(.*?)">', html, re.S)
            for sttr in sttrs:
                new_url = url[:28] + sttr[9:]
                new_html = requests_other(new_url)
                get_div = re.search("<div class="cont">(.*?)</div>", new_html, re.S).group(1)
                print(get_div)
                results = re.findall("<br>(.*?)@(.*?)#[(.*?)]", get_div, re.S)
                for result in results:
                    yield "{}://{}".format(result[1].lower(), result[0])
    
    
        def crawl_66ip():
            """
            66ip 代理:http://www.66ip.cn
            19-04-30可用
            """
            url = (
                "http://www.66ip.cn/nmtq.php?getnum=100&isp=0"
                "&anonymoustype=0&area=0&proxytype={}&api=66ip"
            )
            pattern = "d+.d+.d+.d+:d+"
    
            items = [(0, "http://{}"), (1, "https://{}")]
            for item in items:
                proxy_type, host = item
                html = requests(url.format(proxy_type))
                if html:
                    for proxy in re.findall(pattern, html):
                        yield host.format(proxy)
    
    
        def crawl_kuaidaili():
            """
            快代理:https://www.kuaidaili.com
            每次30个
            19-04-13可用
            """
            url = "https://www.kuaidaili.com/free/inha/{}/"
    
            items = [p for p in range(1, 3)]
            for page in items:
                html = requests(url.format(page))
                if html:
                    doc = pyquery.PyQuery(html)
                    for proxy in doc(".table-bordered tr").items():
                        ip = proxy("[data-title=IP]").text()
                        port = proxy("[data-title=PORT]").text()
                        if ip and port:
                            yield "http://{}:{}".format(ip, port)
    
        def crawl_ip3366():
            """
            云代理:http://www.ip3366.net
            每页10个,验证较快
            19-04-30可用
            """
            url = "http://www.ip3366.net/?stype=1&page={}"
    
            items = [p for p in range(1, 8)]
            for page in items:
                html = requests(url.format(page))
                if html:
                    doc = pyquery.PyQuery(html)
                    for proxy in doc(".table-bordered tr").items():
                        ip = proxy("td:nth-child(1)").text()
                        port = proxy("td:nth-child(2)").text()
                        schema = proxy("td:nth-child(4)").text()
                        if ip and port and schema:
                            yield "{}://{}:{}".format(schema.lower(), ip, port)
    
        def crawl_data5u():
            """
            无忧代理:http://www.data5u.com/
            每次14个,验证时间比较新
            19-04-30可用
            """
            url = "http://www.data5u.com/free/index.html"
    
            html = requests(url)
            if html:
                doc = pyquery.PyQuery(html)
                for index, item in enumerate(doc(".wlist li .l2").items()):
                    if index > 0:
                        ip = item("span:nth-child(1)").text()
                        port = item("span:nth-child(2)").text()
                        schema = item("span:nth-child(4)").text()
                        if ip and port and schema:
                            yield "{}://{}:{}".format(schema, ip, port)
    
        def crawl_iphai():
            """
            ip 海代理:http://www.iphai.com
            爬取国内高匿、国外高匿、国外普通各10个
            19-04-30可用
            """
            url = "http://www.iphai.com/free/{}"
    
            items = ["ng", "np", "wg", "wp"]
            for proxy_type in items:
                html = requests(url.format(proxy_type))
                if html:
                    doc = pyquery.PyQuery(html)
                    for item in doc(".table-bordered tr").items():
                        ip = item("td:nth-child(1)").text()
                        port = item("td:nth-child(2)").text()
                        schema = item("td:nth-child(4)").text().split(",")[0]
                        if ip and port and schema:
                            yield "{}://{}:{}".format(schema.lower(), ip, port)
    

    解释一下代码,返回的代理一般都是(http/https)://ip:port格式的代理。这里的requests是使用 asyncio、aiohttp做了个方法,来实现异步爬取。对于asyncio的介绍可以查看廖雪峰的教程。这个是自己写的异步爬取,替换了之前的request.get方法,加快爬虫效率。

    import asyncio
    import aiohttp
    
    from settings import HEADERS, REQUEST_TIMEOUT, REQUEST_DELAY
    
    
    LOOP = asyncio.get_event_loop()
    
    async def _get_page(url, sleep):
        """
        获取并返回网页内容
        """
        async with aiohttp.ClientSession() as session:
            try:
                await asyncio.sleep(sleep)
                async with session.get(
                    url, headers=HEADERS, timeout=REQUEST_TIMEOUT
                ) as resp:
                    return await resp.text()
            except:
                return ""
    
    def requests(url, sleep=REQUEST_DELAY):
        """
        请求方法,用于获取网页内容
    
        :param url: 请求链接
        :param sleep: 延迟时间(秒)
        """
        html = LOOP.run_until_complete(asyncio.gather(_get_page(url, sleep)))
        if html:
            return "".join(html)

    做好异步爬取工作就应该完成代理池搭建的30%了,之后我们就要尝试保存进数据库了,这里推荐使用redis数据库,毕竟其中的有序集合类型(sorted set)非常适合代理池cookies池的搭建,因为其中是有score的,也就是我们存入一个代理的同时也要给它一个分数,这方便我们之后对其校验以及取代理IP的优先级。redis有序集合类型

    # redis 地址
    REDIS_HOST = "localhost"
    # redis 端口
    REDIS_PORT = 6379
    # redis 密码
    REDIS_PASSWORD = None
    # redis set key
    REDIS_KEY = "myproxies"
    # redis 连接池最大连接量
    REDIS_MAX_CONNECTION = 20
    # REDIS SCORE 最大分数
    MAX_SCORE = 10
    # REDIS SCORE 最小分数
    MIN_SCORE = 0
    # REDIS SCORE 初始分数
    INIT_SCORE = 5
    
    class RedisClient:
        """
        代理池依赖了 Redis 数据库,使用了其`有序集合`的数据结构
        (可按分数排序,key 值不能重复)
        """
        def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
            conn_pool = redis.ConnectionPool(
                host=host,
                port=port,
                password=password,
                max_connections=REDIS_MAX_CONNECTION,
            )
            self.redis = redis.Redis(connection_pool=conn_pool)
    
        def add_proxy(self, proxy, score=INIT_SCORE):
            """
            新增一个代理,初始化分数 INIT_SCORE < MAX_SCORE,确保在
            运行完收集器后还没运行校验器就获取代理,导致获取到分数虽为 MAX_SCORE,
            但实际上确是未经验证,不可用的代理
    
            :param proxy: 新增代理
            :param score: 初始化分数
            """
            if not self.redis.zscore(REDIS_KEY, proxy):
                self.redis.zadd(REDIS_KEY, proxy, score)
    
        def reduce_proxy_score(self, proxy):
            """
            验证未通过,分数减一
            :param proxy: 验证代理
            """
            score = self.redis.zscore(REDIS_KEY, proxy)
            if score and score > MIN_SCORE:
                self.redis.zincrby(REDIS_KEY, proxy, -1)
            else:
                self.redis.zrem(REDIS_KEY, proxy)
    
        def increase_proxy_score(self, proxy):
            """
            验证通过,分数加一
            :param proxy: 验证代理
            """
            score = self.redis.zscore(REDIS_KEY, proxy)
            if score and score < MAX_SCORE:
                self.redis.zincrby(REDIS_KEY, proxy, 1)
    
        def pop_proxy(self):
            """
            返回一个代理
            """
            # 第一次尝试取分数最高,也就是最新可用的代理
            first_chance = self.redis.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
            if first_chance:
                return random.choice(first_chance)
    
            else:
                # 第二次尝试取 7-10 分数的任意一个代理
                second_chance = self.redis.zrangebyscore(
                    REDIS_KEY, MAX_SCORE - 3, MAX_SCORE
                )
                if second_chance:
                    return random.choice(second_chance)
                # 最后一次就随便取咯
                else:
                    last_chance = self.redis.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)
                    if last_chance:
                        return random.choice(last_chance)
    
        def get_proxies(self, count=1):
            """
            返回指定数量代理,分数由高到低排序
            :param count: 代理数量
            """
            proxies = self.redis.zrevrange(REDIS_KEY, 0, count - 1)
            for proxy in proxies:
                yield proxy.decode("utf-8")
    
        def count_all_proxies(self):
            """
            返回所有代理总数
            """
            return self.redis.zcard(REDIS_KEY)
    
        def count_score_proxies(self, score):
            """
            返回指定分数代理总数
            :param score: 代理分数
            """
            if 0 <= score <= 10:
                proxies = self.redis.zrangebyscore(REDIS_KEY, score, score)
                return len(proxies)
            return -1
    
        def clear_proxies(self, score):
            """
            删除分数小于等于 score 的代理
            """
            if 0 <= score <= 10:
                proxies = self.redis.zrangebyscore(REDIS_KEY, 0, score)
                for proxy in proxies:
                    self.redis.zrem(REDIS_KEY, proxy)
                return True
            return False
    
        def all_proxies(self):
            """
            返回全部代理
            """
            return self.redis.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)

    这就是写的对redis数据库的操作,至于我们如何把爬取的IP放入,就需要改一下我们的爬取器了,封装成一个类会比我写这样的方法好得多。

    redis_conn = RedisClient()
    all_funcs = []
    
    
    def collect_funcs(func):
        """
        装饰器,用于收集爬虫函数
        """
        all_funcs.append(func)
        return func
    
    
    class Crawler:
        """
            返回格式: http://host:port
        """
        @staticmethod
        def run():
            """
            启动收集器
            """
            for func in all_funcs:
                for proxy in func():
                    redis_conn.add_proxy(proxy)
    
        #添加之前写的爬取ip方法,在每个方法之前声明装饰器
        @collect_funcs
    
    #然后实例个对象
    crawl = Crawl()

    完成了数据库和爬取器的搭建,就差不多完成七七八八了,现在就加验证器,验证器是为了验证代理IP是否可用,如何可以使用的话就给它加一分,如果不可以使用的话就减一分

    import os
    import asyncio
    
    import aiohttp
    
    from db import RedisClient
    
    #验证url
    VALIDATOR_BASE_URL = "http://baidu.com"
    #批量测试数量
    VALIDATOR_BATCH_COUNT = 250
    
    class Validator:
        def __init__(self):
            self.redis = RedisClient()
    
        async def test_proxy(self, proxy):
            """
            测试代理
            :param proxy: 指定代理
            """
            async with aiohttp.ClientSession() as session:
                try:
                    if isinstance(proxy, bytes):
                        proxy = proxy.decode("utf8")
                    async with session.get(
                        VALIDATOR_BASE_URL, proxy=proxy, timeout=REQUEST_TIMEOUT
                    ) as resp:
                        if resp.status == 200:
                            self.redis.increase_proxy_score(proxy)
                        else:
                            self.redis.reduce_proxy_score(proxy)
                except:
                    self.redis.reduce_proxy_score(proxy)
    
        def run(self):
            """
            启动校验器
            """
            proxies = self.redis.all_proxies()
            loop = asyncio.get_event_loop()
            for i in range(0, len(proxies), VALIDATOR_BATCH_COUNT):
                _proxies = proxies[i : i + VALIDATOR_BATCH_COUNT]
                tasks = [self.test_proxy(proxy) for proxy in _proxies]
                if tasks:
                    loop.run_until_complete(asyncio.wait(tasks))
    
    validator = Validator()

    网上有许多检验IP的方法,诸如requests.get  telnet之类的,这里利用的是aiohttp的session,其实这个检验都差不多的,只是在爬取IP那边利用的异步,干脆检验这里也用异步吧。

    之后写个调度器,调度器就是运行项目后,在每隔一个时间段后运行某个方法,这里我们设置循环时间,爬取是30分钟,检验是15分钟,然后启动爬取器和校验器的run方法

    import time
    
    import schedule
    
    from crawler import crawler
    from validator import validator
    
    #爬取ip检查时间(分)
    CRAWLER_RUN_CYCLE = 30
    
    #验证ip检查时间(分)
    VALIDATOR_RUN_CYCLE = 15
    
    def run_schedule():
        """
        启动客户端
        """
        # 启动收集器
        schedule.every(CRAWLER_RUN_CYCLE).minutes.do(crawler.run).run()
        # 启动验证器
        schedule.every(VALIDATOR_RUN_CYCLE).minutes.do(validator.run).run()
    
        while True:
            try:
                schedule.run_pending()
                time.sleep(1)
            except KeyboardInterrupt:
                return

    最后就是整个项目如何运行了,我们单纯跑调度器肯定是不合适的,因为这样redis中有代理了,但是我们要在爬虫项目中连接redis来获取代理,这一点是比较麻烦的,而且每次运行都要在本地跑一次代理池,这样肯定不符合程序员偷懒的初衷。正确的做法是做一个api接口,然后部署到云端,让云服务器爬取代理IP,这样我们就每次访问api接口就能得到我们的数据了,下面来写写接口,这里用的是Flask,因为简单

    from flask import Flask, jsonify
    from db import RedisClient
    from scheduler import run_schedule
    
    
    myapp = Flask(__name__)
    redis_conn = RedisClient()
    
    @myapp.route("/")
    def index():
        return jsonify({"Welcome": "This is a proxy pool system."},
                       {"if there has problem": "Please communicate with QQ:976264593"})
    
    @myapp.route("/pop")
    def pop_proxy():
        proxy = redis_conn.pop_proxy().decode("utf8")
        if proxy[:5] == "https":
            return jsonify({"https": proxy})
        else:
            return jsonify({"http": proxy})
    
    @myapp.route("/get/<int:count>")
    def get_proxy(count):
        res = []
        for proxy in redis_conn.get_proxies(count):
            if proxy[:5] == "https":
                res.append({"https": proxy})
            else:
                res.append({"http": proxy})
        return jsonify(res)
    
    @myapp.route("/count")
    def count_all_proxies():
        count = redis_conn.count_all_proxies()
        return jsonify({"count": str(count)})
    
    @myapp.route("/count/<int:score>")
    def count_score_proxies(score):
        count = redis_conn.count_score_proxies(score)
        return jsonify({"count": str(count)})
    
    @myapp.route("/clear/<int:score>")
    def clear_proxies(score):
        if redis_conn.clear_proxies(score):
            return jsonify({"Clear": "Successful"})
        return jsonify({"Clear": "Score should >= 0 and <= 10"})
    
    if __name__ == "__main__":
        # 启动服务端 Flask app
        myapp.run(host='localhost', port=5000, debug=True)
        run_schedule()

    这个接口比较简单,实现了查看代理池数量,获取代理IP(推荐pop因为使用后就可以将代理IP删除)当然有许多低分的代理我们也可以通过接口调用将其删除。出于安全性,其实这个api应该加个校验的,不然别人一直用你的代理池就白费功夫了。

    好了,这就是python代理池搭建,如果有不懂可以提出来,博主也是小白一个

    源码链接

  • 相关阅读:
    R语言中的logical(0)和numeric(0)以及赋值问题
    创建hadoop用户
    虚拟机安装
    spark1-MapReduce
    arcgis1-shp转成mdb
    Actor-配置Maven
    scala6-单词计数(map(),flatMap())
    scala5-数组
    scala4-函数
    scala3-for循环
  • 原文地址:https://www.cnblogs.com/triangle959/p/12024359.html
Copyright © 2011-2022 走看看