zoukankan      html  css  js  c++  java
  • M1-Flask-Day2

    内容概要:

      1.flask

        - 蓝图

        - 中间件

        - 闪现

      2.扩展

        - session

        - wtfrom

      3.上下文管理

        - local-threading

      4.websocket

        - 轮训

        - 长轮训

        - websocket

    一.谈谈你对面向对象的理解

      1.三大特性,继承,封装,多态(初级水平)

        封装分两种:

            (1).函数的封装

              class db1():

                def func():

                  pass

                def func2():

                  pass

              (2).数据的封装(一个类的对象)

              

      2.__str__...方法

        __str__,

        __new__,

        __init__,

        __getattr__,

        __setattr__,

        __delattr__,

        __getitem__,

        __setitem__,

        __delitem__,

        __enter__,

        __exit__,

        __del__,

        __call__,

        __dict__,

        这些方法有特有的执行场景,类加()执行__init__方法,对象加(),__call__方法....

      ****一个对象和一个对象能不能相加、减、乘、除

        可以 加用__add__方法 

    class Foo(object):
        def __add__(self, other):
            pass
    
    
    obj1 = Foo()
    obj2 = Foo()
    
    res = obj1 + obj2  

      3.metaclass

        1.创建类的两种方法   

    #创建类的方式一
    class Foo(object):
        pass
    
    #创建类的方式二
    #第一个参数是类名,第二个元祖是继承谁,第三个字典是构造字段
    Bar = type("MyFoo",(object,),{})
    g = Bar()
    print(g)
    

         2.验证类是由type创建

    class Mytype(type):
        def __init__(self,*args,**kwargs):
            print("from my type")
            super(Mytype,self).__init__(*args,**kwargs)
    
    class Foo(metaclass=Mytype):
        pass
    
    g = Foo()
    
    """result
    from my type
    """
    

     二、flask-蓝图

       1.目录结构:

        

      2.__init__.py  

    from flask import Flask
    
    app = Flask(__name__)
    
    from .views import account
    from .views import user
    app.register_blueprint(account.ac) #注册之后可以进行路由分发
    app.register_blueprint(user.us)
    
    
    """
    这里特殊的装饰器,是全局app,每个function都需要经过
    @app.before_request
    def check_login():
        print('.....')
    """
    

       3.user.py

    from flask import Blueprint
    
    us = Blueprint('us',__name__,url_prefix='/xx') #生成蓝图对象,url_prefix是前缀,加上之后请求index就变为/xx/index
    
    #这里是局部的装饰器,用于特定视图需要权限等
    # @us.before_request
    # def check_login():
    #     print('.....')
    
    
    @us.route('/index')
    def index():
        return 'index'
    

      4.manage.py

    from pro_flask import app
    
    if __name__ == '__main__':
        app.run()
    

     三、flask-闪现

       1.用于1次请求之后删除,基于session,取的时候用pop

    from flask import Blueprint,redirect,request,flash,get_flashed_messages
    
    ac = Blueprint('ac',__name__)
    
    @ac.route('/login')
    def login():
        flash("登录成功1",category="x1") #category多了一层分组
        flash("登录成功2",category="x2")
        return  redirect("/logout")
    
    
    @ac.route('/logout')
    def logout():
        res = get_flashed_messages(category_filter="x1") #默认不加category_filter取所有
        print(res)
        return 'logout'
    

       2.get_flask_message源码

    def get_flashed_messages(with_categories=False, category_filter=[]):
        
        flashes = _request_ctx_stack.top.flashes
        if flashes is None:
            #存入local里的stack里对象的flashes
            _request_ctx_stack.top.flashes = flashes = session.pop('_flashes') 
                if '_flashes' in session else []
        if category_filter:
            flashes = list(filter(lambda f: f[0] in category_filter, flashes))
        if not with_categories:
            return [x[1] for x in flashes]
        return flashes    
    

     三、flask-middleware

    from flask import Flask
    app = Flask(__name__)
    
    @app.route('/user',methods=['GET','POST'],endpoint='xxx')
    def user():
        return "login"
    
    
    class MiddleWare(object): #主要理解对象加括号执行__call__方法
        def __init__(self,old_wsgi_app):
    
            self.old_wsgi_app = old_wsgi_app
    
        def __call__(self, *args, **kwargs):
            #这是个时候还没有request
            print("我做一些数据库连接check,或者清理缓存操作")
            return self.old_wsgi_app(*args,**kwargs)
    
    if __name__ == '__main__':
        app.wsgi_app = MiddleWare(app.wsgi_app)
        app.run("0.0.0.0",9999)
    

     四、flask-session

    from flask import Flask,session
    app = Flask(__name__)
    
    
    #如下几行操作就成功将session写入redis中了
    from flask.ext.session import Session
    from redis import Redis
    app.config["SESSION_TYPE"] = 'redis'
    app.config["SESSION_REDIS"] = Redis(host='192.16.1.1',port="6379",)
    Session(app)
    
    
    @app.route('/user',methods=['GET','POST'],endpoint='xxx')
    def user():
        return "login"
    
    
    
    if __name__ == '__main__':
        app.run("0.0.0.0",9999)
    

       源码剖析:

    #session_interface = RedisSessionInterface() #程序刚开始加载时候执行
    def _get_interface(self, app):
        config = app.config.copy()
        config.setdefault('SESSION_TYPE', 'null')
        config.setdefault('SESSION_PERMANENT', True)
        config.setdefault('SESSION_USE_SIGNER', False)
        config.setdefault('SESSION_KEY_PREFIX', 'session:')
        config.setdefault('SESSION_REDIS', None)
        config.setdefault('SESSION_MEMCACHED', None)
        config.setdefault('SESSION_FILE_DIR',
                          os.path.join(os.getcwd(), 'flask_session'))
        config.setdefault('SESSION_FILE_THRESHOLD', 500)
        config.setdefault('SESSION_FILE_MODE', 384)
        config.setdefault('SESSION_MONGODB', None)
        config.setdefault('SESSION_MONGODB_DB', 'flask_session')
        config.setdefault('SESSION_MONGODB_COLLECT', 'sessions')
        config.setdefault('SESSION_SQLALCHEMY', None)
        config.setdefault('SESSION_SQLALCHEMY_TABLE', 'sessions')
    
        if config['SESSION_TYPE'] == 'redis':
            session_interface = RedisSessionInterface(
                config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'],
                config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
        elif config['SESSION_TYPE'] == 'memcached':
            session_interface = MemcachedSessionInterface(
                config['SESSION_MEMCACHED'], config['SESSION_KEY_PREFIX'],
                config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
        elif config['SESSION_TYPE'] == 'filesystem':
            session_interface = FileSystemSessionInterface(
                config['SESSION_FILE_DIR'], config['SESSION_FILE_THRESHOLD'],
                config['SESSION_FILE_MODE'], config['SESSION_KEY_PREFIX'],
                config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
        elif config['SESSION_TYPE'] == 'mongodb':
            session_interface = MongoDBSessionInterface(
                config['SESSION_MONGODB'], config['SESSION_MONGODB_DB'],
                config['SESSION_MONGODB_COLLECT'],
                config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'],
                config['SESSION_PERMANENT'])
        elif config['SESSION_TYPE'] == 'sqlalchemy':
            session_interface = SqlAlchemySessionInterface(
                app, config['SESSION_SQLALCHEMY'],
                config['SESSION_SQLALCHEMY_TABLE'],
                config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'],
                config['SESSION_PERMANENT'])
        else:
            session_interface = NullSessionInterface()
    
        return session_interface
    

       程序执行session["xx"]=123时候先执行session_interface的open_session方法:

        def open_session(self, app, request):
            sid = request.cookies.get(app.session_cookie_name)
            if not sid:
                sid = self._generate_sid() #根据uuid生成一个随机字符串
                #第一次登陆进,返回{"sid":"asdasdada","xx":123}
                return self.session_class(sid=sid, permanent=self.permanent)
            if self.use_signer:
                signer = self._get_signer(app)
                if signer is None:
                    return None
                try:
                    sid_as_bytes = signer.unsign(sid)
                    sid = sid_as_bytes.decode()
                except BadSignature:
                    sid = self._generate_sid()
                    return self.session_class(sid=sid, permanent=self.permanent)
    
            if not PY2 and not isinstance(sid, text_type):
                sid = sid.decode('utf-8', 'strict')
            val = self.redis.get(self.key_prefix + sid)
            if val is not None:
                try:
                    data = self.serializer.loads(val)
                    return self.session_class(data, sid=sid)
                except:
                    return self.session_class(sid=sid, permanent=self.permanent)
            return self.session_class(sid=sid, permanent=self.permanent)    
    

        程序返回之前会执行save_session动作:

        def save_session(self, app, session, response):
            domain = self.get_cookie_domain(app)
            path = self.get_cookie_path(app)
            if not session:
                if session.modified:
                    self.redis.delete(self.key_prefix + session.sid)
                    response.delete_cookie(app.session_cookie_name,
                                           domain=domain, path=path)
                return
            httponly = self.get_cookie_httponly(app)
            secure = self.get_cookie_secure(app)
            expires = self.get_expiration_time(app, session)
            val = self.serializer.dumps(dict(session))
            self.redis.setex(name=self.key_prefix + session.sid, value=val,
                             time=total_seconds(app.permanent_session_lifetime))  #往redis里写数据ex为过期时间
            if self.use_signer:
                session_id = self._get_signer(app).sign(want_bytes(session.sid))
            else:
                session_id = session.sid
            #最后将随机字符串写到cookie里
            response.set_cookie(app.session_cookie_name, session_id, 
                                expires=expires, httponly=httponly,
                                domain=domain, path=path, secure=secure)
    

     五、上下文管理

      1.threading.local

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    
    
    #本地线程,保证即使是多个线程,自己的值也是互相隔离。
    local_values = threading.local()
    
    def func(num):
        #创建数据库连接
        local_values.name = num
        import time
        time.sleep(1)
        print(local_values.name, threading.current_thread().name)
    
    
    for i in range(20):
        th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
        th.start()
    

      2.自定义local类实现本地线程,也可以用setitem

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    from threading import get_ident
    
    #通过自定义模拟本地线程原理
    class Local(object):
        def __init__(self):
            object.__setattr__(self, 'storage', {})
            #self.storage = {} #这样设置就会产生递归,触发settattr
    
        def __setattr__(self, key, value):
            ident = get_ident()
            if ident in self.storage:
                self.storage[ident][key] = value
            else:
                self.storage[ident] = {key: value}
    
        def __getattr__(self, item):
            ident = get_ident()
            return self.storage[ident][item]
    
    obj = Local()
    def func(num):
        #创建数据库连接
        obj.value = num
        import time
        time.sleep(1)
        print(obj.value, threading.current_thread().name)
    
    
    for i in range(20):
        th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
        th.start()
    

        3.带协程判断加入

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    from threading import get_ident
    
    try: #协程获取唯一id方法
        from greenlet import getcurrent as get_ident
    except ImportError:
        try:
            from thread import get_ident
        except ImportError:
            from _thread import get_ident
    
    #通过自定义模拟本地线程原理
    class Local(object):
        def __init__(self):
            object.__setattr__(self, 'storage', {})
            #self.storage = {} #这样设置就会产生递归,触发settattr
    
        def __setattr__(self, key, value):
            ident = get_ident()
            if ident in self.storage:
                self.storage[ident][key] = value
            else:
                self.storage[ident] = {key: value}
    
        def __getattr__(self, item):
            ident = get_ident()
            return self.storage[ident][item]
    
    obj = Local()
    def func(num):
        #创建数据库连接
        obj.value = num
        import time
        time.sleep(1)
        print(obj.value, threading.current_thread().name)
    
    
    for i in range(20):
        th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
        th.start()
    

      

  • 相关阅读:
    一个仿windows泡泡屏保的实现
    易语言中锐浪报表绿色发布指南(免COM组件DLL注册)
    服务器被攻击小记
    aidl.exe'' finished with non-zero exit value 1问题解决【转载】
    给APP增加RSA签名
    fastreport中文乱码问题
    EF join
    ActionFilter、IAuthorizationFilter 权限验证重定向跳转到其它页面
    EF 调试跟踪生成的SQL语句
    Asp.net MVC 权限验证,以及是否允许匿名访问
  • 原文地址:https://www.cnblogs.com/liujiliang/p/8972966.html
Copyright © 2011-2022 走看看