主要内容有:
文本文件读写
open的参数
f = open('test.txt', 'r', encoding='utf-8') # r = read, w = write, a = append, b = binary, +表示文件不存在就创建
文件名 方式 编码方式(有时候需要写,有时不需要写)
(与此文件在同一个目录下)
texts = f.read()
print(texts)
f.close() 文件关闭
# w+覆盖文件重新写入
f = open(...)
try:
do sth
except:
pass
finally:
if f:
f.close()
使用with简化异常处理
with open('sample.txt', 'r') as f:
line = f.readline()
while line:
print(line.strip())
line = f.readline()
with open('sample.txt', 'r') as f:
for line in f.readlines():
print(line.strip())
文件内容读取
自己实现readlines功能
texts = ['New line #1 hello python', 'Line #2 first to learn']
with open('new_sample.txt', 'w') as f:
for text in texts:
f.write(text + '
') #每写入一行,自己手写换行符
with open('new_sample.txt', 'a') as f:
f.write('Something new
')
json与CSV文件操作
json 的文件的字符串一定要 用双引号括住
dump把字典写入json文件
json.loads与json.dump都可以复制
import json
# 模拟dumps的实现
def json_dumps(di): # 回去自己实现带嵌套的情况
s = '{
'
lines = []
for k, v in di.items():
_s = '"' + k + '": '
if t·ype(v) != list:
_s += '"' + str(v) + '"'
else:
items = ['"' + i + '"' for i in v]
_s += '[' + ', '.join(items) + ']'
lines.append(_s)
s += ',
'.join(lines)
s += '
}'
return s
config = {'ip': '192.168.1.1', 'port': ['9100', '9101', '9102']}
print(json_dumps(config))
# 模拟dumps的实现
def json_dumps(di): # 回去自己实现带嵌套的情况
s = '{
'
lines = []
for k, v in di.items():
_s = '"' + k + '": '
if type(v) != list:
_s += '"' + str(v) + '"'
else:
items = ['"' + i + '"' for i in v]
_s += '[' + ', '.join(items) + ']'
lines.append(_s)
s += ',
'.join(lines)
s += '
}'
return s
;
config = {'ip': '192.168.1.1', 'port': ['9100', '9101', '9102']}
print(json_dumps(config))
csv文件的读取
import csv
序列化及应用(不太懂)
import pickle
多进程与多线程
Python没有线程ID这个概念,只好提出了线程名字。
# 多进程
from multiprocessing import Process
import os
# 多线程
import time, threading
进程池与线程池
在notebook里运行显示不出来,放在DOS窗口运行
# 进程池
from multiprocessing import Pool
import os, time, random
# 多线程的应用
def top3(data):
data.sort()
temp_result[threading.current_thread().name] = data[-3:]
data_set = [[1, 7, 8, 9, 20, 11, 14, 15],
[19, 21, 23, 24, 45, 12, 45, 56, 31],
[18, 28, 64, 22, 17, 28]]
temp_result = {} #全局变量
threads = []
for i in range(len(data_set)):
t = threading.Thread(target=top3, name=str(i), args=(data_set[i], ))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
result = []
for k, v in temp_result.items():
result.extend(v)
result.sort()
print(result[-3:])
数据共享与锁
# 锁
import threading
lock = threading.Lock()
系统库
后面的东西比较难理解,多学多练