用oop编程实现文件自动解压整理,同时监控2个目录并处理zip文件,解压后一个文件夹删除zip文件,另一个文件夹将zip文件移动到某一目录中。
记得要在主程序中加入下面2行代码,保证进程一直存在:
while True:
pass
import time
import os
import threading # 使用多线程
import shutil
class ArchiveMonitor:
# 初始化参数
def __init__(self,config):
self.name = config['name']
self.monitor_path = config['monitor_path']
self.interval = config['interval']
# 扫描监控路径文件夹,返回zip文件路径的列表
def scan_file(self):
zips = []
for f in os.listdir(self.monitor_path):
if f.endswith('.zip'):
f_path = os.path.join(self.monitor_path, f)
zips.append(f_path)
if len(zips) == 0:
return None
else:
return zips
# 解压zip文件,存放到同一目录下的同名文件夹中
def unzip_them(self,zips_path):
for zip in zips_path:
unziped_dir = zip.split('.')[0]
os.makedirs(unziped_dir,exist_ok=True)
shutil.unpack_archive(zip,unziped_dir)
print(zip,'已解压,解压文件放在:',unziped_dir)
# 删除zip文件
def del_zips(self,zips_path):
for zip in zips_path:
os.remove(zip)
print(zip,'已删除')
# 移动zip文件
def move_zips(self,zips_path,target_dir='package'):
for zip in zips_path:
target_path = os.path.join(self.monitor_path,target_dir)
os.makedirs(target_path,exist_ok=True)
shutil.move(zip,target_path)
# def run(self):
# if self.name == "download":
# while True:
# zips = self.scan_file()
# if zips:
# self.unzip_them(zips)
# self.move_zips(zips)
# time.sleep(self.interval)
def run(self):
def _run():
print('进入_run函数了')
print('self.name == "download"?',self.name == "download")
print('self.name == "Desktop"?',self.name == "Desktop")
if self.name == "download":
while True:
print('进入download文件夹的while True了')
zips = self.scan_file()
if zips:
self.unzip_them(zips)
self.move_zips(zips)
print('等待20s...')
time.sleep(self.interval)
elif self.name == "Desktop":
while True:
print('进入desktop文件夹的while True了')
zips = self.scan_file()
if zips:
self.unzip_them(zips)
self.del_zips(zips)
print('等待10s...')
time.sleep(self.interval)
print('进入run函数了')
t = threading.Thread(target=_run) #我理解就是开启一个线程,运行_run函数
t.daemon = True # 开启守护线程,即本线程是守护线程,可随程序关闭而关闭
print('thread建好了,还没开启')
t.start()
# am = ArchiveMonitor(CONFIGS[1])
# am.run()
CONFIGS = [
{
"name":"Desktop",
"monitor_path":'C:/Users/lori/Desktop/',
"interval":10
},
{
"name": "download",
"monitor_path": 'C:/Users/lori/Desktop/python52project/test',
"interval":20
},
]
#if __name__ == "__main__":
monitors = [ArchiveMonitor(c) for c in CONFIGS]
for m in monitors:
m.run()
while True:
pass