zoukankan      html  css  js  c++  java
  • websockert工作原理

    转自DragenFire

    手写websocket握手

    import socket, base64, hashlib
    
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('127.0.0.1', 9527))
    sock.listen(5)
    # 获取客户端socket对象
    conn, address = sock.accept()
    # 获取客户端的【握手】信息
    data = conn.recv(1024)
    print(data)
    """
    b'GET /ws HTTP/1.1
    
    Host: 127.0.0.1:9527
    
    User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0
    
    Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
    
    Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2
    
    Accept-Encoding: gzip, deflate
    
    Sec-WebSocket-Version: 13
    
    Origin: http://localhost:63342
    
    Sec-WebSocket-Extensions: permessage-deflate
    
    Sec-WebSocket-Key: jocLOLLq1BQWp0aZgEWL5A==
    
    Cookie: session=6f2bab18-2dc4-426a-8f06-de22909b967b
    
    Connection: keep-alive, Upgrade
    
    Pragma: no-cache
    
    Cache-Control: no-cache
    
    Upgrade: websocket
    
    '
    """
    
    # magic string为:258EAFA5-E914-47DA-95CA-C5AB0DC85B11
    magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
    
    
    def get_headers(data):
        header_dict = {}
        header_str = data.decode("utf8")
        for i in header_str.split("
    "):
            if str(i).startswith("Sec-WebSocket-Key"):
                header_dict["Sec-WebSocket-Key"] = i.split(":")[1].strip()
    
        return header_dict
    
    
    def get_header(data):
        """
         将请求头格式化成字典
         :param data:
         :return:
         """
        header_dict = {}
        data = str(data, encoding='utf-8')
    
        header, body = data.split('
    
    ', 1)
        header_list = header.split('
    ')
        for i in range(0, len(header_list)):
            if i == 0:
                if len(header_list[i].split(' ')) == 3:
                    header_dict['method'], header_dict['url'], header_dict['protocol'] = header_list[i].split(' ')
            else:
                k, v = header_list[i].split(':', 1)
                header_dict[k] = v.strip()
        return header_dict
    
    
    headers = get_headers(data)  # 提取请求头信息
    # 对请求头中的sec-websocket-key进行加密
    response_tpl = "HTTP/1.1 101 Switching Protocols
    " 
                   "Upgrade:websocket
    " 
                   "Connection: Upgrade
    " 
                   "Sec-WebSocket-Accept: %s
    " 
                   "WebSocket-Location: ws://127.0.0.1:9527
    
    "
    
    value = headers['Sec-WebSocket-Key'] + magic_string
    print(value)
    ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest())
    response_str = response_tpl % (ac.decode('utf-8'))
    # 响应【握手】信息
    conn.send(response_str.encode("utf8"))
    
    while True:
        msg = conn.recv(8096)
        print(msg)
    

    解密过程:

    # b'x81x83xceHxb6x85xffzx85'
    
    hashstr = b'x81x83xceHxb6x85xffzx85'
    # b'x81    x83    xceHxb6x85xffzx85'
    
    # 将第二个字节也就是 x83 第9-16位 进行与127进行位运算
    payload = hashstr[1] & 127
    print(payload)
    if payload == 127:
        extend_payload_len = hashstr[2:10]
        mask = hashstr[10:14]
        decoded = hashstr[14:]
    # 当位运算结果等于127时,则第3-10个字节为数据长度
    # 第11-14字节为mask 解密所需字符串
    # 则数据为第15字节至结尾
    
    if payload == 126:
        extend_payload_len = hashstr[2:4]
        mask = hashstr[4:8]
        decoded = hashstr[8:]
    # 当位运算结果等于126时,则第3-4个字节为数据长度
    # 第5-8字节为mask 解密所需字符串
    # 则数据为第9字节至结尾
    
    
    if payload <= 125:
        extend_payload_len = None
        mask = hashstr[2:6]
        decoded = hashstr[6:]
    
    # 当位运算结果小于等于125时,则这个数字就是数据的长度
    # 第3-6字节为mask 解密所需字符串
    # 则数据为第7字节至结尾
    
    str_byte = bytearray()
    
    for i in range(len(decoded)):
        byte = decoded[i] ^ mask[i % 4]
        str_byte.append(byte)
    
    print(str_byte.decode("utf8"))
    

    加密过程:

    import struct
    msg_bytes = "hello".encode("utf8")
    token = b"x81"
    length = len(msg_bytes)
    
    if length < 126:
        token += struct.pack("B", length)
    elif length == 126:
        token += struct.pack("!BH", 126, length)
    else:
        token += struct.pack("!BQ", 127, length)
    
    msg = token + msg_bytes
    
    print(msg)
  • 相关阅读:
    Mac上的USB存储设备使用痕迹在新版操作系统有所变化
    Beware of the encrypted VM
    A barrier for Mobile Forensics
    Second Space could let suspect play two different roles easily
    Take advantage of Checkra1n to Jailbreak iDevice for App analysis
    Find out "Who" and "Where"
    Where is the clone one and how to extract it?
    Downgrade extraction on phones running Android 7/8/9
    高版本安卓手机的取证未来
    How to extract WeChat chat messages from a smartphone running Android 7.x or above
  • 原文地址:https://www.cnblogs.com/Wj-Li/p/10864900.html
Copyright © 2011-2022 走看看