这个日志没有依赖自己的其他包,复制即可运行,也可以从pypi网站上下载或者pip来安装这个日志。
1、日志内置了7种模板,其中模版4和模板5,可以实现点击日志跳转到指定文件指定行数的功能,史无前例的实现这种方式。
2、使用了ColorHandler作为默认的控制台显示日志,而不是使用官方的StramHandler,实现五颜六色的日志,在茫茫大海的日志中一眼就能看出哪些是调试日志,哪些是错误日志哪些是警告日志和严重日志。绿色代表debug,天蓝色代表info,黄色代表warning,粉红色代表错误,血红色代表严重错误,颜色符合正常逻辑,具体的颜色显示业余自己设置的pycharm主题和配色有关,建议使用黑色主题,具体的颜色显示与pycahrm版本也有一些关系。
3、实现了进程安全的日志切片,引用的是第三方的Handler
4、添加了对国内邮箱 qq 163等支持的mailhandler,并且支持邮件发送控频。
5、添加了MongoHanler,可以自动拆分日志字段插入mongo
6.1、以上这些handler都不需要去手动调用添加各种handler,都是通过传参的方式,如,设置了文件名那么自动生成文件日志,添加了mongo的url登录链接,则添加mongohandler,以此类推。
6.2、要搞清楚为啥logger和各种handler,要弄清楚日志命名空间,各种handler的关系和logger的关系必须弄清楚23种设计模式的观察者模式,搞清楚这个模式了,就可以自己扩展各种各样的handler来满足自己的需求。
为什么要使用日志呢,
之前同事全部print,十分蛋疼,项目有几十万行,一运行起来,各种地方嵌套import各种模块,到处去print,十分操蛋,完全不知道哪里冒出来的日志,不好禁止,扩展不了各种handler,总之比日志差太多了。。
拿print当做日志用是属于py很初级的表现。
这个很长,是加了很多种handler,和同类型handler自动去重。
简化版的是打猴子补丁自动变彩,不需要使用这个日志了。
# coding=utf8 """ 日志管理,支持日志打印到控制台和写入切片文件和mongodb和email和钉钉机器人和elastic和kafka。 使用方式为 logger = LogManager('logger_name').get_and_add_handlers(log_level_int=1, is_add_stream_handler=True, log_path=None, log_filename=None, log_file_size=10,mongo_url=None,formatter_template=2) 或者 logger = LogManager('logger_name').get_without_handlers(),此种没有handlers不立即记录日志,之后可以在单独统一的总闸处对所有日志根据loggerame进行get_and_add_handlers添加相关的各种handlers 创建一个邮件日志的用法为 logger = LogManager.bulid_a_logger_with_mail_handler('mail_logger_name', mail_time_interval=10, toaddrs=('909686xxx@qq.com', 'yangxx4508@dingtalk.com',subject='你的主题)),使用了独立的创建方式 concurrent_log_handler的ConcurrentRotatingFileHandler解决了logging模块自带的RotatingFileHandler多进程切片错误,此ConcurrentRotatingFileHandler在win和linux多进程场景下log文件切片都ok. 1、根据日志级别,使用coolorhanlder代替straemhandler打印5种颜色的日志,一目了然哪里是严重的日志。 2、带有多种handler,邮件 mongo stream file的。 3、支持pycharm点击日志跳转到对应代码文件的对应行。 4、对相同命名空间的logger可以无限添加同种类型的handlers,不会重复使用同种handler记录日志。不需要用户自己去判断。 """ import json import traceback from queue import Queue import socket import datetime import sys import os from elasticsearch import Elasticsearch, helpers from threading import Lock, Thread import unittest import time from collections import OrderedDict import pymongo import logging from logging import handlers from concurrent_log_handler import ConcurrentRotatingFileHandler # 需要安装。concurrent-log-handler==0.9.1 from kafka import KafkaProducer from app import config as app_config os_name = os.name DING_TALK_TOKEN = '3ddxxxxxxxxx' # 钉钉报警机器人 EMAIL_HOST = ('smtp.sohu.com', 465) EMAIL_FROMADDR = 'yxx@sohu.com' EMAIL_TOADDRS = ('chao.xx@ab.com', 'yxx@cd.com',) EMAIL_CREDENTIALS = ('yxx@sohu.com', 'acb1xxx') # ELASTIC_HOST = '1xx.90.89.xx' ELASTIC_PORT = 9200 ALWAYS_ADD_ES_HANDLER_IN_TEST_ENVIRONENT = True KAFKA_BOOTSTRAP_SERVERS = ['1xx.90.89.xx:9092'] # noinspection PyProtectedMember,PyUnusedLocal,PyIncorrectDocstring def very_nb_print(*args, sep=' ', end=' ', file=None): """ 超流弊的print补丁 :param x: :return: """ # 获取被调用函数在被调用时所处代码行数 line = sys._getframe().f_back.f_lineno # 获取被调用函数所在模块文件名 file_name = sys._getframe(1).f_code.co_filename # sys.stdout.write(f'"{__file__}:{sys._getframe().f_lineno}" {x} ') args = (str(arg) for arg in args) # REMIND 防止是数字不能被join sys.stdout.write(f'"{file_name}:{line}" {time.strftime("%H:%M:%S")} 33[0;94m{"".join(args)} 33[0m ') # 36 93 96 94 # noinspection PyShadowingBuiltins # print = very_nb_print formatter_dict = { 1: logging.Formatter( '日志时间【%(asctime)s】 - 日志名称【%(name)s】 - 文件【%(filename)s】 - 第【%(lineno)d】行 - 日志等级【%(levelname)s】 - 日志信息【%(message)s】', "%Y-%m-%d %H:%M:%S"), 2: logging.Formatter( '%(asctime)s - %(name)s - %(filename)s - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s', "%Y-%m-%d %H:%M:%S"), 3: logging.Formatter( '%(asctime)s - %(name)s - 【 File "%(pathname)s", line %(lineno)d, in %(funcName)s 】 - %(levelname)s - %(message)s', "%Y-%m-%d %H:%M:%S"), # 一个模仿traceback异常的可跳转到打印日志地方的模板 4: logging.Formatter( '%(asctime)s - %(name)s - "%(filename)s" - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s - File "%(pathname)s", line %(lineno)d ', "%Y-%m-%d %H:%M:%S"), # 这个也支持日志跳转 5: logging.Formatter( '%(asctime)s - %(name)s - "%(pathname)s:%(lineno)d" - %(funcName)s - %(levelname)s - %(message)s', "%Y-%m-%d %H:%M:%S"), # 我认为的最好的模板,推荐 6: logging.Formatter('%(name)s - %(asctime)-15s - %(filename)s - %(lineno)d - %(levelname)s: %(message)s', "%Y-%m-%d %H:%M:%S"), 7: logging.Formatter('%(levelname)s - %(filename)s - %(lineno)d - %(message)s'), # 一个只显示简短文件名和所处行数的日志模板 } # noinspection PyMissingOrEmptyDocstring class LogLevelException(Exception): def __init__(self, log_level): err = '设置的日志级别是 {0}, 设置错误,请设置为1 2 3 4 5 范围的数字'.format(log_level) Exception.__init__(self, err) # noinspection PyMissingOrEmptyDocstring class MongoHandler(logging.Handler): """ 一个mongodb的log handler,支持日志按loggername创建不同的集合写入mongodb中 """ # msg_pattern = re.compile('(d+-d+-d+ d+:d+:d+) - (S*?) - (S*?) - (d+) - (S*?) - ([sS]*)') def __init__(self, mongo_url, mongo_database='logs'): """ :param mongo_url: mongo连接 :param mongo_database: 保存日志的数据库,默认使用logs数据库 """ logging.Handler.__init__(self) mongo_client = pymongo.MongoClient(mongo_url) self.mongo_db = mongo_client.get_database(mongo_database) def emit(self, record): # noinspection PyBroadException, PyPep8 try: """以下使用解析日志模板的方式提取出字段""" # msg = self.format(record) # logging.LogRecord # msg_match = self.msg_pattern.search(msg) # log_info_dict = {'time': msg_match.group(1), # 'name': msg_match.group(2), # 'file_name': msg_match.group(3), # 'line_no': msg_match.group(4), # 'log_level': msg_match.group(5), # 'detail_msg': msg_match.group(6), # } level_str = None if record.levelno == 10: level_str = 'DEBUG' elif record.levelno == 20: level_str = 'INFO' elif record.levelno == 30: level_str = 'WARNING' elif record.levelno == 40: level_str = 'ERROR' elif record.levelno == 50: level_str = 'CRITICAL' log_info_dict = OrderedDict() log_info_dict['time'] = time.strftime('%Y-%m-%d %H:%M:%S') log_info_dict['name'] = record.name log_info_dict['file_path'] = record.pathname log_info_dict['file_name'] = record.filename log_info_dict['func_name'] = record.funcName log_info_dict['line_no'] = record.lineno log_info_dict['log_level'] = level_str log_info_dict['detail_msg'] = record.msg col = self.mongo_db.get_collection(record.name) col.insert_one(log_info_dict) except (KeyboardInterrupt, SystemExit): raise except Exception: self.handleError(record) class KafkaHandler(logging.Handler): """ 日志批量写入kafka中。 """ ES_INTERVAL_SECONDS = 0.5 host_name = socket.gethostname() host_process = f'{host_name} -- {os.getpid()}' script_name = sys.argv[0] task_queue = Queue() last_es_op_time = time.time() has_start_do_bulk_op = False kafka_producer = None es_index_prefix = 'pylog-' def __init__(self, bootstrap_servers, **configs): """ :param elastic_hosts: es的ip地址,数组类型 :param elastic_port: es端口 :param index_prefix: index名字前缀。 """ logging.Handler.__init__(self) producer = KafkaProducer(bootstrap_servers=bootstrap_servers, **configs) if not self.__class__.kafka_producer: self.__class__.kafka_producer = producer t = Thread(target=self._do_bulk_op) t.setDaemon(True) t.start() @classmethod def __add_task_to_bulk(cls, task): cls.task_queue.put(task) # noinspection PyUnresolvedReferences @classmethod def __clear_bulk_task(cls): cls.task_queue.queue.clear() @classmethod def _do_bulk_op(cls): if cls.has_start_do_bulk_op: return cls.has_start_do_bulk_op = True # very_nb_print(cls.kafka_producer) while 1: try: if cls.task_queue.qsize() > 10000: very_nb_print('kafka防止意外日志积累太多了,内存泄漏') cls.__clear_bulk_task() return # noinspection PyUnresolvedReferences tasks = list(cls.task_queue.queue) cls.__clear_bulk_task() for task in tasks: topic = (cls.es_index_prefix + task['name']).replace('.', '').replace('_', '').replace('-', '') # very_nb_print(topic) cls.kafka_producer.send(topic, json.dumps(task).encode()) cls.last_es_op_time = time.time() except Exception as e: very_nb_print(e) finally: time.sleep(cls.ES_INTERVAL_SECONDS) def emit(self, record): # noinspection PyBroadException, PyPep8 try: level_str = None if record.levelno == 10: level_str = 'DEBUG' elif record.levelno == 20: level_str = 'INFO' elif record.levelno == 30: level_str = 'WARNING' elif record.levelno == 40: level_str = 'ERROR' elif record.levelno == 50: level_str = 'CRITICAL' log_info_dict = OrderedDict() log_info_dict['@timestamp'] = datetime.datetime.utcfromtimestamp(record.created).isoformat() log_info_dict['time'] = time.strftime('%Y-%m-%d %H:%M:%S') log_info_dict['name'] = record.name log_info_dict['host'] = self.host_name log_info_dict['host_process'] = self.host_process log_info_dict['file_path'] = record.pathname log_info_dict['file_name'] = record.filename log_info_dict['func_name'] = record.funcName log_info_dict['line_no'] = record.lineno log_info_dict['log_level'] = level_str log_info_dict['msg'] = str(record.msg) log_info_dict['script'] = self.script_name log_info_dict['es_index'] = f'{self.es_index_prefix}{record.name.lower()}' self.__add_task_to_bulk(log_info_dict) except (KeyboardInterrupt, SystemExit): raise except Exception: self.handleError(record) class ElasticHandler000(logging.Handler): """ 日志批量写入es中。 """ ES_INTERVAL_SECONDS = 2 host_name = socket.gethostname() def __init__(self, elastic_hosts: list, elastic_port, index_prefix='pylog-'): """ :param elastic_hosts: es的ip地址,数组类型 :param elastic_port: es端口 :param index_prefix: index名字前缀。 """ logging.Handler.__init__(self) self._es_client = Elasticsearch(elastic_hosts, port=elastic_port) self._index_prefix = index_prefix self._task_list = [] self._task_queue = Queue() self._last_es_op_time = time.time() t = Thread(target=self._do_bulk_op) t.setDaemon(True) t.start() def __add_task_to_bulk(self, task): self._task_queue.put(task) def __clear_bulk_task(self): # noinspection PyUnresolvedReferences self._task_queue.queue.clear() def _do_bulk_op(self): while 1: try: if self._task_queue.qsize() > 10000: very_nb_print('防止意外日志积累太多了,不插入es了。') self.__clear_bulk_task() return # noinspection PyUnresolvedReferences tasks = list(self._task_queue.queue) self.__clear_bulk_task() helpers.bulk(self._es_client, tasks) self._last_es_op_time = time.time() except Exception as e: very_nb_print(e) finally: time.sleep(1) def emit(self, record): # noinspection PyBroadException, PyPep8 try: level_str = None if record.levelno == 10: level_str = 'DEBUG' elif record.levelno == 20: level_str = 'INFO' elif record.levelno == 30: level_str = 'WARNING' elif record.levelno == 40: level_str = 'ERROR' elif record.levelno == 50: level_str = 'CRITICAL' log_info_dict = OrderedDict() log_info_dict['@timestamp'] = datetime.datetime.utcfromtimestamp(record.created).isoformat() log_info_dict['time'] = time.strftime('%Y-%m-%d %H:%M:%S') log_info_dict['name'] = record.name log_info_dict['host'] = self.host_name log_info_dict['file_path'] = record.pathname log_info_dict['file_name'] = record.filename log_info_dict['func_name'] = record.funcName log_info_dict['line_no'] = record.lineno log_info_dict['log_level'] = level_str log_info_dict['msg'] = str(record.msg) self.__add_task_to_bulk({ "_index": f'{self._index_prefix}{record.name.lower()}', "_type": f'{self._index_prefix}{record.name.lower()}', "_source": log_info_dict }) # self.__add_task_to_bulk({ # "_index": f'{self._index_prefix}{record.name.lower()}', # "_type": f'{self._index_prefix}{record.name.lower()}', # "_source": log_info_dict # }) # if time.time() - self._last_es_op_time > self.ES_INTERVAL_SECONDS: # self._do_bulk_op() except (KeyboardInterrupt, SystemExit): raise except Exception: self.handleError(record) # noinspection PyUnresolvedReferences class ElasticHandler(logging.Handler): """ 日志批量写入es中。 """ ES_INTERVAL_SECONDS = 0.5 host_name = socket.gethostname() host_process = f'{host_name} -- {os.getpid()}' script_name = sys.argv[0] task_queue = Queue() last_es_op_time = time.time() has_start_do_bulk_op = False def __init__(self, elastic_hosts: list, elastic_port, index_prefix='pylog-'): """ :param elastic_hosts: es的ip地址,数组类型 :param elastic_port: es端口 :param index_prefix: index名字前缀。 """ logging.Handler.__init__(self) self._es_client = Elasticsearch(elastic_hosts, port=elastic_port) self._index_prefix = index_prefix t = Thread(target=self._do_bulk_op) t.setDaemon(True) t.start() @classmethod def __add_task_to_bulk(cls, task): cls.task_queue.put(task) # noinspection PyUnresolvedReferences @classmethod def __clear_bulk_task(cls): cls.task_queue.queue.clear() def _do_bulk_op(self): if self.__class__.has_start_do_bulk_op: return self.__class__.has_start_do_bulk_op = True while 1: try: if self.__class__.task_queue.qsize() > 10000: very_nb_print('防止意外日志积累太多了,不插入es了。') self.__clear_bulk_task() return tasks = list(self.__class__.task_queue.queue) self.__clear_bulk_task() helpers.bulk(self._es_client, tasks) self.__class__.last_es_op_time = time.time() except Exception as e: very_nb_print(e) finally: time.sleep(self.ES_INTERVAL_SECONDS) def emit(self, record): # noinspection PyBroadException, PyPep8 try: level_str = None if record.levelno == 10: level_str = 'DEBUG' elif record.levelno == 20: level_str = 'INFO' elif record.levelno == 30: level_str = 'WARNING' elif record.levelno == 40: level_str = 'ERROR' elif record.levelno == 50: level_str = 'CRITICAL' log_info_dict = OrderedDict() log_info_dict['@timestamp'] = datetime.datetime.utcfromtimestamp(record.created).isoformat() log_info_dict['time'] = time.strftime('%Y-%m-%d %H:%M:%S') log_info_dict['name'] = record.name log_info_dict['host'] = self.host_name log_info_dict['host_process'] = self.host_process log_info_dict['file_path'] = record.pathname log_info_dict['file_name'] = record.filename log_info_dict['func_name'] = record.funcName log_info_dict['line_no'] = record.lineno log_info_dict['log_level'] = level_str log_info_dict['msg'] = str(record.msg) log_info_dict['script'] = self.script_name self.__add_task_to_bulk({ "_index": f'{self._index_prefix}{record.name.lower()}', "_type": f'{self._index_prefix}{record.name.lower()}', "_source": log_info_dict }) # self.__add_task_to_bulk({ # "_index": f'{self._index_prefix}{record.name.lower()}', # "_type": f'{self._index_prefix}{record.name.lower()}', # "_source": log_info_dict # }) # if time.time() - self._last_es_op_time > self.ES_INTERVAL_SECONDS: # self._do_bulk_op() except (KeyboardInterrupt, SystemExit): raise except Exception: self.handleError(record) class ColorHandler000(logging.Handler): """彩色日志handler,根据不同级别的日志显示不同颜色""" bule = 96 if os_name == 'nt' else 36 yellow = 93 if os_name == 'nt' else 33 def __init__(self): logging.Handler.__init__(self) self.formatter_new = logging.Formatter( '%(asctime)s - %(name)s - "%(filename)s" - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s', "%Y-%m-%d %H:%M:%S") # 对控制台日志单独优化显示和跳转,单独对字符串某一部分使用特殊颜色,主要用于第四种模板,以免filehandler和mongohandler中带有 33 @classmethod def _my_align(cls, string, length): if len(string) > length * 2: return string custom_length = 0 for w in string: custom_length += 1 if cls._is_ascii_word(w) else 2 if custom_length < length: place_length = length - custom_length string += ' ' * place_length return string @staticmethod def _is_ascii_word(w): if ord(w) < 128: return True def emit(self, record): """ 30 40 黑色 31 41 红色 32 42 绿色 33 43 黃色 34 44 蓝色 35 45 紫红色 36 46 青蓝色 37 47 白色 :type record:logging.LogRecord :return: """ if self.formatter is formatter_dict[4] or self.formatter is self.formatter_new: self.formatter = self.formatter_new if os.name == 'nt': self.__emit_for_fomatter4_pycahrm(record) # 使用模板4并使用pycharm时候 else: self.__emit_for_fomatter4_linux(record) # 使用模板4并使用linux时候 else: self.__emit(record) # 其他模板 def __emit_for_fomatter4_linux(self, record): """ 当使用模板4针对linxu上的终端打印优化显示 :param record: :return: """ # noinspection PyBroadException,PyPep8 try: msg = self.format(record) file_formatter = ' ' * 10 + '