导入python自带的hmac模块对随机生成的位数32字节和自定义token进行加密处理
import os
msg = os.urandom(32) #随机生成
msg
Out[4]: b"F5xfaDx19xfc]x0f '>V/xf8x08}x1fx1axefRxdcx08xd6Qx1e3
Txe4Jx08xcf"
Server端处理:
a. 服务端产生一个新的连接,对客户端发送一个32位字符串
b. hmac对token加密处理后接受客户端返回值进行比较
from socket import * import hmac,os my_token = b'louis_swift' """ validate the connection """ def conn_auth(conn): print('validating connection...') msg = os.urandom(32) conn.sendall(msg) h = hmac.new(my_token, msg) digest = h.digest() response = conn.recv(len(digest)) return hmac.compare_digest(response, digest) def data_handler(conn, bufsize=1024): if not conn_auth(conn): print('Invalid connection') conn.close() return while True: data = conn.recv(bufsize) if not data: break conn.sendall(data.upper()) def server_handler(ip_port, bufsize, back_log=5): socket_tcp_server = socket(AF_INET, SOCK_STREAM) socket_tcp_server.bind(ip_port) socket_tcp_server.listen(back_log) while True: conn, addr = socket_tcp_server.accept() data_handler(conn, bufsize) if __name__ == '__main__': ip_port = ('127.0.0.1', 8080) bufsize = 1024 server_handler(ip_port, bufsize)
Client端处理:
接收服务端的返回值进行hmac加密处理后再发送给服务端进行校验
from socket import * import hmac,os my_token = b'louis_swift' def conn_auth(conn): msg = conn.recv(32) h = hmac.new(my_token, msg) digest = h.digest() conn.send(digest) def client_handler(ip_port, bufsize=1024): socket_tcp_client = socket(AF_INET, SOCK_STREAM) socket_tcp_client.connect(ip_port) conn_auth(socket_tcp_client) while True: data = input('>>: ').strip() if not data: continue if data == 'quit': break socket_tcp_client.sendall(data.encode('utf-8')) response = socket_tcp_client.recv(bufsize) print('response data from server:', response) socket_tcp_client.close() if __name__ == '__main__': ip_port = ('127.0.0.1', 8080) bufsize = 1024 client_handler(ip_port, bufsize)