#-*- coding:utf8 -*- ''' Created on 2018年9月21日 @author: Administrator ''' import socket import os #监听主机 host="192.168.0.101" #创建原始套接字 if os.name=="nt": socket_protocol=socket.IPPROTO_IP else: socket_protocol=socket.IPPROTO_ICMP sniffer=socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.IPPROTO_IP) sniffer.bind((host,0)) #设置在补货的数据包中包含IP头 sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) #在window平台上,我们需要设置IOCTL以启用混杂模式 if os.name=="nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) #读取单个数据包 print sniffer.recvfrom(65565) #在window平台上,关闭IOCTL以启用混杂模式 if os.name=="nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
嗅探到包后,解码IP层
1 #-*- coding:utf8 -*- 2 ''' 3 Created on 2018年9月22日 4 @author: Administrator 5 ''' 6 import socket 7 import os 8 import struct 9 from ctypes import * 10 import threading 11 import datetime 12 from _datetime import date 13 14 #监听的主机 15 host="192.168.0.101" 16 17 #IP 头定义 18 19 class IP(Structure): 20 _fields_=[ 21 ("ih1",c_ubyte,4), 22 ("version",c_ubyte,4), 23 ("tos",c_ubyte), 24 ("len",c_ushort), 25 ("id",c_ushort), 26 ("offset",c_ushort), 27 ("ttl",c_ubyte), 28 ("protocol_num",c_ubyte), 29 ("sum",c_ushort), 30 ("src",c_ulong), 31 ("dst",c_ulong) 32 ] 33 def __new__(self, socket_buffer=None): 34 return self.from_buffer_copy(socket_buffer) 35 36 def __init__(self,socket_buff=None): 37 #协议字段与协议名称的对应 38 self.protocol_map={1:"ICMP",6:"TCP",17:"UDP"} 39 #可读性更强的IP地址 40 self.src_address=socket.inet_ntoa(struct.pack("<L",self.src)) 41 self.dst_address=socket.inet_ntoa(struct.pack("<L",self.dst)) 42 43 #协议类型 44 try: 45 self.protocol=self.protocol_map[self.protocol_num] 46 except: 47 self.protocol=str(self.protocol_num) 48 49 #创建原始套接字 50 if os.name=="nt": 51 socket_protocol=socket.IPPROTO_IP 52 else: 53 socket_protocol=socket.IPPROTO_ICMP 54 55 sniffer=socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol) 56 sniffer.bind((host,0)) 57 58 #设置在补货的数据包中包含IP头 59 sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) 60 #在window平台上,我们需要设置IOCTL以启用混杂模式 61 if os.name=="nt": 62 sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) 63 # def getInfo(obj): 64 # dic={"180.97.33.108:443":"百度"} 65 # if obj in dic: 66 # print("访问了 %s"%dic[obj]) 67 try: 68 while True: 69 70 #读取数据包 71 raw_buffer=sniffer.recvfrom(65565)[0] 72 73 #将缓冲区的前20个字节按IP头进行解析 74 ip_header=IP(raw_buffer[0:20]) 75 #输出协议和通信双方的IP地址 76 print(datetime.datetime.now()) 77 data="协议【Protocol】:%s %s --> %s"%(ip_header.protocol,ip_header.src_address,ip_header.dst_address) 78 print(data) 79 with open("demo.txt", "w+")as f: 80 f.write(data) 81 # print(type(ip_header.dst_address)) 82 if ip_header.dst_address=="180.97.33.108": 83 print("百度") 84 if ip_header.dst_address=="202.102.94.124": 85 print("新浪") 86 87 # t1=threading.Thread(target=getInfo,args=(ip_header.dst_address)) 88 # t1.start() 89 90 91 #处理CTRL -C 92 except KeyboardInterrupt: 93 #在window平台上,关闭IOCTL混杂模式 94 if os.name=="nt": 95 sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) 96 97 98
#-*- coding:utf8 -*- ''' Created on 2018年9月22日 @author: Administrator ''' import socket import os import struct from ctypes import * import threading import time from netaddr import IPNetwork,IPAddress #监听的主机 host="192.168.0.101" #扫描的目标子网 subnet="192.168.0.0/24" #自定义的字符串,我们将在ICMP响应中进行核对 magic_message="PYTHONRULES!" #批量发送UDP数据包 def udp_sender(subnet,magic_message): time.sleep(5) sender=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)#UDP协议 for ip in IPNetwork(subnet): try: sender.sento(magic_message,("%s"%ip,65212)) except: pass #IP 头定义 class IP(Structure): _fields_=[ ("ih1",c_ubyte,4), ("version",c_ubyte,4), ("tos",c_ubyte), ("len",c_ushort), ("id",c_ushort), ("offset",c_ushort), ("ttl",c_ubyte), ("protocol_num",c_ubyte), ("sum",c_ushort), ("src",c_ulong), ("dst",c_ulong) ] def __new__(self, socket_buffer=None): return self.from_buffer_copy(socket_buffer) def __init__(self,socket_buff=None): #协议字段与协议名称的对应 self.protocol_map={1:"ICMP",6:"TCP",17:"UDP"} #可读性更强的IP地址 self.src_address=socket.inet_ntoa(struct.pack("<L",self.src)) self.dst_address=socket.inet_ntoa(struct.pack("<L",self.dst)) #协议类型 try: self.protocol=self.protocol_map[self.protocol_num] except: self.protocol=str(self.protocol_num) class ICMP(Structure): _fields_=[ ("type",c_ubyte), ("code",c_ubyte), ("checksum",c_ushort), ("unused",c_ushort), ("next_hop_mtu",c_ushort) ] def __new__(self, socket_buffer): return self.from_buffer_copy(socket_buffer) def __init__(self,socket_buffer): pass #################利用UDP协议发送数据来扫描子网################################## #开始发送数据包 t=threading.Thread(target=udp_sender,args=(subnet,magic_message)) t.start() ########################################################## #创建原始套接字 if os.name=="nt": socket_protocol=socket.IPPROTO_IP else: socket_protocol=socket.IPPROTO_ICMP sniffer=socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol) sniffer.bind((host,0)) #设置在补货的数据包中包含IP头 sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) #在window平台上,我们需要设置IOCTL以启用混杂模式 if os.name=="nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) try: while True: #读取数据包 raw_buffer=sniffer.recvfrom(65565)[0] #将缓冲区的前20个字节按IP头进行解析 ip_header=IP(raw_buffer[0:20]) #输出协议和通信双方的IP地址 # print(datetime.datetime.now()) # print("协议【Protocol】:%s %s --> %s"%(ip_header.protocol,ip_header.src_address,ip_header.dst_address)) # # if ip_header.dst_address=="180.97.33.108": # print("百度") # if ip_header.dst_address=="202.102.94.124": # print("新浪") #如果为ICMP,进行处理 if ip_header.protocol=="ICMP": #计算ICMP包的起始位置 offset=ip_header.ih1*4 buf=raw_buffer[offset:offset+sizeof(ICMP)] #解析ICMP数据 icmp_header=ICMP(buf) print("ICMP --> Type:%s Code: %d"%(icmp_header.type,icmp_header.code)) #检查类型和代码值是否为3 if icmp_header.code==3 and icmp_header.type==3: #确认响应的主机在我们的目标子网之内 if IPAddress(ip_header.src_address) in IPNetwork(subnet): #确认ICMP数据中包含我们发送的自定义的i字符串 if raw_buffer[len(raw_buffer)-len(magic_message):]==magic_message: print("Host Up:%s"%ip_header.src_address) #处理CTRL -C except KeyboardInterrupt: #在window平台上,关闭IOCTL混杂模式 if os.name=="nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)