zoukankan      html  css  js  c++  java
  • Python中logging在多进程环境下打印日志

            因为涉及到进程间互斥与通信问题,因此默认情况下Python中的logging无法在多进程环境下打印日志。但是查询了官方文档可以发现,推荐了一种利用logging.SocketHandler的方案来实现多进程日志打印。

            其原理很简单,概括一句话就是说:多个进程将各自环境下的日志通过Socket发送给一个专门打印日志的进程,这样就可以防止多进程打印的冲突与混乱情况。

            本文主要记录下SocketHandler真实的用法情况:

    1 时序图

           简单说明下逻辑:主进程(MainProcess)启动一个专门打印日志的进程(LogReceiverProcess),并且将自己(主进程)环境下的日志都“重定向”给LogReceiverProcess。同理,在后续逻辑中启动的所有工作子进程(WorkerProcess)都做一样的操作,把自己环境下的日志都“重定向”给日志进程去打印。

    2 实现代码

    2.1 日志进程

      日志进程的代码核心在于要建立一个TCP Server来接收并处理Log record,代码如下:

     1 import os
     2 import logging
     3 import logging.handlers
     4 import traceback
     5 import cPickle
     6 import struct
     7 import SocketServer
     8 from multiprocessing import Process
     9 
    10 class LogRecordStreamHandler(SocketServer.StreamRequestHandler):
    11     def handle(self):
    12         while True:
    13             try:
    14                 chunk = self.connection.recv(4)
    15                 if len(chunk) < 4:
    16                     break
    17                 slen = struct.unpack(">L", chunk)[0]
    18                 chunk = self.connection.recv(slen)
    19                 while len(chunk) < slen:
    20                     chunk = chunk + self.connection.recv(slen - len(chunk))
    21                 obj = self.unpickle(chunk)
    22                 record = logging.makeLogRecord(obj)
    23                 self.handle_log_record(record)
    24 
    25             except:
    26                 break
    27 
    28     @classmethod
    29     def unpickle(cls, data):
    30         return cPickle.loads(data)
    31 
    32     def handle_log_record(self, record):
    33         if self.server.logname is not None:
    34             name = self.server.logname
    35         else:
    36             name = record.name
    37         logger = logging.getLogger(name)
    38         logger.handle(record)
    39 
    40 
    41 class LogRecordSocketReceiver(SocketServer.ThreadingTCPServer):
    42     allow_reuse_address = 1
    43 
    44     def __init__(self, host='localhost', port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, handler=LogRecordStreamHandler):
    45         SocketServer.ThreadingTCPServer.__init__(self, (host, port), handler)
    46         self.abort = 0
    47         self.timeout = 1
    48         self.logname = None
    49 
    50     def serve_until_stopped(self):
    51         import select
    52         abort = 0
    53         while not abort:
    54             rd, wr, ex = select.select([self.socket.fileno()], [], [], self.timeout)
    55             if rd:
    56                 self.handle_request()
    57             abort = self.abort
    58 
    59 
    60 def _log_listener_process(log_format, log_time_format, log_file):
    61     log_file = os.path.realpath(log_file)
    62     logging.basicConfig(level=logging.DEBUG, format=log_format, datefmt=log_time_format, filename=log_file, filemode='a+')
    63 
    64     # Console log
    65     console = logging.StreamHandler()
    66     console.setLevel(logging.INFO)
    67     console.setFormatter(logging.Formatter(fmt=log_format, datefmt=log_time_format))
    68     logging.getLogger().addHandler(console)
    69 
    70     tcp_server = LogRecordSocketReceiver()
    71 
    72     logging.debug('Log listener process started ...')
    73     tcp_server.serve_until_stopped()
    View Code

       关键点:

    (1)TCPServer的构建逻辑,拆包还原Log记录;

    (2)在日志进程中设定好logging记录级别和打印方式,这里除了指定文件存储还添加了Console打印。 

    2.2 其他进程

      除了日志进程之外的进程,设置logging都“重定向”给日志进程,并且要关闭当前进程的日志在Console打印(默认会显示Warning级别及以上的日志到Console),否则Console上日志展示会有重复凌乱的感觉。

      1 class LogHelper:
      2     # 默认日志存储路径(相对于当前文件路径)
      3     default_log_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'logs')
      4     
      5     # 记录当前实际的日志所在目录
      6     current_log_path = ''
      7     
      8     # 记录当前实际的日志完整路径
      9     current_log_file = ''
     10 
     11     # 日志文件内容格式
     12     log_format = '[%(asctime)s.%(msecs)03d][%(processName)s][%(levelname)s][%(filename)s:%(lineno)d] %(message)s'
     13 
     14     # 日志中时间格式
     15     log_time_format = '%Y%m%d %H:%M:%S'
     16 
     17     # 日志进程
     18     log_process = None
     19 
     20     def __init__(self):
     21         pass
     22 
     23     @staticmethod
     24     def print_console_log(level, message):
     25         print '--------------------------------------------------'
     26         if level == logging.WARN:
     27             level_str = '[WARN]'
     28         elif level == logging.ERROR:
     29             level_str = '[ERROR]'
     30         elif level == logging.FATAL:
     31             level_str = '[FATAL]'
     32         else:
     33             level_str = '[INFO]'
     34         print '	%s %s' % (level_str, message)
     35         print '--------------------------------------------------'
     36 
     37     @staticmethod
     38     def init(clear_logs=True, log_path=''):
     39         #
     40         console = logging.StreamHandler()
     41         console.setLevel(logging.FATAL)
     42         logging.getLogger().addHandler(console)
     43 
     44         try:
     45             # 如果外部没有指定日志存储路径则默认在common同级路径存储
     46             if log_path == '':
     47                 log_path = LogHelper.default_log_path
     48                 if not os.path.exists(log_path):
     49                     os.makedirs(log_path)
     50             LogHelper.current_log_path = log_path
     51 
     52             # 清理旧的日志并初始化当前日志路径
     53             if clear_logs:
     54                 LogHelper.clear_old_log_files()
     55             LogHelper.current_log_file = LogHelper._get_latest_log_file()
     56 
     57             socket_handler = logging.handlers.SocketHandler('localhost', logging.handlers.DEFAULT_TCP_LOGGING_PORT)
     58             logging.getLogger().setLevel(logging.DEBUG)
     59             logging.getLogger().addHandler(socket_handler)
     60 
     61             #
     62             LogHelper.start()
     63 
     64         except Exception, ex:
     65             LogHelper.print_console_log(logging.FATAL, 'init() exception: %s' % str(ex))
     66             traceback.print_exc()
     67 
     68     @staticmethod
     69     def start():
     70         if LogHelper.log_process is None:
     71             LogHelper.log_process = Process(target=_log_listener_process, name='LogRecorder', args=(LogHelper.log_format, LogHelper.log_time_format, LogHelper.current_log_file))
     72             LogHelper.log_process.start()
     73         else:
     74             pass
     75 
     76     @staticmethod
     77     def stop():
     78         if LogHelper.log_process is None:
     79             pass
     80         else:
     81             LogHelper.log_process.terminate()
     82             LogHelper.log_process.join()
     83 
     84     @staticmethod
     85     def _get_latest_log_file():
     86         latest_log_file = ''
     87         try:
     88             if os.path.exists(LogHelper.current_log_path):
     89                 for maindir, subdir, file_name_list in os.walk(LogHelper.current_log_path):
     90                     for file_name in file_name_list:
     91                         apath = os.path.join(maindir, file_name)
     92                         if apath > latest_log_file:
     93                             latest_log_file = apath
     94 
     95             if latest_log_file == '':
     96                 latest_log_file = LogHelper.current_log_path + os.sep + 'system_'
     97                 latest_log_file += time.strftime("%Y%m%d_%H%M%S", time.localtime(time.time())) + '.log'
     98 
     99         except Exception, ex:
    100             logging.error('EXCEPTION: %s' % str(ex))
    101             traceback.print_exc()
    102 
    103         finally:
    104             return latest_log_file
    105 
    106     @staticmethod
    107     def get_log_file():
    108         return LogHelper.current_log_file
    109 
    110     @staticmethod
    111     def clear_old_log_files():
    112         if not os.path.exists(LogHelper.current_log_path):
    113             logging.warning('clear_old_log_files() Not exist: %s' % LogHelper.current_log_path)
    114             return
    115 
    116         try:
    117             for maindir, subdir, file_name_list in os.walk(LogHelper.current_log_path):
    118                 for file_name in file_name_list:
    119                     apath = os.path.join(maindir, file_name)
    120                     if apath != LogHelper.current_log_file:
    121                         logging.info('DEL -> %s' % str(apath))
    122                         os.remove(apath)
    123                     else:
    124                         with open(LogHelper.current_log_file, 'w') as f:
    125                             f.write('')
    126 
    127             logging.debug('Clear log done.')
    128 
    129         except Exception, ex:
    130             logging.error('EXCEPTION: %s' % str(ex))
    131             traceback.print_exc()
    View Code
  • 相关阅读:
    查看系统的所有port的状态
    python技巧26[python的egg包的安装和制作]
    python类库31[进程subprocess与管道pipe]
    [BuildRelease Management]hudson插件
    python类库31[使用xml.etree.ElementTree读写xml]
    开机自动运行VMWare
    python实例26[计算MD5]
    2021年最新大厂php+go面试题集(四)
    Jumpserver开源跳板机系统
    报错:ImportError: libmysqlclient.so.20: cannot open shared object file: No such file or director(亲测可用)
  • 原文地址:https://www.cnblogs.com/kuliuheng/p/11190485.html
Copyright © 2011-2022 走看看