def main():
f = open('致橡树.txt', 'r', encoding='utf-8')
print(f.read()) # 一次性读取整个文件内容
f.close()
if __name__ == '__main__':
main()
for line in f: # 通过for-in循环逐行读取
lines = f.readlines() # 读取文件按行读取到列表中
模式 | 具体含义 |
---|---|
'r' | 读取 (默认) |
'w' | 写入(会先截断之前的内容) |
'x' | 写入,如果文件已经存在会产生异常 |
'a' | 追加,将内容写入到已有文件的末尾 |
'b' | 二进制模式 |
't' | 文本模式(默认) |
'+' | 更新(既可以读又可以写) |
json模块主要有四个比较重要的函数,分别是:
- dump - 将Python对象按照JSON格式序列化到文件中. eg: json.dump(mydict, fs)
- dumps - 将Python对象处理成JSON格式的字符串
- load - 将文件中的JSON数据反序列化成对象
- loads - 将字符串的内容反序列化成Python对象
在Python中要实现序列化和反序列化除了使用json模块之外,还可以使用pickle和shelve模块,但是这两个模块是使用特有的序列化协议来序列化数据,因此序列化后的数据只能被Python识别。
读csv文件:
import csv
# with open(filename, 'r', encoding='utf-8') as f: 指定文件对象的上下文环境并在离开上下文环境时自动释放文件资源
def read_csv(filename): # for row in read_csv(filename):
with open(filename, encoding='gbk') as f: # UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 0: invalid continuation byte
reader = csv.reader(f) # f.readline() 读取到“title,poster”
for row in reader:
yield _dict(row)
写入csv文件:
import csv, StringIO
content = StringIO.StringIO()
writer = csv.writer(content)
writer.writerow([u'ID', u'Name'])
for _ in Author.objects.filter(status=1).iterator():
writer.writerow([
_.id,
_.name,
])
content = content.getvalue().encode('gbk', 'ignore')
with open(filename, "wb") as f:
f.write(content)
- 当生成的csv文件很大时,为避免超时和数据存入内存所带来的性能损耗,推荐使用StreamingHttpResponse---StreamingHttpResponse会启动一个进程来和客户端保持长连接,会消耗资源。所以如果不是特殊要求,尽量少用这种方法。
读取本地上文件
import mimetypes
from wsgiref.util import FileWrapper
from django.http import HttpResponse, StreamingHttpResponse
# < a href="api" target="_blank" >首页</a> 不加target="_blank"的话超链接会覆盖本页面,加上则重新打开一个窗体
class LogDownloadHandler(View):
def get(self, request, *args, **kwargs):
file_path, hostName = '**', socket.gethostname() # 机器名
ret = {
'status': 32,
'hostName': hostName,
}
try:
wrapper = FileWrapper(open(file_path, 'rb'))
content_type = mimetypes.guess_type(file_path)[0]
response = StreamingHttpResponse(wrapper, content_type)
# response['Content-Disposition'] = "attachment; filename='%s'" % file_name
ret.update({
'msg': 'succ',
'log': response,
'status': 0,
})
except FileNotFoundError:
ret['msg'] = "Not Found File. Task maybe be executing, please wait for a moment"
except Exception as e:
print(e)
ret['msg'] = "Error. Please contact to developer"
return ret
读取另一台机器上文件:
# 用sftp可以在两台机器之间互相传输拷贝文件,不能拷贝文件夹.
# Paramiko实现ssh账号密码登陆
import paramiko
def get(self, request):
try:
hostname, port, username, password = '*', 22, '*', '*'
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, port, username, password, compress=True)
sftp_client = client.open_sftp()
with sftp_client.open(file_path) as remote_file: # 文件路径
ret.update({
'msg': 'Success',
'log': remote_file.read(),
'status': ServerStatus.OK,
})
except FileNotFoundError:
ret['msg'] = "Not Found File. Task maybe be executing, please wait for a moment"
except paramiko.ssh_exception.AuthenticationException:
ret['msg'] = "Error. Not allowed to login online machine"
except Exception as e:
print(e)
ret['msg'] = "Error. Please contact to developer"
finally:
client.close()
# Paramiko实现ssh账号密码登陆
# ssh-copy-id user@host 然后ssh user@host验证是否可登录
private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
client = paramiko.SSHClient()
client .set_missing_host_key_policy(paramiko.AutoAddPolicy())
client .connect(hostname = '*', port=22, username="root", pkey=private_key)
stdin, stdout, stderr = client .exec_command('ifconfig')
- https://cloud.tencent.com/developer/article/1566897
- SSH与Python模块paramiko: https://www.jianshu.com/p/486dd9993125