zoukankan      html  css  js  c++  java
  • M1-Flask-Day2

    内容概要:

      1.flask

        - 蓝图

        - 中间件

        - 闪现

      2.扩展

        - session

        - wtfrom

      3.上下文管理

        - local-threading

      4.websocket

        - 轮训

        - 长轮训

        - websocket

    一.谈谈你对面向对象的理解

      1.三大特性,继承,封装,多态(初级水平)

        封装分两种:

            (1).函数的封装

              class db1():

                def func():

                  pass

                def func2():

                  pass

              (2).数据的封装(一个类的对象)

              

      2.__str__...方法

        __str__,

        __new__,

        __init__,

        __getattr__,

        __setattr__,

        __delattr__,

        __getitem__,

        __setitem__,

        __delitem__,

        __enter__,

        __exit__,

        __del__,

        __call__,

        __dict__,

        这些方法有特有的执行场景,类加()执行__init__方法,对象加(),__call__方法....

      ****一个对象和一个对象能不能相加、减、乘、除

        可以 加用__add__方法 

    class Foo(object):
        def __add__(self, other):
            pass
    
    
    obj1 = Foo()
    obj2 = Foo()
    
    res = obj1 + obj2  

      3.metaclass

        1.创建类的两种方法   

    #创建类的方式一
    class Foo(object):
        pass
    
    #创建类的方式二
    #第一个参数是类名,第二个元祖是继承谁,第三个字典是构造字段
    Bar = type("MyFoo",(object,),{})
    g = Bar()
    print(g)
    

         2.验证类是由type创建

    class Mytype(type):
        def __init__(self,*args,**kwargs):
            print("from my type")
            super(Mytype,self).__init__(*args,**kwargs)
    
    class Foo(metaclass=Mytype):
        pass
    
    g = Foo()
    
    """result
    from my type
    """
    

     二、flask-蓝图

       1.目录结构:

        

      2.__init__.py  

    from flask import Flask
    
    app = Flask(__name__)
    
    from .views import account
    from .views import user
    app.register_blueprint(account.ac) #注册之后可以进行路由分发
    app.register_blueprint(user.us)
    
    
    """
    这里特殊的装饰器,是全局app,每个function都需要经过
    @app.before_request
    def check_login():
        print('.....')
    """
    

       3.user.py

    from flask import Blueprint
    
    us = Blueprint('us',__name__,url_prefix='/xx') #生成蓝图对象,url_prefix是前缀,加上之后请求index就变为/xx/index
    
    #这里是局部的装饰器,用于特定视图需要权限等
    # @us.before_request
    # def check_login():
    #     print('.....')
    
    
    @us.route('/index')
    def index():
        return 'index'
    

      4.manage.py

    from pro_flask import app
    
    if __name__ == '__main__':
        app.run()
    

     三、flask-闪现

       1.用于1次请求之后删除,基于session,取的时候用pop

    from flask import Blueprint,redirect,request,flash,get_flashed_messages
    
    ac = Blueprint('ac',__name__)
    
    @ac.route('/login')
    def login():
        flash("登录成功1",category="x1") #category多了一层分组
        flash("登录成功2",category="x2")
        return  redirect("/logout")
    
    
    @ac.route('/logout')
    def logout():
        res = get_flashed_messages(category_filter="x1") #默认不加category_filter取所有
        print(res)
        return 'logout'
    

       2.get_flask_message源码

    def get_flashed_messages(with_categories=False, category_filter=[]):
        
        flashes = _request_ctx_stack.top.flashes
        if flashes is None:
            #存入local里的stack里对象的flashes
            _request_ctx_stack.top.flashes = flashes = session.pop('_flashes') 
                if '_flashes' in session else []
        if category_filter:
            flashes = list(filter(lambda f: f[0] in category_filter, flashes))
        if not with_categories:
            return [x[1] for x in flashes]
        return flashes    
    

     三、flask-middleware

    from flask import Flask
    app = Flask(__name__)
    
    @app.route('/user',methods=['GET','POST'],endpoint='xxx')
    def user():
        return "login"
    
    
    class MiddleWare(object): #主要理解对象加括号执行__call__方法
        def __init__(self,old_wsgi_app):
    
            self.old_wsgi_app = old_wsgi_app
    
        def __call__(self, *args, **kwargs):
            #这是个时候还没有request
            print("我做一些数据库连接check,或者清理缓存操作")
            return self.old_wsgi_app(*args,**kwargs)
    
    if __name__ == '__main__':
        app.wsgi_app = MiddleWare(app.wsgi_app)
        app.run("0.0.0.0",9999)
    

     四、flask-session

    from flask import Flask,session
    app = Flask(__name__)
    
    
    #如下几行操作就成功将session写入redis中了
    from flask.ext.session import Session
    from redis import Redis
    app.config["SESSION_TYPE"] = 'redis'
    app.config["SESSION_REDIS"] = Redis(host='192.16.1.1',port="6379",)
    Session(app)
    
    
    @app.route('/user',methods=['GET','POST'],endpoint='xxx')
    def user():
        return "login"
    
    
    
    if __name__ == '__main__':
        app.run("0.0.0.0",9999)
    

       源码剖析:

    #session_interface = RedisSessionInterface() #程序刚开始加载时候执行
    def _get_interface(self, app):
        config = app.config.copy()
        config.setdefault('SESSION_TYPE', 'null')
        config.setdefault('SESSION_PERMANENT', True)
        config.setdefault('SESSION_USE_SIGNER', False)
        config.setdefault('SESSION_KEY_PREFIX', 'session:')
        config.setdefault('SESSION_REDIS', None)
        config.setdefault('SESSION_MEMCACHED', None)
        config.setdefault('SESSION_FILE_DIR',
                          os.path.join(os.getcwd(), 'flask_session'))
        config.setdefault('SESSION_FILE_THRESHOLD', 500)
        config.setdefault('SESSION_FILE_MODE', 384)
        config.setdefault('SESSION_MONGODB', None)
        config.setdefault('SESSION_MONGODB_DB', 'flask_session')
        config.setdefault('SESSION_MONGODB_COLLECT', 'sessions')
        config.setdefault('SESSION_SQLALCHEMY', None)
        config.setdefault('SESSION_SQLALCHEMY_TABLE', 'sessions')
    
        if config['SESSION_TYPE'] == 'redis':
            session_interface = RedisSessionInterface(
                config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'],
                config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
        elif config['SESSION_TYPE'] == 'memcached':
            session_interface = MemcachedSessionInterface(
                config['SESSION_MEMCACHED'], config['SESSION_KEY_PREFIX'],
                config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
        elif config['SESSION_TYPE'] == 'filesystem':
            session_interface = FileSystemSessionInterface(
                config['SESSION_FILE_DIR'], config['SESSION_FILE_THRESHOLD'],
                config['SESSION_FILE_MODE'], config['SESSION_KEY_PREFIX'],
                config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
        elif config['SESSION_TYPE'] == 'mongodb':
            session_interface = MongoDBSessionInterface(
                config['SESSION_MONGODB'], config['SESSION_MONGODB_DB'],
                config['SESSION_MONGODB_COLLECT'],
                config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'],
                config['SESSION_PERMANENT'])
        elif config['SESSION_TYPE'] == 'sqlalchemy':
            session_interface = SqlAlchemySessionInterface(
                app, config['SESSION_SQLALCHEMY'],
                config['SESSION_SQLALCHEMY_TABLE'],
                config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'],
                config['SESSION_PERMANENT'])
        else:
            session_interface = NullSessionInterface()
    
        return session_interface
    

       程序执行session["xx"]=123时候先执行session_interface的open_session方法:

        def open_session(self, app, request):
            sid = request.cookies.get(app.session_cookie_name)
            if not sid:
                sid = self._generate_sid() #根据uuid生成一个随机字符串
                #第一次登陆进,返回{"sid":"asdasdada","xx":123}
                return self.session_class(sid=sid, permanent=self.permanent)
            if self.use_signer:
                signer = self._get_signer(app)
                if signer is None:
                    return None
                try:
                    sid_as_bytes = signer.unsign(sid)
                    sid = sid_as_bytes.decode()
                except BadSignature:
                    sid = self._generate_sid()
                    return self.session_class(sid=sid, permanent=self.permanent)
    
            if not PY2 and not isinstance(sid, text_type):
                sid = sid.decode('utf-8', 'strict')
            val = self.redis.get(self.key_prefix + sid)
            if val is not None:
                try:
                    data = self.serializer.loads(val)
                    return self.session_class(data, sid=sid)
                except:
                    return self.session_class(sid=sid, permanent=self.permanent)
            return self.session_class(sid=sid, permanent=self.permanent)    
    

        程序返回之前会执行save_session动作:

        def save_session(self, app, session, response):
            domain = self.get_cookie_domain(app)
            path = self.get_cookie_path(app)
            if not session:
                if session.modified:
                    self.redis.delete(self.key_prefix + session.sid)
                    response.delete_cookie(app.session_cookie_name,
                                           domain=domain, path=path)
                return
            httponly = self.get_cookie_httponly(app)
            secure = self.get_cookie_secure(app)
            expires = self.get_expiration_time(app, session)
            val = self.serializer.dumps(dict(session))
            self.redis.setex(name=self.key_prefix + session.sid, value=val,
                             time=total_seconds(app.permanent_session_lifetime))  #往redis里写数据ex为过期时间
            if self.use_signer:
                session_id = self._get_signer(app).sign(want_bytes(session.sid))
            else:
                session_id = session.sid
            #最后将随机字符串写到cookie里
            response.set_cookie(app.session_cookie_name, session_id, 
                                expires=expires, httponly=httponly,
                                domain=domain, path=path, secure=secure)
    

     五、上下文管理

      1.threading.local

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    
    
    #本地线程,保证即使是多个线程,自己的值也是互相隔离。
    local_values = threading.local()
    
    def func(num):
        #创建数据库连接
        local_values.name = num
        import time
        time.sleep(1)
        print(local_values.name, threading.current_thread().name)
    
    
    for i in range(20):
        th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
        th.start()
    

      2.自定义local类实现本地线程,也可以用setitem

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    from threading import get_ident
    
    #通过自定义模拟本地线程原理
    class Local(object):
        def __init__(self):
            object.__setattr__(self, 'storage', {})
            #self.storage = {} #这样设置就会产生递归,触发settattr
    
        def __setattr__(self, key, value):
            ident = get_ident()
            if ident in self.storage:
                self.storage[ident][key] = value
            else:
                self.storage[ident] = {key: value}
    
        def __getattr__(self, item):
            ident = get_ident()
            return self.storage[ident][item]
    
    obj = Local()
    def func(num):
        #创建数据库连接
        obj.value = num
        import time
        time.sleep(1)
        print(obj.value, threading.current_thread().name)
    
    
    for i in range(20):
        th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
        th.start()
    

        3.带协程判断加入

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    from threading import get_ident
    
    try: #协程获取唯一id方法
        from greenlet import getcurrent as get_ident
    except ImportError:
        try:
            from thread import get_ident
        except ImportError:
            from _thread import get_ident
    
    #通过自定义模拟本地线程原理
    class Local(object):
        def __init__(self):
            object.__setattr__(self, 'storage', {})
            #self.storage = {} #这样设置就会产生递归,触发settattr
    
        def __setattr__(self, key, value):
            ident = get_ident()
            if ident in self.storage:
                self.storage[ident][key] = value
            else:
                self.storage[ident] = {key: value}
    
        def __getattr__(self, item):
            ident = get_ident()
            return self.storage[ident][item]
    
    obj = Local()
    def func(num):
        #创建数据库连接
        obj.value = num
        import time
        time.sleep(1)
        print(obj.value, threading.current_thread().name)
    
    
    for i in range(20):
        th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
        th.start()
    

      

  • 相关阅读:
    UVa532 Dungeon Master 三维迷宫
    6.4.2 走迷宫
    UVA 439 Knight Moves
    UVa784 Maze Exploration
    UVa657 The die is cast
    UVa572 Oil Deposits DFS求连通块
    UVa10562 Undraw the Trees
    UVa839 Not so Mobile
    327
    UVa699 The Falling Leaves
  • 原文地址:https://www.cnblogs.com/liujiliang/p/8972966.html
Copyright © 2011-2022 走看看