zoukankan      html  css  js  c++  java
  • day_7:代理使用

    一、代理池

    代理池分为4块:存储模块、获取模块、检测模块、接口模块,其中多进程运行获取、检测、接口

    存储模块:负责存储抓取下来的代理。保证代理不重复,标识代理的可用情况,动态实时处理每个代理(使用Redis的Sorted-Set有序集合)

    获取模块:需要定时在各大代理网站抓取代理。

    检测模块:需要定时检测数据库中的代理。设置一个检测链接,标识每个代理的状态(100分可用,分越低越不可用)

    接口模块:需要用API来提供对外服务的接口。Web API接口,随机取均衡负载。

    程序流程:  ---获取--(测试)-->存储<---->定时检测

                | 

              外部接口         

    项目目录

    |-proxypool

    |--crawler.py  获取模块

    |--db.py         存储模块

    |--tester.py    检测模块

    |--conf.py      配置文件

    |--api.py        接口模块

    |--run.py        程序入口

    2、代理池的实现

    获取模块crawler.py

    import re
    import requests
    from db import RedisClient
    from pyquery import PyQuery as pq
    from conf import *
    from requests.exceptions import ConnectionError
    
    class ProxyMetaclass(type):
        """
        元类
        """
        def __new__(cls, name, bases, attrs):
            """
            :param name:
            :param bases:
            :param attrs: 包含类的所有方法信息,键名对应方法名称
            :return:
            """
            count = 0
            attrs['__CrawlFunc__'] = []
    
            # 遍历所有方法,筛选出以crawl_开头的方法
            for k, v in attrs.items():
                if 'crawl_' in k:
                    attrs['__CrawlFunc__'].append(k)
                    count += 1
            attrs['__CrawlFuncCount__'] = count
            return type.__new__(cls, name, bases, attrs)
    
    class Crawler(object, metaclass=ProxyMetaclass):
        base_headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36',
            'Accept-Encoding': 'gzip, deflate, sdch',
            'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7'
        }
    
        def get_page(self, url, options={}):
            """
            抓取代理
            :param url: 请求url
            :param options: 请求头参数
            :return: 请求结果
            """
            headers = dict(Crawler.base_headers, **options)
            print('正在抓取', url)
            try:
                response = requests.get(url, headers=headers)
                if response.status_code == 200:
                    print('抓取成功', url, response.status_code)
                    return response.text
            except ConnectionError:
                print('抓取失败', url)
                return None
    
        def get_proxies(self, callback):
            """
            调用Crawler的以crawler开头的方法
            :param callback:
            :return: 代理列表
            """
            proxies = []
            for proxy in eval("self.{}()".format(callback)):
                print('成功获取到代理', proxy)
                proxies.append(proxy)
            return proxies
    
        def crawl_daili66(self, page_count=4):
            """
            获取代理66
            :param page_count: 页码
            :return: 代理
            """
            start_url = 'http://www.66ip.cn/{}.html'
            urls = [start_url.format(page) for page in range(1, page_count + 1)]
            for url in urls:
                print('Crawling', url)
                html = self.get_page(url)
                if html:
                    doc = pq(html)
                    trs = doc('.containerbox table tr:gt(0)').items()
                    for tr in trs:
                        ip = tr.find('td:nth-child(1)').text()
                        port = tr.find('td:nth-child(2)').text()
                        yield ':'.join([ip, port])
    
        def crawl_ip3366(self):
            for page in range(1, 4):
                start_url = 'http://www.ip3366.net/free/?stype=1&page={}'.format(page)
                html = self.get_page(start_url)
                ip_address = re.compile('<tr>s*<td>(.*?)</td>s*<td>(.*?)</td>')
                # s * 匹配空格,起到换行作用
                re_ip_address = ip_address.findall(html)
                for address, port in re_ip_address:
                    result = address + ':' + port
                    yield result.replace(' ', '')
    
        def crawl_kuaidaili(self):
            for i in range(1, 4):
                start_url = 'http://www.kuaidaili.com/free/inha/{}/'.format(i)
                html = self.get_page(start_url)
                if html:
                    ip_address = re.compile('<td data-title="IP">(.*?)</td>')
                    re_ip_address = ip_address.findall(html)
                    port = re.compile('<td data-title="PORT">(.*?)</td>')
                    re_port = port.findall(html)
                    for address, port in zip(re_ip_address, re_port):
                        address_port = address + ':' + port
                        yield address_port.replace(' ', '')
    
        def crawl_xicidaili(self):
            for i in range(1, 3):
                start_url = 'http://www.xicidaili.com/nn/{}'.format(i)
                headers = {
                    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
                    'Cookie': '_free_proxy_session=BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJWRjYzc5MmM1MTBiMDMzYTUzNTZjNzA4NjBhNWRjZjliBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMUp6S2tXT3g5a0FCT01ndzlmWWZqRVJNek1WanRuUDBCbTJUN21GMTBKd3M9BjsARg%3D%3D--2a69429cb2115c6a0cc9a86e0ebe2800c0d471b3',
                    'Host': 'www.xicidaili.com',
                    'Referer': 'http://www.xicidaili.com/nn/3',
                    'Upgrade-Insecure-Requests': '1',
                }
                html = self.get_page(start_url, options=headers)
                if html:
                    find_trs = re.compile('<tr class.*?>(.*?)</tr>', re.S)
                    trs = find_trs.findall(html)
                    for tr in trs:
                        find_ip = re.compile('<td>(d+.d+.d+.d+)</td>')
                        re_ip_address = find_ip.findall(tr)
                        find_port = re.compile('<td>(d+)</td>')
                        re_port = find_port.findall(tr)
                        for address, port in zip(re_ip_address, re_port):
                            address_port = address + ':' + port
                            yield address_port.replace(' ', '')
    
        def crawl_ip3366(self):
            for i in range(1, 4):
                start_url = 'http://www.ip3366.net/?stype=1&page={}'.format(i)
                html = self.get_page(start_url)
                if html:
                    find_tr = re.compile('<tr>(.*?)</tr>', re.S)
                    trs = find_tr.findall(html)
                    for s in range(1, len(trs)):
                        find_ip = re.compile('<td>(d+.d+.d+.d+)</td>')
                        re_ip_address = find_ip.findall(trs[s])
                        find_port = re.compile('<td>(d+)</td>')
                        re_port = find_port.findall(trs[s])
                        for address, port in zip(re_ip_address, re_port):
                            address_port = address + ':' + port
                            yield address_port.replace(' ', '')
    
        def crawl_iphai(self):
            start_url = 'http://www.iphai.com/'
            html = self.get_page(start_url)
            if html:
                find_tr = re.compile('<tr>(.*?)</tr>', re.S)
                trs = find_tr.findall(html)
                for s in range(1, len(trs)):
                    find_ip = re.compile('<td>s+(d+.d+.d+.d+)s+</td>', re.S)
                    re_ip_address = find_ip.findall(trs[s])
                    find_port = re.compile('<td>s+(d+)s+</td>', re.S)
                    re_port = find_port.findall(trs[s])
                    for address, port in zip(re_ip_address, re_port):
                        address_port = address + ':' + port
                        yield address_port.replace(' ', '')
    
        def crawl_data5u(self):
            start_url = 'http://www.data5u.com/free/gngn/index.shtml'
            headers = {
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
                'Accept-Encoding': 'gzip, deflate',
                'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
                'Cache-Control': 'max-age=0',
                'Connection': 'keep-alive',
                'Cookie': 'JSESSIONID=47AA0C887112A2D83EE040405F837A86',
                'Host': 'www.data5u.com',
                'Referer': 'http://www.data5u.com/free/index.shtml',
                'Upgrade-Insecure-Requests': '1',
                'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36',
            }
            html = self.get_page(start_url, options=headers)
            if html:
                ip_address = re.compile('<span><li>(d+.d+.d+.d+)</li>.*?<li class="port.*?>(d+)</li>', re.S)
                re_ip_address = ip_address.findall(html)
                for address, port in re_ip_address:
                    result = address + ':' + port
                    yield result.replace(' ', '')
    
    class GetProxy():
        def __init__(self):
            self.redis = RedisClient()
            self.crawler = Crawler()
    
        def is_over_threshold(self):
            """
            判断是否达到了代理池限制
            """
            return True if self.redis.count() >= POOL_UPPER_THRESHOLD else False
    
        def run(self):
            print('获取器开始执行')
            if not self.is_over_threshold():
                for callback_label in range(self.crawler.__CrawlFuncCount__):
                    callback = self.crawler.__CrawlFunc__[callback_label]
                    # 获取代理
                    proxies = self.crawler.get_proxies(callback)
                    for proxy in proxies:
                        self.redis.add(proxy)

    三个类:

    • ProxyMetaclass(元类):筛选出Crawler类中以crawler_开头的方法
    • Crawler:爬虫类
    1. get_page:获取页面
    2. get_proxies:调用以crawler开头的方法,返回代理list
    3. 其他都是对应网站的解析方法
    • GetProxy
    1. 初始化redis和crawler
    2. is_over_threashold:判断代理池是否达到阀值
    3. run:运行crawler里的所有以crawler开头的方法

    存储模块db.py

    • Redis ZSet存储:字段是IP:PORT+分数(整数)
    • 分数:最高100,(新获取代理)初始10,最低0,每次测试不可用减1,0分删除
    import redis
    from conf import *
    from random import choice
    import re
    
    
    class PoolEmptyError(Exception):
    
        def __init__(self):
            Exception.__init__(self)
    
        def __str__(self):
            return repr('代理池已经枯竭')
    
    
    class RedisClient(object):
        def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
            """
            初始化
            :param host: Redis 地址
            :param port: Redis 端口
            :param password: Redis密码
            """
            self.db = redis.Redis(host=host, port=port, password=password, decode_responses=True)
    
        def add(self, proxy, score=INITIAL_SCORE):
            """
            添加代理,设置分数为最高
            :param proxy: 代理
            :param score: 分数
            :return: 添加结果
            """
            if not re.match('d+.d+.d+.d+:d+', proxy):
                print('代理不符合规范', proxy, '丢弃')
                return
            if not self.db.zscore(REDIS_KEY, proxy):
                return self.db.zadd(REDIS_KEY, {proxy:score})
    
        def random(self):
            """
            随机获取有效代理,首先尝试获取最高分数代理,如果不存在,按照排名获取,否则异常
            :return: 随机代理
            """
            result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
            if len(result):
                return choice(result)
            else:
                result = self.db.zrevrange(REDIS_KEY, 0, 100)
                if len(result):
                    return choice(result)
                else:
                    raise PoolEmptyError
    
        def decrease(self, proxy):
            """
            代理值减一分,小于最小值则删除
            :param proxy: 代理
            :return: 修改后的代理分数
            """
            score = self.db.zscore(REDIS_KEY, proxy)
            if score and score > MIN_SCORE:
                print('代理', proxy, '当前分数', score, '减1')
                return self.db.zincrby(REDIS_KEY, -1, proxy)
            else:
                print('代理', proxy, '当前分数', score, '移除')
                return self.db.zrem(REDIS_KEY, proxy)
    
        def exists(self, proxy):
            """
            判断是否存在
            :param proxy: 代理
            :return: 是否存在
            """
            return not self.db.zscore(REDIS_KEY, proxy) == None
    
        def max(self, proxy):
            """
            将代理设置为MAX_SCORE
            :param proxy: 代理
            :return: 设置结果
            """
            print('代理', proxy, '可用,设置为', MAX_SCORE)
            return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)
    
        def count(self):
            """
            获取数量
            :return: 数量
            """
            return self.db.zcard(REDIS_KEY)
    
        def all(self):
            """
            获取全部代理
            :return: 全部代理列表
            """
            return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)
    
        def batch(self, start, stop):
            """
            批量获取
            :param start: 开始索引
            :param stop: 结束索引
            :return: 代理列表
            """
            return self.db.zrevrange(REDIS_KEY, start, stop - 1)
    • PoolEmptyError:自定义代理池报错类
    • RedisClient:redis服务器
    1. __init__初始化redis链接池
    2. add添加到redis
    3. random随机获取有效代理
    4. decrease处理测试过的代理
    5. exists:代理是否存在
    6. max:代理设置为最高分
    7. count:获取代理数量
    8. all:获取所有代理
    9. batch:批量获取代理

    检测模块Tester.py

    import asyncio
    import aiohttp
    import time
    
    try:
        from aiohttp import ClientError
    except:
        from aiohttp import ClientProxyConnectionError as ProxyConnectionError
    from db import RedisClient
    from conf import *
    
    
    class Tester(object):
        def __init__(self):
            self.redis = RedisClient()
    
        async def test_single_proxy(self, proxy):
            """
            测试单个代理
            :param proxy:
            :return:
            """
            conn = aiohttp.TCPConnector(verify_ssl=False)
            async with aiohttp.ClientSession(connector=conn) as session:
                try:
                    if isinstance(proxy, bytes):
                        proxy = proxy.decode('utf-8')
                    real_proxy = 'http://' + proxy
                    print('正在测试', proxy)
                    async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response:
                        if response.status in VALID_STATUS_CODES:
                            self.redis.max(proxy)
                            print('代理可用', proxy)
                        else:
                            self.redis.decrease(proxy)
                            print('请求响应码不合法 ', response.status, 'IP', proxy)
                except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError, aiohttp.client_exceptions.ServerDisconnectedError):
                    self.redis.decrease(proxy)
                    print('代理请求失败', proxy)
    
        def run(self):
            """
            测试主函数
            :return:
            """
            print('测试器开始运行')
            try:
                count = self.redis.count()
                print('当前剩余', count, '个代理')
                for i in range(0, count, BATCH_TEST_SIZE):
                    start = i
                    stop = min(i + BATCH_TEST_SIZE, count)
                    print('正在测试第', start + 1, '-', stop, '个代理')
                    test_proxies = self.redis.batch(start, stop)
                    loop = asyncio.get_event_loop()
                    tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
                    loop.run_until_complete(asyncio.wait(tasks))
                    time.sleep(1)
            except Exception as e:
                print('测试器发生错误', e.args)

    接口模块api.py

    from flask import Flask, g
    
    from db import RedisClient
    
    __all__ = ['app']
    
    app = Flask(__name__)
    
    
    def get_conn():
        if not hasattr(g, 'redis'):
            g.redis = RedisClient()
        return g.redis
    
    
    @app.route('/')
    def index():
        return '<h2>Welcome to Proxy Pool System</h2>'
    
    
    @app.route('/random')
    def get_proxy():
        """
        Get a proxy
        :return: 随机代理
        """
        conn = get_conn()
        return conn.random()
    
    
    @app.route('/count')
    def get_counts():
        """
        Get the count of proxies
        :return: 代理池总量
        """
        conn = get_conn()
        return str(conn.count())
    
    
    if __name__ == '__main__':
        app.run()

    配置文件conf

    # Redis数据库地址
    REDIS_HOST = '127.0.0.1'
    
    # Redis端口
    REDIS_PORT = 6379
    
    # Redis密码,如无填None
    REDIS_PASSWORD = None
    
    REDIS_KEY = 'proxies'
    
    # 代理分数
    MAX_SCORE = 100
    MIN_SCORE = 0
    INITIAL_SCORE = 10
    
    VALID_STATUS_CODES = [200, 302]
    
    # 代理池数量界限
    POOL_UPPER_THRESHOLD = 50000
    
    # 检查周期
    TESTER_CYCLE = 20
    # 获取周期
    GETTER_CYCLE = 300
    
    # 测试API,建议抓哪个网站测哪个
    TEST_URL = 'http://www.baidu.com'
    
    # API配置
    API_HOST = '0.0.0.0'
    API_PORT = 5555
    
    # 开关
    TESTER_ENABLED = True
    GETTER_ENABLED = True
    API_ENABLED = True
    
    # 最大批测试量
    BATCH_TEST_SIZE = 100

    程序入口run.py  多线程运行获取、检测、接口模块

    import sys
    import io
    import time
    from multiprocessing import Process
    from api import app
    from crawler import GetProxy
    from tester import Tester
    from conf import *
    
    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
    
    class Scheduler():
        def schedule_tester(self, cycle=TESTER_CYCLE):
            """
            定时测试代理
            """
            tester = Tester()
            while True:
                print('测试器开始运行')
                tester.run()
                time.sleep(cycle)
    
        def schedule_getter(self, cycle=GETTER_CYCLE):
            """
            定时获取代理
            """
            get_proxy = GetProxy()
            while True:
                print('开始抓取代理')
                get_proxy.run()
                time.sleep(cycle)
    
        def schedule_api(self):
            """
            开启API
            """
            app.run(API_HOST, API_PORT)
    
        def run(self):
            print('代理池开始运行')
    
            if TESTER_ENABLED:
                tester_process = Process(target=self.schedule_tester)
                tester_process.start()
    
            if GETTER_ENABLED:
                getter_process = Process(target=self.schedule_getter)
                getter_process.start()
    
            if API_ENABLED:
                api_process = Process(target=self.schedule_api)
                api_process.start()
    
    def main():
        try:
            s = Scheduler()
            s.run()
        except:
            main()
    
    if __name__ == '__main__':
        main()
  • 相关阅读:
    java接口请求超时处理方法
    Spring Cloud GateWay 服务网关
    Mysql中on条件和where条件的使用总结
    Elasticsearch之javaAqi
    Elasticsearch—CRUD
    ElasticSearch的版本控制和Mapping创建
    cmd定时任务计划
    CSS简介及使用
    html简介及应用
    Python基础(十五)-IO模型
  • 原文地址:https://www.cnblogs.com/jp-mao/p/10106085.html
Copyright © 2011-2022 走看看