自己写的才好用
import asyncio
import aiohttp
import time
# 协程函数-->负责发请求
async def func(url):
full_url = f"{url}/login"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
}
data = {
"userName": "admin",
"password": "123456"
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(full_url, headers=headers, data=data, verify_ssl=False,timeout=5) as response:
data_dict = await response.json()
except Exception as e:
print(f"{url} <===请求失败===>")
return
if data_dict.get("code") == 200 and data_dict.get("msg") == None:
msg = f"{url} ===> 存在弱口令"
print(msg)
return msg
else:
print(f"{url} <===密码错误===>")
# 主函数,负责调度执行任务函数
async def main():
# 创建任务列表
with open('ips.txt', mode='r', encoding='u8') as f1, open('结果.txt', mode='w', encoding='u8') as f2:
task_list = []
for i in f1:
i = i.strip()
if 'http' not in i:
i = f"http://{i}"
task_list.append(asyncio.create_task(func(i)))
l = await asyncio.gather(*task_list)
for el in l:
if el: f2.write(f"{el}
")
if __name__ == '__main__':
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
多线程版本
import requests
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
def save_file(msg):
with open('res.txt', mode='a', encoding="utf-8") as f:
f.write(f"{msg}
")
def check_pass(lock, q):
url = q.get()
full_url = f"{url}/login"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
}
data = {
"userName": "admin",
"password": "123456"
}
try:
res = requests.post(url=full_url, headers=headers, data=data)
data_dict = res.json()
except Exception as e:
print("<===请求失败===>")
return
if data_dict.get("code") == 200 and data_dict.get("msg") == None:
msg = f"{url} ===> 存在弱口令"
print(msg)
with lock:
save_file(msg)
else:
print("<===密码错误===>")
def main():
lock = threading.RLock()
q = queue.Queue()
with ThreadPoolExecutor() as pool:
with open('ips.txt', mode='r', encoding='utf-8') as f:
for i in f:
line = i.strip()
if "http" not in line:
line = f"http://{line}"
q.put(line)
pool.submit(check_pass, lock, q)
if __name__ == '__main__':
main()
协程版本
import asyncio
import aiofiles
import aiohttp
async def save_file(msg):
async with aiofiles.open('res1.txt', mode='a', encoding="utf-8") as f:
await f.write(f"{msg}
")
async def check_pass(url):
full_url = f"{url}/login"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
}
data = {
"userName": "admin",
"password": "123456"
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(full_url, headers=headers, data=data, verify_ssl=False) as response:
data_dict = await response.json()
except Exception as e:
print("<===请求失败===>")
return
if data_dict.get("code") == 200 and data_dict.get("msg") == None:
msg = f"{url} ===> 存在弱口令"
print(msg)
await save_file(msg)
else:
print("<===密码错误===>")
async def main():
async with aiofiles.open("ips.txt", mode="r", encoding="utf-8") as f:
async for i in f:
line = i.strip()
if "http" not in line:
line = f"http://{line}"
await check_pass(line)
if __name__ == '__main__':
fiberObject = main()
asyncio.run(fiberObject)
携程最终版本
import asyncio
import aiofiles
import aiohttp
async def save_file(msg):
async with aiofiles.open('res1.txt', mode='a', encoding="utf-8") as f:
await f.write(f"{msg}
")
async def check_pass(url):
full_url = f"{url}/login"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
}
data = {
"userName": "admin",
"password": "123456"
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(full_url, headers=headers, data=data, verify_ssl=False) as response:
data_dict = await response.json()
except Exception as e:
print("<===请求失败===>")
return
if data_dict.get("code") == 200 and data_dict.get("msg") == None:
msg = f"{url} ===> 存在弱口令"
print(msg)
await save_file(msg)
else:
print("<===密码错误===>")
async def main():
task_list = []
async with aiofiles.open("ips.txt", mode="r", encoding="utf-8") as f:
async for i in f:
line = i.strip()
if "http" not in line:
line = f"http://{line}"
# create_task时,将会自动执行任务,我们需要将它防止在任务队列中
task_list.append(asyncio.create_task(check_pass(line)))
# await asyncio.wait(task_list)
# 指的是等待所有的任务运行完成后,继续运行主线程‘
# 它将返回2个集合,分别是已完成任务的集合和未完成任务的集合
done, pending = await asyncio.wait(task_list)
print(done)
print(pending)
if __name__ == '__main__':
import time
start = time.time()
asyncio.run(main())
end = time.time()
print(end - start)
import requests
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
def save_file(msg):
with open('res.txt', mode='a', encoding="utf-8") as f:
f.write(f"{msg}
")
def check_pass(lock, url):
full_url = f"{url}/login"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
}
data = {
"userName": "admin",
"password": "123456"
}
try:
res = requests.post(url=full_url, headers=headers, data=data)
data_dict = res.json()
except Exception as e:
print("<===请求失败===>")
return
if data_dict.get("code") == 200 and data_dict.get("msg") == None:
msg = f"{url} ===> 存在弱口令"
print(msg)
with lock:
save_file(msg)
else:
print("<===密码错误===>")
def main():
lock = threading.RLock()
q = queue.Queue()
with ThreadPoolExecutor() as pool:
with open('ips.txt', mode='r', encoding='utf-8') as f:
for i in f:
line = i.strip()
if "http" not in line:
line = f"http://{line}"
q.put(line)
while q.qsize():
url = q.get()
pool.submit(check_pass, lock, url)
if __name__ == '__main__':
import time
start = time.time()
main()
end = time.time()
print(end - start)