服务端文件:
import base64
import hashlib
import socket
# 将请求头格式化成字典
def get_headers(data):
"""
将请求头格式化成字典
:param data:
:return:
"""
header_dict = {}
data = str(data, encoding='utf-8')
# for i in data.split('
'):
# print(i)
header, body = data.split('
', 1)
header_list = header.split('
')
for i in range(0, len(header_list)):
if i == 0:
if len(header_list[i].split(' ')) == 3:
header_dict['method'], header_dict['url'], header_dict['protocol'] = header_list[i].split(' ')
else:
k, v = header_list[i].split(':', 1)
header_dict[k] = v.strip()
return header_dict
def run():
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
sock.bind(("127.0.0.1",8002))
sock.listen(5)
conn,address = sock.accept()
data = conn.recv(1024)
## data的数据:
# print(data.decode("utf-8"))
# E:PythonPublicScriptspython.exe C:/Users/Administrator/Desktop/webSocket/demo/web.py
# GET / HTTP/1.1
# Host: 127.0.0.1:8002
# Connection: Upgrade
# Pragma: no-cache
# Cache-Control: no-cache
# User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36
# Upgrade: websocket
# Origin: http://localhost:63342
# Sec-WebSocket-Version: 13
# Accept-Encoding: gzip, deflate, br
# Accept-Language: zh-CN,zh;q=0.9
# Sec-WebSocket-Key: AeywaspR+reHpMX8RK2KBw==
# Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
# 将请求头格式化成字典 {'method': 'GET', 'url': '/', 'protocol': 'HTTP/1.1', 'Host': '127.0.0.1:8002', 'Connection': 'Upgrade',。。。
headers = get_headers(data)
# 标准加密字符串
magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
# 对请求头中的Sec-WebSocket-Key进行加密
value = headers['Sec-WebSocket-Key'] + magic_string # 拼接
ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest()) # hashlib + base64 加密
response_tpl = "HTTP/1.1 101 Switching Protocols
"
"Upgrade:websocket
"
"Connection: Upgrade
"
"Sec-WebSocket-Accept: %s
"
"WebSocket-Location: ws://%s%s
"
# 定义返回数据 响应头格式
response_str = response_tpl % (ac.decode('utf-8'), headers['Host'], headers['url'])
# 返回加密随机字符串返回
conn.send(bytes(response_str, encoding='utf-8'))
# 循环接收
while True:
try:
info = conn.recv(8096)
except Exception as e:
info = None
if not info:
break
# 获取到后7位 获取到126 127 《=125 都是不同的结果
payload_len = info[1] & 127
if payload_len == 126:
extend_payload_len = info[2:4]
mask = info[4:8]
decoded = info[8:]
elif payload_len == 127:
extend_payload_len = info[2:10]
mask = info[10:14]
decoded = info[14:]
else:
extend_payload_len = None
mask = info[2:6]
decoded = info[6:]
bytes_list = bytearray()
for i in range(len(decoded)):
chunk = decoded[i] ^ mask[i % 4]
bytes_list.append(chunk)
body = str(bytes_list, encoding='utf-8')
print("解密结果", body)
if __name__ == '__main__':
run()
前端文件:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
</body>
</html>
<script>
// 自动创建随机字符串 将其发送给服务端 密文校验
var ws = new WebSocket("ws://127.0.0.1:8002/")
</script>