zoukankan      html  css  js  c++  java
  • Python loguru日志拦截(过滤)器实现

    本文由希希大队长原创

    loguru相信玩过的玩家都觉得很爽,这里我们简单实现一种日志拦截(过滤)器,可用于对特定级别日志进行监控并去执行你想做的事情,适用于异步和非异步场景

    直接贴代码:

    # -*- coding: utf-8 -*-
    
    import sys
    import queue
    import asyncio
    import loguru
    
    from threading import Thread
    from loguru._handler import Message
    from loguru._logger import Level, Core
    
    
    class InterceptSink:
    
        def __init__(self,
                     callback,
                     level,
                     format=True,
                     max_submit_num=50,
                     min_submit_num=1,
                     buffer_size=0,
                     buffer_block=False,
                     **kwargs
                     ):
    
            """
            callback: intercept log execute
            level: intercept log level
            format: True if you need to format, like this:
                `
                2021-03-15T22:25:34.568938+0800
                        elapsed: 0:00:00.009447
                        exception: None
                        extra: {}
                        file: sink.py
                        function: <module>
                        level: ERROR
                        line: 116
                        message: 这是条错误日志
                        module: sink
                        name: __main__
                        process: 88795
                        thread: 4491730240
                        time: 2021-03-15T22:25:34.568938+0800
                `
    
            buffer_size: intercept log maximum water level
            min_submit_num: Minimum number of logs per commit
            max_submit_num: Maximum number of logs per commit
    
            """
            self._max_submit_num = max_submit_num
            self._min_submit_num = min_submit_num
            self._format = format
            self._callback = callback
            self._buffer = queue.Queue(buffer_size)
            self._buffer_size = buffer_size
            self._buffer_block = buffer_block
            if isinstance(level, Level):
                self._level = level
    
            assert level.upper() in ["DEBUG", "INFO", "WARNING", "ERROR", "TRACE", "SUCCESS", "CRITICAL"]
    
            self._level = Core().levels[level.upper()]
    
            self._worker = Thread(target=self._queued_executer, daemon=True)
            self._running = True
    
            self._worker.start()
    
        def stop(self):
    
            self._running = False
            self._worker.join(10)
    
        def write(self, message: Message):
    
            if message.record['level'].no >= self._level.no:
    
                if self._format:
                    line = [f'        {key}: {val}' for key, val in message.record.items()]
                    line.insert(0, f'{message.record["time"]}')
                    record = '
    '.join(line)
                else:
                    record = message
                try:
                    self._buffer.put(record)
                except queue.Full as _:
                    sys.stderr.write(f'{self.__class__.__name__} Log queued writer overflow: {self._buffer_size}')
    
        def _queued_executer(self):
    
            while self._running:
    
                messages = []
                while True:
    
                    try:
                        message = self._buffer.get(block=True, timeout=1)
                        messages.append(message)
                        if len(messages) >= self._max_submit_num:
                            break
                    except queue.Empty as _:
                        break
    
                if (messages and len(messages) >= self._min_submit_num) or (messages and not self._running):
    
                    if asyncio.iscoroutinefunction(self._callback):
                        asyncio.run(self._callback(messages))
                    else:
                        self._callback(messages)
    
    
    if __name__ == '__main__':
    
        async def async_handle(messages):
    
            for msg in messages:
                print('这是异步打印的
    '+ msg)
    
        def sync_handle(messages):
    
            for msg in messages:
                print('这是同步打印的
    '+ msg)
    
        loguru.logger.add(
            InterceptSink(
                async_handle,
                level='ERROR'
            )
        )
        loguru.logger.add(
            InterceptSink(
                sync_handle,
                level='ERROR'
            )
        )
    
        loguru.logger.error('这是条错误日志')

    本文完结!

    程序猿,要对自己狠一点!
  • 相关阅读:
    java 简单封装resultMap返回对象为map
    freemarker 遍历树形菜单
    python 正则表达式
    python BeautifulSoup基本用法
    sublime中正则替换
    媒体查询
    响应式网站的优点和缺点
    响应式网站概念
    vue系列之vue-resource
    vue系列之项目打包以及优化(最新版)
  • 原文地址:https://www.cnblogs.com/dongxixi/p/14540549.html
Copyright © 2011-2022 走看看