配置文件如下:支持多节点:
{ "dns":[{"domainname":"www.baidu.com","dnsserver":"114.114.114.114"}], "icmp":[{"ip_address":"114.114.114.114"},{"ip_address":"172.17.1.1"},{"ip_address":"192.168.101.1"},{"ip_address":"192.168.9.1"}], "http":[{"url":"www.baidu.com","title":"百度"}] }
# 读取配置文件,配置文件,每个监控类型允许多个监控节点,函数返回包含多个节点的列表 def read_config(): try: file = open("config.json") config = file.read() file.close() config = json.loads(config) http = config["http"] icmp = config["icmp"] dns = config["dns"] return http,icmp,dns except Exception: print("read config error") write_logs("read config error") return None
# 用来写result结果,只有网络异常的时候才会输出 def write_resules(filename,results): try : file = open(filename,"a") file.write(time.asctime(time.localtime(time.time())) + " : " + results + " ") file.close() except Exception: print(time.asctime(time.localtime(time.time())) + "write results is error")
# 写日志文件,方便在出现错误的时候判断错误原因,只有在出错的时候才会输出 def write_logs(logs): try : file = open("logs.txt","a") file.write(time.asctime(time.localtime(time.time())) + " : " + logs + " ") file.close() except Exception: print(time.asctime(time.localtime(time.time())) + "write log is error")
# 调用requests发送数据包 def send_http_packet(url): # requests.packages.urllib3.disable_warnings() user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36" headers = {'User-Agent': user_agent} url = "http://" + url try: response = requests.get(url, headers) response_html = response.content.decode() return response_html except Exception: print('send http packet error,error') write_logs('send http packet error,error') return None
# 使用bs4解析获取到的数据,并和给出的网页标题对比 def check_http(url,title): response_html = send_http_packet(url) if response_html != None and title != None : soup = bs4.BeautifulSoup(response_html, 'lxml') html_title = soup.title.text if title in html_title: return True else: write_logs("title not same ,web title is : " + html_title) return False else: print('html or title is None') return False
# 发送数ping数据包 def send_icmp_packet(ip_address): try: response_packet = sr1(IP(dst=ip_address)/ICMP(),timeout=2,verbose = False) return response_packet except Exception: print('send icmp packet error,error') write_logs('send icmp packet error,error') return None
# 检查icmp数据包有没有正常恢复 def check_icmp(ip_address): response_packet = send_icmp_packet(ip_address) if response_packet != None: return True else: return False
# 发送DNS请求 def send_dns_packet(domainname,dnsserver): i = IP(dst=dnsserver) u = UDP(dport = 53) d = DNS(rd=1) d.qd = DNSQR(qname = domainname , qtype = "A" ,qclass = "IN") dns_request_packet = i/u/d try: dns_response_packet = sr1(dns_request_packet,timeout=2,verbose = False) return dns_response_packet except Exception: print("send dns packet error ") write_logs('send icmp packet error,error') return None
# 检查DNS请求是否正常返回 def check_dns(domainname,dnsserver): dns_response_packet = send_dns_packet(domainname,dnsserver) if dns_response_packet != None: return True else: return False
# 为了方便多线程调用,每个监控类型都设置一个启动程序 def start_http(url, title): while True: if check_http(url, title) : print("http True : " + url + "," + title) else: print("http False : " + url + "," + title) write_resules("http_results.txt","http False : " + url + "," + title) time.sleep(2)
# 为了方便多线程调用,每个监控类型都设置一个启动程序 def start_icmp(ip_address): while True: if check_icmp(ip_address) : print("icmp True : " + ip_address) else: print("icmp False : " + ip_address) write_resules("icmp_results.txt","icmp False : " + ip_address) time.sleep(2)
# 为了方便多线程调用,每个监控类型都设置一个启动程序 def start_dns(domainname,dnsserver): while True: if check_dns(domainname,dnsserver) : print("dns True : " + domainname + "," + dnsserver) else: print("dns False : " + domainname + "," + dnsserver) write_resules("dns_results.txt","dns False : " + domainname + "," +dnsserver) time.sleep(2)
# 主函数 def main(): http,icmp,dns= read_config() # 循环取出http监控节点,每个节点启动一个线程 for h in http: url = h["url"] title = h["title"] moniter_http = threading.Thread(target=start_http,args=(url, title)) moniter_http.start() # 循环取出icmp监控节点,每个节点启动一个线程 for i in icmp: ip_address = i["ip_address"] moniter_icmp = threading.Thread(target=start_icmp,args=(ip_address,)) moniter_icmp.start() # 循环取出dns监控节点,每个节点启动一个线程 for d in dns: domainname = d["domainname"] dnsserver = d["dnsserver"] moniter_dns = threading.Thread(target=start_dns,args=(domainname,dnsserver)) moniter_dns.start()
if __name__ == '__main__': main()