zoukankan      html  css  js  c++  java
  • Flask 工厂模式注册初始化logger

    工厂模式的意义

    引用官方解释:

    If you are already using packages and blueprints for your application (Modular Applications with Blueprints) there are a couple of really nice ways to further improve the experience. A common pattern is creating the application object when the blueprint is imported. But if you move the creation of this object into a function, you can then create multiple instances of this app later.

    So why would you want to do this?

    1. Testing. You can have instances of the application with different settings to test every case.
    2. Multiple instances. Imagine you want to run different versions of the same application. Of course you could have multiple instances with different configs set up in your webserver, but if you use factories, you can have multiple instances of the same application running in the same application process which can be handy.

    大概意思是:

    1. 为了方便测试,你可以随心所欲的切换不同的配置实例来测试你的用例。
    2. 相同的应用,你可以利用不同的配置实例来进行初始化,达到运行不同版本应用的目的。

    官方实例的应用(autoapp.py):

    def create_app(config_filename):
        app = Flask(__name__)
        app.config.from_pyfile(config_filename)
    
        from yourapplication.model import db
        db.init_app(app)
    
        from yourapplication.views.admin import admin
        from yourapplication.views.frontend import frontend
        app.register_blueprint(admin)
        app.register_blueprint(frontend)
    
        return app
    

    初始化

    为了能让我们的日志初始化添加如下代码(autoapp.py):

    def register_logging(app):
        app.config.setdefault("LOG_PATH", "application.log")
    
        log_formatter = "%(asctime)s [%(thread)d:%(threadName)s] %(filename)s:%(module)s:%(funcName)s in %(lineno)d] [%(levelname)s]: %(message)s"
        app.config.setdefault("LOG_FORMATTER", log_formatter)
        app.config.setdefault("LOG_MAX_BYTES", 50 * 1024 * 1024)
        app.config.setdefault("LOG_BACKUP_COUNT", 10)
        app.config.setdefault("LOG_INTERVAL", 1)
        app.config.setdefault("LOG_WHEN", "D")
        app.config.setdefault("LOG_LEVEL", "INFO")
    
        formatter = logging.Formatter(app.config["LOG_FORMATTER"])
        # 将日志输出到文件
        # 指定间隔时间自动生成文件的处理器
        # 实例化TimedRotatingFileHandler
        # interval是时间间隔,
        # backupCount是备份文件的个数,如果超过这个个数,就会自动删除
        # when是间隔的时间单位,单位有以下几种:
        # S 秒
        # M 分
        # H 小时、
        # D 天、
        # W 每星期(interval==0时代表星期一)
        # midnight 每天凌晨
        timed_rotating_file_handler = TimedRotatingFileHandler(
            filename=app.config["LOG_PATH"],
            interval=app.config["LOG_INTERVAL"],
            when=app.config["LOG_WHEN"],
            backupCount=app.config["LOG_BACKUP_COUNT"],
            encoding="utf-8",
        )
    
        timed_rotating_file_handler.setFormatter(formatter)  # 设置文件里写入的格式
        timed_rotating_file_handler.setLevel(app.config["LOG_LEVEL"])
    
        # StreamHandler
        stream_handler = StreamHandler()
        stream_handler.setFormatter(formatter)
        stream_handler.setLevel(app.config["LOG_LEVEL"])
    
        # SMTPHandler
        mail_handler = DelaySMTPHandler(
            mailhost=app.config["MAILHOST"],
            credentials=app.config["CREDENTIALS"],
            fromaddr=app.config["FROMADDR"],
            toaddrs=app.config["TOADDRS"],
            subject=app.config["SUBJECT"],
        )
        mail_handler.setLevel(logging.ERROR)
        mail_handler.setFormatter(formatter)
    
        # 删除默认的handler
        # app.logger.removeHandler(default_handler)
    
        # 设置logger
        for logger in (
            app.logger,
            logging.getLogger("sqlalchemy"),
            logging.getLogger("werkzeug"),
        ):
            logger.addHandler(stream_handler)
            logger.addHandler(timed_rotating_file_handler)
            if os.getenv("FLASK_ENV") == "production":
                logger.addHandler(mail_handler)
    
        # set logger for elk
        # stash_handler = logstash.LogstashHandler(
        #     app.config.get('ELK_HOST'),
        #     app.config.get('ELK_PORT')
        # )
        # root_logger.addHandler(stashHandler)
    

    使用log

    from flask import current_app
    
    current_app.logger.info("hello world")
    

    这样做的好处

    1. 可以多环境随意切换配置,而不用更改代码
    2. 日志配置在setting.py统一配置管理

    完整代码

    autoapp.py:

    def create_app(config_filename):
        app = Flask(__name__)
        app.config.from_pyfile(config_filename)
    
        from yourapplication.model import db
        db.init_app(app)
        
        register_logging(app)
    
        from yourapplication.views.admin import admin
        from yourapplication.views.frontend import frontend
        app.register_blueprint(admin)
        app.register_blueprint(frontend)
    
        return app
    
    
    def register_logging(app):
        app.config.setdefault("LOG_PATH", "application.log")
    
        log_formatter = "%(asctime)s [%(thread)d:%(threadName)s] %(filename)s:%(module)s:%(funcName)s in %(lineno)d] [%(levelname)s]: %(message)s"
        app.config.setdefault("LOG_FORMATTER", log_formatter)
        app.config.setdefault("LOG_MAX_BYTES", 50 * 1024 * 1024)
        app.config.setdefault("LOG_BACKUP_COUNT", 10)
        app.config.setdefault("LOG_INTERVAL", 1)
        app.config.setdefault("LOG_WHEN", "D")
        app.config.setdefault("LOG_LEVEL", "INFO")
    
        formatter = logging.Formatter(app.config["LOG_FORMATTER"])
        # 将日志输出到文件
        # 指定间隔时间自动生成文件的处理器
        # 实例化TimedRotatingFileHandler
        # interval是时间间隔,
        # backupCount是备份文件的个数,如果超过这个个数,就会自动删除
        # when是间隔的时间单位,单位有以下几种:
        # S 秒
        # M 分
        # H 小时、
        # D 天、
        # W 每星期(interval==0时代表星期一)
        # midnight 每天凌晨
        timed_rotating_file_handler = TimedRotatingFileHandler(
            filename=app.config["LOG_PATH"],
            interval=app.config["LOG_INTERVAL"],
            when=app.config["LOG_WHEN"],
            backupCount=app.config["LOG_BACKUP_COUNT"],
            encoding="utf-8",
        )
    
        timed_rotating_file_handler.setFormatter(formatter)  # 设置文件里写入的格式
        timed_rotating_file_handler.setLevel(app.config["LOG_LEVEL"])
    
        # StreamHandler
        stream_handler = StreamHandler()
        stream_handler.setFormatter(formatter)
        stream_handler.setLevel(app.config["LOG_LEVEL"])
    
        # SMTPHandler
        mail_handler = DelaySMTPHandler(
            mailhost=app.config["MAILHOST"],
            credentials=app.config["CREDENTIALS"],
            fromaddr=app.config["FROMADDR"],
            toaddrs=app.config["TOADDRS"],
            subject=app.config["SUBJECT"],
        )
        mail_handler.setLevel(logging.ERROR)
        mail_handler.setFormatter(formatter)
    
        # 删除默认的handler
        # app.logger.removeHandler(default_handler)
    
        # 设置logger
        for logger in (
            app.logger,
            logging.getLogger("sqlalchemy"),
            logging.getLogger("werkzeug"),
        ):
            logger.addHandler(stream_handler)
            logger.addHandler(timed_rotating_file_handler)
            if os.getenv("FLASK_ENV") == "production":
                logger.addHandler(mail_handler)
    
        # set logger for elk
        # stash_handler = logstash.LogstashHandler(
        #     app.config.get('ELK_HOST'),
        #     app.config.get('ELK_PORT')
        # )
        # root_logger.addHandler(stashHandler)
    

    settings.py

    # -*- coding: utf-8 -*-
    """Application configuration.
    """
    import os
    
    BASE_DIR = os.path.dirname(__file__)
    
    
    class Config(object):
        # Base settings #################################################
        DEBUG = False
        TESTING = False
        SECRET_KEY = ""
        BUNDLE_ERRORS = True
    
        # 日志配置 ###############################################################
        LOG_PATH = os.path.join(BASE_DIR, "logs", "falling-wind-service.log")
        LOG_FORMATTER = (
            "%(asctime)s [%(name)s] [%(thread)d:%(threadName)s] "
            "%(filename)s:%(module)s:%(funcName)s "
            "in %(lineno)d] "
            "[%(levelname)s]: %(message)s"
        )
        LOG_MAX_BYTES = 50 * 1024 * 1024  # 日志文件大小
        LOG_BACKUP_COUNT = 10  # 备份文件数量
        LOG_INTERVAL = 1
        LOG_WHEN = "D"
    
        # 数据库配置 ####################################################
        SQLALCHEMY_ENGINE_OPTIONS = {
            "pool_timeout": 10,  # 默认链接超时时长
            "pool_size": 10,  # 数据库链接池大小
        }
        # 提供多库链接 使用其他库进行链接的时候需要使用bind指定那个库使用
        SQLALCHEMY_BINDS = {}
        SQLALCHEMY_TRACK_MODIFICATIONS = True
    
        # Celery ##################################################################
        enable_utc = True
        timezone = "Asia/Shanghai"
        # or the actual content-type (MIME)
        accept_content = ["application/json"]
        # or the actual content-type (MIME)
        result_accept_content = ["application/json"]
        include = ["app_tasks.user_tasks"]
        result_expires = 3600
    
        # JWT ####################################################################
        JWT_SECRET_KEY = ""
        # JWT_BLACKLIST_ENABLED = False
        # JWT_BLACKLIST_TOKEN_CHECKS = ['access', 'refresh']
    
    
    class Pro(Config):
        ENV = "product"
    
        # 日志 ##################################################################
        LOG_PATH = "your application log path"
    
        # DB ##################################################################
        DB_HOST = ""
        DB_PORT = 3306
        DB_DATABASE = ""
        DB_USER = ""
        DB_PASSWORD = ""
        SQLALCHEMY_DATABASE_URI = "mysql+pymysql://{}:{}@{}:{}/{}".format(
            DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_DATABASE
        )
    
        # Redis ####################################################
        REDIS_URL = ""
    
        # Mail ########################################################
        MAIL_SERVER = "smtp.qq.com"
        MAIL_PORT = 465
        MAIL_USE_SSL = True
        MAIL_USERNAME = ""
        MAIL_PASSWORD = ""
    
        # Celery ###########################################################
        # Broker settings.
        broker_url = ""
        # Using the redis to store task state and results.
        result_backend = ""
    
        # JWT ###############################################################
        # JWT_ACCESS_TOKEN_EXPIRES = 60 * 60
        JWT_SECRET_KEY = ""
    
        # SMTPHandler ######################################################
        MAILHOST = ("smtp.qq.com", 465)
        CREDENTIALS = ("", "")
        FROMADDR = ""
        TOADDRS = [""]
        SUBJECT = ""
        SECURE = ("SSL",)
    
    
    class Dev(Config):
        DEBUG = True
        ENV = "dev"
    
        # 日志 ##################################################################
        LOG_PATH = "your path"
    
        # DB ####################################################################
        DB_HOST = "localhost"
        DB_PORT = 3306
        DB_DATABASE = ""
        DB_USER = ""
        DB_PASSWORD = ""
        SQLALCHEMY_DATABASE_URI = "mysql+pymysql://{}:{}@{}:{}/{}".format(
            DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_DATABASE
        )
    
        # Redis ####################################################
        REDIS_URL = ""
    
        # Mail ########################################################
        MAIL_SERVER = "smtp.qq.com"
        MAIL_PORT = 25
        MAIL_USE_TLS = True
        MAIL_USERNAME = ""
        MAIL_PASSWORD = ""
    
        # Celery ###################################################################
        # Broker settings.
        broker_url = ""
        # Using the redis to store task state and results.
        result_backend = ""
    
        # JWT ###############################################################
        JWT_ACCESS_TOKEN_EXPIRES = 60 * 60
        JWT_SECRET_KEY = ""
    
        # SMTPHandler ######################################################
        MAILHOST = ("smtp.qq.com", 465)
        CREDENTIALS = ("", "")
        FROMADDR = ""
        TOADDRS = [""]
        SUBJECT = ""
    
    
    class Test(Config):
        TESTING = True
        DEBUG = True
        ENV = "test"
    
    

    app.py

    # -*- coding: utf-8 -*-
    """Create an application instance."""
    from autoapp import create_app
    import os
    from extensions import celery
    
    if os.getenv("FLASK_ENV") == "development":
        app = create_app("settings.Dev")
    elif os.getenv("FLASK_ENV") == "production":
        app = create_app("settings.Pro")
    else:
        raise EnvironmentError("Please set FLASK_ENV !!!")
    
    celery = celery
    

    运行flask

    windows

    $ set FLASK_ENV=development
    $ flask run
    

    Linux

    $ export FLASK_ENV=development
    $ flask run
    

    Enjoy your code!

    不积跬步,无以至千里。
  • 相关阅读:
    你所不了解的静态路由特点及配置
    程序员进阶中--说说这一年的“酸甜苦辣”
    前序、中序、后序遍历的多种非递归实现
    spring依赖注入单元测试:expected single matching bean but found 2
    汉语-汉字:効、效
    汉语-词语:悃愊
    汉语-词语:宽容
    System.Threading.Tasks.TaskFactory.cs
    汉语-词语:高明
    唐-诗:《山居秋暝》
  • 原文地址:https://www.cnblogs.com/DeaconOne/p/12557456.html
Copyright © 2011-2022 走看看