http://www.cnblogs.com/wupeiqi/articles/4963027.html
模块概念:用一砣代码实现了某个功能的代码集合。
类似于函数式编程和面向过程编程,函数式编程则完成一个功能,其他代码用来调用即可,提供了代码的重用性和代码间的耦合。而对于一个复杂的功能来,可能需要多个函数才能完成(函数又可以在不同的.py文件中),n个 .py 文件组成的代码集合就称为模块。
模块分为三种:
1、自定义模块
2、内置模块
3、开源模块
注意:py文件命名时不要跟模块名相同,否则会找不到模块
1、自定义模块
1、导入模块
导入模块方法:
import module from module.xx.xx import xx from module.xx.xx import xx as rename #给导入模块取别名 from module.xx.xx import * #导入module.xx.xx 文件中的所有函数、变量、类等
导入模块其实就是告诉Python解释器去解释那个py文件
- 导入一个py文件,解释器解释该py文件
- 导入一个包,解释器解释该包下的 __init__.py 文件
导入模块时是根据那个路径作为基准来进行的呢?即:sys.path
import sys print(sys.path) 打印结果: ['E:\Lab\python\s12\day6', 'E:\Lab\python\s12', 'E:\Python\Python35\python35.zip', 'E:\Python\Python35\DLLs', 'E:\Python\Python35\lib', 'E:\Python\Python35', 'E:\Python\Python35\lib\site-packages']
#这里会把当前运行的py文件路径、项目根路径、python常用变量添加到sys.path,sys.path返回的是一个列表
例子:
现在目录结构如下:
(1)、根目录下的 .py文件里调用子目录 .py文件
注意:from DIR_PATH import py文件或py文件里的函数、类等
import后只能是 py文件或py文件里的
这里也可以只导入某py文件中的某个函数等
(2)、子目录调用另一个子目录py包
导入包前需要把根路径添加到path环境变量中,否则模块将找不到而报错,此方法仅适用于工作目录py所在目录,即需要在py目录来执行运行。
pre_path = os.path.abspath('../') #获取根的path,os.path.abspath() 这里传当前路径到根路径的相对路径
sys.path.append(pre_path) #将根路径追加到path环境变量中
任意工作目录下获取项目的根路径:
pre_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# __file__ :获取当前py文件的路径
# os.path.abspath(__file__):获取当前py文件的绝对路径
# os.path.dirname(os.path.abspath(__file__)) :获取当前py文件的目录路径
# s.path.dirname(os.path.abspath(__file__)) :获取当前py文件的上一层的目录路径
2、开源模块
下载安装有两种方式:
包管理工具安装
yum
pip
apt-get
2.1 源码安装
下载源码
解压源码
进入目录
编译源码 python setup.py build
安装源码 python setup.py install
注:在使用源码安装时,需要使用到gcc编译和python开发环境,所以先要安装gcc及python-devvel
yum install gcc
yum install python-devel
或
apt-get python-dev
安装成功后,模块会自动安装到 sys.path 中的某个目录中,如:
/usr/lib/python2.7/site-packages/
2.2 导入模块
导入模块的方式同自定义模块的导入方法
2.3 paramiko模块
paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实。
2.3.1 paramiko安装
Centos中安装该模块: pip3 install paramiko
# pycrypto,由于 paramiko 模块内部依赖pycrypto,所以先下载安装pycrypto
# 下载安装 pycrypto
wget http:
/
/
files.cnblogs.com
/
files
/
wupeiqi
/
pycrypto
-
2.6
.
1.tar
.gz
tar
-
xvf pycrypto
-
2.6
.
1.tar
.gz
cd pycrypto
-
2.6
.
1
python setup.py build
python setup.py install
# 进入python环境,导入Crypto检查是否安装成功
# 下载安装 paramiko
wget http:
/
/
files.cnblogs.com
/
files
/
wupeiqi
/
paramiko
-
1.10
.
1.tar
.gz
tar
-
xvf paramiko
-
1.10
.
1.tar
.gz
cd paramiko
-
1.10
.
1
python setup.py build
python setup.py install
# 进入python环境,导入paramiko检查是否安装成功
#!/usr/bin/env python #coding:utf-8 import paramiko ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect('192.168.1.108', 22, 'alex', '123') stdin, stdout, stderr = ssh.exec_command('df') print stdout.read() ssh.close();
执行命令--通过密钥连接服务器
需要先配置好ssh_key:
方法:
先在主控端生成ssh-key
ssh-keygen -t rsa #此时会在用户家目录(/root/.ssh)生成两个文件,id_rsa(私钥) id_rsa.pub(公钥)
# id_rsa id_rsa.pub
把公钥复制到被控端
ssh-copy-id -i .ssh/id_rsa.pub "-p 22 root@10.10.50.30" #此时会在目标服务器用户家目录的.ssh目录下生成 authorized_keys
import paramiko private_key_path = '/home/auto/.ssh/id_rsa' key = paramiko.RSAKey.from_private_key_file(private_key_path) ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect('主机名 ', 端口, '用户名', key) stdin, stdout, stderr = ssh.exec_command('df') print stdout.read() ssh.close()
上传下载文件--通过用户名和密码
import os,sys import paramiko t = paramiko.Transport(('182.92.219.86',22)) t.connect(username='wupeiqi',password='123') sftp = paramiko.SFTPClient.from_transport(t) sftp.put('/tmp/test.py','/tmp/test.py') t.close() import os,sys import paramiko t = paramiko.Transport(('182.92.219.86',22)) t.connect(username='wupeiqi',password='123') sftp = paramiko.SFTPClient.from_transport(t) sftp.get('/tmp/test.py','/tmp/test2.py') t.close()
上传下载文件--通过密钥
import paramiko pravie_key_path = '/home/auto/.ssh/id_rsa' key = paramiko.RSAKey.from_private_key_file(pravie_key_path) t = paramiko.Transport(('182.92.219.86',22)) t.connect(username='wupeiqi',pkey=key) sftp = paramiko.SFTPClient.from_transport(t) sftp.put('/tmp/test3.py','/tmp/test3.py') t.close() import paramiko pravie_key_path = '/home/auto/.ssh/id_rsa' key = paramiko.RSAKey.from_private_key_file(pravie_key_path) t = paramiko.Transport(('182.92.219.86',22)) t.connect(username='wupeiqi',pkey=key) sftp = paramiko.SFTPClient.from_transport(t) sftp.get('/tmp/test3.py','/tmp/test4.py') t.close()
3 内置模块
3.1 os模块
用于提供系统级别的操作
os.getcwd() #获取当前工作目录,即当前python脚本工作的目录路径
os.chdir("dirname") #改变当前脚本工作目录;相当于shell下cd
os.curdir #返回当前目录: ('.')
os.pardir # 获取当前目录的父目录字符串名:('..')
os.makedirs('dir1/dir2') #makedirs(name, mode=0o777, exist_ok=False),可递归生成多层目录,相当于shell下的mkdir -p
os.removedirs('path') #若目录为空,则删除,并递归到上一级目录,如若也为空,则删除,依此类推。如 os.makedirs('dir1/dir2'),os.removedirs('dir1/dir2'),若dir1、dir2均为空上当时,将删除dir1及dir1/dir2
os.mkdir('path') #os.mkdir(path ,mode=0o777) 生成单级目录;相当于shell中mkdir dirname
os.rmdir('path') #删除单级空目录,若目录不为空则无法删除,报错;相当于shell中rmdir dirname
os.listdir(path) #列出指定目录下的所有文件和子目录,包括隐藏文件,并以列表方式打印
os.remove(path) #删除一个文件
os.rename('src', 'dst') #重命名文件/目录 os.renames('old', 'new') 有相同功能
os.stat(path) #获取文件/目录信息
os.sep #输出操作系统特定的路径分隔符,win下为"\",Linux下为"/"
os.linesep #输出当前平台使用的行终止符,win下为" ",Linux下为" "
os.pathsep #输出用于分割文件路径的字符串 win下为";", Linux下为":"
os.name #输出字符串指示当前使用平台。win->'nt'; Linux->'posix'
os.system(command) #运行shell命令command,直接显示并显示执行状态,成功为0,每执行个os.system(command)新开一个shell临时进程
os.environ #获取操作系统环境变量
os.path.abspath(path) #返回path规范化的绝对路径
os.path.split(path) #将path分割成目录和文件名二元组返回 os.path.split('path')[0] 可获取父路径
os.path.dirname(path) #返回path的目录。其实就是os.path.split(path)的第一个元素
os.path.basename(path) #返回path最后的文件名。如何path以/或结尾,那么就会返回空值。即os.path.split(path)的第二个元素
os.path.exists(path) #如果path存在,返回True;如果path不存在,返回False
os.path.isabs(path) #如果path是绝对路径,返回True
os.path.isfile(path) #如果path是一个存在的文件,返回True。否则返回False
os.path.isdir(path) # 如果path是一个存在的目录,则返回True。否则返回False
os.path.join(path1[, path2[, ...]]) #将多个路径组合后返回,第一个绝对路径之前的参数将被忽略
os.path.getatime(filename) #返回path所指向的文件或者目录的最后存取时间
os.path.getmtime(filename) 返回path所指向的文件或者目录的最后修改时间、
os.path.getctime(filename) #返回path所指向的文件或者目录的创建时间
os.path.getsize(filename) #返回path所指向的文件或者目录的大小
os.popen(command)
a = os.popen("dir").read() # a = 执行执行命令dir的结果
b = os.system("dir") # a = 执行命令dir的状态,正常执行后的状态为0
更多猛击这里
3.2 sys模块
sys.argv #获取命令行参数List,第一个元素是程序本身路径
sys.exit(n) #sys.exit(status=None) 退出程序,正常退出时exit(0)
sys.version #获取Python解释程序的版本信息
sys.maxsize #最大的Int值 2.x 为sys.maxint
sys.path #返回模块的搜索路径,初始化时使用PYTHONPATH环境变量的值
sys.platform #返回操作系统平台名称.win-->'win32' linux -->'linux2'
sys.stdout.wirte('please:') #输出
val = sys.stdin.readline()[:-1] #从行读取并去掉回车符
更多猛击这里
进度条小程序:
import time, sys for i in range(10): sys.stdout.write(">") sys.stdout.flush() time.sleep(0.3)
3.3 hashlib模块
用于加密相关的操作,代替了md5模块和sha模块,主要提供 SHA1, SHA224, SHA256, SHA384, SHA512 ,MD5 算法
md5--废弃(deprecated)
import md5 hash = md5.new() hash.update('admin') print hash.hexdigest()
#打印结果:21232f297a57a5a743894a0e4a801fc3
sha--废弃(deprecated)
import sha hash = sha.new() hash.update('admin') print hash.hexdigest() #打印结果:d033e22ae348aeb5660fc2140aec35850c4da997
用于加密相关的操作,3.x里代替了md5模块和sha模块,主要提供 SHA1, SHA224, SHA256, SHA384, SHA512 ,MD5 算法
import hashlib m = hashlib.md5() m.update(b"Hello") m.update(b"It's me") print(m.digest()) #b']xdexb4{/x92Zxd0xbf$x9cRxe3Brx8a' m.update(b"It's been a long time since last time we ...") print(m.digest()) #2进制格式hash #b'xa0xe9x89Ex03xcbx9fx1ax14xaax07?<xaexfaxa5' print(len(m.hexdigest())) #16进制格式hash #32 # md5 h_md5 = hashlib.md5() h_md5.update(b'admin') #python 3.x 需要转二进制,python 2.x 直接使用h_md5.update('admin'),Unicode-objects must be encoded before hashing print(h_md5.hexdigest()) #21232f297a57a5a743894a0e4a801fc3 # sha1 h_sha1 = hashlib.sha1() h_sha1.update(b'admin') #h_sha1.update('admin'.encode('utf-8')) print(h_sha1.hexdigest()) #d033e22ae348aeb5660fc2140aec35850c4da997 # sha224 h_sha224 = hashlib.sha224() h_sha224.update(b'admin') print(h_sha224.hexdigest()) #58acb7acccce58ffa8b953b12b5a7702bd42dae441c1ad85057fa70b # sha256 h_sha256 = hashlib.sha256() h_sha256.update(b'admin') print(h_sha256.hexdigest()) #8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918 # sha384 h_sha384 = hashlib.sha384() h_sha384.update(b'admin') print(h_sha384.hexdigest()) #9ca694a90285c034432c9550421b7b9dbd5c0f4b6673f05f6dbce58052ba20e4248041956ee8c9a2ec9f10290cdc0782 # sha512 h_sha512 = hashlib.sha512() h_sha512.update(b'admin') print(h_sha512.hexdigest()) #c7ad44cbad762a5da0a452f9e854fdc1e0e7a52a38015f23f3eab1d80b931dd472634dfac71cd34ebc35d16ab7fb8a90c81f975113d6c7538dc69dd8de9077ec
以上加密算法虽然依然非常厉害,但时候存在缺陷,即:通过撞库可以反解。所以,有必要对加密算法中添加自定义key再来做加密。
import hashlib # md5 h_m = hashlib.md5(b'mykey_1001') h_m.update(b'admin') print(h_m.hexdigest()) #f4e45c78302ae746e0b20c4ad1b889a6
还不够叼?python 还有一个 hmac 模块,它内部先对我们创建的key 和 内容 进行处理,然后再加密
import hmac h = hmac.new(b'jiami') h.update(b'hello') print(h.hexdigest()) #96e8ad3c5dc8c1eeb0bf510f40d0393e
更多关于md5,sha1,sha256等介绍的文章看这里 https://www.tbs-certificates.co.uk/FAQ/en/sha256.html
3.4 json 和 pickle
用于义序列化
json:用于字符串 和 python数据类型间进行转换(可与其他语言通用,仅能序列化基本的数据库类型如字符串,字典。像函数等这些就不行了)
pickle:用于python特有的类型 和 python的数据类型间进行转换(python特有,与其他语言不通用,可把字符串、字典、函数、类等写入文件)
json、pickle模块都提供了4个功能:dumps、dump、loads、load
pickle向同个文件dump了n次,那么再load n次按原来的存入顺序读出
#!/usr/bin/env python # -*- coding:utf-8 -*- import json, pickle data = {'k1':123, 'k2':'hello'} ## json # json.dumps 将数据通过特殊的形式转换为所有程序都识别的字符串 j_str = json.dumps(data) print(j_str) #{"k2": "hello", "k1": 123} # json.loads 读取json.dumps特殊处理后的数据并返回该对象 j_str_loads = json.loads(j_str) print(j_str_loads) #{'k2': 'hello', 'k1': 123} # json.dump 将数据通过特殊的形式转换为所有程序都识别的字符串,并写入文件 with open('file.json', 'w') as fp: json.dump(data, fp) with open('file.json', 'r') as fp: data_j_load = json.load(fp) print(data_j_load) #{'k2': 'hello', 'k1': 123} ## pickle # pickle.dumps将数据通过特殊的形式转换成只有python语言能识别的字符串 p_str = pickle.dumps(data) print(p_str) #b'x80x03}qx00(Xx02x00x00x00k2qx01Xx05x00x00x00helloqx02Xx02x00x00x00k1qx03K{u.' # pickle.loads 读取pickle.dumps特殊处理后的数据并返回该对象 p_loads = pickle.loads(p_str) print(p_loads) #{'k2': 'hello', 'k1': 123} # pickle.dump将数据通过特殊的形式转换成只有python语言识别的字符串,并写入文件 with open('file.pickle', 'wb') as fp: pickle.dump(data, fp) # pickle.loads 从文件中读取pickle.dumps特殊处理后的数据并返回该对象 with open('file.pickle', 'rb') as fp: data_p_load = pickle.load(fp) print(data_p_load) #{'k2': 'hello', 'k1': 123}
3.5 subprocess模块
用于执行复杂的系统命令
可执行系统命令
可以执行shell命令的相关模块和函数有:
- os.system
- os.spawn*
- os.popen* --废弃
- popen2.* --废弃
- commands.* --废弃,3.x中被移除
commands模块(3.x 已移除)
import commands
result1 = commands.getoutput('ls')
result2 = commands.getstatus('filename')
result3 = commands.getstatusoutput('hostname')
以上执行shell命令的相关的模块和函数的功能均在 subprocess 模块中实现,并提供了更丰富的功能。
The subprocess
module allows you to spawn new processes, connect to their input/output/error pipes, and obtain their return codes. This module intends to replace several older modules and functions:
os.system
os.spawn*
The recommended approach to invoking subprocesses is to use the run()
function for all use cases it can handle. For more advanced use cases, the underlying Popen
interface can be used directly.
The run()
function was added in Python 3.5; if you need to retain compatibility with older versions, see the Older high-level API section.
subprocess.
run
(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, timeout=None, check=False)
Run the command described by args. Wait for command to complete, then return a CompletedProcess
instance.
The arguments shown above are merely the most common ones, described below in Frequently Used Arguments (hence the use of keyword-only notation in the abbreviated signature). The full function signature is largely the same as that of the Popen
constructor - apart from timeout, input and check, all the arguments to this function are passed through to that interface.
This does not capture stdout or stderr by default. To do so, pass PIPE
for the stdout and/or stderr arguments.
The timeout argument is passed to Popen.communicate()
. If the timeout expires, the child process will be killed and waited for. The TimeoutExpired
exception will be re-raised after the child process has terminated.
The input argument is passed to Popen.communicate()
and thus to the subprocess’s stdin. If used it must be a byte sequence, or a string if universal_newlines=True
. When used, the internal Popen
object is automatically created withstdin=PIPE
, and the stdin argument may not be used as well.
If check is True, and the process exits with a non-zero exit code, a CalledProcessError
exception will be raised. Attributes of that exception hold the arguments, the exit code, and stdout and stderr if they were captured.
subprocess.run()
subprocess.run(["ls", "-l"]) # doesn't capture output(不捕获输出) subprocess.run("exit 1", shell=True, check=True) '''#运行提示: Traceback (most recent call last): ... subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1) ''' subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE) '''#运行提示: CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0, stdout=b'crw-rw-rw- 1 root root 1, 3 Jan 23 16:23 /dev/null ') '''
subprocess.call()
执行命令,返回状态码
ret = subprocess.call(["ls", "-l"], shell=False) ret = subprocess.call("ls -l", shell=True) #shell = True ,允许 shell 命令是字符串形式
subprocess.check_call()
执行命令,如果执行状态码是 0 ,则返回0,否则抛异常
subprocess.check_call(["ls", "-l"])
subprocess.check_call("exit 1", shell=True)
subprocess.check_output()
执行命令,如果状态码是 0 ,则返回执行结果,否则抛异常
subprocess.check_output(["echo", "Hello World!"]) subprocess.check_output("exit 1", shell=True)
subprocess.Popen(...)
用于执行复杂的系统命令
参数:
args:shell命令,可以是字符串或者序列类型(如:list,元组)
bufsize:指定缓冲。0 无缓冲,1 行缓冲,其他 缓冲区大小,负值 系统缓冲
stdin, stdout, stderr:分别表示程序的标准输入、输出、错误句柄
preexec_fn:只在Unix平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用
close_sfs:在windows平台下,如果close_fds被设置为True,则新创建的子进程将不会继承父进程的输入、输出、错误管道。
所以不能将close_fds设置为True同时重定向子进程的标准输入、输出与错误(stdin, stdout, stderr)。
shell:同上
cwd:用于设置子进程的当前目录
env:用于指定子进程的环境变量。如果env = None,子进程的环境变量将从父进程中继承。
universal_newlines:不同系统的换行符不同,True -> 同意使用
startupinfo与createionflags只在windows下有效
将被传递给底层的CreateProcess()函数,用于设置子进程的一些属性,如:主窗口的外观,进程的优先级等等
执行普通命令:
import subprocess ret1 = subprocess.Popen(["mkdir","t1"]) ret2 = subprocess.Popen("mkdir t2", shell=True)
终端输入的命令分为两种:
输入即可得到输出,如:ifconfig
输入进行某环境依赖,再输入,如:python
import subprocess obj = subprocess.Popen("mkdir t3", shell=True, cwd='/home/dev',)
import subprocess obj = subprocess.Popen(["python"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) obj.stdin.write('print 1 ') obj.stdin.write('print 2 ') obj.stdin.write('print 3 ') obj.stdin.write('print 4 ') obj.stdin.close() cmd_out = obj.stdout.read() obj.stdout.close() cmd_error = obj.stderr.read() obj.stderr.close() print cmd_out print cmd_error
import subprocess
obj = subprocess.Popen(["python"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
obj.stdin.write(b"print(1) ")
obj.stdin.write(b"print(2) ")
obj.stdin.write(b"print(3) ")
obj.stdin.write(b"print(4) ")
out_error_list = obj.communicate(timeout=10)
print(out_error_list) #(b'1 2 3 4 ', b'')
import subprocess obj = subprocess.Popen(["python"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out_error_list = obj.communicate('print "hello"') print out_error_list
捕获执行命令的显示的结果:
a = subprocess.Popen("ipconfig /a", shell=True, stdout=subprocess.PIPE) #使用PIPE管道 print(a.stdout.read())
更多猛击这里
3.6 shutil模块
文件、文件夹、压缩包 处理模块的高级处理(复制、压缩、解压缩=)
shutil.copyfileobj(fsrc, fdst[, length])
将文件内容拷贝到另一个文件中,可以部分内容
def copyfileobj(fsrc, fdst, length=16*1024): """copy data from file-like object fsrc to file-like object fdst""" while 1: buf = fsrc.read(length) if not buf: break fdst.write(buf)
shutil.copyfileobj 例子:
import shutil with open('f1.txt', 'r') as f1, open('f2.txt', 'a') as f2: shutil.copyfileobj(f1, f2)
shutil.copyfile(src, dst)
拷贝文件
def copyfile(src, dst): """Copy data from src to dst""" if _samefile(src, dst): raise Error("`%s` and `%s` are the same file" % (src, dst)) for fn in [src, dst]: try: st = os.stat(fn) except OSError: # File most likely does not exist pass else: # XXX What about other special files? (sockets, devices...) if stat.S_ISFIFO(st.st_mode): raise SpecialFileError("`%s` is a named pipe" % fn) with open(src, 'rb') as fsrc: with open(dst, 'wb') as fdst: copyfileobj(fsrc, fdst)
shutil.copyfile 例子:
import shutil shutil.copyfile('f1.txt', 'f3.txt')
shutil.copymode(src, dst)
仅拷贝权限。内容、组、用户信息均不变
def copymode(src, dst): """Copy mode bits from src to dst""" if hasattr(os, 'chmod'): st = os.stat(src) mode = stat.S_IMODE(st.st_mode) os.chmod(dst, mode)
shutil.copystat(src, dst)
拷贝状态的信息,包括:mode bits, atime, mtime, flags
def copystat(src, dst): """Copy all stat info (mode bits, atime, mtime, flags) from src to dst""" st = os.stat(src) mode = stat.S_IMODE(st.st_mode) if hasattr(os, 'utime'): os.utime(dst, (st.st_atime, st.st_mtime)) if hasattr(os, 'chmod'): os.chmod(dst, mode) if hasattr(os, 'chflags') and hasattr(st, 'st_flags'): try: os.chflags(dst, st.st_flags) except OSError, why: for err in 'EOPNOTSUPP', 'ENOTSUP': if hasattr(errno, err) and why.errno == getattr(errno, err): break else: raise
shutil.copy(src, dst)
拷贝文件和权限
def copy(src, dst): """Copy data and mode bits ("cp src dst"). The destination may be a directory. """ if os.path.isdir(dst): dst = os.path.join(dst, os.path.basename(src)) copyfile(src, dst) copymode(src, dst)
shutil.copy2(src, dst)
拷贝文件和状态信息
def copy2(src, dst): """Copy data and all stat info ("cp -p src dst"). The destination may be a directory. """ if os.path.isdir(dst): dst = os.path.join(dst, os.path.basename(src)) copyfile(src, dst) copystat(src, dst) 复制代码
shutil.copytree(src, dst, symlinks=False, ignore=None)
递归的去拷贝文件
例如:copytree(source, destination, ignore=ignore_patterns('*.pyc', 'tmp*'))
def copytree(src, dst, symlinks=False, ignore=None): """Recursively copy a directory tree using copy2(). The destination directory must not already exist. If exception(s) occur, an Error is raised with a list of reasons. If the optional symlinks flag is true, symbolic links in the source tree result in symbolic links in the destination tree; if it is false, the contents of the files pointed to by symbolic links are copied. The optional ignore argument is a callable. If given, it is called with the `src` parameter, which is the directory being visited by copytree(), and `names` which is the list of `src` contents, as returned by os.listdir(): callable(src, names) -> ignored_names Since copytree() is called recursively, the callable will be called once for each directory that is copied. It returns a list of names relative to the `src` directory that should not be copied. XXX Consider this example code rather than the ultimate tool. """ names = os.listdir(src) if ignore is not None: ignored_names = ignore(src, names) else: ignored_names = set() os.makedirs(dst) errors = [] for name in names: if name in ignored_names: continue srcname = os.path.join(src, name) dstname = os.path.join(dst, name) try: if symlinks and os.path.islink(srcname): linkto = os.readlink(srcname) os.symlink(linkto, dstname) elif os.path.isdir(srcname): copytree(srcname, dstname, symlinks, ignore) else: # Will raise a SpecialFileError for unsupported file types copy2(srcname, dstname) # catch the Error from the recursive copytree so that we can # continue with other files except Error, err: errors.extend(err.args[0]) except EnvironmentError, why: errors.append((srcname, dstname, str(why))) try: copystat(src, dst) except OSError, why: if WindowsError is not None and isinstance(why, WindowsError): # Copying file access times may fail on Windows pass else: errors.append((src, dst, str(why))) if errors: raise Error, errors
shutil.ignore_patterns(*patterns)
功能相当于shutil.copytree的ignore参数,可用作排除某些文件及文件类型等
def ignore_patterns(*patterns): """Function that can be used as copytree() ignore parameter. Patterns is a sequence of glob-style patterns that are used to exclude files""" def _ignore_patterns(path, names): ignored_names = [] for pattern in patterns: ignored_names.extend(fnmatch.filter(names, pattern)) return set(ignored_names) return _ignore_patterns
shutil.rmtree(path[, ignore_errors[, onerror]])
递归的去删除文件
def rmtree(path, ignore_errors=False, onerror=None): """Recursively delete a directory tree. If ignore_errors is set, errors are ignored; otherwise, if onerror is set, it is called to handle the error with arguments (func, path, exc_info) where func is platform and implementation dependent; path is the argument to that function that caused it to fail; and exc_info is a tuple returned by sys.exc_info(). If ignore_errors is false and onerror is None, an exception is raised. """ if ignore_errors: def onerror(*args): pass elif onerror is None: def onerror(*args): raise if _use_fd_functions: # While the unsafe rmtree works fine on bytes, the fd based does not. if isinstance(path, bytes): path = os.fsdecode(path) # Note: To guard against symlink races, we use the standard # lstat()/open()/fstat() trick. try: orig_st = os.lstat(path) except Exception: onerror(os.lstat, path, sys.exc_info()) return try: fd = os.open(path, os.O_RDONLY) except Exception: onerror(os.lstat, path, sys.exc_info()) return try: if os.path.samestat(orig_st, os.fstat(fd)): _rmtree_safe_fd(fd, path, onerror) try: os.rmdir(path) except OSError: onerror(os.rmdir, path, sys.exc_info()) else: try: # symlinks to directories are forbidden, see bug #1669 raise OSError("Cannot call rmtree on a symbolic link") except OSError: onerror(os.path.islink, path, sys.exc_info()) finally: os.close(fd) else: return _rmtree_unsafe(path, onerror)
shutil.move(src, dst)
递归的去移动文件
def move(src, dst, copy_function=copy2): """Recursively move a file or directory to another location. This is similar to the Unix "mv" command. Return the file or directory's destination. If the destination is a directory or a symlink to a directory, the source is moved inside the directory. The destination path must not already exist. If the destination already exists but is not a directory, it may be overwritten depending on os.rename() semantics. If the destination is on our current filesystem, then rename() is used. Otherwise, src is copied to the destination and then removed. Symlinks are recreated under the new name if os.rename() fails because of cross filesystem renames. The optional `copy_function` argument is a callable that will be used to copy the source or it will be delegated to `copytree`. By default, copy2() is used, but any function that supports the same signature (like copy()) can be used. A lot more could be done here... A look at a mv.c shows a lot of the issues this implementation glosses over. """ real_dst = dst if os.path.isdir(dst): if _samefile(src, dst): # We might be on a case insensitive filesystem, # perform the rename anyway. os.rename(src, dst) return real_dst = os.path.join(dst, _basename(src)) if os.path.exists(real_dst): raise Error("Destination path '%s' already exists" % real_dst) try: os.rename(src, real_dst) except OSError: if os.path.islink(src): linkto = os.readlink(src) os.symlink(linkto, real_dst) os.unlink(src) elif os.path.isdir(src): if _destinsrc(src, dst): raise Error("Cannot move a directory '%s' into itself" " '%s'." % (src, dst)) copytree(src, real_dst, copy_function=copy_function, symlinks=True) rmtree(src) else: copy_function(src, real_dst) os.unlink(src) return real_dst
shutil.make_archive(base_name, format,...)
创建压缩包并返回文件路径,例如:zip、tar
参数:
base_name: 压缩包的文件名,也可以是压缩包的路径。只是文件名时,则保存至当前目录,否则保存至指定路径,
如:www =>保存至当前路径
如:/Users/wupeiqi/www =>保存至/Users/wupeiqi/
format: 压缩包种类,“zip”, “tar”, “bztar”,“gztar”
root_dir: 要压缩的文件夹路径(默认当前目录)
owner: 用户,默认当前用户
group: 组,默认当前组
logger: 用于记录日志,通常是logging.Logger对象
例子:
#将 /Users/wupeiqi/Downloads/test 下的文件打包放置当前程序目录 import shutil ret = shutil.make_archive("test.tar.gz", 'gztar', root_dir='/Users/wupeiqi/Downloads/test') #将 /Users/wupeiqi/Downloads/test 下的文件打包放置 /Users/wupeiqi/目录 import shutil ret = shutil.make_archive("/Users/wupeiqi/test.tar.gz", 'gztar', root_dir='/Users/wupeiqi/Downloads/test')
def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, dry_run=0, owner=None, group=None, logger=None): """Create an archive file (eg. zip or tar). 'base_name' is the name of the file to create, minus any format-specific extension; 'format' is the archive format: one of "zip", "tar", "bztar" or "gztar". 'root_dir' is a directory that will be the root directory of the archive; ie. we typically chdir into 'root_dir' before creating the archive. 'base_dir' is the directory where we start archiving from; ie. 'base_dir' will be the common prefix of all files and directories in the archive. 'root_dir' and 'base_dir' both default to the current directory. Returns the name of the archive file. 'owner' and 'group' are used when creating a tar archive. By default, uses the current owner and group. """ save_cwd = os.getcwd() if root_dir is not None: if logger is not None: logger.debug("changing into '%s'", root_dir) base_name = os.path.abspath(base_name) if not dry_run: os.chdir(root_dir) if base_dir is None: base_dir = os.curdir kwargs = {'dry_run': dry_run, 'logger': logger} try: format_info = _ARCHIVE_FORMATS[format] except KeyError: raise ValueError("unknown archive format '%s'" % format) func = format_info[0] for arg, val in format_info[1]: kwargs[arg] = val if format != 'zip': kwargs['owner'] = owner kwargs['group'] = group try: filename = func(base_name, base_dir, **kwargs) finally: if root_dir is not None: if logger is not None: logger.debug("changing back to '%s'", save_cwd) os.chdir(save_cwd) return filename
shutil 对压缩包的处理是调用 ZipFile 和 TarFile 两个模块来进行的,详细:
zipfile压缩解压:
import zipfile # 压缩 z = zipfile.ZipFile('laxi.zip', 'w') z.write('a.log') z.write('data.data') z.close() # 解压 z = zipfile.ZipFile('laxi.zip', 'r') z.extractall() z.close()
tarfile压缩解压
import tarfile # 压缩 tar = tarfile.open('your.tar','w') tar.add('/Users/wupeiqi/PycharmProjects/bbs2.zip', arcname='bbs2.zip') tar.add('/Users/wupeiqi/PycharmProjects/cmdb.zip', arcname='cmdb.zip') tar.close() # 解压 tar = tarfile.open('your.tar','r') tar.extractall() # 可设置解压地址 tar.close()
zipfile源码:
""" Read and write ZIP files. XXX references to utf-8 need further investigation. """ import io import os import re import importlib.util import sys import time import stat import shutil import struct import binascii try: import threading except ImportError: import dummy_threading as threading try: import zlib # We may need its compression method crc32 = zlib.crc32 except ImportError: zlib = None crc32 = binascii.crc32 try: import bz2 # We may need its compression method except ImportError: bz2 = None try: import lzma # We may need its compression method except ImportError: lzma = None __all__ = ["BadZipFile", "BadZipfile", "error", "ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", "ZIP_LZMA", "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile"] class BadZipFile(Exception): pass class LargeZipFile(Exception): """ Raised when writing a zipfile, the zipfile requires ZIP64 extensions and those extensions are disabled. """ error = BadZipfile = BadZipFile # Pre-3.2 compatibility names ZIP64_LIMIT = (1 << 31) - 1 ZIP_FILECOUNT_LIMIT = (1 << 16) - 1 ZIP_MAX_COMMENT = (1 << 16) - 1 # constants for Zip file compression methods ZIP_STORED = 0 ZIP_DEFLATED = 8 ZIP_BZIP2 = 12 ZIP_LZMA = 14 # Other ZIP compression methods not supported DEFAULT_VERSION = 20 ZIP64_VERSION = 45 BZIP2_VERSION = 46 LZMA_VERSION = 63 # we recognize (but not necessarily support) all features up to that version MAX_EXTRACT_VERSION = 63 # Below are some formats and associated data for reading/writing headers using # the struct module. The names and structures of headers/records are those used # in the PKWARE description of the ZIP file format: # http://www.pkware.com/documents/casestudies/APPNOTE.TXT # (URL valid as of January 2008) # The "end of central directory" structure, magic number, size, and indices # (section V.I in the format document) structEndArchive = b"<4s4H2LH" stringEndArchive = b"PK 05 06" sizeEndCentDir = struct.calcsize(structEndArchive) _ECD_SIGNATURE = 0 _ECD_DISK_NUMBER = 1 _ECD_DISK_START = 2 _ECD_ENTRIES_THIS_DISK = 3 _ECD_ENTRIES_TOTAL = 4 _ECD_SIZE = 5 _ECD_OFFSET = 6 _ECD_COMMENT_SIZE = 7 # These last two indices are not part of the structure as defined in the # spec, but they are used internally by this module as a convenience _ECD_COMMENT = 8 _ECD_LOCATION = 9 # The "central directory" structure, magic number, size, and indices # of entries in the structure (section V.F in the format document) structCentralDir = "<4s4B4HL2L5H2L" stringCentralDir = b"PK 01 02" sizeCentralDir = struct.calcsize(structCentralDir) # indexes of entries in the central directory structure _CD_SIGNATURE = 0 _CD_CREATE_VERSION = 1 _CD_CREATE_SYSTEM = 2 _CD_EXTRACT_VERSION = 3 _CD_EXTRACT_SYSTEM = 4 _CD_FLAG_BITS = 5 _CD_COMPRESS_TYPE = 6 _CD_TIME = 7 _CD_DATE = 8 _CD_CRC = 9 _CD_COMPRESSED_SIZE = 10 _CD_UNCOMPRESSED_SIZE = 11 _CD_FILENAME_LENGTH = 12 _CD_EXTRA_FIELD_LENGTH = 13 _CD_COMMENT_LENGTH = 14 _CD_DISK_NUMBER_START = 15 _CD_INTERNAL_FILE_ATTRIBUTES = 16 _CD_EXTERNAL_FILE_ATTRIBUTES = 17 _CD_LOCAL_HEADER_OFFSET = 18 # The "local file header" structure, magic number, size, and indices # (section V.A in the format document) structFileHeader = "<4s2B4HL2L2H" stringFileHeader = b"PK 03 04" sizeFileHeader = struct.calcsize(structFileHeader) _FH_SIGNATURE = 0 _FH_EXTRACT_VERSION = 1 _FH_EXTRACT_SYSTEM = 2 _FH_GENERAL_PURPOSE_FLAG_BITS = 3 _FH_COMPRESSION_METHOD = 4 _FH_LAST_MOD_TIME = 5 _FH_LAST_MOD_DATE = 6 _FH_CRC = 7 _FH_COMPRESSED_SIZE = 8 _FH_UNCOMPRESSED_SIZE = 9 _FH_FILENAME_LENGTH = 10 _FH_EXTRA_FIELD_LENGTH = 11 # The "Zip64 end of central directory locator" structure, magic number, and size structEndArchive64Locator = "<4sLQL" stringEndArchive64Locator = b"PKx06x07" sizeEndCentDir64Locator = struct.calcsize(structEndArchive64Locator) # The "Zip64 end of central directory" record, magic number, size, and indices # (section V.G in the format document) structEndArchive64 = "<4sQ2H2L4Q" stringEndArchive64 = b"PKx06x06" sizeEndCentDir64 = struct.calcsize(structEndArchive64) _CD64_SIGNATURE = 0 _CD64_DIRECTORY_RECSIZE = 1 _CD64_CREATE_VERSION = 2 _CD64_EXTRACT_VERSION = 3 _CD64_DISK_NUMBER = 4 _CD64_DISK_NUMBER_START = 5 _CD64_NUMBER_ENTRIES_THIS_DISK = 6 _CD64_NUMBER_ENTRIES_TOTAL = 7 _CD64_DIRECTORY_SIZE = 8 _CD64_OFFSET_START_CENTDIR = 9 def _check_zipfile(fp): try: if _EndRecData(fp): return True # file has correct magic number except OSError: pass return False def is_zipfile(filename): """Quickly see if a file is a ZIP file by checking the magic number. The filename argument may be a file or file-like object too. """ result = False try: if hasattr(filename, "read"): result = _check_zipfile(fp=filename) else: with open(filename, "rb") as fp: result = _check_zipfile(fp) except OSError: pass return result def _EndRecData64(fpin, offset, endrec): """ Read the ZIP64 end-of-archive records and use that to update endrec """ try: fpin.seek(offset - sizeEndCentDir64Locator, 2) except OSError: # If the seek fails, the file is not large enough to contain a ZIP64 # end-of-archive record, so just return the end record we were given. return endrec data = fpin.read(sizeEndCentDir64Locator) if len(data) != sizeEndCentDir64Locator: return endrec sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data) if sig != stringEndArchive64Locator: return endrec if diskno != 0 or disks != 1: raise BadZipFile("zipfiles that span multiple disks are not supported") # Assume no 'zip64 extensible data' fpin.seek(offset - sizeEndCentDir64Locator - sizeEndCentDir64, 2) data = fpin.read(sizeEndCentDir64) if len(data) != sizeEndCentDir64: return endrec sig, sz, create_version, read_version, disk_num, disk_dir, dircount, dircount2, dirsize, diroffset = struct.unpack(structEndArchive64, data) if sig != stringEndArchive64: return endrec # Update the original endrec using data from the ZIP64 record endrec[_ECD_SIGNATURE] = sig endrec[_ECD_DISK_NUMBER] = disk_num endrec[_ECD_DISK_START] = disk_dir endrec[_ECD_ENTRIES_THIS_DISK] = dircount endrec[_ECD_ENTRIES_TOTAL] = dircount2 endrec[_ECD_SIZE] = dirsize endrec[_ECD_OFFSET] = diroffset return endrec def _EndRecData(fpin): """Return data from the "End of Central Directory" record, or None. The data is a list of the nine items in the ZIP "End of central dir" record followed by a tenth item, the file seek offset of this record.""" # Determine file size fpin.seek(0, 2) filesize = fpin.tell() # Check to see if this is ZIP file with no archive comment (the # "end of central directory" structure should be the last item in the # file if this is the case). try: fpin.seek(-sizeEndCentDir, 2) except OSError: return None data = fpin.read() if (len(data) == sizeEndCentDir and data[0:4] == stringEndArchive and data[-2:] == b"