工具开发-端口扫描工具-多线程-协程
背景
有时候,在进行网络相关的研究的时候,我们需要执行一些有目的的参数测量。而端口扫描就是其中比较普遍也比较重要的一项。所谓的端口扫描,就是指通过TCP握手或者别的方式来判别一个给定主机上的某些端口是否处理开放,或者说监听的状态。现有的使用比较广泛的端口扫描工具是nmap。毋庸置疑,nmap是一款非常强大且易于使用的软件。但nmap是一款运行于terminal中的软件,有时在别的代码中调用并不是很方便,甚至没有相应的库。另外,nmap依赖的其他库较多,在较老的系统中可能无法使用较新的nmap,这样会造成扫描的不便。另外,nmap在扫描时需要root权限。
参考
链接:https://www.jianshu.com/p/b1994a370660
https://github.com/windard/Port_Scan
。
- 每个进程背后都有一个端口
- 入侵者通常会用扫描器对目标主机的端口进行扫描,以确定哪些端口是开放的,从开放的端口,入侵者可以知道目标主机大致提供了哪些服务,进而猜测可能存在的漏洞
- 因此对端口的扫描可以帮助我们更好的了解目标主机。而对应管理员,扫描本机的开放端口也是做好安全防范的第一步
为什么有那么多开源工具,我们为什么还要花费大量精力取开发工具呢?nmap
不好定制化,比如我们需要发送一个邮件,企业微信,钉钉,开源工具做不了
首先你的工具是否支持这个功能,还有功能跟web平台集成就无法搞
可以参考站长工具中的端口扫描进行开发
- 端口扫描原理:尝试与目标主机建立连接,如果目标主机有回复,则说明端口开放
- TCP连接:这种方法使用三次握手与目标主机建立标准的TCP连接。但是这种放放风容易被发现,被目标主机记录。
- 实现简单,对操作者的权限没有严格的要求
设计方案规划
- 1、建立TCP连接
- 2、查看连接返回
- 3、判断连接返回值
- 4、循环扫描剩余的端口
代码实现功能
import socket
#s1=socket.socket(family,type)
TCP_sock =socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#family参数代码地址家族,可为AF_INET或AF_UNIX.
#AF_INET家族包括internet地址,AF_UNIX家族用于同一台机器上的进程间通讯.
#type参数代表套接字类型,可为SOCK_STREAM(流套接字,就是TCP套接字)和SOCK_DGRAM(数据报套接字,就是UDP套接字)
#默认为family=AF_INET,type=SOCK_STREAM
#返回一个整数描述符,用这个描述符来标识这个套接字
客户端套接字
s.connect() 主动初始化TCP服务器连接,一般address格式为元组(hostname,port),如果连接出错,返回socket.error错误
s.connect_ex() connect()函数的扩展版本,出错时返回出错码,而不是抛出异常----使用这个
在项目发布之前做一个端口扫描--为团队做一个工具
如果连接正确,返回0
如果连接错误,会返回错误码
请输入需要扫描的IP>>>192.168.222.128
请输入需要扫描的port>>>6666
10035
Process finished with exit code 0
V1.0实现一个端口扫描
import socket
def scan_tool_v1():
#1-创建socket对象
sk=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.settimeout(0.5)
#2-接收用户输入IP 和 port
ip = input('请输入需要扫描的IP>>>')
port = int(input('请输入需要扫描的port>>>').strip())
#3-建立连接
try:
conn = sk.connect_ex((ip,port)) #有返回值,('127.0.0.1',int类型)
#4-判断返回值
if conn==0:
print(f'主机:{ip},端口{port}已开放')
else:
print(f'主机:{ip},端口{port}未开放')
#5-关闭连接
sk.close()
except:
pass
if __name__ == '__main__':
scan_tool_v1()
调试优化
V2.0 循环扫描多个端口
import socket
def scan_tool_v2():
# 2-接收用户输入IP 和 port
ip = input('请输入需要扫描的IP>>>')
ports = input('请输入需要扫描的ports(0-65535)>>>').strip()
port_begin,port_end = ports.split('-')
# 3-建立连接
for port in range(int(port_begin),int(port_end)+1):
try:
# 1-创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.settimeout(0.5)
conn = sk.connect_ex((ip,port)) #有返回值,('127.0.0.1',int类型)
# 4-判断返回值
if conn==0:
print(f'主机:{ip},端口{port}已开放')
else:
print(f'主机:{ip},端口{port}未开放')
except:
pass
# 5-关闭连接
sk.close()
if __name__ == '__main__':
scan_tool_v2()
V3.0 IP和域名判断
-------------------------------------------
"""
反馈:
1、只能输入IP,不能使用域名进行扫描
2、没有对IP做有效判断
优化方案:
1-IP有效判断
2-进行端口扫描
3-使用ip进行扫描
4-使用域名进行扫描
5-用户可以选择,逻辑规划
6-域名有效判断
192.168.222.128
IPv4地址:((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})(.((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})){3}
http://c.runoob.com/front-end/854 菜鸟
"""
-------------------------------------------
import socket
1-IP有效判断
def check_ip(ip):
"""
:param ip: 输入IP地址
:return: 返回IP判断结果
"""
ip_address=re.compile('((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})(.((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})){3}')#表达式对象
if ip_address.match(ip) and len(ip) !=0:
return True
else:
return False
6-域名有效判断
def check_domain(domain):
"""
:param domain: 输入域名
:return: 返回domain判断结果
"""
ip_address=re.compile('[a-zA-Z0-9][-a-zA-Z0-9]{0,62}(.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+.?')#表达式对象
if ip_address.match(domain) and len(domain) !=0:
return True
else:
return False
2-定义端口扫描
def scan_port(ip):
port_begin,port_end = (9090,9100)
# 3-建立连接
for port in range(int(port_begin),int(port_end)+1):
try:
# 1-创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.settimeout(0.5)
conn = sk.connect_ex((ip,port)) #有返回值,('127.0.0.1',int类型)
# 4-判断返回值
if conn==0:
print(f'主机:{ip},端口{port}已开放')
else:
pass
except:
pass
# 5-关闭连接
sk.close()
3-使用ip进行扫描
def scan_ip(ip):
if check_ip(ip):
#进行端口扫描
scan_port(ip)
else:
print('ip输入有误')
4-使用域名进行扫描
def scan_domain(domain_name):
# 1-通过正则过滤域名
if 'http://' in domain_name or 'https://' in domain_name:
domain_name=re.findall(r'://(.*)',domain_name)[0]
if check_domain(domain_name):
# 2-通过域名解析出IP地址
server_ip= socket.gethostbyname(domain_name)
# 3-进行端口扫描
scan_port(server_ip)
else:
print('域名输入有误')
5-用户可以选择,逻辑规划
def main():
# 1-提示信息
info="""
1.请输入IP
2.请输入域名
"""
print(info)
# 2-接收用户的输入
select = input('请输入>>> ')
# 3-判断用户的输入,选择对应的扫描方式
if select =='1':
ip=input('请输入IP>>> ')
scan_ip(ip)
elif select=='2':
domain = input('请输入域名>>> ')
scan_domain(domain)
else:
print('扫描方式输入有误')
V4.0多线程
#-----------------v4.0----------------------
"""
反馈:
1、只能输入IP,不能使用域名进行扫描
2、没有对IP做有效判断
优化方案:
1-IP有效判断
2-进行端口扫描
3-使用ip进行扫描
4-使用域名进行扫描
5-用户可以选择,逻辑规划
6-域名有效判断
192.168.222.128
IPv4地址:((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})(.((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})){3}
http://c.runoob.com/front-end/854 菜鸟
7-使用多线程
"""
#-------------------------------------------
import socket
#1-IP有效判断
def check_ip(ip):
"""
:param ip: 输入IP地址
:return: 返回IP判断结果
"""
ip_address=re.compile('((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})(.((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})){3}')#表达式对象
if ip_address.match(ip) and len(ip) !=0:
return True
else:
return False
#6-域名有效判断
def check_domain(domain_name):
"""
:param domain: 输入域名
:return: 返回domain判断结果
"""
domain=re.compile('[a-zA-Z0-9][-a-zA-Z0-9]{0,62}(.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+.?')#表达式对象
if domain.match(domain_name) and len(domain_name) !=0:
return True
else:
return False
#2-定义端口扫描--只扫一个
def scan_port(ip,port):
port_begin,port_end = (9090,9100)
# 3-建立连接
try:
# 1-创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.settimeout(0.1)
conn = sk.connect_ex((ip,port)) #有返回值,('127.0.0.1',int类型)
# 4-判断返回值
if conn==0:
print(f'主机:{ip},端口{port}已开放')
else:
pass
except:
pass
# 5-关闭连接
sk.close()
#3-使用ip进行扫描
def threading_scan_port(ip):
start_time =time.time()
threads=[]#存储线程组
#创建线程组
for one in range(1,65535+1):
t =threading.Thread(target=scan_port,args=(ip,one))
threads.append(t)
t.setDaemon(True)
t.start()
# #启动线程组
# for i in range(65535):
# threads[i].start()
# #阻塞线程
# for i in range(65535):
# threads[i].join()
end_time=time.time()
print('端口扫描总共耗时>>> ',end_time-start_time)
def scan_ip(ip):
#进行端口扫描
threading_scan_port(ip)
#4-使用域名进行扫描
def scan_domain(domain_name):
# 1-通过正则过滤域名
if 'http://' in domain_name or 'https://' in domain_name:
domain_name = re.findall(r'://(.*)', domain_name)[0]
# 2-通过域名解析出IP地址
server_ip= socket.gethostbyname(domain_name)
# 3-进行端口扫描
threading_scan_port(server_ip)
#5-不需要用户选择,逻辑规划
def main():
# 1-提示信息
info="""
请输入IP或者域名
"""
# 2-接收用户的输入
info = input('请输入IP或者域名>>> ')
# 3-判断用户的输入,选择对应的扫描方式
if check_ip(info):
scan_ip(info)
elif check_domain(info):
scan_domain(info)
else:
print('输入格式有误')
if __name__ == '__main__':
main()
V5.0 协程基础
有没有可以比多线程技术更优化的并发方案
-
协程的概述
- python线程的性能问题,在python中使用多线程运行代码经常不能达到预期的效果。而有些时候我们的逻辑中又需要开更高的并发,让代码跑的更快,在同样的时间内执行更多的有效逻辑,减少无用的等待。
- 协程是一种用户级轻量级的线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。
- 协程又称为微线程,纤程
- 优点1:协程极高的执行效率。因为子程序切换不是线程的切换,而是由程序自身控制。因此,没有线程切换的开销。和多线程比,线程数量越多,协程的性能优势就越明显。
- 优点2:不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多
- 缺点1:无法利用多核资源:协程的本质是个单线程,它不能同时将多个CPU的多个核用上,协程需要和进程配合才能运行在多CPU上,当然我们日常所编写的绝大部分应用都没有这个必要,除非是CPU密集型应用
- 缺点2:进行阻塞操作(如IO)会阻塞掉整个程序
-
gevent
- gevent就是一个现在很火的,支持很全面的python第三方协程库
- pip install gevent
- 由于切换是在IO操作时自动完成,所以gevent需要修改python自带的一些标准库,这一过程在启动时通过monkey patch完成
协程的基本用法:
import time
import gevent
#time,默认是阻塞的
from gevent import monkey
monkey.patch_all()#解决:能不能改成非阻塞方式,在操作IO的时候,立即切换到其他的协程运行
def f1():
for i in range(5):
print('f1---在运行---')
time.sleep(1)
def f2():
for i in range(5):
print('f2---在运行---')
time.sleep(1)
#创建协程对象
t1 =gevent.spawn(f1)
t2 =gevent.spawn(f2)
gevent.joinall([t1,t2])
控制台输出
f1---在运行---
f2---在运行---
f1---在运行---
f2---在运行---
f1---在运行---
f2---在运行---
def get(url):
print(f'{datetime.now()}开始--GET--DATA:{url}')
resp=requests.get(url)
print(f'{datetime.now()}结束--GET--DATA:{url}')
gevent.joinall(
[
gevent.spawn(get,'http://www.baidu.com'),
gevent.spawn(get,'http://c.runoob.com/')
]
)
控制台输出:
2021-07-09 21:23:42.628202开始--GET--DATA:http://www.baidu.com
2021-07-09 21:23:42.631228开始--GET--DATA:http://c.runoob.com/
2021-07-09 21:23:42.718241结束--GET--DATA:http://www.baidu.com
2021-07-09 21:23:42.798265结束--GET--DATA:http://c.runoob.com/
V6.0协程+文件
#-----------------v6.0----------------------
"""
反馈:
1、扫描效率低
2、有些端口没有扫出来
优化方案:
1-使用协程方案
192.168.222.128
IPv4地址:((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})(.((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})){3}
http://c.runoob.com/front-end/854 菜鸟
[a-zA-Z0-9][-a-zA-Z0-9]{0,62}(.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+.?')
47.96.181.17
2-扫描结果写入文件
"""
#-------------------------------------------
import socket
#1-IP有效判断
def check_ip(ip):
"""
:param ip: 输入IP地址
:return: 返回IP判断结果
"""
ip_address=re.compile('((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})(.((2(5[0-5]|[0-4]d))|[0-1]?d{1,2})){3}')#表达式对象
if ip_address.match(ip) and len(ip) !=0:
return True
else:
return False
#6-域名有效判断
def check_domain(domain_name):
"""
:param domain: 输入域名
:return: 返回domain判断结果
"""
domain=re.compile('[a-zA-Z0-9][-a-zA-Z0-9]{0,62}(.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+.?')#表达式对象
if domain.match(domain_name) and len(domain_name) !=0:
return True
else:
return False
#导出文件
def write_file(info,path='./scan_port_result.txt'):
with open(path,'a',encoding='utf-8') as f:
f.write(info)
#2-定义端口扫描--只扫一个
def scan_port(ip,port):
#3-建立连接
try:
#1-创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.settimeout(0.1)
conn = sk.connect_ex((ip,port)) #有返回值,('127.0.0.1',int类型)
#4-判断返回值
if conn==0:
print(f'主机:{ip},端口{port}已开放')
write_file(f'主机:{ip},端口{port}已开放
')
else:
pass
except:
pass
#5-关闭连接
sk.close()
#3-使用ip进行扫描
monkey.patch_all()#非阻塞方式执行
def gevent_scan_port(ip):
write_file(f'主机:{ip},端口扫描开始,请耐心等待......
')
start_time =time.time()
runList=[]#存储协程
g =gevent.pool.Pool(200)#并发的协程数
#创建协程
for port in range(1,65535+1):
runList.append(g.spawn(scan_port,ip,port))
gevent.joinall(runList)
end_time=time.time()
write_file(f'主机:{ip},端口扫描结束
')
write_file(f'端口扫描总共耗时{end_time-start_time}
')
print('端口扫描总共耗时>>> ',end_time-start_time)
def scan_ip(ip):
#进行端口扫描
gevent_scan_port(ip)
#4-使用域名进行扫描
def scan_domain(domain_name):
#1-通过正则过滤域名
if 'http://' in domain_name or 'https://' in domain_name:
domain_name = re.findall(r'://(.*)', domain_name)[0]
#2-通过域名解析出IP地址
try:
server_ip= socket.gethostbyname(domain_name)
#3-进行端口扫描
gevent_scan_port(server_ip)
except Exception as e:
print(f'输入有误,请检查:{e}')
#5-不需要用户选择,逻辑规划
def main():
#1-提示信息
info="""
请输入IP或者域名
"""
#2-接收用户的输入
info = input('请输入IP或者域名>>> ')
#3-判断用户的输入,选择对应的扫描方式
if check_ip(info):
scan_ip(info)
elif check_domain(info):
scan_domain(info)
else:
print('输入格式有误')
if __name__ == '__main__':
main()
问题记录
-
1、端口扫描结果不正确
请输入需要扫描的IP>>>192.168.222.128
请输入需要扫描的ports(0-65535)>>>9089-9100
主机:192.168.222.128,端口9089未开放
主机:192.168.222.128,端口9090未开放
主机:192.168.222.128,端口9091未开放
主机:192.168.222.128,端口9092未开放
主机:192.168.222.128,端口9093未开放
主机:192.168.222.128,端口9094未开放
主机:192.168.222.128,端口9095未开放
主机:192.168.222.128,端口9096未开放
主机:192.168.222.128,端口9097未开放
主机:192.168.222.128,端口9098未开放
主机:192.168.222.128,端口9099未开放
主机:192.168.222.128,端口9100未开放- 原因分析
socket已经关闭了,导致后面的端口都无法扫描
- 原因分析
第一次debug结果
<socket.socket fd=736, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
第二次debug结果
<socket.socket fd=836, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
每次扫描都需要使用新的socket对象来建立连接
* 解决方案
把建立socket对象放到循环中,每次扫描都要重新建立socket对象
验证结果:
请输入需要扫描的IP>>>192.168.222.128
请输入需要扫描的ports(0-65535)>>>9089-9100
主机:192.168.222.128,端口9089未开放
主机:192.168.222.128,端口9090已开放
主机:192.168.222.128,端口9091未开放
主机:192.168.222.128,端口9092未开放
主机:192.168.222.128,端口9093未开放
主机:192.168.222.128,端口9094未开放
主机:192.168.222.128,端口9095未开放
主机:192.168.222.128,端口9096未开放
主机:192.168.222.128,端口9097未开放
主机:192.168.222.128,端口9098未开放
主机:192.168.222.128,端口9099未开放
主机:192.168.222.128,端口9100已开放