本文由希希大队长原创
loguru相信玩过的玩家都觉得很爽,这里我们简单实现一种日志拦截(过滤)器,可用于对特定级别日志进行监控并去执行你想做的事情,适用于异步和非异步场景
直接贴代码:
# -*- coding: utf-8 -*- import sys import queue import asyncio import loguru from threading import Thread from loguru._handler import Message from loguru._logger import Level, Core class InterceptSink: def __init__(self, callback, level, format=True, max_submit_num=50, min_submit_num=1, buffer_size=0, buffer_block=False, **kwargs ): """ callback: intercept log execute level: intercept log level format: True if you need to format, like this: ` 2021-03-15T22:25:34.568938+0800 elapsed: 0:00:00.009447 exception: None extra: {} file: sink.py function: <module> level: ERROR line: 116 message: 这是条错误日志 module: sink name: __main__ process: 88795 thread: 4491730240 time: 2021-03-15T22:25:34.568938+0800 ` buffer_size: intercept log maximum water level min_submit_num: Minimum number of logs per commit max_submit_num: Maximum number of logs per commit """ self._max_submit_num = max_submit_num self._min_submit_num = min_submit_num self._format = format self._callback = callback self._buffer = queue.Queue(buffer_size) self._buffer_size = buffer_size self._buffer_block = buffer_block if isinstance(level, Level): self._level = level assert level.upper() in ["DEBUG", "INFO", "WARNING", "ERROR", "TRACE", "SUCCESS", "CRITICAL"] self._level = Core().levels[level.upper()] self._worker = Thread(target=self._queued_executer, daemon=True) self._running = True self._worker.start() def stop(self): self._running = False self._worker.join(10) def write(self, message: Message): if message.record['level'].no >= self._level.no: if self._format: line = [f' {key}: {val}' for key, val in message.record.items()] line.insert(0, f'{message.record["time"]}') record = ' '.join(line) else: record = message try: self._buffer.put(record) except queue.Full as _: sys.stderr.write(f'{self.__class__.__name__} Log queued writer overflow: {self._buffer_size}') def _queued_executer(self): while self._running: messages = [] while True: try: message = self._buffer.get(block=True, timeout=1) messages.append(message) if len(messages) >= self._max_submit_num: break except queue.Empty as _: break if (messages and len(messages) >= self._min_submit_num) or (messages and not self._running): if asyncio.iscoroutinefunction(self._callback): asyncio.run(self._callback(messages)) else: self._callback(messages) if __name__ == '__main__': async def async_handle(messages): for msg in messages: print('这是异步打印的 '+ msg) def sync_handle(messages): for msg in messages: print('这是同步打印的 '+ msg) loguru.logger.add( InterceptSink( async_handle, level='ERROR' ) ) loguru.logger.add( InterceptSink( sync_handle, level='ERROR' ) ) loguru.logger.error('这是条错误日志')
本文完结!