zoukankan      html  css  js  c++  java
  • (转)Python 如何仅用5000 行代码,实现强大的 logging 模块?

    原文:https://juejin.cn/post/6945682794853072909

    Python 的 logging 模块实现了灵活的日志系统。整个模块仅仅 3 个类,不到 5000 行代码的样子,学习它可以加深对程序日志的了解,本文分下面几个部分:

    • logging 简介

    • logging API 设计

    • 记录器对象 Logger

    • 日志记录对象 LogRecord

    • 处理器对象 Hander

    • 格式器对象 Formatter

    • 滚动日志文件处理器

    • 小结

    • 小技巧

    logging 简介

    本次代码使用的是 python 3.8.5 的版本,官方中文文档 3.8.8 。参考链接中官方中文文档非常详细,建议先看一遍了解日志使用。

    我们主要研究日志如何输出到标准窗口这一主线;日志的配置,日志的线程安全及各种特别的Handler等支线可以先忽略。

    logging API 设计

    先看看日志使用:

    import logging
    
    logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(name)-10s %(asctime)s %(message)s')
    lang = {"name": "python", "age":20}
    logging.info('This is a info message %s', lang)
    logging.debug('This is a debug message')
    logging.warning('This is a warning message')
    
    logger = logging.getLogger(__name__)
    logger.warning('This is a warning')
    复制代码

    输出内容如下:

    INFO     root       2021-03-04 00:03:53,473 This is a info message {'name': 'python', 'age': 20}
    WARNING  root       2021-03-04 00:03:53,473 This is a warning message
    WARNING  __main__   2021-03-04 00:03:53,473 This is a warning
    复制代码

    可以看到 logging 的使用非常方便,模块直接提供了一组API。

    root = RootLogger(WARNING)  # 默认提供的logger
    Logger.root = root
    Logger.manager = Manager(Logger.root)
    
    def debug(msg, *args, **kwargs): # info,warning等api类似
        if len(root.handlers) == 0:
            basicConfig()  # 默认配置
        root.debug(msg, *args, **kwargs)
    
    def getLogger(name=None):
        if name:
            return Logger.manager.getLogger(name)  # 创建特定的logger
        else:
            return root  # 返回默认的logger
    复制代码

    这种API的提供方式,我们在requests中也有看到。api中很重要的设置config的方式:

    def basicConfig(**kwargs):
        ...
        if handlers is None:
            filename = kwargs.pop("filename", None)
            mode = kwargs.pop("filemode", 'a')
            if filename:
                h = FileHandler(filename, mode)
            else:
                stream = kwargs.pop("stream", None)
                h = StreamHandler(stream)  # 默认的handler
            handlers = [h]
        dfs = kwargs.pop("datefmt", None)
        style = kwargs.pop("style", '%')
        fs = kwargs.pop("format", _STYLES[style][1])
        fmt = Formatter(fs, dfs, style)  # 生成formatter
        for h in handlers:
            if h.formatter is None:
                h.setFormatter(fmt)
            root.addHandler(h)  # 设置root的handler
        level = kwargs.pop("level", None)
        if level is not None:
            root.setLevel(level)  # 设置日志级别
    复制代码

    可以看到,日志的配置主要包括下面几项:

    • level 日志级别
    • format 信息格式化模版
    • filename 输出到文件
    • datefmt %Y-%m-%d %H:%M:%S,uuu 时间的格式模版
    • style [%,{,$] 格式样板

    演示代码输出中,可以看到debug日志没有显示,是因为 debug < info:

    CRITICAL = 50
    FATAL = CRITICAL
    ERROR = 40
    WARNING = 30
    WARN = WARNING
    INFO = 20
    DEBUG = 10
    NOTSET = 0
    复制代码

    记录器对象 Logger

    查看Logger之前,先看logger对象的管理类Manager

    _loggerClass = Logger
    
    class Manager(object):
        def __init__(self, rootnode):
            self.root = rootnode
            self.disable = 0
            self.loggerDict = {}  # 所有日志记录对象的字典
        ...
        def getLogger(self, name):
            rv = None
            if name in self.loggerDict:
                rv = self.loggerDict[name]  # 获取已经创建过的同名logger
                ...
            else:
                rv = (self.loggerClass or _loggerClass)(name)  # 创建新的logger
                rv.manager = self
                self.loggerDict[name] = rv
                ...
            return rv
    复制代码

    日志过滤器

    class Filterer(object):
    
        def __init__(self):
            self.filters = []
    
        def addFilter(self, filter):
            self.filters.append(filter)
    
        def removeFilter(self, filter):
            self.filters.remove(filter)
    
        def filter(self, record):
            rv = True
            for f in self.filters:  # 过滤日志
                if hasattr(f, 'filter'):
                    result = f.filter(record)
                else:
                    result = f(record) # assume callable - will raise if not
                if not result:
                    rv = False
                    break
            return r
    复制代码

    核心的 Logger 实际上只是一个控制中心:

    class Logger(Filterer):  # logger可以过滤日志
        def __init__(self, name, level=NOTSET):
            Filterer.__init__(self)
            self.name = name
            self.level = _checkLevel(level)
            self.parent = None  # 日志可以有层级
            self.propagate = True
            self.handlers = []  # 可以输出到多个handler
            self.disabled = False  # 可以关闭
            self._cache = {}
    
        def debug(self, msg, *args, **kwargs):  # 输出debug日志
            if self.isEnabledFor(DEBUG):
                self._log(DEBUG, msg, args, **kwargs)
    复制代码

    logger可以判断日志级别:

    def isEnabledFor(self, level):
        if self.disabled:
            return False
    
        try:
            return self._cache[level]
        except KeyError:
            try:
                if self.manager.disable >= level:
                    is_enabled = self._cache[level] = False
                else:
                    is_enabled = self._cache[level] = (
                        level >= self.getEffectiveLevel()
                    )
            return is_enabled
    
    def getEffectiveLevel(self):
        logger = self
        while logger:
            if logger.level:
                return logger.level
            logger = logger.parent
        return NOTSET
    复制代码

    日志输出:

    def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False,
             stacklevel=1):
        ...
        fn, lno, func = "(unknown file)", 0, "(unknown function)"
        ...
        # 生成日志记录
        record = self.makeRecord(self.name, level, fn, lno, msg, args,
                                 exc_info, func, extra, sinfo)
        # 使用handler处理日志
        self.handle(record)
    复制代码

    日志记录的生产,就是创建一个LogRecord对象:

    _logRecordFactory = LogRecord
    
    def makeRecord(self, name, level, fn, lno, msg, args, exc_info,
                   func=None, extra=None, sinfo=None):
        ...
        rv = _logRecordFactory(name, level, fn, lno, msg, args, exc_info, func,
                             sinfo)
        ...
        return rv
    复制代码

    使用logger对象的所有handler处理日志:

    def handle(self, record):
        c = self
        found = 0
        while c:
            for hdlr in c.handlers:  # 使用所有的handler处理日志
                found = found + 1
                if record.levelno >= hdlr.level:
                    hdlr.handle(record)
    复制代码

    root-logger 的handler是在config中配置的:

    def basicConfig(**kwargs):
        ...
        root.addHandler(h)  # 设置root的handler
    复制代码

    日志记录对象 LogRecord

    日志记录对象非常简单:

    class LogRecord(object):
        def __init__(self, name, level, pathname, lineno,
                     msg, args, exc_info, func=None, sinfo=None, **kwargs):
            ct = time.time()
            self.name = name  # logger名称
            self.msg = msg  # 日志标识信息
            ...
            self.args = args  # 变量
            self.levelname = getLevelName(level)
            ...
    
        def getMessage(self):
            msg = str(self.msg)
            if self.args:
                msg = msg % self.args  # 格式化消息
            return msg
    复制代码

    处理器对象 Hander

    顶级Handler定义了Handler的模版方法

    class Handler(Filterer):  # 处理器也可以过滤日志
        def __init__(self, level=NOTSET):
            Filterer.__init__(self)
            self._name = None
            self.level = _checkLevel(level)  # handler也有日志级别
            self.formatter = None
            _addHandlerRef(self)
            self.createLock()
    
        def handle(self, record):  # 处理日志
            rv = self.filter(record)  # 过滤日志
            if rv:
                self.acquire()  # 申请锁
                try:
                    self.emit(record)  # 提交记录,由不同子类实现 
                finally:
                    self.release()  # 释放锁
            return rv
    复制代码

    默认的console流 StreamHandler

    class StreamHandler(Handler):
    
        terminator = '\n'  # 自动换行
    
        def __init__(self, stream=None):
            Handler.__init__(self)
            if stream is None:
                stream = sys.stderr  # 默认使用stderr输出
            self.stream = stream
    
        def emit(self, record):
            try:
                msg = self.format(record)  # 格式化日志记录
                stream = self.stream
                stream.write(msg + self.terminator)  # 写日志
                self.flush()  # 刷新写缓存
            except Exception:
                ...
    
        def format(self, record):
            if self.formatter:
                fmt = self.formatter
            else:
                fmt = _defaultFormatter
            return fmt.format(record)  # 使用格式化器格式化日志记录
    复制代码

    为什么使用stderr,可以看下面的测试中的输出都是到console:

    print("haha")
    print("fatal error", file=sys.stderr)
    sys.stderr.write("fatal error\n")
    复制代码

    格式器对象 Formatter

    格式化器主要使用Formatter和Style实现

    class Formatter(object):
        def __init__(self, fmt=None, datefmt=None, style='%', validate=True):
            self._style = _STYLES[style][0](fmt)
            self._fmt = self._style._fmt
            self.datefmt = datefmt
    
        def format(self, record):
            record.message = record.getMessage()
            s = self.formatMessage(record)
            return s
    
        def formatMessage(self, record):
            return self._style.format(record)  # 格式化
    复制代码

    Style类

    class PercentStyle(object):
    
        default_format = '%(message)s'
        asctime_format = '%(asctime)s'
        asctime_search = '%(asctime)'
        validation_pattern = re.compile(r'%\(\w+\)[#0+ -]*(\*|\d+)?(\.(\*|\d+))?[diouxefgcrsa%]', re.I)
    
        def __init__(self, fmt):
            self._fmt = fmt or self.default_format
    
        def usesTime(self):
            return self._fmt.find(self.asctime_search) >= 0
    
        def validate(self):
            """Validate the input format, ensure it matches the correct style"""
            if not self.validation_pattern.search(self._fmt):
                raise ValueError("Invalid format '%s' for '%s' style" % (self._fmt, self.default_format[0]))
    
        def _format(self, record):
            return self._fmt % record.__dict__  # 格式化日志记录对象
    
        def format(self, record):
            try:
                return self._format(record)
            except KeyError as e:
                raise ValueError('Formatting field not found in record: %s' % e)
    复制代码

    滚动日志文件处理器

    线上的日志持续输出到一个文件的话,会让文件巨大,即有加剧了丢失的风险,也难以处理。通常有按照大小滚动或者按照日期滚动的方法,这个功能非常重要。先看滚动日志处理器模版:

    class BaseRotatingHandler(logging.FileHandler):
        def emit(self, record):
            try:
                if self.shouldRollover(record): # 判断是否需要滚动
                    self.doRollover()  # 滚动日志
                logging.FileHandler.emit(self, record)  # 输出日志
            except Exception:
                self.handleError(record)
    
        def rotate(self, source, dest):
            if not callable(self.rotator):
                if os.path.exists(source):
                    os.rename(source, dest)  # 重命名日志文件
            else:
                self.rotator(source, dest)
    复制代码

    按大小滚动 RotatingFileHandler

    按照文件大小滚动的处理器:

    class RotatingFileHandler(BaseRotatingHandler):
    
        def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False):
            if maxBytes > 0:
                mode = 'a'
            BaseRotatingHandler.__init__(self, filename, mode, encoding, delay)
            self.maxBytes = maxBytes  # 单个文件大小上限
            self.backupCount = backupCount  # 日志备份数量
    
        def doRollover(self):  # 执行滚动
            if self.stream:
                self.stream.close()  # 关闭当前的流
                self.stream = None
            if self.backupCount > 0:
                for i in range(self.backupCount - 1, 0, -1):
                    sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i))
                    dfn = self.rotation_filename("%s.%d" % (self.baseFilename,
                                                            i + 1))
                    if os.path.exists(sfn):
                        if os.path.exists(dfn):
                            os.remove(dfn)
                        os.rename(sfn, dfn)
                dfn = self.rotation_filename(self.baseFilename + ".1")
                if os.path.exists(dfn):
                    os.remove(dfn)
                self.rotate(self.baseFilename, dfn)  # 重命名文件
            if not self.delay:
                self.stream = self._open()  # 如果shouldRollover延迟,可以打开新的流
    
        def shouldRollover(self, record):  # 判断是否需要滚动
            if self.stream is None:  # 立即打开流
                self.stream = self._open()
            if self.maxBytes > 0:   
                msg = "%s\n" % self.format(record)
                self.stream.seek(0, 2)  #due to non-posix-compliant Windows feature
                if self.stream.tell() + len(msg) >= self.maxBytes:  # 判断大小
                    return 1
            return 0
    复制代码

    文件大小滚动就是在记录日志时候判断文档是否超过上限,超过则重命名旧日志,生成新日志。

    按照日期滚动 TimedRotatingFileHandler

    按照日期滚动的处理器:

    class TimedRotatingFileHandler(BaseRotatingHandler):
        def __init__(self, filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None):
            BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay)
            self.when = when.upper()
            self.backupCount = backupCount
            self.utc = utc
            self.atTime = atTime
            # 日期设置,支持多种方式
            if self.when == 'S':
                self.interval = 1 # one second
                self.suffix = "%Y-%m-%d_%H-%M-%S"
                self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}(\.\w+)?$"
            ...
    
            self.extMatch = re.compile(self.extMatch, re.ASCII)
            self.interval = self.interval * interval # multiply by units requested
            filename = self.baseFilename
            if os.path.exists(filename):
                t = os.stat(filename)[ST_MTIME]  # 最后修改时间
            else:
                t = int(time.time())
            self.rolloverAt = self.computeRollover(t)  # 提前计算终止时间
    
        def computeRollover(self, currentTime):
            # 判断的方法还是很长很复杂的,先pass
    
        def shouldRollover(self, record):
            t = int(time.time())
            if t >= self.rolloverAt:  # 判断是否到期
                return 1
            return 0
    
        def doRollover(self):
            ...
            dfn = self.rotation_filename(self.baseFilename + "." +
                                         time.strftime(self.suffix, timeTuple))
            #  滚动日志文件
            if os.path.exists(dfn):
                os.remove(dfn)
            self.rotate(self.baseFilename, dfn)
            if self.backupCount > 0:
                for s in self.getFilesToDelete():
                    os.remove(s)
            ...
            # 计算下一个时间点
            newRolloverAt = self.computeRollover(currentTime)
            ...
            self.rolloverAt = newRolloverAt
    复制代码

    日期滚动就是计算最后时间点,超过时间点则重新生成新的日志文件。

    小结

    logging的处理逻辑大概是这样的:

    • 创建Logger对象,提供API,用来接收应用程序日志
    • Logger对象包括多个Handler
    • 每个Handler有一个Formatter对象
    • 每条日志都会生成一个LogRecord对象
    • 使用不同的Handler对象将LogRecored对象提交到不同的流
    • 每个日志对象通过Formatter格式化输出
    • 可以使用按日期/文件大小的方式进行日志文件的滚动记录

    小技巧

    覆盖对象的 reduce 方法,让对象支持reduce函数:

    class RootLogger(Logger):
        def __init__(self, level):
            Logger.__init__(self, "root", level)
    
        def __reduce__(self):
            return getLogger, ()
    复制代码

    线程锁的创建和释放:

    _lock = threading.RLock()
    
    def _acquireLock():
        if _lock:
            _lock.acquire()
    
    def _releaseLock():
        if _lock:
            _lock.release()
    复制代码

    线程锁的使用:

    def addHandler(self, hdlr):
        _acquireLock()
        try:
            self.handlers.append(hdlr)
        finally:
            _releaseLock()
    
    def removeHandler(self, hdlr):
        _acquireLock()
        try:
            self.handlers.remove(hdlr)
        finally:
            _releaseLock()
     
    技术链接
  • 相关阅读:
    Centos 5.5+PHP5 添加phpjavabrige
    Google /Baidu Ping服务快速收录php
    基站一些信息
    搜索引擎提交入口收集
    druid简单教程
    http关于application/xwwwformurlencoded等字符编码的解释说明
    对比.NET PetShop和Duwamish来探讨Ado.NET的数据库编程模式。
    WinForm RTF生成器
    在 Visual C# .NET 中跟踪和调试
    c#中构建异常处理
  • 原文地址:https://www.cnblogs.com/liujiacai/p/15624572.html
Copyright © 2011-2022 走看看