zoukankan      html  css  js  c++  java
  • python 多进程与多线程配合拷贝文件目录

    版本一:使用shutil进行拷贝

      1 # -*- coding: utf-8 -*-
      2 # @author: Tele
      3 # @Time  : 2019/04/02 下午 3:09
      4 # 待改进:
      5 # 1.拷贝逻辑使用原生的io
      6 # 2.针对大文件在进程内部实现多线程方式进行拷贝
      7 
      8 
      9 import time
     10 import re
     11 import os
     12 import shutil
     13 import multiprocessing
     14 
     15 
     16 # 遍历文件夹
     17 def walk_file(file):
     18     file_list = list()
     19     for root, dirs, files in os.walk(file):
     20         # 遍历文件
     21         for f in files:
     22             file_list.append(f)
     23     return file_list
     24 
     25 
     26 # 计算文件数量
     27 def get_file_count(dir):
     28     return len(walk_file(dir))
     29 
     30 
     31 def copy(src, target, queue):
     32     target_number = 1
     33     if os.path.isdir(src):
     34         target_number = get_file_count(src)
     35         shutil.copytree(src, target)
     36     else:
     37         shutil.copyfile(src, target)
     38     # 将拷贝完成的文件数量放入队列中
     39     queue.put(target_number)
     40 
     41 
     42 def copy_dir(src, desc):
     43     total_number = get_file_count(src)
     44     # 分隔符检测
     45     src = check_separator(src)
     46     desc = check_separator(desc)
     47     # print("src:",src)
     48     # print("desc:",desc)
     49 
     50     file_dir_list = [src + "/" + i for i in os.listdir(src)]
     51     if os.path.exists(desc):
     52         shutil.rmtree(desc)
     53     pool = multiprocessing.Pool(3)
     54 
     55     # 创建队列
     56     queue = multiprocessing.Manager().Queue()
     57 
     58     # 一个文件/目录开启一个进程去拷贝
     59     for f_name in file_dir_list:
     60         target = desc + "/" + f_name[index_list("/", f_name)[1] + 1:]
     61         # print(target)
     62         # 创建target目录
     63         parent_path = os.path.split(target)[0]
     64         if not os.path.exists(parent_path):
     65             os.makedirs(parent_path)
     66         pool.apply_async(copy, args=(f_name, target, queue,))
     67 
     68     start = time.time()
     69     pool.close()
     70     #    pool.join()
     71     count = 0
     72     while True:
     73         count += queue.get()
     74         # 格式化输出时两个%输出一个%,不换行,每次定位到行首,实现覆盖
     75         print("
    拷贝进度为 %.2f %%" % (count * 100 / total_number), end="")
     76         if count >= total_number:
     77             break
     78     end = time.time()
     79     print()
     80     print("耗时-----", (end - start), "s")
     81 
     82 
     83 # 查找指定字符出现的全部索引位置
     84 def index_list(c, s):
     85     return [i.start() for i in re.finditer(c, s)]
     86 
     87 
     88 # 检测目录结尾是否有 "/"
     89 def check_separator(path):
     90     if path.rindex("/") == len(path) - 1:
     91         return path[0:path.rindex("/")]
     92     return path
     93 
     94 
     95 def main():
     96     copy_dir("f:/ftp_mypc/", "e:/ftp_mypc/")
     97 
     98 
     99 if __name__ == '__main__':
    100     main()

    这样做仍然有些小问题,对于大文件可以在进程内部采用多线程的方式,可以看到使用shutil进行拷贝时我们没有办法实现字节切割,于是有了下面的版本二

     版本二:

      1 # -*- coding: utf-8 -*-
      2 # @author: Tele
      3 # @Time  : 2019/04/02 下午 3:09
      4 # 使用多进程拷贝文件夹,对于大文件进程内部又使用了多线程进行拷贝
      5 # 使用进程池实现多进程时,使用的消息队列要使用multiprocessing.Manager().Queue()创建
      6 
      7 import time
      8 import re
      9 import os
     10 import shutil
     11 import multiprocessing
     12 import math
     13 from concurrent.futures import ThreadPoolExecutor, wait
     14 
     15 # 设置单个文件的最大值:209715200 200M
     16 MAX_SINGLE_FILE_SIZE = 209715200
     17 mutex = multiprocessing.Lock()
     18 executor = ThreadPoolExecutor(max_workers=3)
     19 
     20 
     21 # 遍历文件夹
     22 def walk_file(file):
     23     file_list = list()
     24     for root, dirs, files in os.walk(file):
     25         # 遍历文件
     26         for f in files:
     27             file_list.append(f)
     28 
     29         # 空文件夹处理
     30         for d in dirs:
     31             if len(os.listdir(os.path.join(root, d))) == 0:
     32                 file_list.append(d)
     33     return file_list
     34 
     35 
     36 # 计算文件数量
     37 def get_file_count(dir):
     38     return len(walk_file(dir))
     39 
     40 
     41 def copy(src, target, queue):
     42     target_number = 1
     43     buffer = 1024
     44     # 文件夹
     45     if os.path.isdir(src):
     46         target_number = get_file_count(src)
     47         for root, dirs, files in os.walk(src):
     48             # 遍历文件
     49             for f in files:
     50                 drive = os.path.splitdrive(target)[0]
     51                 target = drive + os.path.splitdrive(os.path.join(root, f))[1]
     52                 copy_single_file(buffer, os.path.join(root, f), target)
     53             # 空文件夹
     54             for d in dirs:
     55                 drive = os.path.splitdrive(target)[0]
     56                 target = drive + os.path.splitdrive(os.path.join(root, d))[1]
     57                 # 检查文件的层级目录
     58                 if not os.path.exists(target):
     59                     os.makedirs(target)
     60     else:
     61         copy_single_file(buffer, src, target)
     62     # 将拷贝完成的文件数量放入队列中
     63     queue.put(target_number)
     64 
     65 
     66 # 拷贝单文件
     67 def copy_single_file(buffer, src, target):
     68     file_size = os.path.getsize(src)
     69     rs = open(src, "rb")
     70 
     71     # 检查文件的层级目录
     72     parent_path = os.path.split(target)[0]
     73     if not os.path.exists(parent_path):
     74         os.makedirs(parent_path)
     75 
     76     ws = open(target, "wb")
     77     # 小文件直接读写
     78     if file_size <= MAX_SINGLE_FILE_SIZE:
     79         while True:
     80             content = rs.read(buffer)
     81             ws.write(content)
     82             if len(content) == 0:
     83                 break
     84         ws.flush()
     85     else:
     86         # 设置每个线程拷贝的字节数 50M
     87         PER_THREAD_SIZE = 52428800
     88         # 构造参数并执行
     89         task_list = list()
     90         for i in range(math.ceil(file_size / PER_THREAD_SIZE)):
     91             byte_size = PER_THREAD_SIZE
     92             # 最后一个线程拷贝的字节数应该是取模
     93             if i == math.ceil(file_size / PER_THREAD_SIZE) - 1:
     94                 byte_size = file_size % PER_THREAD_SIZE
     95             start = i * PER_THREAD_SIZE + i
     96             t = executor.submit(copy_file_thread, start, byte_size, rs, ws)
     97             task_list.append(t)
     98         wait(task_list)
     99     if rs:
    100         rs.close()
    101     if ws:
    102         ws.close()
    103 
    104 
    105 # 多线程拷贝
    106 def copy_file_thread(start, byte_size, rs, ws):
    107     mutex.acquire()
    108     buffer = 1024
    109     count = 0
    110     rs.seek(start)
    111     ws.seek(start)
    112     while True:
    113         if count + buffer <= byte_size:
    114             content = rs.read(buffer)
    115             count += len(content)
    116             write(content, ws)
    117         else:
    118             content = rs.read(byte_size % buffer)
    119             count += len(content)
    120             write(content, ws)
    121             break
    122     # global total_count
    123     # total_count += byte_size
    124     # print("
    拷贝进度为%.2f %%" % (total_count * 100 / file_size), end="")
    125     mutex.release()
    126 
    127 
    128 def write(content, ws):
    129     ws.write(content)
    130     ws.flush()
    131 
    132 
    133 def copy_dir(src, desc):
    134     # 获得待拷贝的文件总数(含空文件夹)
    135     total_number = get_file_count(src)
    136     # 分隔符检测
    137     src = check_separator(src)
    138     desc = check_separator(desc)
    139     # print("src:",src)
    140     # print("desc:",desc)
    141 
    142     file_dir_list = [src + "/" + i for i in os.listdir(src)]
    143     if os.path.exists(desc):
    144         shutil.rmtree(desc)
    145 
    146     # 进程池
    147     pool = multiprocessing.Pool(3)
    148 
    149     # 创建队列
    150     queue = multiprocessing.Manager().Queue()
    151 
    152     # 一个文件/目录开启一个进程去拷贝
    153     for f_name in file_dir_list:
    154         target = os.path.splitdrive(desc)[0] + "/" + os.path.splitdrive(f_name)[1]
    155         # target = desc + "/" + f_name[index_list("/", f_name)[1] + 1:]
    156         # print(target)
    157         # 创建target目录
    158         parent_path = os.path.split(target)[0]
    159         if not os.path.exists(parent_path):
    160             os.makedirs(parent_path)
    161         pool.apply_async(copy, args=(f_name, target, queue))
    162 
    163     start = time.time()
    164     pool.close()
    165     # pool.join()
    166     count = 0
    167     while True:
    168         count += queue.get()
    169         # 格式化输出时两个%输出一个%,不换行,每次定位到行首,实现覆盖
    170         print("
    当前进度为 %.2f %%" % (count * 100 / total_number), end="")
    171         if count >= total_number:
    172             break
    173 
    174     executor.shutdown()
    175     end = time.time()
    176     print()
    177     print("耗时-----", (end - start), "s")
    178 
    179 
    180 # 查找指定字符出现的全部索引位置
    181 def index_list(c, s):
    182     return [i.start() for i in re.finditer(c, s)]
    183 
    184 
    185 # 检测目录结尾是否有 "/"
    186 def check_separator(path):
    187     if path.rindex("/") == len(path) - 1:
    188         return path[0:path.rindex("/")]
    189     return path
    190 
    191 
    192 def main():
    193     copy_dir("f:/ftp_mypc/", "e:/ftp_mypc/")
    194 
    195 
    196 if __name__ == '__main__':
    197     main()
  • 相关阅读:
    BestCoder6 1002 Goffi and Squary Partition(hdu 4982) 解题报告
    codeforces 31C Schedule 解题报告
    codeforces 462C Appleman and Toastman 解题报告
    codeforces 460C. Present 解题报告
    BestCoder3 1002 BestCoder Sequence(hdu 4908) 解题报告
    BestCoder3 1001 Task schedule(hdu 4907) 解题报告
    poj 1195 Mobile phones 解题报告
    二维树状数组 探索进行中
    codeforces 460B Little Dima and Equation 解题报告
    通过Sql语句控制SQLite数据库增删改查
  • 原文地址:https://www.cnblogs.com/tele-share/p/10656811.html
Copyright © 2011-2022 走看看