IO 多路复用、paramiko 及 pymysql 模块
1、引入 IO 模式
对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
- 等待数据准备 (Waiting for the data to be ready)
- 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
正式因为这两个阶段,Linux系统产生了下面五种网络模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路复用( IO multiplexing)
- 信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)
1.1 阻塞 I/O
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来
1.2 非阻塞 I/O
linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。
所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。
1.3 I/O 多路复用
IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。
1.4 异步 I/O(asynchronous IO)
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
水平触发(Level Triggered):
将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,但是会增加消耗
边缘触发(Edge Triggered):
只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
2、 select/poll/epoll 模块
2.1 Python IO复用之select
格式:
rList,wList,eList = select.select(argv1,argv2,argv3,timeout)
参数:
- argv1 标准输入
- argv2 如果监听序列中句柄发生变化 则将变化句柄返回至wList
- argv3 如果监听序列中句柄有错误时 则将错误句柄返回到eList
- timeout 设置阻塞时间,如果为2那么将阻塞2s,如果不设置则默认一直阻塞
import socket
import select
# IO多路复用:8002,8001
#
sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001,))
sk1.listen(5)
sk2 = socket.socket()
sk2.bind(('127.0.0.1',8002,))
sk2.listen(5)
inputs = [sk1,sk2,]
while True:
# IO多路复用,同时监听多个socket对象
# - select,内部进行循环操作(1024) 主动查看
# - poll, 内部进行循环操作 主动查看
# - epoll, 被动告知
r,w,e = select.select(inputs,[],[],0.05)#读、写、异常时间
# r = [sk2,]
# r = [sk1,]
# r = [sk1,sk2]
# r = []
# r = [conn,]
# r = [sk1,Wconn]
#######?
for obj in r:
if obj in [sk1,sk2]:
# 新连接捡来了...
print('新连接来了:',obj)
conn,addr = obj.accept()
inputs.append(conn)
else:
# 有连接用户发送消息来了..
print('有用户发送数据了:',obj)
data = obj.recv(1024)
obj.sendall(data)
2.2 HTTP 服务
设置连接的默认方式为 sock.setblocking(False)
<pre style="background-color:#ffffff;color:#000000;font-family:'宋体';font-size:11.3pt;">#!/usr/bin/env python
import select,socket
def process_data(conn):
data = bytes()
while True:
try:
chunk = client.recv(1024)
except Exception as e:
chunk = None
if not chunk:
break
data += chunk
data_str = str(data,encoding='utf-8')
# print(data_str)
header,body = data_str.split('
',1)
header_list = header.split('
')
header_dict = {}
for line in header_list:
value = line.split(':',1)
if len(value) == 2:
k,v = value
header_dict[k] = v
else:
header_dict['方法'],header_dict['URL'],header_dict['HTTP版本'] = line.split(' ')
return header_dict,body
sock = socket.socket()
sock.setblocking(False)#默认阻塞式,改为非阻塞
sock.bind(('127.0.0.1',8888,))
sock.listen(5)
inputs = [sock,]
while True:
r,w,e = select.select(inputs,[],[],0.5)
for client in r:
if client == sock:
conn,addr = client.accept()
conn.setblocking(False)
inputs.append(conn)
else:
header_dict,body = process_data(client)
request_url = header_dict['URL']
client.sendall(request_url.encode('utf-8'))
inputs.remove(client)
client.close()</pre>
开源 web 服务器 tornado 就是 HTTP 原理的封装
2.3 Web 客户端模拟
filno 函数
fileno()方法返回所使用的底层实现,要求从操作系统I/O操作的整数文件描述符。
import socket,select
class Foo(object):
def __init__(self):
self.sock = socket.socket()
def fileno(self):
return self.sock.fileno()
r,w,e = select.select([Foo()],[],[],0.5)
用法
原先使用的 select 内部就封装了 fileno 函数,实现socket 对象的使用,如果我们要在 r 中对象添加其他属性、信息就需要自己封装这个函数Foo
Web 客户端
import select,socket
def f1(data):
print('f1',data)
def f2(data):
print('f2',data)
url_list = [
{'host':"www.baidu.com",'url': '/','callback':f1 }, # socket
{'host':"www.cnblogs.com",'url': '/wupeiqi','callback':f1 },
{'host':"www.oldboyedu.com",'url': '/','callback':f2 },
]
class Foo(object):
def __init__(self,sock,callback,url,host):
self.sock = sock
self.callback = callback
self.url = url
self.host = host
def fileno(self):
return self.sock.fileno()
class NbIO(object):
def __init__(self):
self.fds = []
self.connections = []
def connect(self,url_list):
for item in url_list:
conn = socket.socket()
conn.setblocking(False)
# 1. 发送链接请求
try:
conn.connect((item['host'],80))
except BlockingIOError as e:
pass
obj = Foo(conn,item['callback'],item['url'],item['host'])
self.fds.append(obj)
# print(self.fds)
self.connections.append(obj)
def send(self):
while True:
# wList,有对象;当前socket已经创建链接
try:
if len(self.fds) == 0:
return
rList,wList,eList = select.select(self.fds,self.connections,[],0.5)
# 【1,11】
for obj in rList:
# 4.有数据响应回来了
conn = obj.sock
data = bytes()
while True:
try:
d = conn.recv(1024)
data = data + d
except BlockingIOError as e:
d = None
if not d:
break
obj.callback(data) # 自定义操作 f1 f2
self.fds.remove(obj)
# print(len(self.fds),len(self.connections))
# 执行当前请求 函数:f1 f2
# 【1,2,3,】
for obj in wList:
# 2.已经连接上远程
conn = obj.sock
# 3. 发送数据
# HTTP/1.0
Host: %s
template = "GET %s HTTP/1.1
Host: %s
" %(obj.url,obj.host,)
# template = "POST %s HTTP/1.1
Host: 127.0.0.1:8888
k1=v1&k2=v2" %(obj.url,)
conn.sendall(template.encode('utf-8'))
self.connections.remove(obj)
except OSError as e:
pass
obj = helei_new.NbIO()
obj.connect(url_list)
obj.send()
3、paramiko 模块
http://www.cnblogs.com/wupeiqi/articles/5095821.html
Python 的 paramiko 模块基于 SSH 用于连接远程服务器并执行相关操作
3.1 paramiko.SSHClient
用于连接远程服务器并执行基本命令
import paramiko
#基于秘钥连接时使用
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123')#密码连接时使用
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key)#秘钥连接时使用
#还可以基于秘钥字符串连接
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()
# 关闭连接
ssh.close()
封装
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', pkey=private_key)
ssh = paramiko.SSHClient()
ssh._transport = transport
stdin, stdout, stderr = ssh.exec_command('df')
transport.close()
3.2 paramiko.SFTPClient
用于连接远程服务器并执行上传下载
import paramiko
transport = paramiko.Transport(('hostname',22))
transport.connect(username='wupeiqi',password='123')
sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 将remove_path 下载到本地 local_path
sftp.get('remove_path', 'local_path')
transport.close()
同样可以根据秘钥连接
4、pymysql 模块
pymsql是Python中操作MySQL的模块,其使用方法和MySQLdb几乎相同。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
# 创建连接
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
# 创建游标
cursor = conn.cursor()
# 执行SQL,并返回收影响行数
effect_row = cursor.execute("update hosts set host = '1.1.1.2'")
# 执行SQL,并返回受影响行数
#effect_row = cursor.execute("update hosts set host = '1.1.1.2' where nid > %s", (1,))
# 执行SQL,并返回受影响行数
#effect_row = cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)])
# 获取第一行数据
row_1 = cursor.fetchone()
# 获取前n行数据
# row_2 = cursor.fetchmany(3)
# 获取所有数据
# row_3 = cursor.fetchall()
# 获取最新自增ID
new_id = cursor.lastrowid
# 提交,不然无法保存新建或者修改的数据
conn.commit()
# 关闭游标
cursor.close()
# 关闭连接
conn.close()