zoukankan      html  css  js  c++  java
  • Flask(二)

    一.threading.local

    多个线程修改同一个数据,复制多份变量给每个线程用,为每个线程开辟一块空间进行数据存储

    不用threading.local (多线程资源共享,sleep后执行取到的都是最后一次修改的值)

    # 不用local
    from threading import Thread
    import time
    cxw = -1
    def task(arg):
        global cxw
        cxw = arg
        # time.sleep(2)
        print(cxw)
    ​
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()

    ###threading.local使用 (给每一个线程都指定了一个id存储各自修改的属性值,彼此数据隔离)

    from threading import Thread
    from threading import local
    import time
    from threading import get_ident
    # 特殊的对象
    cxw = local()
    def task(arg):
        # 对象.val = 1/2/3/4/5
        cxw.value = arg
        time.sleep(2)
        print(cxw.value)
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()

    通过字典自定义threading.local(函数)

    from threading import get_ident,Thread
    import time
    storage = {}
    def set(k,v):
        ident = get_ident()
        if ident in storage:
            storage[ident][k] = v
        else:
            storage[ident] = {k:v}
    def get(k):
        ident = get_ident()
        return storage[ident][k]
    def task(arg):
        set('val',arg)
        v = get('val')
        print(v)
    ​
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()

    面向对象版

    from threading import get_ident,Thread
    import time
    class Local(object):
        storage = {}
        def set(self, k, v):
            ident = get_ident()
            if ident in Local.storage:
                Local.storage[ident][k] = v
            else:
                Local.storage[ident] = {k: v}
        def get(self, k):
            ident = get_ident()
            return Local.storage[ident][k]
    obj = Local()
    def task(arg):
        obj.set('val',arg) 
        v = obj.get('val')
        print(v)
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()

    通过setattr和getattr实现

    from threading import get_ident,Thread
    import time
    class Local(object):
        storage = {}
        def __setattr__(self, k, v):
            ident = get_ident()
            if ident in Local.storage:
                Local.storage[ident][k] = v
            else:
                Local.storage[ident] = {k: v}
        def __getattr__(self, k):
            ident = get_ident()
            return Local.storage[ident][k]
    obj = Local()
    def task(arg):
        obj.val = arg
        print(obj.val)
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()

    每个对象有自己的存储空间(字典)

    from threading import get_ident,Thread
    import time
    class Local(object):
        def __init__(self):
            object.__setattr__(self,'storage',{})
            self.storage={}
        def __setattr__(self, k, v):
            ident = get_ident()
            if ident in self.storage:
                self.storage[ident][k] = v
            else:
                self.storage[ident] = {k: v}
        def __getattr__(self, k):
            ident = get_ident()
            return self.storage[ident][k]
    obj = Local()
    def task(arg):
        obj.val = arg
        obj.xxx = arg
        print(obj.val)
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()

    兼容线程和协程(源码到request中去看,看local的getattr,setattr)

    try:
        from greenlet import getcurrent as get_ident
    except Exception as e:
        from threading import get_ident
    from threading import Thread
    import time
    class Local(object):
        def __init__(self):
            object.__setattr__(self,'storage',{})
        def __setattr__(self, k, v):
            ident = get_ident()
            if ident in self.storage:
                self.storage[ident][k] = v
            else:
                self.storage[ident] = {k: v}
        def __getattr__(self, k):
            ident = get_ident()
            return self.storage[ident][k]
    obj = Local()
    def task(arg):
        obj.val = arg
        obj.xxx = arg
        print(obj.val)
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()   

    partial偏函数

    #偏函数的第二个部分(可变参数),按原有函数的参数顺序进行补充,参数将作用在原函数上,最后偏函数返回一个新函数
    from functools import partial
    def test(a,b,c,d):
        return a+b+c+d
    ​
    tes=partial(test,1,2)
    print(tes(3,4))

    二.请求上下文

    '''
    全局的变量
    _request_ctx_stack = LocalStack()
    _app_ctx_stack = LocalStack()
    current_app = LocalProxy(_find_app)
    #local就是我们的partial(_lookup_req_object, "request")
    request = LocalProxy(partial(_lookup_req_object, "request"))
    session = LocalProxy(partial(_lookup_req_object, "session"))
    g = LocalProxy(partial(_lookup_app_object, "g"))
    
    self.wsgi_app(environ, start_response)源码:
        def wsgi_app(self, environ, start_response):
            #ctx是ResquestContext的对象,里面request
            ctx = self.request_context(environ)
            error = None
            try:
                try:
                    #就是ctx放到了Local对象
                    ctx.push()
                    #所有请求的执行函数的,包括请求扩展,真正的视图函数
                    response = self.full_dispatch_request()
                except Exception as e:
                    error = e
                    response = self.handle_exception(e)
                except:  # noqa: B001
                    error = sys.exc_info()[1]
                    raise
                # 请求之后的函数
                return response(environ, start_response)
            finally:
                if self.should_ignore_error(error):
                    error = None
                ctx.auto_pop(error)
    1 ctx = self.request_context(environ) environ,请求相关的,ctx现在是包含request,session的RequestContext的对象
        源码:
            1.1RequestContext(self, environ) self ,是app对象 environ,请求相关的
            1.2 RequestContext在实例化的时候的源码:
            def __init__(self, app, environ, request=None, session=None):
                self.app = app
                if request is None:
                    request = app.request_class(environ)
                self.request = request
                self.url_adapter = None
                try:
                    self.url_adapter = app.create_url_adapter(self.request)
                except HTTPException as e:
                    self.request.routing_exception = e
                self.flashes = None
                self.session = session
                self._implicit_app_ctx_stack = []
                self.preserved = False
                self._after_request_functions = []
            这个RequestContext对象封装了,request 和seesoin
    
    2  ctx.push()这个ctx是RequestContext,那就执行RequestContext.push方法
        2.1RequestContext.push()的源码
            def push(self):
                #_request_ctx_stack是localStack的对象
                #self是ctx,把self也就ctx放入到local对象里面
                _request_ctx_stack.push(self)
                if self.session is None:
                    session_interface = self.app.session_interface
                    self.session = session_interface.open_session(self.app, self.request)
    
                    if self.session is None:
                        self.session = session_interface.make_null_session(self.app)
    
                if self.url_adapter is not None:
                    self.match_request()
            2.1.1 _request_ctx_stack.push(self)现在的self是ctx
            2.1.2 _request_ctx_stack是LocalStack()的对象
            2.1.3 LocalStack()的push把ctx传过来
            2.1.4 LocalStack()的push方法
                源码:
                #obj是ctx
                def push(self, obj):
                    #obj是ctx,requestContext的对象
                    rv = getattr(self._local, "stack", None)
                    if rv is None:
                        # self._local是Local()的对象
                        # storage[“线程id或者协程id”][stack] = [ctx,]
                        self._local.stack = rv = []
                    rv.append(obj)
                    return rv
    
        2的最终也就是ctx.push()他的最终目的:把当前的ctx放入到Local()里面
    
    3  response = self.full_dispatch_request()
        源码:
            def full_dispatch_request(self):
                #这是服务器第一次请求时候执行的函数
                self.try_trigger_before_first_request_functions()
                try:
                    request_started.send(self)
                    #执行请求之前所有的函数,并且拿到请求之前的返回值
                    rv = self.preprocess_request()
                    if rv is None:
                        #这个是真正视图函数,如果我的请求之前函数没有返回值才会执行
                        rv = self.dispatch_request()
                except Exception as e:
                    rv = self.handle_user_exception(e)
                return self.finalize_request(rv)
        3.1 return self.finalize_request(rv)的源码:
            def finalize_request(self, rv, from_error_handler=False):
                response = self.make_response(rv)
                try:
                    #请求之后的函数,after_request
                    response = self.process_response(response)
                    request_finished.send(self, response=response)
                except Exception:
                    if not from_error_handler:
                        raise
                    self.logger.exception(
                        "Request finalizing failed with an error while handling an error"
                    )
                return response
    4 我们的现在已经在2步的时候把我们request已经方法Locald对象中了,我们第三步的任意一个地方
    都能使用我们的request,session,那他是怎么获取的?
        4.1 我们在flask导入request,这个request是一个全局的变量,我们怎么通过request区分我当前的request对象(environ)
         我们发现request是LocalProxy的对象
        4.2 当我们用全局的request.属性的时候,就会去找LocalProxy的对象,但是我们发现里面根本就没有
            那他一定执行LocalProxy对象的__getattr__方法
        4.3 我们现在来看LocalProxy对象的__getattr__方法的源码:
            #name我们要获取属性名
            def __getattr__(self, name):
                if name == "__members__":
                    return dir(self._get_current_object())
                #form
                #self._get_current_object()就是ctx里面的request,
                return getattr(self._get_current_object(), name)
            4.3.1 通过反射self._get_current_object()对象,来找我们属性,也就是name
               self._get_current_object()的源码:
                    def _get_current_object(self):
                        if not hasattr(self.__local, "__release_local__"):
                            return self.__local()
                        try:
                            #self.__local就实例化传过来的偏函数,
                            return getattr(self.__local, self.__name__)
                        except AttributeError:
                            raise RuntimeError("no object bound to %s" % self.__name__)
    
               4.3.1.1 return getattr(self.__local, self.__name__)那这里self.__local是谁?
                        def __init__(self, local, name=None):
                              object.__setattr__(self, "_LocalProxy__local", local)
                        self.___local为local
                        这个local为实例化的时候传的
                    4.3.1.1.1 这个实例化的时候的操作
                       request = LocalProxy(partial(_lookup_req_object, "request"))
                       4.3.1.1的local就是 partial(_lookup_req_object, "request")的地址
                    4.3.1.1.2 _lookup_req_object的源码:
                        #调用的时候 partial(_lookup_req_object, "request")
                        #现在的name就是"request"
                        def _lookup_req_object(name):
                            # top是当前线程的ctx
                            top = _request_ctx_stack.top
                            if top is None:
                                raise RuntimeError(_request_ctx_err_msg)
                            #找top里面的request
                            # ctx找request
                            return getattr(top, name)
                    4.3.1.1.2 我们来看这个_request_ctx_stack.top的top方法
                            def top(self):
                                try:
                                    return self._local.stack[-1]
                                except (AttributeError, IndexError):
                                    return None
                            我们发现这个self._local是Local()对象,这样就把ctx拿到了
    
    '''

    三.蓝图

    对程序进行目录结构划分

    不使用蓝图,自己分文件

    目录结构:

    -templates
    -views
    -__init__.py
       -user.py
       -order.py
    -app.py

    app.py

    from views import app
    if __name__ == '__main__':
        app.run()

    init.py

    from flask import Flask,request
    app = Flask(__name__)
    #不导入这个不行
    from . import account
    from . import order
    from . import user

    user.py

    from . import app
    @app.route('/user')
    def user():
        return 'user'

    order.py

    from . import app
    @app.route('/order')
    def order():
        return 'order'

    使用蓝图之中小型系统

    详见代码:pro_flask_简单应用程序目录示例.zip

    目录结构:

    -flask_pro
    -flask_test
      -__init__.py
      -static
           -templates
           -views
          -order.py
               -user.py
        -manage.py
           

    _init.py

    from flask import  Flask
    app=Flask(__name__)
    from flask_test.views import user
    from flask_test.views import order
    app.register_blueprint(user.us)
    app.register_blueprint(order.ord)

    manage.py

    from flask_test import  app
    if __name__ == '__main__':
        app.run(port=8008)

    user.py

    from flask import Blueprint
    us=Blueprint('user',__name__)
    ​
    @us.route('/login')
    def login():
        return 'login'

    order.py

    from flask import Blueprint
    ord=Blueprint('order',__name__)
    ​
    @ord.route('/test')
    def test():
        return 'order test'

    使用蓝图之大型系统

    详见代码:pro_flask_大型应用目录示例.zip
    
    总结:
    
    1 xxx = Blueprint('account', __name__,url_prefix='/xxx') :蓝图URL前缀,表示url的前缀,在该蓝图下所有url都加前缀
    
    2 xxx = Blueprint('account', name,url_prefix='/xxx',template_folder='tpls'):给当前蓝图单独使用templates,向上查找,当前找不到,会找总templates
    
    3 蓝图的befort_request,对当前蓝图有效
    
    4 大型项目,可以模拟出类似于django中app的概念

    四.g对象

    from flask import Flask,views,session,request,g
    app=Flask(__name__)
    ''''
    session 只要设置,在任意请求中都能拿到,无论你拿多少次
    flash   一旦设置,可在任意一次请求中获取,但是只能取一次
    g     一旦设置,只能在当前请求中获取,其它的请求都不能获取
    
    
    '''
    
    @app.after_request
    def test(response):
        print(g.name)
        return response
    
    
    
    @app.route("/")
    def index():
        g.name = "suv"
        return "ok"
    
    @app.route("/index")
    def index1():
    
        return "ojbk"
    
    
    
    if __name__ == '__main__':
    
        app.run()

    五.flask-session

    作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy

    安装:pip3 install flask-session

     

    from flask import Flask,session
    from flask_session import RedisSessionInterface
    import redis
    app = Flask(__name__)
    conn=redis.Redis(host='127.0.0.1',port=6379)
    
    #use_signer是否对key签名
    #如果use_siginer为False,这表示不需要配置app.secret_key
    #permanent=False表示关闭浏览器cookie失效
    app.secret_key="aksdhkajs" app.session_interface=RedisSessionInterface(conn,key_prefix='lqz', use_signer=True,permanent=False) ''' 之前的session seesion名字为配置文件中的名字 存 seesion ->加密-->cookie 取 session -->值 ----》解密 redis的seesion: seesion名字为配置文件中的名字(默认为session)

    前台cookie => session:session.id

    redis => name:value name=self.key_prefix + session.sid, value=val 是redis的值
    ''' @app.route('/') def hello_world(): session['name']='lqz' return 'Hello World!' @app.route("/index") def index(): print(session['name']) return "ok" if __name__ == '__main__': app.run()

    第二种简便方式:

    from flask import Flask,session
    from redis import Redis
    from flask_session import Session
    app = Flask(__name__)
    app.config['SESSION_TYPE'] = 'redis'
    app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port='6379')
    Session(app)
    
    @app.route('/')
    def hello_world():
        session['name']='lqz'
        return 'Hello World!'
    
    @app.route("/index")
    def index():
        print(session['name'])
        return "ok"
    
    
    if __name__ == '__main__':
        app.run()

    六.信号

    Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为

    安装:pip3 install blinker

    内置信号:
    
    ```python
    request_started = _signals.signal('request-started')                # 请求到来前执行
    request_finished = _signals.signal('request-finished')              # 请求结束后执行
     
    before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
    template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
     
    got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
     
    request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
    appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
     
    appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
    appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
    message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发
    ```
    
    
    使用信号:

    from flask import Flask,signals,render_template
    
    app = Flask(__name__)
    
    # 往信号中注册函数
    def func12123(*args,**kwargs):
        print('触发型号',args,kwargs)
    signals.request_started.connect(func12123)
    
    # 触发信号: signals.request_started.send()
    @app.before_first_request
    def before_first1(*args,**kwargs):
        pass
    @app.before_first_request
    def before_first2(*args,**kwargs):
        pass
    
    @app.before_request
    def before_first3(*args,**kwargs):
        pass
    
    @app.route('/',methods=['GET',"POST"])
    def index():
        print('视图')
        return "ok"
    
    
    if __name__ == '__main__':
        app.wsgi_app
        app.run()

    自定义信号:

    from flask import Flask, current_app, flash, render_template
    from flask.signals import _signals
    
    app = Flask(import_name=__name__)
    
    # 自定义信号
    xxxxx = _signals.signal('xxxxx')
    
    
    # 必须有一个位置参数,去接收他的发送者,
    def func(sender,name):
        print(sender)
        print(name)
        print("123")
    
    
    
    # 自定义信号中注册函数
    xxxxx.connect(func)
    
    
    @app.route("/x")
    def index():
        # 触发信号,这里的第一是发送者,第二个参数可选的话,必须是键值对
        xxxxx.send("sb",name="wb")
        return 'Index'
    
    
    if __name__ == '__main__':
        app.run()

     

     

  • 相关阅读:
    SQL语法:查询此表有另外一个表没有的数据
    .NET平台开源项目速览-最快的对象映射组件Tiny Mapper之项目实践
    win7 64 安装Oracle 11G 、使用PLSQL进行连接 标准实践
    json 筛选数据 $.grep过滤数据
    bootstrap table 行号 显示行号 添加行号 bootstrap-table 行号
    ajax 请求二进制流 图片 文件 XMLHttpRequest 请求并处理二进制流数据 之最佳实践
    JS中判断JSON数据是否存在某字段的方法 JavaScript中判断json中是否有某个字段
    json 数字key json 数字作为主键
    ajax 跨域 headers JavaScript ajax 跨域请求 +设置headers 实践
    扩展:gridview 空数据时显示表头
  • 原文地址:https://www.cnblogs.com/sima-3/p/11604839.html
Copyright © 2011-2022 走看看