zoukankan      html  css  js  c++  java
  • scapy抓包使用

    # coding=utf-8
    import json
    import time
    import os
    import dpkt
    import socket
    import datetime
    import uuid
    import traceback
    
    from dpkt.ethernet import Ethernet
    from scapy.layers.l2 import Ether
    from scapy.sendrecv import sniff
    from scapy.utils import wrpcap
    
    from BigData.data_common.utils.file_util import FileUtil
    
    
    def get_local_ip():
        hostname = socket.gethostname()
        # 获取本机内网ip
        local_ips = socket.gethostbyname_ex(hostname)[-1]
        return local_ips
    
    def body_transfer(body):
        str_body = body.decode()
        body_ls = str_body.split("&")
        d = {}
        for item in body_ls:
            key_, value_ = item.split("=")
            d[key_.strip()] = value_.strip()
    
    
    def analysis_pcap(timestamp, buf):
        data = {}
        if isinstance(buf, dpkt.ip.IP):
            eth = buf
        else:
            eth = dpkt.ethernet.Ethernet(buf)
        # print(eth.data.__dict__)
        # print("ip layer:"+eth.data.__class__.__name__) #以太包的数据既是网络层包
        # print("tcp layer:"+eth.data.data.__class__.__name__) #网络层包的数据既是传输层包
        # print("http layer:" + eth.data.data.data.__class__.__name__) #传输层包的数据既是应用层包
        # print('Timestamp: ',str(datetime.datetime.utcfromtimestamp(timestamp))) #打印出包的抓取时间
        if isinstance(eth.data, dpkt.ip.IP) or isinstance(eth.data, dpkt.ip6.IP6):
            #     # print('%d Non IP Packet type not supported %s' % (int(timestamp), eth.data.__class__.__name__))
            #     print('ip.data type is {}'.format(eth.data.__class__.__name__))
            #     print(repr(eth.data))
            #     return data
            ip = eth.data
            if isinstance(eth.data, dpkt.ip.IP):
                src_ip = socket.inet_ntoa(ip.src)
                dst_ip = socket.inet_ntoa(ip.dst)
                # do_not_fragment =bool(ip.off & dpkt.ip.IP_DF)
                # more_fragments =bool(ip.off & dpkt.ip.IP_MF)
                # fragment_offset = ip.off & dpkt.ip.IP_OFFMASK
                # key = 'IPV4' if isinstance(eth.data, dpkt.ip.IP) else 'IPV6'
                data.update({
                    'time': timestamp,
                    'IPV4': {'src': src_ip, 'dst': dst_ip}
                })
            else:
                src_ip = ip.src
                dst_ip = ip.dst
                data.update({
                    'time': timestamp,
                    'IPV6': {'src': src_ip, 'dst': dst_ip}
                })
            print('ip.data type is {}'.format(ip.data.__class__.__name__))
            if isinstance(ip.data, dpkt.tcp.TCP):
                layer = ip.data
                data.update(analysis_tcp(layer))
            elif isinstance(ip.data, dpkt.udp.UDP):
                layer = ip.data
                data.update(analysis_udp(layer))
            elif isinstance(ip.data, dpkt.icmp.ICMP) or isinstance(ip.data, dpkt.icmp6.ICMP6):
                layer = ip.data
                data.update(analysis_icmp(layer))
            else:
                print('analysis_pcap ip.data {}'.format(repr(ip.data)))
                data = {'time': timestamp, eth.data.__class__.__name__: {}}
        else:
            print('analysis_pcap eth.data {}'.format(repr(eth.data)))
            data = {'time': timestamp, eth.data.__class__.__name__: eth.data.__dict__}
        return data
    
    def analysis_udp(udp, key="UDP"):
        try:
            data_dict = {}
            try:
                data_str = udp.data.decode('utf-8')
                if data_str.startswith('M-SEARCH'):
                    data_list = data_str.strip().split('\n')[1:]
                    for item in data_list:
                        k, v = item.split(':')[0], ':'.join(item.split(':')[1:])
                        data_dict[k.strip()] = v.strip()
                else:
                    data_dict = data_str
            except:
                pass
            return {
                key: {
                    'sport': udp.sport,
                    'dport': udp.dport,
                    'ulen': udp.ulen,
                    'sum': udp.sum,
                    'data': data_dict
                }
            }
        except:
            pass
        print('analysis_udp udp {}'.format(repr(udp)))
        return {}
    
    def analysis_tcp(tcp, key='TCP'):
        data = {
            key: {
                'dport': tcp.dport,
                'sport': tcp.sport,
                'ack': tcp.ack,
                'seq': tcp.seq
            }
        }
        try:
            request = dpkt.http.Request(tcp.data)
            data['HTTP'] = {
                    'type': 'request',
                    'uri': request.uri,
                    'Method': request.method.upper(),
                    'Headers': dict(request.headers),
                    'Body': body_transfer(request.body),
                    'Data': body_transfer(request.data)
                }
        except:
            pass
        try:
            response = dpkt.http.Response(tcp)
            data['HTTP'] = {
                    'type': 'response',
                    'Headers': dict(response.headers),
                    'Body': body_transfer(response.body),
                    'Data': body_transfer(response.data)
                }
        except:
            pass
        try:
            data_dict = {}
            data_str = tcp.data.decode('utf-8')
            if data_str.startswith('M-SEARCH'):
                data_list = data_str.strip().split('\n')[1:]
                for item in data_list:
                    k, v = item.split(':')[0], ':'.join(item.split(':')[1:])
                    data_dict[k.strip()] = v.strip()
            else:
                data_dict = data_str
            if key in data and data_dict:
                data[key]['Data'] = data_dict
        except:
            pass
        if data:
            return data
        print('analysis_tcp tcp {}'.format(repr(tcp)))
        return {}
    
    def analysis_icmp(icmp, key='ICMP'):
        try:
            if isinstance(icmp, dpkt.icmp.ICMP):
                return {
                    'ICMP': {
                        'type': icmp.type,
                        'sum': icmp.sum,
                        'Data': analysis_pcap(int(time.time()), icmp.data.data)
                    }
                }
            else:
                data_str = ''
                try:
                    data_str = icmp.data.decode('utf-8')
                except:
                    pass
                return {
                    'ICMP6': {
                        'type': icmp.type,
                        'sum': icmp.sum,
                        'Data': data_str
                    }
                }
        except:
            pass
        return {}
    
    
    def analysis_pcap2(timestamp, buf):
        e = Ether(buf)
        e.show()
    
    def get_dpkt():
        # 这里是针对单网卡的机子, 多网卡的可以在参数中指定网卡, 例:iface=Qualcomm QCA9377 802.11ac Wireless Adapter
        dpkt_ = sniff(count = 10)
        _uuid = uuid.uuid1()
        filename = f"{_uuid}.pcap"
        wrpcap(filename, dpkt_)
        return filename
    
    
    def main():
        while True:
            filename = get_dpkt()
            with open(filename, "rb") as f:
                pcap = dpkt.pcap.Reader(f)
                local_ips = get_local_ip()
                for timestamp, buf in pcap:
                    res = analysis_pcap(timestamp, buf)
                    # analysis_pcap2(timestamp, buf)
                    print(res)
                    # FileUtil.write_lines('test.txt', json.dumps(res))
    
            os.remove(filename)
            # FileUtil.flush()
    
    if __name__ =='__main__':
        main()
    https://github.com/jiangsiwei2018/BigData.git 实例代码git仓库地址
  • 相关阅读:
    STM32关于优先级设定的理解 NVIC_SetPriority()
    linux6.2安装mysql
    【PAT】1009. Product of Polynomials (25)
    Android的重力传感器(3轴加速度传感器)简单实例
    out/target/common/obj/PACKAGING/public_api.txt android.view.KeyEvent.KEYCODE_has changed value from
    框架技术--Spring自动加载配置
    NSDate conversion utilities
    Poj 1201 Intervals
    asp导航条子菜单横向
    大学四年,你必须做的事---这些计算机科学
  • 原文地址:https://www.cnblogs.com/satansz/p/15575292.html
Copyright © 2011-2022 走看看