zoukankan      html  css  js  c++  java
  • python-flask框架

    面试的时候聊:
            1. Flask中令你影响深刻的是什么?
                - 路由系统
                    - 装饰器,带参数的装饰器
                    - 额外装饰器
                - 特殊的装饰器
            2.    有没有遇到印象深刻:
                - 本地线程 
                - 最大共享数(文档中写的是最大共享数,但是看源码实现时发现pymysql.threadsafety=1有关),无用。

    1. flask知识点:    - flask依赖wsgi,实现wsgi协议的模块:wsgiref(django),werkzeug,uwsgi

            - 创建Flask对象
                app = Flask(__name__)
              def __init__(self, import_name, static_url_path=None,
                                 static_folder='static', template_folder='templates',
                                 instance_path=None, instance_relative_config=False,
                                 root_path=None):
               - 当前模块名          
    - 静态文件文件前缀 - 静态文件文件夹位置 - 模板路径 - 配置文件寻找位置: from flask import Flask app = Flask(__name__,instance_path=None, instance_relative_config=True) #只有当 引用文件的方式是 配置文件时,instance这两个参数才有用 #instance_path是指定 从哪个位置开始找settings文件
                   #__name__ 是当前模块名,当模块被直接运行时模块名为 __main__ 。这句话的意思就是,当模块被直接运行时,以下代码块将被运行,当模块是被导入时,代码块不被运行。
                   app.config.from_pyfile('settings.py') # C:UsersAdministratorPycharmProjectss6day1201.实例化补充 if __name__ == '__main__': app.run() - 配置文件 - 推荐使用对象方式 方式一: app.config['SESSION_COOKIE_NAME'] = 'session_lvning' # 方式二: app.config.from_pyfile('settings.py') settings.py : XXX=123 方式三: import os os.environ['FLAKS-SETTINGS'] = 'settings.py' app.config.from_envvar('FLAKS-SETTINGS') 方式四: app.config.from_object('settings.DevConfig') settings.py : class BaseConfig(object): NNN = 123 class TestConfig(BaseConfig): DB = '127.0.0.1' class DevConfig(BaseConfig): DB = '192.168.1.1' class ProConfig(BaseConfig): DB = '47.18.1.1' - 路由系统 - 通过带参数的装饰器实现的路由关系 注意:其他的装饰器要写在路由装饰器下面 - 两种形式: 第一种: @app.route('/xxxx') def index(): return "Index" 第二种: def index(): return "Index" app.add_url_rule('/xxx', "n1", index) #n1 是别名 - 将函数和url封装到一个 Rule对象 - 将Role对象添加到 app.url_map(Map对象) - 参数: (url路径,endpoint,视图函数名,method=[],default,redirect_to,strict_slashes,subdomain) endpoint 是路由的别名,不写时默认是被装饰的函数名; 反向生成用url_for 来实现 url_for("aaa",**dic)或者 url_for("aaa",x=1) method 允许的请求方式 /index/<int:nid> 不写类型的时候默认是 字符串 def index(nid): print(nid) defaults=None, 默认值,当URL中无参数,函数需要参数时,使用defaults={'k':'v'}为函数提供参数 strict_slashes=None, 对URL最后的 / 符号是否严格要求, redirect_to 直接重定向,跳转的url需要参数时,也得传参,注意:不用加类型 redirect_to("/index/<nid>") subdomain=None 二级域名 :详见day118-06 首先你要有一个主域名 动态的二级域名 - 在本地hosts文件中找IP C:WindowsSystem32driversetc ios/lenux系统是在/etc/hosts from flask import Flask,render_template,request,redirect,session,url_for app = Flask(__name__) app.config['SERVER_NAME'] = 'bjg.com:5000' @app.route("/index",subdomain='<xxxxx>') def index(xxxxx): return "%s.bjg.com" %(xxxxx,) if __name__ == '__main__': app.run() - 扩展Flask的路由系统,让他支持正则 from flask import Flask,url_for app = Flask(__name__) # 定义转换的类 from werkzeug.routing import BaseConverter class RegexConverter(BaseConverter): """ 自定义URL匹配正则表达式 """ def __init__(self, map, regex): super(RegexConverter, self).__init__(map) self.regex = regex def to_python(self, value): """ 路由匹配时,匹配成功后传递给视图函数中参数的值 :param value: :return: """ return int(value) def to_url(self, value): """ 使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数 :param value: :return: """ val = super(RegexConverter, self).to_url(value) return val # 添加到converts中 app.url_map.converters['xxx'] = RegexConverter # 进行使用 @app.route('/index/<xxx("d+"):nid>',endpoint='xx') def index(nid): url_for('xx',nid=123) return "Index" if __name__ == '__main__': app.run() - 视图函数 请求: request.files 文件信息 request.values 所有的信息 request.form post请求 request.args get请求 响应: reutrn render_template() reutrn redirect() return "" return jsonify(name='alex',age='18') 返回json格式数据 make_response 每一个return的时候flask都做处理 → make_response(返回值), 可以设置cookie 及session 或者更多的其他内容 eg: response = make_response('xxxxx') response.headers['xxx'] = '123123' return response - CBV、FBV 回顾django的cbv: url(r'^login_cbv/',views.Login.as_view) from django.views import View class Login(View): def get(self,request): return render(request,"login.html") def post(self,request): return HttpResponse("post...") 如果是get请求就走 get方法,如果是post请求就走post方法。 flask的cbv def auth(func): def inner(*args, **kwargs): result = func(*args, **kwargs) return result return inner class IndexView(views.MethodView): methods = ['POST'] #只写POST时,只有请求时post时才生效,也就是get函数不执行 decorators = [auth,] 为每一个函数都加上auth装饰器 def get(self): v = url_for('index') print(v) return "GET" def post(self): return "GET" app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) if __name__ == '__main__': app.run() 零碎知识点: root_path 根路径 to_dict() 把url变成字典 urllib.parse 引入 urlencode 把字典变成url形式 quote 和 unquote 把汉字变成乱码/把乱码变成汉字 self.__class__ 找到对象的类 - 模板 1.方法不会自动执行,要加括号,字典也可以用get方法 2.Makeup相当于django的 mark_safe , 或者在前端页面上 |safe Markup("<input type='text' />") 引出 xss 攻击 3.自定义的 标签和 过滤器 @ ,在页面上写的区别, 3.1 需要把test传给前端 def test(a1,a2): return a1+a2 return render_template('index.html',test=test) 使用:{{test(1,19)}} 3.2 所有的模板都可以用 @app.template_global() #加上这个装饰器以后就不需要传了,所有的页面直接就可以使用 def sb(a1, a2): return a1 + a2 + 100 使用:{{sb(1,2)}} 3.3 所有的模板都可以用 @app.template_filter() def db(a1, a2, a3): return a1 + a2 + a3 使用:{{ 1|db(2,3)}} 4.模板的继承和django一样的,include也是一样的 5.hong: {% macro xxxx(name, type='text', value='') %} <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> <input type="{{ type }}" name="{{ name }}" value="{{ value }}"> {% endmacro %} {{ xxxx('n1') }} - 蓝图 - 项目目录规则化 (把一个py文件分成多个py文件 ) 蓝图:小中型项目:结构 大中型项目:结构 - 特殊装饰器 app.before_first_request app.before_request app.after_request 必须要有返回值,并且得有一个形参 - session 设置值 session['xx']=1 取值 session.get('xx') 超时时间的设置 session超时时间如何设置? PERMANENT_SESSION_LIFETIME app.config['SESSION_COOKIE_NAME'] = 'session_lvning' """ 'SESSION_COOKIE_NAME': 'session', 'SESSION_COOKIE_DOMAIN': None, 'SESSION_COOKIE_PATH': None, 'SESSION_COOKIE_HTTPONLY': True, 'SESSION_COOKIE_SECURE': False, 'SESSION_REFRESH_EACH_REQUEST': True, #是否实时更新 'PERMANENT_SESSION_LIFETIME': timedelta(days=31) """ session本质是操作的一个字典,在修改session的时候是在内存中操作的,等在return之前 把内存的session字典再返回给你open_session / save_session - flash 同session的原理,区别在于flash取一次就没了 设置值,flash('xxxx') 取值,get_flashed_messages()

    2.上下文管理

         什么是上写文?

      上下文个人理解是:程序运行时相关的周围环境,flask里的上下文是指:在请求刚进来的时候把某个数据或者变量放到一个栈里,等你后面什么时候用就去栈里取就行了。在java 和php中上下文被叫做 HttpRequestContext

            a. 创建Local类:
                {
                    线程或协程唯一标识: { 'stack':[request],'xxx':[session,] },
                    线程或协程唯一标识: { 'stack':[] },
                    线程或协程唯一标识: { 'stack':[] },
                    线程或协程唯一标识: { 'stack':[] },
                }
            b. 本质
                当请求进来之后,将请求相关数据添加到 [request,]
                以后使用时:去读取
                请求完成之后,将request从列表中移除。
                
            c. 关系
                local= 宋康 = {
                    线程或协程唯一标识: { 'stack':[] },
                    线程或协程唯一标识: { 'stack':[] },
                    线程或协程唯一标识: { 'stack':[] },
                    线程或协程唯一标识: { 'stack':[] },
                }
                
                stack=强哥={
                    push
                    top
                    pop
                }
                
                存取数据时,要基于stack来做。
                
            d. Flask和Django区别?
                - 请求相关数据传递方式
                    - Django: 参数 
                    - Flask: 基于 Local,LocalStack对象
                    - 问题:多个请求到来会不会混淆
                        - 单线程
                        - 多线程
                        - 协程
                        解决: from greenlet import getcurrent as get_ident

     3. 数据库连接池

            3.1本地线程    -每一个线程来的时候,都分配一个标示,也就是说每个线程都有自己的数据信息,当取值的时候,只取自己线程的数据,这样实现了线程之间的数据隔离
      
                            import threading
                            import time
                            # 本地线程对象
                            local_values = threading.local()
    
                            def func(num):
    
                                """
                                # 第一个线程进来,本地线程对象会为他创建一个唯一标识
                                # 第二个线程进来,本地线程对象会为他创建一个唯一标识
                                {
                                    线程1的唯一标识:{name:1},
                                    线程2的唯一标识:{name:2},
                                }
    
                                """
                                local_values.name = num 
                                # 线程停下来了
                                time.sleep(2)
                                # local_values.name,去local_values中根据自己的唯一标识作为key,获取value中name对应的值
                                print(local_values.name, threading.current_thread().name)
    
    
                            for i in range(5):
                                th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
                                th.start()
                
            3.2原来连接数据库的方式:   1  #缺点:每次请求反复创建数据库连接,连接数太多
                                             conn = pymysql.connect()
                                             cursor = conn.cursor()
                                             cursor.execute('select * from tb where id > %s',[5,])
                                             result = cursor.fetchall()
                                             cursor.close()
                                             conn.close()
                                             print(result)
    
                                        2     # 公用一个连接,多线程有问题,加锁→缺点,不能支持并发
                                             with LOCK:
                                                 cursor = CONN.cursor()
                                                 cursor.execute('select * from tb where id > %s', [5, ])
                                                 result = cursor.fetchall()
                                                 cursor.close()
                                            
                                                 print(result)
                                                
            3.3数据库连接池:
                    原理:设置连接池中最大连接数、默认启动时连接池中创建的连接数
    
    
                  3.3.1.为每个线程创建一个连接,该线程关闭时,不是真正关闭;本线程再次调用时,还是使用的最开始创建的连接。直到线程终止,数据库连接才关闭。(本质是用本地线程实现的)
                    ;当很多线程进来时还是会创建很多的连接,所以这种方法也不好。
                        from DBUtils.PersistentDB import PersistentDB
                        import pymysql
                        import threading
    
                        POOL = PersistentDB(
                            creator=pymysql,  # 使用链接数据库的模块
                            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
                            setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
                            ping=0,
                            # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
                            closeable=False,
                            # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
                            threadlocal=None,  # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
                            host='127.0.0.1',
                            port=3306,
                            user='root',
                            password='123',
                            database='pooldb',
                            charset='utf8'
                        )
    
                        def func():
                            conn = POOL.connection()
                            cursor = conn.cursor()
                            cursor.execute('select * from tb1')
                            result = cursor.fetchall()
                            cursor.close()
                            conn.close() # 不是真的关闭,而是假的关闭。
    
                        for i in range(10):
                            t = threading.Thread(target=func)
                            t.start()
                  
                  
                  3.3.2.创建一个连接池(10个连接),为所有线程提供连接,使用时来进行获取,使用完毕后,再次放回到连接池。      
                                        注意: 连接池中的所有连接都可以被重新使用,原因是因为pymsql.threadsafety的 个数是1
                                        
                                        
                        数据库连接池样板:  
                                xxx.py---------------    
                                        from flask import Flask
                                        from db import POOL
    
                                        app = Flask(__name__)
    
                                        @app.route('/index')
                                        def index():
                                          
                                            conn = POOL.connection()
                                            cursor = conn.cursor()
                                            cursor.execute('select * from tb1')
                                            result = cursor.fetchall()
                                            conn.close()
                                            
                                            return '执行成功'
    
                                        if __name__ == '__main__':
                                            app.run()
                        
                        
                                db.py---------------
                                        import pymysql
                                        from DBUtils.PooledDB import PooledDB, SharedDBConnection
                                        POOL = PooledDB(
                                            creator=pymysql,  # 使用链接数据库的模块
                                            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
                                            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
                                            maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
                                            maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的     threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
                                            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
                                            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
                                            setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
                                            ping=0,
                                            # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
                                            #ping : 0 或 4最常用
                                            host='127.0.0.1',
                                            port=3306,
                                            user='root',
                                            password='123',
                                            database='pooldb',
                                            charset='utf8'
                                        )
                                        
                                        
                                        
                                        
                        如果有三个线程来连接池(最大连接数:9个,初始化创建了5个)拿连接,有三种情况:
                            1. 他们三个先后顺序来,  同一个连接就够三个线程用了
                            2. 他们来的顺序不定,    有可能需要两个连接
                            3. 他们同时来的,        需要三个链接

    4. 单例模式 :    - 推荐:__new__

            - 文件
            - 基于类方法
                            
                # 单例模式:无法支持多线程情况
                """
                class Singleton(object):
    
                    def __init__(self):
                        import time
                        time.sleep(1)
    
                    @classmethod
                    def instance(cls, *args, **kwargs):
                        if not hasattr(Singleton, "_instance"):
                            Singleton._instance = Singleton(*args, **kwargs)
                        return Singleton._instance
    
                import threading
    
                def task(arg):
                    obj = Singleton.instance()
                    print(obj)
    
                for i in range(10):
                    t = threading.Thread(target=task,args=[i,])
                    t.start()
                """
    
                # # 单例模式:支持多线程情况
                """
                import time
                import threading
                class Singleton(object):
                    _instance_lock = threading.Lock()
    
                    def __init__(self):
                        time.sleep(1)
    
                    @classmethod
                    def instance(cls, *args, **kwargs):
                        if not hasattr(Singleton, "_instance"):
                            with Singleton._instance_lock:
                                if not hasattr(Singleton, "_instance"):
                                    Singleton._instance = Singleton(*args, **kwargs)
                        return Singleton._instance
    
    
                def task(arg):
                    obj = Singleton.instance()
                    print(obj)
                for i in range(10):
                    t = threading.Thread(target=task,args=[i,])
                    t.start()
                time.sleep(20)
                obj = Singleton.instance()
                print(obj)
                """
    
            - 基于__new__方法
            
                    引出:new,call,init 方法及区别
                        """
                    1.对象是类创建,创建对象时候类的__init__方法自动执行,对象()执行类的 __call__ 方法
                    2.类是type创建,创建类时候type的__init__方法自动执行,类() 执行type的 __call__方法(类的__new__方法,类的__init__方法)
    
                    # 第0步: 执行type的 __init__ 方法【类是type的对象】
                    class Foo:
                        def __init__(self):
                            pass
    
                        def __call__(self, *args, **kwargs):
                            pass
    
                    # 第1步: 执行type的 __call__ 方法
                    #        1.1  调用 Foo类(是type的对象)的 __new__方法,用于创建对象。
                    #        1.2  调用 Foo类(是type的对象)的 __init__方法,用于对对象初始化。
                    obj = Foo()
                    # 第2步:执行Foodef __call__ 方法
                    obj()
                    """
                    
                    
                    """
                    class SingletonType(type):
                        def __init__(self,*args,**kwargs):
                            super(SingletonType,self).__init__(*args,**kwargs)
    
                        def __call__(cls, *args, **kwargs):
                            obj = cls.__new__(cls,*args, **kwargs)
                            cls.__init__(obj,*args, **kwargs) # Foo.__init__(obj)
                            return obj
    
                    class Foo(metaclass=SingletonType):
                        def __init__(self,name):
                            self.name = name
                        def __new__(cls, *args, **kwargs):
                            return object.__new__(cls, *args, **kwargs)
    
                    obj = Foo('name')
                    """
                    
                    import threading
                    class Singleton(object):
                        _instance_lock = threading.Lock()
    
                        def __init__(self):
                            pass
    
                        def __new__(cls, *args, **kwargs):
                            if not hasattr(Singleton, "_instance"):
                                with Singleton._instance_lock:
                                    if not hasattr(Singleton, "_instance"):
                                        Singleton._instance = object.__new__(cls, *args, **kwargs)
                            return Singleton._instance
    
                    obj1 = Singleton()
                    obj2 = Singleton()
                    print(obj1,obj2)   #  <__main__.Singleton object at 0x0000029B7FE127F0>    <__main__.Singleton object at 0x0000029B7FE127F0>
    
            - 基于metaclass方法
                import threading
    
                class SingletonType(type):
                    _instance_lock = threading.Lock()
                    def __call__(cls, *args, **kwargs):
                        if not hasattr(cls, "_instance"):
                            with SingletonType._instance_lock:
                                if not hasattr(cls, "_instance"):
                                    cls._instance = super(SingletonType,cls).__call__(*args, **kwargs)
                        return cls._instance
    
                class Foo(metaclass=SingletonType):
                    def __init__(self,name):
                        self.name = name
    
    
                obj1 = Foo('name')
                obj2 = Foo('name')
                print(obj1,obj2)   #  <__main__.Foo object at 0x0000017B6A612898>   <__main__.Foo object at 0x0000017B6A612898>
            
            在哪应用了单例模式:
                a. stark组件
                b. 数据库连接池 

    5.Session

            5.1- 读Flask session源码
    
            5.2- 流程:
                当请求第一次进来,生成随机字符串:sidfnsdfisdfs
                    - 发给用户cookie
                    - 保存到session字典中
                    
                    PS: 调用 stack(强哥)将随机字符串和对应的值放到 local(宋康)
                    
                视图函数使用
                    session = LocalProxy(partial(_lookup_req_object, 'session'))
                    
                请求处理完毕
                    将session做持久化:
                        - 存到数据
                        - 存到redis
                        - 存到加密cookie中
            
            5.3- 自定义session:
                from flask import Flask,request,session
                app = Flask(__name__)
                app.secret_key = 'sdfsdfsd'
                from flask.sessions import SessionInterface,SessionMixin
                import uuid
                import json
                from flask.sessions import SessionInterface
                from flask.sessions import SessionMixin
                from itsdangerous import Signer, BadSignature, want_bytes
    
                class MySession(dict, SessionMixin):
                    def __init__(self, initial=None, sid=None):
                        self.sid = sid
                        self.initial = initial
                        super(MySession, self).__init__(initial or ())
    
                    def __setitem__(self, key, value):
                        super(MySession, self).__setitem__(key, value)
    
                    def __getitem__(self, item):
                        return super(MySession, self).__getitem__(item)
    
                    def __delitem__(self, key):
                        super(MySession, self).__delitem__(key)
    
    
                class MySessionInterface(SessionInterface):
                    session_class = MySession
                    container = {
                        # 'asdfasdfasdfas':{'k1':'v1','k2':'v2'}
                        # 'asdfasdfasdfas':"{'k1':'v1','k2':'v2'}"
                    }
    
                    def __init__(self):
                        pass
                        # import redis
                        # self.redis = redis.Redis()
    
                    def _generate_sid(self):
                        return str(uuid.uuid4())
    
                    def _get_signer(self, app):
                        if not app.secret_key:
                            return None
                        return Signer(app.secret_key, salt='flask-session',
                                      key_derivation='hmac')
    
                    def open_session(self, app, request):
                        """
                        程序刚启动时执行,需要返回一个session对象
                        """
                        sid = request.cookies.get(app.session_cookie_name)
                        if not sid:
                            # 生成随机字符串,并将随机字符串添加到 session对象中
                            sid = self._generate_sid()
                            return self.session_class(sid=sid)
    
                        signer = self._get_signer(app)
                        try:
                            sid_as_bytes = signer.unsign(sid)
                            sid = sid_as_bytes.decode()
                        except BadSignature:
                            sid = self._generate_sid()
                            return self.session_class(sid=sid)
    
                        # session保存在redis中
                        # val = self.redis.get(sid)
                        # session保存在内存中
                        val = self.container.get(sid)
    
                        if val is not None:
                            try:
                                data = json.loads(val)
                                return self.session_class(data, sid=sid)
                            except:
                                return self.session_class(sid=sid)
                        return self.session_class(sid=sid)
    
                    def save_session(self, app, session, response):
                        """
                        程序结束前执行,可以保存session中所有的值
                        如:
                            保存到resit
                            写入到用户cookie
                        """
                        domain = self.get_cookie_domain(app)
                        path = self.get_cookie_path(app)
                        httponly = self.get_cookie_httponly(app)
                        secure = self.get_cookie_secure(app)
                        expires = self.get_expiration_time(app, session)
    
                        val = json.dumps(dict(session))
    
                        # session保存在redis中
                        # self.redis.setex(name=session.sid, value=val, time=app.permanent_session_lifetime)
                        # session保存在内存中
                        self.container.setdefault(session.sid, val)
    
                        session_id = self._get_signer(app).sign(want_bytes(session.sid))
    
                        response.set_cookie(app.session_cookie_name, session_id,
                                            expires=expires, httponly=httponly,
                                            domain=domain, path=path, secure=secure)
    
    
    
                app.session_interface = MySessionInterface()
                # app.session_interface = Foo()
                # app.session_interface
                # app.make_null_session()
                @app.route('/index')
                def index():
                    print('网站的所有session',MySessionInterface.container)
                    print(session)
                    session['k1'] = 'v1'
                    session['k2'] = 'v2'
                    del session['k1']
    
                    # 在内存中操作字典....
                    # session['k1'] = 'v1'
                    # session['k2'] = 'v2'
                    # del session['k1']
    
                    return "xx"
    
                if __name__ == '__main__':
                    app.__call__
                    app.run()
            
            5.4 最常用
                pip3 install flask-session
                
                #!/usr/bin/env python
                # -*- coding:utf-8 -
                from flask import Flask,current_app,session
                from flask_session import Session
                app = Flask(__name__)
                app.debug = True
                app.secret_key = 'xxxx'
    
                # 为Flask-session组件提供的配置
                # import redis
                # app.config['SESSION_TYPE'] = 'redis'  # session类型为redis
                # app.config['SESSION_REDIS'] = redis.Redis(host='127.0.0.1', port='6379', password='123123')  # 用于连接redis的配置
                # app.config['SESSION_KEY_PREFIX'] = 'session:'  # 保存到session中的值的前缀
                # app.config['SESSION_PERMANENT'] = False  # 如果设置为True,则关闭浏览器session就失效。
                # app.config['SESSION_USE_SIGNER'] = False  # 是否对发送到浏览器上 session:cookie值进行加密
                # Session(app)
                #
                # import memcache
                # app.config['SESSION_TYPE'] = 'memcached' # session类型为memcached
                # app.config['SESSION_PERMANENT'] = True # 如果设置为True,则关闭浏览器session就失效。
                # app.config['SESSION_USE_SIGNER'] = False # 是否对发送到浏览器上session的cookie值进行加密
                # app.config['SESSION_KEY_PREFIX'] = 'session:' # 保存到session中的值的前缀
                # app.config['SESSION_MEMCACHED'] = memcache.Client(['10.211.55.4:12000'])
                # Session(app)
    
                app.config['SESSION_TYPE'] = 'filesystem'  # session类型为filesystem
                app.config['SESSION_FILE_DIR'] = r'C:UsersAdministratorPycharmProjectsday1212.flask-session组件'  # session类型为redis
                app.config['SESSION_FILE_THRESHOLD'] = 500  # 存储session的个数如果大于这个值时,就要开始进行删除了
                app.config['SESSION_FILE_MODE'] = 384  # 文件权限类型
    
                app.config['SESSION_PERMANENT'] = True  # 如果设置为True,则关闭浏览器session就失效。
                app.config['SESSION_USE_SIGNER'] = False  # 是否对发送到浏览器上session的cookie值进行加密
                app.config['SESSION_KEY_PREFIX'] = 'session:'  # 保存到session中的值的前缀
    
                Session(app)
    
    
                @app.route('/index')
                def index():
                    session['k1'] = 'v1'
                    return 'xx'
    
                if __name__ == '__main__':
                    app.run()

    6.blinker  信号

            pip3 install blinker
            
            6.1 内置信号
                10个信号:
                    2. request_started = _signals.signal('request-started')                # 请求到来前执行
                    5. request_finished = _signals.signal('request-finished')              # 请求结束后执行
                     
                    3. before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
                    4. template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
                     
                    2/3/4/5或不执行 got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
                     
                    6. request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
                    7. appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 请求上下文执行完毕后自动执行(无论成功与否)
                     
                     
                    1. appcontext_pushed = _signals.signal('appcontext-pushed')            # 请求app上下文push时执行
                    
                    8. appcontext_popped = _signals.signal('appcontext-popped')            # 请求上下文pop时执行
                    
                    message_flashed = _signals.signal('message-flashed')                   # 调用flask在其中添加数据时,自动触发
                
                
                问题:
                    特殊的装饰器和信号有什么区别?
                        - 装饰器返回值有意义
                        - 信号用于做什么呢?
                            - 降低代码之间的耦合
                            
            6.2 自定义信号 :创建信号→注册→触发
                    from flask import Flask,flash
                    from flask.signals import _signals
                    app = Flask(__name__)
    
                    wh = _signals.signal('wh')
    
    
                    # 定义函数
                    def luominwen(*args,**kwargs):
                        print('罗姑娘',args,kwargs)
    
                    # 定义函数
                    def shaowei(*args,**kwargs):
                        print('少姑娘',args,kwargs)
    
                    # 将函数注册到request_started信号中: 添加到这个列表
                    wh.connect(luominwen)
                    wh.connect(shaowei)
    
    
                    @app.route('/index')
                    def index():
                        # 触发这个信号:执行注册到列表中的所有函数
                        # 发送短信,邮件,微信
                        wh.send(sender='xxx',a1=123,a2=456)
                        return "xx"
    
                    if __name__ == '__main__':
                        app.__call__
                        app.run()

        
         6.3Django内置
                        Request/response signals
                            request_started             # 请求到来前,自动触发
                            request_finished            # 请求结束后,自动触发
                            got_request_exception       # 请求异常后,自动触发
                        
                        Model signals
                            pre_init                    # django的modal执行其构造方法前,自动触发
                            post_init                   # django的modal执行其构造方法后,自动触发
                            
                            pre_save                    # django的modal对象保存前,自动触发
                            post_save                   # django的modal对象保存后,自动触发, 它可以判断是增加的还是修改的
                            
                            pre_delete                  # django的modal对象删除前,自动触发
                            post_delete                 # django的modal对象删除后,自动触发
                            
                            m2m_changed                 # django的modal中使用m2m字段操作第三张表(add,remove,clear)前后,自动触发
                            
                            class_prepared              # 程序启动时,检测已注册的app中modal类,对于每一个类,自动触发
                            
                        Management signals
                            pre_migrate                 # 执行migrate命令前,自动触发
                            post_migrate                # 执行migrate命令后,自动触发
                        
                        Test signals
                            setting_changed             # 使用test测试修改配置文件时,自动触发
                            template_rendered           # 使用test测试渲染模板时,自动触发
                        Database Wrappers
                            connection_created          # 创建数据库连接时,自动触发
                    
                    需求:新浪面试题,数据库12张表,每张表创建一条数据时,记录一条日志。
                 答案: 可以重写 django里面orm操作的save 方法,或者 使用信号

    7. wtforms

            - wtforms的源码
            - 使用
        metaclass的另外一种方式:
            class MyType(type):
                def __init__(self,*args,**kwargs):
                    print('xxxx')
                    super(MyType,self).__init__(*args,**kwargs)
    
                def __call__(cls, *args, **kwargs):
                    obj = cls.__new__(cls,*args, **kwargs)
                    cls.__init__(obj,*args, **kwargs) # Foo.__init__(obj)
                    return obj
    
            def with_metaclass(base):
                return MyType("MyType",(base,),{})
    
            class Foo(with_metaclass(object)):
                def __init__(self,name):
                    self.name = name
    
            #打印结果:  xxxx    xxxx

    8. flask的简单 使用:

            - 蓝图
            - 配置 from_objects
            - flask-session
            - 记录:请求到来写日志
            - wtforms
            - DBUtils
            - 单例模式
            PS: 连接数据库,对用户表进行:登录和注册

     9. 导出程序内应用的所有模块

    # 获取环境中所有安装的模块
            pip3 freeze          #在终端查看
            pip3 freeze > requirements.txt    #写入requirements.txt文件中
            
    # pip3 install pipreqs
    # 获取当前所在程序目录中涉及到的所有模块,并自动生成 requirements.txt 且写入内容。
    pipreqs ./
            
            
    以后别人给你程序:
         requirements.txt
                
    进入程序目录:
         pip3 install -r requirements.txt
  • 相关阅读:
    【转载,待整理】初学 springmvc整合shiro
    【转载并整理】javaweb单点登录
    【转载】linux 测试机器端口连通性方法
    Intellij idea 复制粘贴查找快捷键失效
    intellij 打开node项目 一直停留在scanning files to index....,或跳出内存不够的提示框
    【转载】Hibernate之hbm.xml集合映射的使用(Set集合映射,list集合映射,Map集合映射)
    【转载并整理】mysql 1293错误 建表两个timestamp
    作用域与闭包:this,var
    在MongoDB中使用JOIN操作
    linux下用top命令查看cpu利用率超过100%
  • 原文地址:https://www.cnblogs.com/liuwei0824/p/8259410.html
Copyright © 2011-2022 走看看