zoukankan      html  css  js  c++  java
  • 批量弱口令升级版本

    自己写的才好用

    import asyncio
    import aiohttp
    import time
    
    
    # 协程函数-->负责发请求
    async def func(url):
        full_url = f"{url}/login"
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
        }
        data = {
            "userName": "admin",
            "password": "123456"
        }
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(full_url, headers=headers, data=data, verify_ssl=False,timeout=5) as response:
                    data_dict = await response.json()
        except Exception as e:
            print(f"{url} <===请求失败===>")
            return
        if data_dict.get("code") == 200 and data_dict.get("msg") == None:
            msg = f"{url} ===> 存在弱口令"
            print(msg)
            return msg
        else:
            print(f"{url} <===密码错误===>")
    
    
    # 主函数,负责调度执行任务函数
    async def main():
        # 创建任务列表
        with open('ips.txt', mode='r', encoding='u8') as f1, open('结果.txt', mode='w', encoding='u8') as f2:
            task_list = []
            for i in f1:
                i = i.strip()
                if 'http' not in i:
                    i = f"http://{i}"
                task_list.append(asyncio.create_task(func(i)))
            l = await asyncio.gather(*task_list)
            for el in l:
                if el: f2.write(f"{el}
    ")
    
    
    if __name__ == '__main__':
        start_time = time.time()
        asyncio.run(main())
        print(time.time() - start_time)
    
    

    多线程版本

    import requests
    from concurrent.futures import ThreadPoolExecutor
    import queue
    import threading
    
    
    def save_file(msg):
        with open('res.txt', mode='a', encoding="utf-8") as f:
            f.write(f"{msg}
    ")
    
    
    def check_pass(lock, q):
        url = q.get()
        full_url = f"{url}/login"
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
        }
        data = {
            "userName": "admin",
            "password": "123456"
        }
        try:
            res = requests.post(url=full_url, headers=headers, data=data)
            data_dict = res.json()
        except Exception as e:
            print("<===请求失败===>")
            return
    
        if data_dict.get("code") == 200 and data_dict.get("msg") == None:
            msg = f"{url} ===> 存在弱口令"
            print(msg)
            with lock:
                save_file(msg)
    
        else:
            print("<===密码错误===>")
    
    
    def main():
        lock = threading.RLock()
        q = queue.Queue()
        with ThreadPoolExecutor() as pool:
            with open('ips.txt', mode='r', encoding='utf-8') as f:
                for i in f:
                    line = i.strip()
                    if "http" not in line:
                        line = f"http://{line}"
                    q.put(line)
                    pool.submit(check_pass, lock, q)
    
    
    if __name__ == '__main__':
        main()
    
    

    协程版本

    import asyncio
    import aiofiles
    import aiohttp
    
    
    async def save_file(msg):
        async with aiofiles.open('res1.txt', mode='a', encoding="utf-8") as f:
            await f.write(f"{msg}
    ")
    
    
    async def check_pass(url):
        full_url = f"{url}/login"
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
        }
        data = {
            "userName": "admin",
            "password": "123456"
        }
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(full_url, headers=headers, data=data, verify_ssl=False) as response:
                    data_dict = await response.json()
        except Exception as e:
            print("<===请求失败===>")
            return
    
        if data_dict.get("code") == 200 and data_dict.get("msg") == None:
            msg = f"{url} ===> 存在弱口令"
            print(msg)
            await save_file(msg)
    
        else:
            print("<===密码错误===>")
    
    
    async def main():
        async with aiofiles.open("ips.txt", mode="r", encoding="utf-8") as f:
            async for i in f:
                line = i.strip()
                if "http" not in line:
                    line = f"http://{line}"
                await check_pass(line)
    
    
    if __name__ == '__main__':
    
        fiberObject = main()
        asyncio.run(fiberObject)
    
    

    携程最终版本

    import asyncio
    import aiofiles
    import aiohttp
    
    
    async def save_file(msg):
        async with aiofiles.open('res1.txt', mode='a', encoding="utf-8") as f:
            await f.write(f"{msg}
    ")
    
    
    async def check_pass(url):
        full_url = f"{url}/login"
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
        }
        data = {
            "userName": "admin",
            "password": "123456"
        }
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(full_url, headers=headers, data=data, verify_ssl=False) as response:
                    data_dict = await response.json()
        except Exception as e:
            print("<===请求失败===>")
            return
    
        if data_dict.get("code") == 200 and data_dict.get("msg") == None:
            msg = f"{url} ===> 存在弱口令"
            print(msg)
            await save_file(msg)
    
        else:
            print("<===密码错误===>")
    
    
    async def main():
        task_list = []
        async with aiofiles.open("ips.txt", mode="r", encoding="utf-8") as f:
            async for i in f:
                line = i.strip()
                if "http" not in line:
                    line = f"http://{line}"
                # create_task时,将会自动执行任务,我们需要将它防止在任务队列中
                task_list.append(asyncio.create_task(check_pass(line)))
        # await asyncio.wait(task_list)
        # 指的是等待所有的任务运行完成后,继续运行主线程‘
        # 它将返回2个集合,分别是已完成任务的集合和未完成任务的集合
        done, pending = await asyncio.wait(task_list)
        print(done)
        print(pending)
    
    if __name__ == '__main__':
        import time
        start = time.time()
        asyncio.run(main())
        end = time.time()
        print(end - start)
    
    
    
    import requests
    from concurrent.futures import ThreadPoolExecutor
    import queue
    import threading
    
    
    def save_file(msg):
        with open('res.txt', mode='a', encoding="utf-8") as f:
            f.write(f"{msg}
    ")
    
    
    def check_pass(lock, url):
        full_url = f"{url}/login"
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
        }
        data = {
            "userName": "admin",
            "password": "123456"
        }
        try:
            res = requests.post(url=full_url, headers=headers, data=data)
            data_dict = res.json()
        except Exception as e:
            print("<===请求失败===>")
            return
    
        if data_dict.get("code") == 200 and data_dict.get("msg") == None:
            msg = f"{url} ===> 存在弱口令"
            print(msg)
            with lock:
                save_file(msg)
    
        else:
            print("<===密码错误===>")
    
    
    def main():
        lock = threading.RLock()
        q = queue.Queue()
        with ThreadPoolExecutor() as pool:
            with open('ips.txt', mode='r', encoding='utf-8') as f:
                for i in f:
                    line = i.strip()
                    if "http" not in line:
                        line = f"http://{line}"
                    q.put(line)
            while q.qsize():
                url = q.get()
                pool.submit(check_pass, lock, url)
    
    
    if __name__ == '__main__':
        import time
        start = time.time()
        main()
        end = time.time()
        print(end - start)
    
  • 相关阅读:
    github设置添加SSH
    pythonanywhere笔记
    Python3x 爬取妹子图
    python3.4 百度API接口
    简易博客开发(8)----django1.9 博客部署到pythonanywhere上
    Python3.4+Django1.9+Bootstrap3
    docker搭建私有仓库之harbor
    docker新手常见问题和知识点
    node之sinopia搭建本地npm仓库
    rancher-HA快速搭建
  • 原文地址:https://www.cnblogs.com/xcymn/p/15336360.html
Copyright © 2011-2022 走看看