zoukankan      html  css  js  c++  java
  • g对象、信号、local、请求上下文、flask-session

    from flask import Flask, request, g, redirect
    
    # g对象的特性:
    #   1.当前请求内你设置就可以取,必须先设置,然后取,当前请求可以无限次
    #   2.如果不是当前请求,其他请求,也取不到
    
    app = Flask(__name__)
    
    
    @app.before_request
    def dou():
        if request.path == '/index':
            request.name = 'sb'
            g.name = 'sb'
    
    
    def set_g():
        g.name = '老子最帅'
    
    
    # @app.route('/')
    # def index():
    #     set_g()
    #     return redirect('/index')
    #
    
    @app.route('/index')
    def login():
        print(g.name)
        return '2b'
    
    
    if __name__ == '__main__':
        app.run(port=8000)

    信号:

    from flask import Flask, signals
    
    app = Flask(__name__)
    
    
    # 第一步,编写一个函数
    def func(*args, **kwargs):
        print('触发信号', args, kwargs)
    
    
    # 第二部:注册信号
    signals.request_started.connect(func)  # 请求来之前执行这个
    
    # 第三部:触发信号(内置信号,不需要手动触发,框架自动触发)
    
    
    @app.route('/')
    def index():
    
        return 'index'
    
    if __name__ == '__main__':
        app.run()
    """
    request_started = _signals.signal('request-started')                # 请求到来前执行
    request_finished = _signals.signal('request-finished')              # 请求结束后执行
     
    before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
    template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
     
    got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
     
    request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
    appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
     
    appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
    appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
    message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发
    """

    自定义信号

    from flask import Flask, current_app, flash, render_template
    from flask.signals import _signals
    
    app = Flask(import_name=__name__)
    
    # 1.自定义信号
    xxxxx = _signals.signal('xxxxx')
    
    
    # 2.定义函数
    def func(*args, **kwargs):
        print('信号来了', args, kwargs)
    
    
    # 3.绑定函数
    xxxxx.connect(func)
    
    
    @app.route("/x")
    def index():
        # 触发信号
        xxxxx.send('123123', k1='v1')    # 自定义就在这里,需要什么时候执行(此时就是试图执行后运行信号)
        return 'Index'
    
    
    if __name__ == '__main__':
        app.run()

    local

    """
    多个线程同时修改同一个数据,会造成数据混乱,
    方法一:加线程锁
    方法二:复制多份变量给每个线程用,为每个线程开辟一块空间进行数据存储
    
    """
    # from threading import Thread
    # import time
    #
    # cxw = -1
    #
    #
    # def task(arg):
    #     global cxw
    #     cxw = arg
    #     time.sleep(2)
    #     print(cxw)
    #
    #
    # for i in range(10):
    #     t = Thread(target=task, args=(i,))
    #     t.start()
    # 结果为9,9,9为什么全部是9,当第一进程启动进去,等待2秒,因为异步其他进程也同时启动最终,cxm全局结果赋值为最后的9
    
    # from threading import Thread
    # from threading import local
    # import time
    # from threading import get_ident
    #
    # # 特殊的对象
    # cxw = local()
    #
    #
    # def task(arg):
    #     # 对象.val = 1/2/3/4/5
    #     # cxw['线程id']['value']=arg
    #     cxw.value = arg
    #     time.sleep(2)
    #     # cxw['线程id']['value']
    #     print(cxw.value)
    #
    #
    # for i in range(10):
    #     t = Thread(target=task, args=(i,))
    #     t.start()
    
    # 把数据存储到一个对象中,每个线程分配一个变量进行独立操作
    
    # 函数的方式
    from threading import get_ident, Thread
    import time
    
    storage = {}
    
    
    def set(k, v):
        ident = get_ident()
        if ident in storage:
            storage[ident][k] = v
        else:
            # cxw['线程id']['value']=arg
            # storage[1][val]=arg
            # storage={1:{val:agr}}
            storage[ident] = {k: v}
    
    
    def get(k):
        ident = get_ident()
        # 1
        # storage = {1: {val: agr}}
        return storage[ident][k]
    
    
    def task(arg):
        set('val', arg)
        time.sleep(1)
        v = get('val')
        print(v)
    
    
    for i in range(10):
        t = Thread(target=task, args=(i,))
        t.start()

    请求上下文

    """
    多个线程同时修改同一个数据,会造成数据混乱,
    方法一:加线程锁
    方法二:复制多份变量给每个线程用,为每个线程开辟一块空间进行数据存储
    
    """
    # from threading import Thread
    # import time
    #
    # cxw = -1
    #
    #
    # def task(arg):
    #     global cxw
    #     cxw = arg
    #     time.sleep(2)
    #     print(cxw)
    #
    #
    # for i in range(10):
    #     t = Thread(target=task, args=(i,))
    #     t.start()
    # 结果为9,9,9为什么全部是9,当第一进程启动进去,等待2秒,因为异步其他进程也同时启动最终,cxm全局结果赋值为最后的9
    
    # from threading import Thread
    # from threading import local
    # import time
    # from threading import get_ident
    #
    # # 特殊的对象
    # cxw = local()
    #
    #
    # def task(arg):
    #     # 对象.val = 1/2/3/4/5
    #     # cxw['线程id']['value']=arg
    #     cxw.value = arg
    #     time.sleep(2)
    #     # cxw['线程id']['value']
    #     print(cxw.value)
    #
    #
    # for i in range(10):
    #     t = Thread(target=task, args=(i,))
    #     t.start()
    
    # 把数据存储到一个对象中,每个线程分配一个变量进行独立操作
    
    # 函数的方式
    from threading import get_ident, Thread
    import time
    
    storage = {}
    
    
    def set(k, v):
        ident = get_ident()
        if ident in storage:
            storage[ident][k] = v
        else:
            # cxw['线程id']['value']=arg
            # storage[1][val]=arg
            # storage={1:{val:agr}}
            storage[ident] = {k: v}
    
    
    def get(k):
        ident = get_ident()
        # 1
        # storage = {1: {val: agr}}
        return storage[ident][k]
    
    
    def task(arg):
        set('val', arg)
        time.sleep(1)
        v = get('val')
        print(v)
    
    
    for i in range(10):
        t = Thread(target=task, args=(i,))
        t.start()

    flask-session

    from flask import Flask, session
    from flask_session import RedisSessionInterface
    import redis
    
    app = Flask(__name__)
    app.secret_key = "ajksda"
    conn = redis.Redis(host='127.0.0.1', port=6379)
    # use_signer是否对key签名
    app.session_interface = RedisSessionInterface(conn, key_prefix='jason', use_signer=True, permanent=False)
    
    
    @app.route('/')
    def hello_world():
        session['sb'] = 'jason'
        return 'Hello World!'
    
    
    @app.route("/index")
    def index():
        print(session['sb'])
        return "ok"
    
    
    if __name__ == '__main__':
        app.run()

    session第二种方法

    from flask import Flask, session   # 推荐
    import redis
    from flask_session import Session
    
    app = Flask(__name__)
    app.config['SESSION_TYPE'] = 'redis'
    app.config['SESSION_REDIS'] = redis.Redis(host='127.0.0.1', port='6379')
    app.config['SESSION_KEY_PREFIX'] = "jason"
    Session(app)
    
    
    @app.route('/')
    def hello_world():
        session['sb'] = 'jason'
        return 'Hello World!'
    
    
    @app.route("/index")
    def index():
        print(session['sb'])
        return "ok"
    
    
    if __name__ == '__main__':
        app.run()

    ------------恢复内容结束------------

  • 相关阅读:
    【转】汽车CAN总线
    【转】I2C总线协议
    【转】SPI总线协议
    【转】结构struct 联合Union和枚举Enum的细节讨论
    复合类型变量其首地址的几种表示方式
    【转】四款经典3.7v锂电池充电电路图详解
    【转】crc16几种标准校验算法及c语言代码
    【转】 CRC循环冗余校验码
    对STM32库函数中 assert 函数的认知
    【转】用宏定义代替printf函数
  • 原文地址:https://www.cnblogs.com/wukai66/p/11860954.html
Copyright © 2011-2022 走看看