zoukankan      html  css  js  c++  java
  • python 处理传输层的报文 TCP/UDP

    总所周知,对于python而言实现tcp/udp的传输一般靠的是socket这个库,而区分两者的是建立socket的参数
    SOCK_STREAM 为TCP连接
    SOCK_DGRAM 为UDP连接
    而一般情况下接收报文需要遵从某一些协议,这样双方可以通过特定的粘包解包操作处理数据。
    很多情况自定义协议都是比较简单,先接收报文头,获取消息长度,再获取消息体。
    但是有很多协议写起来很麻烦,于是就用到scapy这个库,就可以每次获取一个报文,一般报文头信息都是类似的
    都带有源地址,目标地址,消息长度,校验码
     
    这里是一段监听DMS报警系统的代码
     
    from scapy.all import *
    alarm_map = {"211": "未系安全带", "205": "疲劳驾驶", "206": "疲劳驾驶", "208": "抽烟", "209": "出现异常"}
    def scan(target, port):
        has_no_connect = True
        while True:
            # 根据接口进行监听报文
            # 接口获取由 IFACES 决定
            try:
                # sniff开始获取报文,iface是接口,filter可以选择过滤报文类型, count为一次性获取多少个pkt
                pkt = sniff(iface=IFACES.dev_from_index(12), filter="udp", count=1)
                # 每个pkt格式类型于YAML,如果没有IP信息则代表这个包不完整
                if pkt[0][IP].src == "192.168.43.1":
                    if has_no_connect:
                        # 展示建立连接的第一个UDP包
                        pkt[0].show()
                    if Raw in pkt[0]:
                        has_no_connect = False
                        # load的值为bytes类型
                        body = pkt[0][Raw].load
                        if b'alarm":2' in body:
                            result = json.loads(body.decode())
                            if str(result.get('dms').get('alarm')) in alarm_map.keys():
                                now_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
                                message = {
                                    "event_name": alarm_map[str(result.get('dms').get('alarm'))],
                                    "event_time": now_time,
                                }
                                print(message)
                            else:
                                print(result)
            except Exception as e:
                print(pkt[0])
                continue
     
    """            
     调用show()可以展示数据包,抓到第一个数据包格式如下:
    ###[ Ethernet ]### 
      dst       = 20:0d:b0:17:cf:d4
      src       = 02:08:22:b2:bb:fb
      type      = IPv4
    ###[ IP ]### 
         version   = 4
         ihl       = 5
         tos       = 0x0
         len       = 208
         id        = 19582
         flags     = DF
         frag      = 0
         ttl       = 64
         proto     = udp
         chksum    = 0x163a
         src       = 192.168.43.1
         dst       = 192.168.43.19
         options   
    ###[ UDP ]### 
            sport     = 23456
            dport     = 62472
            len       = 188
            chksum    = 0x202f
    ###[ Raw ]### 
               load      = '{"code":3,"dms":{"alarm":0,"num":1,"id":0,"eye":0,"p":15.220612,"y":-29.808479,"r":6.351517,"fr":0.493671,"fmi":0,"fmon":0,"fx":0.389583,"fy":0.201852,"fw":0.192708,"fh":0.457407}}'
    """
     
    scapy这个库确实能够满足一部分需求,不用头疼于各种协议的实现。上手也比较简单,在一些需要抓TCP包或者UDP的场景下使用效果极佳。
    如果知道协议了,这里有段比较不错的监听tcp/udp ip报文的代码。 【需要用管理员权限运行,不要在IDE下运行】
     
    from __future__ import print_function
    import ctypes, sys
    import struct
    import socket
    import traceback
    
    ip_header_fmt = '!BBHHHBBH4s4s'
    tcp_header_fmt = '!HHLLBBHHH'
    udp_header_fmt = '!HHHH'
    IP_HEAD_LEN = struct.calcsize(ip_header_fmt)  # 20字节
    TCP_HEADER_LEN = struct.calcsize(tcp_header_fmt)  # 20字节
    UDP_HEAD_LEN = struct.calcsize(udp_header_fmt)  # 8字节
    IP_MAX_LEN = 65535
    
    def is_admin():
        try:
            return ctypes.windll.shell32.IsUserAnAdmin()
        except:
            return False
    
    
    def unpack_ip_header(buf):
        if len(buf) < IP_HEAD_LEN:
            return
        try:
            iph = struct.unpack(ip_header_fmt, buf[:IP_HEAD_LEN])
            protocol_map = {1: 'ICMP', 6: 'TCP', 17: 'UDP'}
            return {
                'version': iph[0] >> 4,  # 高4位
                'ihl': (iph[0] & 0xF) * 4,  # 低4位,每个长度单位表示4字节,最大为60字节
                'tos': iph[1],  # 8位
                'len': iph[2],  # 16位
                'id': iph[3],  # 16位
                'offset': iph[4],  # 16位
                'ttl': iph[5],  # 8位
                'protocol': protocol_map.get(iph[6], str(iph[6])),  # 8位
                'cks': iph[7],  # 16位
                'src': iph[8],  # 32位
                'dst': iph[9],  # 32位
            }
        except Exception as e:
            raise e
    
    
    def unpack_tcp_header(buf):
        if len(buf) < TCP_HEADER_LEN:
            return
        try:
            tcph = struct.unpack(tcp_header_fmt, buf[:TCP_HEADER_LEN])
            return {
                'src': tcph[0],  # 16位
                'dst': tcph[1],  # 16位
                'seq': tcph[2],  # 32位
                'ack': tcph[3],  # 32位
                'thl': (tcph[4] >> 4) * 4,  # 高4位
                'wlen': tcph[6],  # 16位
            }
        except Exception as e:
            raise e
    
    
    def unpack_udp_header(buf):
        if len(buf) < UDP_HEAD_LEN:
            return
        try:
            udph = struct.unpack(udp_header_fmt, buf[:UDP_HEAD_LEN])
            return {
                'src': udph[0],  # 16位
                'dst': udph[1],  # 16位
                'dlen': udph[2],  # 16位
                'cks': udph[3],  # 16位
            }
        except Exception as e:
            raise e
    
    
    def get_host_ip():
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            sock.connect(('8.8.8.8', 80))
            ip = sock.getsockname()[0]
        except Exception:
            pass
        finally:
            sock.close()
        return ip
    
    
    if __name__ == '__main__':
        if sys.version_info[0] == 3:
            ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, __file__, None, 1)
    
        host = get_host_ip()  # 获取当前联网网卡的ip地址
        try:
            sock = socket.socket(
                socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP)
            sock.bind((host, 0))
            sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
            sock.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
            while True:
                buf = sock.recvfrom(IP_MAX_LEN)[0]
                if len(buf) < IP_HEAD_LEN:
                    continue
                ipheader = unpack_ip_header(buf[:IP_HEAD_LEN])
                if ipheader:
                    buf = buf[ipheader['ihl']:]
                    if ipheader['protocol'] == 'TCP':
                        tcpheader = unpack_tcp_header(buf[:TCP_HEADER_LEN])
                        if tcpheader:
                            data = buf[tcpheader['thl']:(
                                ipheader['len'] - ipheader['ihl'])]
                            if data:
                                print('TCP {}'.format(data))
                    elif ipheader['protocol'] == 'UDP':
                        udpheader = unpack_udp_header(buf[:UDP_HEAD_LEN])
                        if udpheader:
                            data = buf[UDP_HEAD_LEN:][:udpheader['dlen']]
                            if data:
                                print('UDP {}'.format(data))
                    # 可以增加其他协议的头解析
        except Exception as e:
            traceback.print_exc()
        finally:
            sock.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
            sock.close()
  • 相关阅读:
    ThinkPHP 3 的CURD管理用户信息 修改和删除
    检测一个字符串是否为一个有效的编码格式字符串
    将Buffer对象结合创建为一个新的Buffer对象
    Buffer.byteLength(字符串,编码方式)计算指定字符串的字节数
    TypeError: Buffer.allocUnsafe is not a function
    多个haproxy 之间跳转
    TypeError: Identifier 'assert' has already been declared
    14.5.7 Storing InnoDB Undo Logs in Separate Tablespaces 存储InnoDB Undo logs 到单独的表空间
    14.5.5 Creating a File-Per-Table Tablespace Outside the Data Directory
    php session 管理
  • 原文地址:https://www.cnblogs.com/triangle959/p/12855891.html
Copyright © 2011-2022 走看看