zoukankan      html  css  js  c++  java
  • 2.工具开发端口扫描工具

    工具开发-端口扫描工具-多线程-协程

    背景
    有时候,在进行网络相关的研究的时候,我们需要执行一些有目的的参数测量。而端口扫描就是其中比较普遍也比较重要的一项。所谓的端口扫描,就是指通过TCP握手或者别的方式来判别一个给定主机上的某些端口是否处理开放,或者说监听的状态。现有的使用比较广泛的端口扫描工具是nmap。毋庸置疑,nmap是一款非常强大且易于使用的软件。但nmap是一款运行于terminal中的软件,有时在别的代码中调用并不是很方便,甚至没有相应的库。另外,nmap依赖的其他库较多,在较老的系统中可能无法使用较新的nmap,这样会造成扫描的不便。另外,nmap在扫描时需要root权限。

    参考
    链接:https://www.jianshu.com/p/b1994a370660
    https://github.com/windard/Port_Scan

    • 每个进程背后都有一个端口
    • 入侵者通常会用扫描器对目标主机的端口进行扫描,以确定哪些端口是开放的,从开放的端口,入侵者可以知道目标主机大致提供了哪些服务,进而猜测可能存在的漏洞
    • 因此对端口的扫描可以帮助我们更好的了解目标主机。而对应管理员,扫描本机的开放端口也是做好安全防范的第一步

    为什么有那么多开源工具,我们为什么还要花费大量精力取开发工具呢?nmap
    不好定制化,比如我们需要发送一个邮件,企业微信,钉钉,开源工具做不了
    首先你的工具是否支持这个功能,还有功能跟web平台集成就无法搞

    可以参考站长工具中的端口扫描进行开发
    图 1

    • 端口扫描原理:尝试与目标主机建立连接,如果目标主机有回复,则说明端口开放
    • TCP连接:这种方法使用三次握手与目标主机建立标准的TCP连接。但是这种放放风容易被发现,被目标主机记录。
    • 实现简单,对操作者的权限没有严格的要求

    设计方案规划

    • 1、建立TCP连接
    • 2、查看连接返回
    • 3、判断连接返回值
    • 4、循环扫描剩余的端口

    代码实现功能

    import socket
    #s1=socket.socket(family,type)
    TCP_sock =socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #family参数代码地址家族,可为AF_INET或AF_UNIX.
    #AF_INET家族包括internet地址,AF_UNIX家族用于同一台机器上的进程间通讯.
    #type参数代表套接字类型,可为SOCK_STREAM(流套接字,就是TCP套接字)和SOCK_DGRAM(数据报套接字,就是UDP套接字)
    #默认为family=AF_INET,type=SOCK_STREAM
    #返回一个整数描述符,用这个描述符来标识这个套接字
    
    客户端套接字
    s.connect() 主动初始化TCP服务器连接,一般address格式为元组(hostname,port),如果连接出错,返回socket.error错误
    
    s.connect_ex() connect()函数的扩展版本,出错时返回出错码,而不是抛出异常----使用这个
    

    在项目发布之前做一个端口扫描--为团队做一个工具

    如果连接正确,返回0
    图 2

    如果连接错误,会返回错误码
    请输入需要扫描的IP>>>192.168.222.128
    请输入需要扫描的port>>>6666
    10035

    Process finished with exit code 0

    V1.0实现一个端口扫描

    import socket
    def scan_tool_v1():
        #1-创建socket对象
        sk=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        sk.settimeout(0.5)
        #2-接收用户输入IP 和 port
        ip = input('请输入需要扫描的IP>>>')
        port = int(input('请输入需要扫描的port>>>').strip())
        #3-建立连接
        try:
            conn = sk.connect_ex((ip,port)) #有返回值,('127.0.0.1',int类型)
            #4-判断返回值
            if conn==0:
                print(f'主机:{ip},端口{port}已开放')
            else:
                print(f'主机:{ip},端口{port}未开放')
            #5-关闭连接
            sk.close()
        except:
            pass
    if __name__ == '__main__':
        scan_tool_v1()
    

    调试优化

    V2.0 循环扫描多个端口

        import socket
    def scan_tool_v2():
        # 2-接收用户输入IP 和 port
        ip = input('请输入需要扫描的IP>>>')
        ports = input('请输入需要扫描的ports(0-65535)>>>').strip()
        port_begin,port_end = ports.split('-')
        # 3-建立连接
        for port in range(int(port_begin),int(port_end)+1):
            try:
                # 1-创建socket对象
                sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sk.settimeout(0.5)
                conn = sk.connect_ex((ip,port)) #有返回值,('127.0.0.1',int类型)
                # 4-判断返回值
                if conn==0:
                    print(f'主机:{ip},端口{port}已开放')
                else:
                    print(f'主机:{ip},端口{port}未开放')
    
            except:
                pass
            # 5-关闭连接
            sk.close()
    if __name__ == '__main__':
        scan_tool_v2()
    

    V3.0 IP和域名判断

    -------------------------------------------

    """
    反馈:
    1、只能输入IP,不能使用域名进行扫描
    2、没有对IP做有效判断
    优化方案:
    1-IP有效判断
    2-进行端口扫描
    3-使用ip进行扫描
    4-使用域名进行扫描
    5-用户可以选择,逻辑规划
    6-域名有效判断
    192.168.222.128
    IPv4地址:((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})(.((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})){3}
    http://c.runoob.com/front-end/854 菜鸟
    """

    -------------------------------------------

    import socket
     1-IP有效判断
    def check_ip(ip):
        """
    
        :param ip: 输入IP地址
        :return: 返回IP判断结果
        """
        ip_address=re.compile('((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})(.((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})){3}')#表达式对象
        if ip_address.match(ip) and len(ip) !=0:
            return True
        else:
            return False
     6-域名有效判断
    def check_domain(domain):
        """
    
        :param domain: 输入域名
        :return: 返回domain判断结果
        """
        ip_address=re.compile('[a-zA-Z0-9][-a-zA-Z0-9]{0,62}(.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+.?')#表达式对象
        if ip_address.match(domain) and len(domain) !=0:
            return True
        else:
            return False
     2-定义端口扫描
    def scan_port(ip):
        port_begin,port_end = (9090,9100)
        # 3-建立连接
        for port in range(int(port_begin),int(port_end)+1):
            try:
                # 1-创建socket对象
                sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sk.settimeout(0.5)
                conn = sk.connect_ex((ip,port)) #有返回值,('127.0.0.1',int类型)
                # 4-判断返回值
                if conn==0:
                    print(f'主机:{ip},端口{port}已开放')
                else:
                    pass
    
            except:
                pass
            # 5-关闭连接
            sk.close()
     3-使用ip进行扫描
    def scan_ip(ip):
        if check_ip(ip):
            #进行端口扫描
            scan_port(ip)
        else:
            print('ip输入有误')
    
     4-使用域名进行扫描
    def scan_domain(domain_name):
        # 1-通过正则过滤域名
        if 'http://' in domain_name or 'https://' in domain_name:
            domain_name=re.findall(r'://(.*)',domain_name)[0]
        if check_domain(domain_name):
            # 2-通过域名解析出IP地址
            server_ip= socket.gethostbyname(domain_name)
            # 3-进行端口扫描
            scan_port(server_ip)
        else:
            print('域名输入有误')
    
     5-用户可以选择,逻辑规划
    def main():
        # 1-提示信息
        info="""
            1.请输入IP
            2.请输入域名   
        """
        print(info)
        # 2-接收用户的输入
        select = input('请输入>>> ')
        # 3-判断用户的输入,选择对应的扫描方式
        if select =='1':
            ip=input('请输入IP>>> ')
            scan_ip(ip)
        elif select=='2':
            domain = input('请输入域名>>> ')
            scan_domain(domain)
        else:
            print('扫描方式输入有误')
    

    V4.0多线程

    #-----------------v4.0----------------------
    
    
    """
    反馈:
        1、只能输入IP,不能使用域名进行扫描
        2、没有对IP做有效判断
    优化方案:
        1-IP有效判断
        2-进行端口扫描
        3-使用ip进行扫描
        4-使用域名进行扫描
        5-用户可以选择,逻辑规划
        6-域名有效判断
        192.168.222.128
        IPv4地址:((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})(.((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})){3}
        http://c.runoob.com/front-end/854    菜鸟
        7-使用多线程
    """
    #-------------------------------------------
    import socket
    #1-IP有效判断
    def check_ip(ip):
        """
    
        :param ip: 输入IP地址
        :return: 返回IP判断结果
        """
        ip_address=re.compile('((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})(.((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})){3}')#表达式对象
        if ip_address.match(ip) and len(ip) !=0:
            return True
        else:
            return False
    #6-域名有效判断
    def check_domain(domain_name):
        """
    
        :param domain: 输入域名
        :return: 返回domain判断结果
        """
        domain=re.compile('[a-zA-Z0-9][-a-zA-Z0-9]{0,62}(.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+.?')#表达式对象
        if domain.match(domain_name) and len(domain_name) !=0:
            return True
        else:
            return False
    #2-定义端口扫描--只扫一个
    def scan_port(ip,port):
        port_begin,port_end = (9090,9100)
        # 3-建立连接
        try:
            # 1-创建socket对象
            sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sk.settimeout(0.1)
            conn = sk.connect_ex((ip,port)) #有返回值,('127.0.0.1',int类型)
            # 4-判断返回值
            if conn==0:
                print(f'主机:{ip},端口{port}已开放')
            else:
                pass
    
        except:
            pass
        # 5-关闭连接
        sk.close()
    #3-使用ip进行扫描
    
    def threading_scan_port(ip):
        start_time =time.time()
        threads=[]#存储线程组
        #创建线程组
        for one in range(1,65535+1):
            t =threading.Thread(target=scan_port,args=(ip,one))
            threads.append(t)
    
            t.setDaemon(True)
    
            t.start()
    
        # #启动线程组
        # for i in range(65535):
        #     threads[i].start()
        # #阻塞线程
        # for i in range(65535):
        #     threads[i].join()
    
        end_time=time.time()
        print('端口扫描总共耗时>>> ',end_time-start_time)
    
    def scan_ip(ip):
            #进行端口扫描
            threading_scan_port(ip)
    
    #4-使用域名进行扫描
    def scan_domain(domain_name):
        # 1-通过正则过滤域名
        if 'http://' in domain_name or 'https://' in domain_name:
            domain_name = re.findall(r'://(.*)', domain_name)[0]
            # 2-通过域名解析出IP地址
        server_ip= socket.gethostbyname(domain_name)
        # 3-进行端口扫描
        threading_scan_port(server_ip)
    
    #5-不需要用户选择,逻辑规划
    def main():
        # 1-提示信息
        info="""
            请输入IP或者域名  
        """
        # 2-接收用户的输入
        info = input('请输入IP或者域名>>> ')
        # 3-判断用户的输入,选择对应的扫描方式
        if check_ip(info):
            scan_ip(info)
        elif check_domain(info):
            scan_domain(info)
        else:
            print('输入格式有误')
    
    
    
    if __name__ == '__main__':
        main()
    

    V5.0 协程基础

    有没有可以比多线程技术更优化的并发方案

    • 协程的概述

      • python线程的性能问题,在python中使用多线程运行代码经常不能达到预期的效果。而有些时候我们的逻辑中又需要开更高的并发,让代码跑的更快,在同样的时间内执行更多的有效逻辑,减少无用的等待。
      • 协程是一种用户级轻量级的线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。
      • 协程又称为微线程,纤程
        • 优点1:协程极高的执行效率。因为子程序切换不是线程的切换,而是由程序自身控制。因此,没有线程切换的开销。和多线程比,线程数量越多,协程的性能优势就越明显。
        • 优点2:不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多
        • 缺点1:无法利用多核资源:协程的本质是个单线程,它不能同时将多个CPU的多个核用上,协程需要和进程配合才能运行在多CPU上,当然我们日常所编写的绝大部分应用都没有这个必要,除非是CPU密集型应用
        • 缺点2:进行阻塞操作(如IO)会阻塞掉整个程序
    • gevent

      • gevent就是一个现在很火的,支持很全面的python第三方协程库
      • pip install gevent
      • 由于切换是在IO操作时自动完成,所以gevent需要修改python自带的一些标准库,这一过程在启动时通过monkey patch完成

    协程的基本用法:

    import time
    import gevent
    #time,默认是阻塞的
    
    from gevent import monkey
    monkey.patch_all()#解决:能不能改成非阻塞方式,在操作IO的时候,立即切换到其他的协程运行
    def f1():
        for i in range(5):
            print('f1---在运行---')
            time.sleep(1)
    def f2():
        for i in range(5):
            print('f2---在运行---')
            time.sleep(1)
    
    #创建协程对象
    t1 =gevent.spawn(f1)
    t2 =gevent.spawn(f2)
    
    gevent.joinall([t1,t2])
    
    控制台输出
    f1---在运行---
    f2---在运行---
    f1---在运行---
    f2---在运行---
    f1---在运行---
    f2---在运行---
    
    def get(url):
        print(f'{datetime.now()}开始--GET--DATA:{url}')
        resp=requests.get(url)
        print(f'{datetime.now()}结束--GET--DATA:{url}')
    
    gevent.joinall(
        [
            gevent.spawn(get,'http://www.baidu.com'),
            gevent.spawn(get,'http://c.runoob.com/')
        ]
    )
    
    控制台输出:
    2021-07-09 21:23:42.628202开始--GET--DATA:http://www.baidu.com
    2021-07-09 21:23:42.631228开始--GET--DATA:http://c.runoob.com/
    2021-07-09 21:23:42.718241结束--GET--DATA:http://www.baidu.com
    2021-07-09 21:23:42.798265结束--GET--DATA:http://c.runoob.com/
    

    V6.0协程+文件

    #-----------------v6.0----------------------
    
    
    """
    反馈:
        1、扫描效率低
        2、有些端口没有扫出来
    优化方案:
        1-使用协程方案
        192.168.222.128
        IPv4地址:((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})(.((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})){3}
        http://c.runoob.com/front-end/854    菜鸟
        [a-zA-Z0-9][-a-zA-Z0-9]{0,62}(.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+.?')
        47.96.181.17
        2-扫描结果写入文件
    """
    #-------------------------------------------
    import socket
    #1-IP有效判断
    def check_ip(ip):
        """
    
        :param ip: 输入IP地址
        :return: 返回IP判断结果
        """
        ip_address=re.compile('((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})(.((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})){3}')#表达式对象
        if ip_address.match(ip) and len(ip) !=0:
            return True
        else:
            return False
    #6-域名有效判断
    def check_domain(domain_name):
        """
    
        :param domain: 输入域名
        :return: 返回domain判断结果
        """
        domain=re.compile('[a-zA-Z0-9][-a-zA-Z0-9]{0,62}(.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+.?')#表达式对象
        if domain.match(domain_name) and len(domain_name) !=0:
            return True
        else:
            return False
    
    #导出文件
    def write_file(info,path='./scan_port_result.txt'):
        with open(path,'a',encoding='utf-8') as f:
            f.write(info)
    
    #2-定义端口扫描--只扫一个
    def scan_port(ip,port):
        #3-建立连接
        try:
            #1-创建socket对象
            sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sk.settimeout(0.1)
            conn = sk.connect_ex((ip,port)) #有返回值,('127.0.0.1',int类型)
            #4-判断返回值
            if conn==0:
                print(f'主机:{ip},端口{port}已开放')
                write_file(f'主机:{ip},端口{port}已开放
    ')
            else:
                pass
    
        except:
            pass
        #5-关闭连接
        sk.close()
    #3-使用ip进行扫描
    monkey.patch_all()#非阻塞方式执行
    def gevent_scan_port(ip):
        write_file(f'主机:{ip},端口扫描开始,请耐心等待......
    ')
        start_time =time.time()
        runList=[]#存储协程
        g =gevent.pool.Pool(200)#并发的协程数
        #创建协程
        for port in range(1,65535+1):
            runList.append(g.spawn(scan_port,ip,port))
        gevent.joinall(runList)
    
        end_time=time.time()
        write_file(f'主机:{ip},端口扫描结束
    ')
        write_file(f'端口扫描总共耗时{end_time-start_time}
    ')
        print('端口扫描总共耗时>>> ',end_time-start_time)
    
    def scan_ip(ip):
            #进行端口扫描
            gevent_scan_port(ip)
    
    #4-使用域名进行扫描
    def scan_domain(domain_name):
        #1-通过正则过滤域名
        if 'http://' in domain_name or 'https://' in domain_name:
            domain_name = re.findall(r'://(.*)', domain_name)[0]
            #2-通过域名解析出IP地址
        try:
            server_ip= socket.gethostbyname(domain_name)
            #3-进行端口扫描
            gevent_scan_port(server_ip)
        except Exception as e:
            print(f'输入有误,请检查:{e}')
    
    #5-不需要用户选择,逻辑规划
    def main():
        #1-提示信息
        info="""
            请输入IP或者域名  
        """
        #2-接收用户的输入
        info = input('请输入IP或者域名>>> ')
        #3-判断用户的输入,选择对应的扫描方式
        if check_ip(info):
            scan_ip(info)
        elif check_domain(info):
            scan_domain(info)
        else:
            print('输入格式有误')
    
    
    
    if __name__ == '__main__':
        main()
    

    问题记录

    • 1、端口扫描结果不正确
      请输入需要扫描的IP>>>192.168.222.128
      请输入需要扫描的ports(0-65535)>>>9089-9100
      主机:192.168.222.128,端口9089未开放
      主机:192.168.222.128,端口9090未开放
      主机:192.168.222.128,端口9091未开放
      主机:192.168.222.128,端口9092未开放
      主机:192.168.222.128,端口9093未开放
      主机:192.168.222.128,端口9094未开放
      主机:192.168.222.128,端口9095未开放
      主机:192.168.222.128,端口9096未开放
      主机:192.168.222.128,端口9097未开放
      主机:192.168.222.128,端口9098未开放
      主机:192.168.222.128,端口9099未开放
      主机:192.168.222.128,端口9100未开放

      • 原因分析
        socket已经关闭了,导致后面的端口都无法扫描
        图 3

    第一次debug结果
    <socket.socket fd=736, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>

    第二次debug结果
    <socket.socket fd=836, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
    图 4

    每次扫描都需要使用新的socket对象来建立连接

    * 解决方案
    把建立socket对象放到循环中,每次扫描都要重新建立socket对象
    
    验证结果:
    
    请输入需要扫描的IP>>>192.168.222.128
    请输入需要扫描的ports(0-65535)>>>9089-9100
    主机:192.168.222.128,端口9089未开放
    主机:192.168.222.128,端口9090已开放
    主机:192.168.222.128,端口9091未开放
    主机:192.168.222.128,端口9092未开放
    主机:192.168.222.128,端口9093未开放
    主机:192.168.222.128,端口9094未开放
    主机:192.168.222.128,端口9095未开放
    主机:192.168.222.128,端口9096未开放
    主机:192.168.222.128,端口9097未开放
    主机:192.168.222.128,端口9098未开放
    主机:192.168.222.128,端口9099未开放
    主机:192.168.222.128,端口9100已开放
  • 相关阅读:
    oracle 11g ocp 笔记(7)-- DDL和模式对象
    oracle 11g ocp 笔记(6)-- oracle安全
    oracle 11g ocp 笔记(5)-- oracle存储结构
    oracle 11g ocp 笔记(4)-- 网络服务
    oracle 11g ocp 笔记(3)-- 实例管理
    oracle 11g ocp 笔记(2)-- 安装和创建数据库
    oracle 11g ocp 笔记(1)-- oracle 11g体系结构概述
    https://blog.csdn.net/gyming/article/details/46611369
    AWR管理
    3、实例管理
  • 原文地址:https://www.cnblogs.com/xiehuangzhijia/p/14979311.html
Copyright © 2011-2022 走看看