zoukankan      html  css  js  c++  java
  • 一个python爬虫工具类

    写了一个爬虫工具类。

    # -*- coding: utf-8 -*-
    # @Time    : 2018/8/7 16:29
    # @Author  : cxa
    # @File    : utils.py
    # @Software: PyCharm
    from retrying import retry
    from decorators.decorators import decorator, parse_decorator
    from glom import glom
    from config import headers
    import datetime
    import hashlib
    from tomorrow import threads
    from requests_html import HTMLSession
    
    try:
        import simplejson as json
    except ImportError:
        import json
    
    
    class MetaSingleton(type):
        _inst = {}
    
        def __call__(cls, *args, **kwargs):
            if cls not in cls._inst:
                cls._inst[cls] = super(MetaSingleton, cls).__call__(*args, **kwargs)
            return cls._inst[cls]
    
    
    class Get_Proxies(metaclass=MetaSingleton):
        ip = None
    
        def getproxy(self, change_proxy):
            if self.ip is None:
                self.ip = self.get_ip(HTMLSession())
                self.proxies = {
                    'http': self.ip,
                    'https': self.ip
                }
            if change_proxy:
                self.ip = self.get_ip(HTMLSession())
                self.proxies = {
                    'http': self.ip,
                    'https': self.ip
                }
            return self.proxies
    
        def get_ip(self, session):
            url = 'ip'
            req = session.get(url)
            if req.status_code == 200:
                jsonstr = req.json()
                isok = glom(jsonstr, "resCode")
                if isok == "0000":
                    key = glom(jsonstr, ('reData', ['key']))[0]
                    uname = glom(jsonstr, ('reData', ['username']))[0]
                    passwd = glom(jsonstr, ('reData', ['password']))[0]
                    proxies = f"http://{uname}:{passwd}@{key}"
                    return proxies
    
    
    @retry(stop_max_attempt_number=5, wait_random_min=3000, wait_random_max=7000)
    @decorator
    def post_html(session, post_url: int, post_data: dict, headers=headers, timeout=30):
        '''
        :param session: 传入session对象
        :param post_url: post请求需要的url
        :param headers: 报头信息,config模块默认提供
        :param post_data: post信息 字典类型
        :param timeout:
        :return:
        '''
    
        post_req = session.post(url=post_url, headers=headers, data=post_data, timeout=timeout, proxies=get_proxies())
        if post_req.status_code == 200:
            post_req.encoding = post_req.apparent_encoding
            # time.sleep(random.randint(1, 3))
            return post_req
        # 随机等待1-3s
    
    
    @retry(stop_max_attempt_number=5, wait_random_min=3000, wait_random_max=7000)
    @decorator
    def get_response(session, url: str, params=None, headers=headers, timeout=10):
        '''
        获取response
        :param url:链接
        :return: return response object
        '''
        try:
            req = session.get(url=url, headers=headers, params=params, timeout=timeout, proxies=get_proxies())
        except:
            req = session.get(url=url, headers=headers, params=params, timeout=timeout, proxies=get_proxies(True))
        if req.status_code == 200:
            req.encoding = req.apparent_encoding
            # time.sleep(random.randint(1, 3))
            return req
        # 随机等待1-3s
    
    
    @decorator
    def get_html(req):
        '''
        获取html类型的网页格式
        :param req:
        :return:
        '''
        source = req.text
        return source
    
    
    @decorator
    def get_json(req):
        '''
        获取json类型的网页格式
        :param req: response对象
        :return:
        '''
        try:
            jsonstr = req.json()
        except:
            source = get_html(req)
            if source.endswith(';'):
                jsonstr = json.loads(source.replace(';', ''))
        return jsonstr
    
    
    @parse_decorator(None)
    def get_xpath(req, xpathstr: str):
        '''
        xpath操作获取节点
        :param req:response对象
        :param xpathstr:
        :return:
        '''
        node = req.html.xpath(xpathstr)
        return node
    
    
    @decorator
    def get_link(node):
        '''
        获取当前节点的链接
        :param req:response对象
        :return:返回绝对链接
        '''
        return list(node.absolute_links)[0]
    
    
    @parse_decorator(None)
    def get_text(node):
        '''
        获取当前节点下的文本
        :param req:response对象
        :param xpathstr:xpath表达式
        :return:
        '''
        return node.text
    
    
    @parse_decorator(None)
    def get_all_text(node):
        '''
        获取该节点包括其子节点下的所有文本
        :param req:response对象
        :param xpathstr:xpath表达式
        :return:
        '''
        if isinstance(node, list):
            return node[0].full_text
        else:
            return node.full_text
    
    
    @decorator
    def get_json_data(jsonstr: str, pat: str):
        '''
        #通过glom模块操作数据
        :param jsonstr:json字符串
        :param pat:模板
        :return:
        '''
        item = glom(jsonstr, pat)
        return item
    
    
    @decorator
    def get_hash_code(key):
        '''
        获取字符串hash值,md5加密
        :param key:
        :return:
        '''
        value = hashlib.md5(key.encode('utf-8')).hexdigest()
        return value
    
    
    @parse_decorator(None)
    def get_next_node(node, xpathstr):
        '''
        当前节点下面操作xpath
        :param node: 节点
        :param xpathstr: xpath表达式
        :return:
        '''
        next_node = node[0].xpath(xpathstr)
        if next_node:
            return next_node
    
    
    @decorator
    def get_datetime_from_unix(unix_time):
        '''
        时间戳转时间格式
        :param unix_time:
        :return:
        '''
        unix_time_value = unix_time
        if not isinstance(unix_time_value, int):
            unix_time_value = int(unix_time)
        new_datetime = datetime.datetime.fromtimestamp(unix_time_value)
        return new_datetime
    
    
    def get_proxies(change_proxy=False):
        ip = Get_Proxies().getproxy(change_proxy)
        return ip
    
    
    @decorator
    @threads(20)
    @retry(stop_max_attempt_number=5)
    def async_get_response(session, url: str, headers=headers, timeout=10):
        '''
        获取response
        :param url:链接
        :return: return response object
        '''
        try:
            req = session.get(url=url, headers=headers, timeout=timeout, proxies=get_proxies())
        except:
            req = session.get(url=url, headers=headers, timeout=timeout, proxies=get_proxies(True))
        # if req.status_code==200:
        #     req.encoding=req.apparent_encoding
        #     #time.sleep(random.randint(1, 3))
        return req
    
    
    if __name__ == '__main__':
        print(get_proxies())
    
    

    以下是headers文件的内容

    import random
    
    first_num = random.randint(55, 62)
    third_num = random.randint(0, 3200)
    fourth_num = random.randint(0, 140)
    
    
    class FakeChromeUA:
        os_type = [
                    '(Windows NT 6.1; WOW64)', '(Windows NT 10.0; WOW64)', '(X11; Linux x86_64)',
                    '(Macintosh; Intel Mac OS X 10_12_6)'
                   ]
    
        chrome_version = 'Chrome/{}.0.{}.{}'.format(first_num, third_num, fourth_num)
    
        @classmethod
        def get_ua(cls):
            return ' '.join(['Mozilla/5.0', random.choice(cls.os_type), 'AppleWebKit/537.36',
                             '(KHTML, like Gecko)', cls.chrome_version, 'Safari/537.36']
                            )
    
    
    headers = {
        'User-Agent': FakeChromeUA.get_ua(),
        'Accept-Encoding': 'gzip, deflate, sdch',
        'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Connection': 'keep-alive'
    }
    
    

    以下是logger文件的内容

    # -*- coding: utf-8 -*-
    import os
    import time
    import logging
    import sys
    log_dir1=os.path.join(os.path.dirname(os.path.dirname(__file__)),"logs")
    today = time.strftime('%Y%m%d', time.localtime(time.time()))
    full_path=os.path.join(log_dir1,today)
    if not os.path.exists(full_path):
        os.makedirs(full_path)
    log_path=os.path.join(full_path,"t.log")
    def get_logger():
         # 获取logger实例,如果参数为空则返回root logger
         logger = logging.getLogger("t")
         if not logger.handlers:
                # 指定logger输出格式
                formatter = logging.Formatter('%(asctime)s %(levelname)-8s: %(message)s')
    
                # 文件日志
                file_handler = logging.FileHandler(log_path,encoding="utf8")
                file_handler.setFormatter(formatter)  # 可以通过setFormatter指定输出格式
    
                # 控制台日志
                console_handler = logging.StreamHandler(sys.stdout)
                console_handler.formatter = formatter  # 也可以直接给formatter赋值
    
                # 为logger添加的日志处理器
                logger.addHandler(file_handler)
                logger.addHandler(console_handler)
    
                # 指定日志的最低输出级别,默认为WARN级别
                logger.setLevel(logging.INFO)
         #  添加下面一句,在记录日志之后移除句柄
         return  logger
    
  • 相关阅读:
    BackupPC备份
    H5日常使用
    无互联网环境安装docker
    docker 部署zabbix
    docker: error pulling image configuration:
    java web开发入门六(spring mvc)基于intellig idea
    java web开发入门七(mybatis)基于intellig idea
    java web开发入门九(Maven使用&idea创建maven项目)基于intellig idea
    Intellij IDEA使用一 创建javaweb项目并配置tomcat
    java web开发入门四(spring)基于intellig idea
  • 原文地址:https://www.cnblogs.com/c-x-a/p/9438587.html
Copyright © 2011-2022 走看看