zoukankan      html  css  js  c++  java
  • Flask-session,WTForms,POOL,Websocket通讯原理 -握手,加密解密过程

    1.Flask-session

    Flask中的session 需要执行 session_interface - open_session
    存储到redis中,存的key:session:d3f07db2-940b-4ee3-adfb-cdd7956dd921(前端浏览器里session的vlaue)

    from flask import Flask, session
    from flask_session import Session
    from redis import Redis
    
    app = Flask(__name__)
    # app.secret_key = "asdasd"
    app.config["SESSION_TYPE"] = "redis"
    app.config["SESSION_REDIS"] = Redis(host="127.0.0.1", port=6379, db=6)
    
    Session(app)
    
    @app.route("/")
    def index():
        session["user"] = "1111111"
        return "hello"
    
    @app.route("/go")
    def go():
        print(session.get("user"))
        return "ok"
    
    
    if __name__ == '__main__':
        app.run("0.0.0.0", 5000, debug=True)

    2.WTForms

    类似django的modelform

    .py

    from flask import Flask, request, render_template
    from wtforms import Form
    from wtforms.fields import simple, core
    from wtforms import validators
    
    
    app = Flask(__name__)
    
    
    class LoginFrom(Form):
        username = simple.StringField(
            # 标签标记
            label="用户名",
            # 校验条件,可迭代对象
            validators=[validators.DataRequired(message="用户名不能为空"),
                        validators.Length(min=3, max=8, message="密码必须在3-8位之间")],
            # 标签id
            id="user_name",
            # 默认值
            default=None,
            # 默认组件(input type="text") 在StringField中已经被实例化了
            widget=None,
            # 标签属性
            render_kw={"class": "login"}
        )
        password = simple.PasswordField(
            label="密码",
            validators=[validators.DataRequired(message="密码不能为空"),
                        validators.Length(min=6, max=16, message="密码必须在6-16位之间"),
                        validators.Email(message="密码必须为邮箱格式")],
            id="pwd",
            default=None,
            # simple.PasswordField里面为我们设置了widget为widgets.PasswordInput()
            widget=None,
            render_kw={"class": "login"}
        )
    
    
    class RegForm(Form):
        username = simple.StringField(
            label="用户名",
            validators=[validators.DataRequired(message="用户名不能为空"),
                        validators.Length(min=3, max=8, message="密码必须在3-8位之间")]
        )
        password = simple.PasswordField(
            label="密码",
            validators=[validators.DataRequired(message="密码不能为空"),
                        validators.Length(min=6, max=16, message="密码必须在6-16位之间"),
                        validators.Email(message="密码必须为邮箱格式")],
        )
        repassword = simple.PasswordField(
            label="确认密码",
            validators=[validators.EqualTo(fieldname="password", message="两次密码不一致!")]
        )
        gender = core.RadioField(
            label="性别",
            coerce=str,
            choices=(
                ("1", ""),
                ("2", "")
            ),
            default="1"
        )
        hobby = core.SelectMultipleField(
            label="爱好",
            validators=[validators.Length(min=2, max=4, message="请选择2-4个爱好")],
            coerce=int,
            choices=(
                (1, "fengjie"),
                (2, "luoyufeng"),
                (3, "lixueqin"),
                (4, "wuyifan"),
                (5, "panta")
            ),
            default=(1, 3, 5)
        )
    
    
    @app.route("/login", methods=["GET", "POST"])
    def login():
        if request.method == "GET":
            fm = LoginFrom()
            return render_template("login.html", fm=fm)
        else:
            fm = LoginFrom(request.form)
            if fm.validate():
                return "登录成功" + request.form.get("password")
            else:
                return render_template("login.html", fm=fm)
    
    
    @app.route("/reg", methods=["GET", "POST"])
    def reg():
        if request.method == "GET":
            fm = RegForm()
            return render_template("reg.html", fm=fm)
        else:
            fm = RegForm(request.form)
            if fm.validate():
                print(type(fm.data.get("gender")), fm.data.get("gender"))
                return "登录成功" + request.form.get("password")
            else:
                return render_template("reg.html", fm=fm)
    
    
    if __name__ == '__main__':
        app.run("0.0.0.0", 5000, debug=True)

    .html

    login
    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <form action="" method="post" novalidate>
        {{ fm.username.label }}
        {{ fm.username }}
        <p><h1>{{ fm.username.errors.0}}</h1></p>
        {{ fm.password.label }}
        {{ fm.password }}
        <p><h1>{{ fm.password.errors.0 }}</h1></p>
        <input type="submit" value="登录">
    </form>
    </body>
    </html>
    reg
    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <form action="" method="post" novalidate>
        {% for field in fm %}
            <p>{{ field.label }}{{ field }}{{ field.errors.0 }}</p>
        {% endfor %}
        <input type="submit" value="注册">
    </form>
    </body>
    </html>

    3.pool

    import pymysql
    from DBUtils.PooledDB import PooledDB
    
    
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,
        # ping MySQL服务端,检查是否服务可用。
        #  如:0 = None = never,
        # 1 = default = whenever it is requested,
        # 2 = when a cursor is created,
        # 4 = when a query is executed,
        # 7 = always
        host="127.0.0.1",
        port=3306,
        user="root",
        password="",
        charset="utf8",
        db="day"
    )
    
    conn = POOL.connection() # pymysql - conn
    cur = conn.cursor(cursor=pymysql.cursors.DictCursor) #游标,用于查询(里面的参数会将查询结果优化)
    sql = "select * from users WHERE name='jwb' and age=73"
    res = cur.execute(sql) #有几行响应
    print(res)
    print(cur.fetchall())
    conn.close()

    4.sqlhelper

    import pymysql
    from dbpool import POOL
    
    
    def create_conn():
        conn = POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        return conn, cursor
    
    
    def close_conn(conn, cursor):
        cursor.close()
        conn.close()
    
    
    def insert(sql, args):
        conn, cursor = create_conn()
        res = cursor.execute(sql, args)
        conn.commit()
        close_conn(conn, cursor)
        return res
    
    
    def fetch_one(sql, args):
        conn, cursor = create_conn()
        cursor.execute(sql, args)
        res = cursor.fetchone()
        close_conn(conn, cursor)
        return res
    
    
    def fetch_all(sql, args):
        conn, cursor = create_conn()
        cursor.execute(sql, args)
        res = cursor.fetchall()
        close_conn(conn, cursor)
        return res
    
    
    sql = "insert into users(name, age) VALUES (%s, %s)"
    insert(sql, ("mjj", 9))

    5.websocket握手

    import socket, base64, hashlib
    
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('127.0.0.1', 9527))
    sock.listen(5)
    # 获取客户端socket对象
    conn, address = sock.accept()
    # 获取客户端的【握手】信息
    data = conn.recv(1024)
    print(data)
    """
    GET /ws HTTP/1.1
    
    Host: 127.0.0.1:9527
    
    Connection: Upgrade
    
    Pragma: no-cache
    
    Cache-Control: no-cache
    
    User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.81 Safari/537.36
    
    Upgrade: websocket
    
    Origin: http://localhost:63342
    
    Sec-WebSocket-Version: 13
    
    Accept-Encoding: gzip, deflate, br
    
    Accept-Language: zh-CN,zh;q=0.9
    
    Cookie: session=d3f07db2-940b-4ee3-adfb-cdd7956dd921
    
    Sec-WebSocket-Key: JScC6nJU67n3fwkp2LdA0w==
    
    Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
    
    
    """
    # 以下动作是有websockethandler完成的
    # magic string为:258EAFA5-E914-47DA-95CA-C5AB0DC85B11
    
    
    def get_headers(data):
        header_dict = {}
        header_str = data.decode("utf8")
        for i in header_str.split("
    "):
            if str(i).startswith("Sec-WebSocket-Key"):
                header_dict["Sec-WebSocket-Key"] = i.split(":")[1].strip()
    
        return header_dict
    
    
    headers = get_headers(data)  # 提取请求头信息
    #
    magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
    #Sec-WebSocket-Key: MAZZU5DPIxWmhk/UWL2+BA==
    value = headers['Sec-WebSocket-Key'] + magic_string
    print(value)
    ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest())
    
    # 对请求头中的sec-websocket-key进行加密
    response_tpl = "HTTP/1.1 101 Switching Protocols
    " 
                   "Upgrade:websocket
    " 
                   "Connection: Upgrade
    " 
                   "Sec-WebSocket-Accept: %s
    " 
                   "WebSocket-Location: ws://127.0.0.1:9527
    
    "
    print(ac.decode('utf-8'))
    response_str = response_tpl % (ac.decode('utf-8'))
    # 响应【握手】信息
    conn.send(response_str.encode("utf8"))
    #
    while True:
        msg = conn.recv(8096)
        print(msg)

    6.解密

    #b'x81x89xf3x99x81-x15x05x01xcbOx1bex97]'
    #b'x81x85sx92ax10x1bxf7
    |x1c'
    #b'x81x83Hxc0xxa6yxf2K'
    
    hashstr = b'x81x85sx92ax10x1bxf7
    |x1c'
    # b'x81 x85s x92ax10x1bxf7  
    |x1c' <126
    # x85s = 5
    hashstr = b'x81xfex02xdcx8dxe8-xb2hmxa5W5uxc8:x16x0cx95(ktx87Wx00bxc52x01x0cx95x1fdixbeW9Axcbx1cx0fx07x91>iSxa7W)Axc9
    x06x0cx95;h`xab]1dxca)x07
    x9a,j~x9fW1bxc2x0ex01x0ex80x16eGxb7Wx00Yxcb2(
    x80*iRx8cV4cxcax15x06x0cx94-nhxafU	^xc9x0cx00
    xa0x19iQxa6Z
    Kxc9
    x00x0exaa:iRxa3Wx0bmxc2x0ex01
    x92x12hWxbaV4cxc8x11&
    x92*eRx86V7fxc8x16x1bx00xad7bTxa1Ux16~xc5
    0
    xa8:hPxb0V4cxcbx1cx07x01xac5bTxa1T!Zxcb8(x0cx949iRxa3[x14sxc9
    x06x0cx94-nhxafZ"rxc8x1cx11
    x912hTx8dWx11Kxc8"!x07x91>iSx88Wx08axc87x05
    x95/dixbaW3_xc2x0ex01x0exacx10hTxb5W2x7fxc8x11&x0cx949kXxb9]1dxc9
    x00
    x83.hNxa9Z
    Bxc5=?x00xbb6bTxa1W1}xc8$6
    x89x03iQxa4]1dxc9	(
    x8c,hWx8dZ=gxc9x0bx06x00x9ax1diQxb2Q
    jxc8x1c&x0cx95x1fhRxb1V5Exc2x0ex01x0cx92x03iPx97V5hxc9x0fx1ex07x91>dqxb2U0rxc55*
    xbdx14bTxa1V5exc8x1cx11
    x910hxxa1Q
    jxc59(x0exb1;iUxb1W(Pxca8"x0fx8a#hgxa7V5Rxc8
    -
    xbb6ehxa8]1dxc8x1cx11x0cx96*ktxa4Wx02Pxc5x1c7
    xa8x04h`xbcZ8gxc2x0ex01x0cx96x17kpx80[x14sxc9
    x06
    x94x01kpxa3V4cxca"x0bx07x91>iPxa0W#txc83x02x0fx8a3bTxa1V0Wxc84x08
    x89$hTxafT>}xc9x0bx12x0bxad0iVxa0V5Exce2x0cx0cx93?dkxa3[x0eExcb&5x0cx949nhxacZ9Qxcax17x03x0bxad3eyx8eWx08ixcax1fx04x07x91>kEx89Ux17nxc5;"
    x83,bTxa1W2x7fxc5+x1c
    x92x12jRx82]1dxcb*"x0cx96x17hmxa5W5uxcax1c
    x0exa6&iSx88[x0cx7fxc4+x16x0cx959nhxafT	rxc9	(x0cx95x08hFx86V5Exc9x0bx06x0cx979bTxa1V7cxcb%-
    x89x15hXxa2]1dxcb0x04x0cx96x17hzx85V4cxc2x0ex01x0fxa9x04hxxa3Tx1bUxc5x13x01x07x91>hWxa8Zx0eUxc5x11%x00x8cx17dpxb4T1gxc2x0ex01x0exb1;kaxadW4Wxca)x07x0bxad0'
    # print(chushibiao[1],chushibiao[1]&127)
    # print(chushibiao[2:4],chushibiao[4:8])
    # 将第二个字节也就是 x83 第9-16位 进行与127进行位运算
    payload = hashstr[1] & 127
    print(payload)
    if payload == 127:
        extend_payload_len = hashstr[2:10]
        mask = hashstr[10:14]
        decoded = hashstr[14:]
    # 当位运算结果等于127时,则第3-10个字节为数据长度
    # 第11-14字节为mask 解密所需字符串
    # 则数据为第15字节至结尾
    
    if payload == 126:
        extend_payload_len = hashstr[2:4]
        mask = hashstr[4:8]
        decoded = hashstr[8:]
    # 当位运算结果等于126时,则第3-4个字节为数据长度
    # 第5-8字节为mask 解密所需字符串
    # 则数据为第9字节至结尾
    
    
    if payload <= 125:
        extend_payload_len = None
        mask = hashstr[2:6]
        decoded = hashstr[6:]
    
    # 当位运算结果小于等于125时,则这个数字就是数据的长度
    # 第3-6字节为mask 解密所需字符串
    # 则数据为第7字节至结尾
    
    str_byte = bytearray()
    # b'x81 x85s x92ax10x1b xf7
    |x1c' <126
    for i in range(len(decoded)): # 0  xf7 ^ x92a 1 
     ^ x10 x1c ^ x1b
        byte = decoded[i] ^ mask[i % 4]
        str_byte.append(byte)
    
    print(str_byte.decode("utf8"))

    7.加密

    import struct
    msg_bytes = "the emperor has not been half-baked in the early days of the collapse of the road, today down three points, yizhou weakness, this serious crisis autumn".encode("utf8")
    token = b"x81" # + 数据长度/运算位 + mask/数据长度 + mask/数据 + 数据
    length = len(msg_bytes)
    
    if length < 126:
        token += struct.pack("B", length)
    elif length == 126:
        token += struct.pack("!BH", 126, length)
    else:
        token += struct.pack("!BQ", 127, length)
    
    msg = token + msg_bytes
    
    print(msg)
  • 相关阅读:
    omnibus gitlab-ce安装
    Helm
    pod状态为Back-off
    我的博客即将入驻“云栖社区”,诚邀技术同仁一同入驻。
    云主机搭建Kubernetes 1.10集群
    Linux清除Windows密码
    Nginx负载均衡之健康检查
    MariaDB主从复制搭建
    Python基础
    Tomcat URL重写
  • 原文地址:https://www.cnblogs.com/qq849784670/p/10274068.html
Copyright © 2011-2022 走看看