zoukankan      html  css  js  c++  java
  • aiohttp/asyncio测试代理是否可用

    #!/usr/bin/env python  
    # encoding: utf-8
    from proxyPool.db import RedisClient
    import asyncio
    import aiohttp
    import time
    try:
        from aiohttp import ClientError,client_exceptions
    except:
        from aiohttp import ClientProxyConnectionError as ProxyConnectionError
    VALID_STATUS_CODES=[200]
    TEST_URL='http://www.baidu.com'
    BATCH_TEST_SIZE=100
    class Tester(object):
        def __init__(self):
            self.redis=RedisClient()
        async def test_single_proxy(self,proxy):
            """
            测试单个代理
            :param proxy:单个代理
            :return: None
            """
            conn=aiohttp.TCPConnector(verify_ssl=False)
            async with aiohttp.ClientSession(connector=conn) as session:
                try:
                    if isinstance(proxy,bytes):
                        proxy=proxy.decode('utf-8')
                    real_proxy='http://'+proxy
                    print("正在测试",proxy)
                    async with session.get(TEST_URL,proxy=real_proxy,timeout=15) as response:
                        if response.status in VALID_STATUS_CODES:
                            self.redis.max(proxy)
                            print("代理可用",proxy)
                        else:
                            self.redis.decrease(proxy)
                            print("请求响应码不合法",proxy)
                except(ClientError,client_exceptions.ClientConnectorError,TimeoutError,AttributeError):
                    self.redis.decrease(proxy)
                    print("代理请求失败",proxy)
    
        def run(self):
            """
            测试函数
            :return:None
            """
            print("测试器开始运行")
            try:
                proxies=self.redis.all()
                loop=asyncio.get_event_loop()
                #批量测试
                for i in range(0,len(proxies),BATCH_TEST_SIZE):
                    test_proxies=proxies[i:i+BATCH_TEST_SIZE]
                    tasks=[self.test_single_proxy(proxy) for proxy in test_proxies]
                    loop.run_until_complete(asyncio.wait(tasks))
                    time.sleep(5)
            except Exception as e:
                print("测试器发生错误",e.args)
    

      

  • 相关阅读:
    【7】用Laravel5.1开发一个简单的博客系统
    【6】Laravel5.1的migration数据库迁移
    【5】说说Laravel5的blade模板
    【4】优化一下【3】的例子,顺便说说细节
    【3】创建一个简单的Laravel例子
    【2】最简单的Laravel5.1程序分析
    【1】Laravel5.1 安装
    【0】Laravel 5.1 简介
    MySQL常用命令
    Windows8.1使用博客客户端写博客
  • 原文地址:https://www.cnblogs.com/c-x-a/p/9003594.html
Copyright © 2011-2022 走看看