#!/usr/local/bin/python3.8
# -*- coding:utf-8 -*-
# Author: 上善
# Time: 2019-10-25 13:37
#
import os
import sys
import re
import time
import hashlib
import requests
import datetime
import subprocess
import dns.resolver
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
class box (object):
"""运维工具"""
def check_ip(self):
iplist = open("/Users/sean/Desktop/Module/cdnetwork/tools_box/module/node.list", encoding="utf-8")
ip_list = iplist.readlines()
for ip in ip_list:
res = os.popen('curl -s http://freeapi.ipip.net/' + ip).read()
ip_address = ip.replace("
", "") + " " + "" + (res[1:-1]).replace(""", "").replace(",", " ")
print(ip_address)
def beian(self):
'''
备案查询
:return:
'''
while True:
try:
domain = input("请输入二级域名(eg baidu.com 按q or Q退出) :")
if re.match('(([\w-]{1,62})?(\.[\w-]{1,62})+)', domain):
# 新网api接口
http_uri = "http://icp.fleacloud.com/api/v1/icp?domain="
http_url = "http://icp.fleacloud.com/api/v1/icp?domain=" + domain
gmt_format = '%a, %d %b %Y %H:%M:%S GMT'
date_gmt = datetime.datetime.utcnow().strftime(gmt_format)
headers = {
'Date': date_gmt,
'Accept': 'application/json',
'Content-type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleW '
'ebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36'
}
res = requests.get(http_url, headers=headers)
if res.json()['status'] == 3:
print('域名 : ' + domain + " 备案号 : " + res.json()['icp'])
elif res.json()['status'] == 2:
print('域名 : ' + domain + " 备案号 : " + res.json()['icp'])
else:
print('查询失败或未知')
elif domain.upper() == 'Q':
break
else:
print('域名输入有误;')
except AttributeError:
sys.exit('系统错误')
def oscp(self):
'''
OCSP Stapling 测试
:return:
'''
with open('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/domain_list', 'r') as domain_list:
for domain_list in domain_list.readlines():
(ip, domain) = domain_list.strip("
").split()
print(domain + ":
" + subprocess.Popen(('openssl s_client -connect ' + domain + ':443 -servername '
+ domain + ' -status -tlsextdebug < /dev/null 2>&1 | grep -i '
'"OCSP response"'),
shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE).stdout.read().decode())
def datetime(self):
'''
日期和md5加密
md5加密
:return:
'''
date_time = datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S')
print(date_time)
print(subprocess.Popen(('echo'+date_time+'|md5'), shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).
stdout.read().decode())
def openssl(self):
'''
证书校验
:return:
'''
taps = '''
domain_cname: images-cn.ssl-images-amazon.com.wtxcdn.com
domain_pad: images-cn.ssl-images-amazon.com
'''
print(taps)
c_domain = input('domain_cname: ')
s_domain = input('domain_pad: ')
print(os.popen('openssl s_client -connect c_domain:443 '
'-servername s_domain< /dev/null 2> /dev/null | openssl x509 -text | grep -C2 Altern'))
def ab_test(self):
'''
apachc ab 压力测试
:return:
'''
url = input('Http_Url: ')
proxy = input('Proxy: ')
port = input('Port: ')
print('ab -X ' + proxy + ':' + port + ' -n 100 -c 50 ' + url)
if (subprocess.Popen(('ab -X ' + proxy + ':' + port + ' -n 100 -c 50 ' + url), shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE).stdout.read().decode()):
print('命令执行完成100链接10并发')
def timestamp_datetime(self, value):
'''
时间戳转时间
:param value:
:return:
'''
format = '%Y-%m-%d %H:%M:%S'
value = time.localtime(value)
dt = time.strftime(format, value)
return dt
def datetime_timestamp(self, dt):
'''
时间转时间戳
:param dt:
:return:
'''
# dt为字符串
# 中间过程,一般都需要将字符串转化为时间数组
time.strptime(dt, '%Y-%m-%d %H:%M:%S')
# 10位 (秒)
s = int(time.mktime(time.strptime(dt, '%Y-%m-%d %H:%M:%S')))
# 13位(毫秒)
# s = (time.mktime(time.strptime(dt, '%Y-%m-%d %H:%M:%S')))*1000
print('unix时间戳(10位秒级):' + str(s))
return int(s)
def md5sum_encryption(self, unix_time, uri, key, encryption):
'''
秘文组合方式需要在变量上修改
:return:
'''
wstime = str(600)
if encryption == 'key+unixtime+wstime':
uri_encryption = bytes((key + str(unix_time) + wstime).encode('utf-8'))
md5encryption = hashlib.md5(uri_encryption).hexdigest()
print('加密方式方式:' + encryption)
print('md5: ' + md5encryption)
elif encryption == 'uri+key+unixtime':
uri_encryption = bytes((uri + key + str(unix_time)).encode('utf-8'))
md5encryption = hashlib.md5(uri_encryption).hexdigest()
print('加密方式方式:' + encryption)
print('md5: ' + md5encryption)
elif encryption == 'key+time+uri':
uri_encryption = bytes((key + str(unix_time)+ uri).encode('utf-8'))
md5encryption = hashlib.md5(uri_encryption).hexdigest()
print('加密方式方式:' + encryption)
print('md5: ' + md5encryption)
elif encryption == 'time+key+uri':
uri_encryption = bytes((str(unix_time) +key+ uri +str(3600)).encode('utf-8'))
md5encryption = hashlib.md5(uri_encryption).hexdigest()
print(unix_time)
print('加密方式方式:' + encryption)
print('md5: ' + md5encryption)
else:
print('暂不支持的加密方式,请检查或修改python')
def unix_timestamp_create(self):
'''
变量获取并用md5 加密
:return:
'''
if os.path.exists('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/uri_information'):
unix_time = box.datetime_timestamp(self, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
with open('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/uri_information', "r") as f:
for uri_line in f.readlines():
uri_information = uri_line
(uri, key, encryption) = uri_information.strip('
').split()
box.md5sum_encryption(self,unix_time, uri, key, encryption)
else:
with open('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/uri_information', "w+") as f:
f.write('origin/111 SEd0kljv5M key+unixtime+wstime ')
sys.exit('uri_information文件不存在,将在当前目录生成,请退出后')
def http_requests(self):
'''
request 测试
:return:
'''
print(datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
s = requests.session()
s.keep_alive = False # 关闭多余连接
with open('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/url', 'r') as f:
for i in f.readlines():
(http_url, proxy) = i.strip().split()
print(http_url)
proxies = {
'http': proxy + ':80',
'https': proxy + ':443'
}
headers = {
'Date': datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'),
'Accept': 'application/json',
'Content-type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36',
'cdn-src-ip': proxy
}
res = requests.get(http_url, headers=headers, proxies=proxies, verify=False)
headers_post = dict(res.headers)
tmp_str = "statusCode:{}
Date:{}
Content-Length:{}
Connection:{}
Cache-Control:{}
Via:{}
X-px:{}
".format(
res.status_code,
headers_post.get('Date'),
headers_post.get('Content-Length'),
headers_post.get('Connection'),
headers_post.get('Cache-Control'),
headers_post.get('Via'),
headers_post.get('X-Px'),
)
print(tmp_str)
def hosts(self):
domain = input("Please input an domain: ").strip() # 输入一个域名
try:
A = dns.resolver.query(domain, 'A') # 指定查看类型为A记录
except Exception as e:
print(e)
else:
for i in A.response.answer: # 通过response.answer方法获取查询回应信息
for j in i.items: # 遍历回应信息
hosts = (j.address + " " + domain)
os.system("echo "+hosts+"|sed 's/.whecloud.com//g' | sed 's/.wtxcdn.com//g'>> /etc/hosts")
os.system('cat /etc/hosts')