zoukankan      html  css  js  c++  java
  • python中调用多线程加速处理文件

    问题背景是这样的,我有一批需要处理的文件,对于每一个文件,都需要调用同一个函数进行处理,相当耗时

    有没有加速的办法呢?当然有啦,比如说你将这些文件分成若干批,每一个批次都调用自己写的python脚本进行处理,这样同时运行若干个python程序也可以进行加速

    但是,有没有更简单的方法呢?比如说,我一个运行的一个程序里面,同时分为多个线程,然后进行处理?

    实际上是有的

    大概思路是这样,将这些个文件路径的list,分成若干个,至于分成多少,要看自己cpu核心有多少,比如你的cpu有32核的,理论上就可以加速32倍

    直接上代码:

    # -*-coding:utf-8-*-
    import numpy as np
    from glob import glob
    import math
    import os
    import torch
    from tqdm import tqdm
    import multiprocessing
    
    label_path = '/home/ying/data/shiyongjie/distortion_datasets/new_distortion_dataset/train/label.txt'
    file_path = '/home/ying/data/shiyongjie/distortion_datasets/new_distortion_dataset/train/distortion_image'
    save_path = '/home/ying/data/shiyongjie/distortion_datasets/new_distortion_dataset/train/flow_field'
    
    r_d_max = 128
    image_index = 0
    txt_file = open(label_path)
    file_list = txt_file.readlines()
    txt_file.close()
    file_label = {}
    for i in file_list:
        i = i.split()
        file_label[i[0]] = i[1]
    
    r_d_max = 128
    eps = 1e-32
    H = 256
    W = 256
    
    def generate_flow_field(image_list):
        for image_file_path in ((image_list)):
            pixel_flow = np.zeros(shape=tuple([256, 256, 2]))  # 按照pytorch中的grid来写
            image_file_name = os.path.basename(image_file_path)
            # print(image_file_name)
            k = float(file_label[image_file_name])*(-1)*1e-7
            # print(k)
            r_u_max = r_d_max/(1+k*r_d_max**2)  # 计算出畸变校正之后的对角线的理论长度
            scale = r_u_max/128  # 将这个长度压缩到256的尺寸,会有一个scale,实际上这里写128*sqrt(2)可能会更加直观
            for i_u in range(256):
                for j_u in range(256):
                    x_u = float(i_u - 128)
                    y_u = float(128 - j_u)
                    theta = math.atan2(y_u, x_u)
                    r = math.sqrt(x_u ** 2 + y_u ** 2)
                    r = r * scale  # 实际上得到的r,即没有resize到256×256的图像尺寸size,并且带入公式中
                    r_d = (1.0 - math.sqrt(1 - 4.0 * k * r ** 2)) / (2 * k * r + eps)  # 对应在原图(畸变图)中的r
                    x_d = int(round(r_d * math.cos(theta)))
                    y_d = int(round(r_d * math.sin(theta)))
                    i_d = int(x_d + W / 2.0)
                    j_d = int(H / 2.0 - y_d)
                    if i_d < W and i_d >= 0 and j_d < H and j_d >= 0:  # 只有求的的畸变点在原图中的时候才进行赋值
                        value1 = (i_d - 128.0)/128.0
                        value2 = (j_d - 128.0)/128.0
                        pixel_flow[j_u, i_u, 0] = value1  # mesh中存储的是对应的r的比值,在进行畸变校正的时候,给定一张这样的图,进行找像素即可
                        pixel_flow[j_u, i_u, 1] = value2
    
    # 保存成array格式
            saved_image_file_path = os.path.join(save_path, image_file_name.split('.')[0] + '.npy')
            pixel_flow = pixel_flow.astype('f2')  # 将数据的格式转换成float16类型, 节省空间
            # print(saved_image_file_path)
            # print(pixel_flow)
            np.save(saved_image_file_path, pixel_flow)
        return
    
    
    if __name__ == '__main__':
        file_list = glob(file_path + '/*.JPEG')
        m = 32
        n = int(math.ceil(len(file_list) / float(m)))  # 向上取整
        result = []
        pool = multiprocessing.Pool(processes=m)  # 32进程
        for i in range(0, len(file_list), n):
            result.append(pool.apply_async(generate_flow_field, (file_list[i: i+n],)))
        pool.close()
        pool.join()

    在上面的代码中,我的函数

    generate_flow_field(image_list)

    需要传入一个list,然后对于这个list进行操作,之后对操作的结果进行保存

    所以,只需要将你需要处理的多个文件,切分成尽量等大小的list,然后再对每一个list,开一个线程进行处理即可

    看我上面的主函数

    if __name__ == '__main__':
        file_list = glob(file_path + '/*.JPEG')  # 将文件夹下所有的JPEG文件列成一个list
        m = 32  # 假设CPU有32个核心
        n = int(math.ceil(len(file_list) / float(m)))  # 每一个核心需要处理的list的数目
        result = []
        pool = multiprocessing.Pool(processes=m)  # 开32线程的线程池
        for i in range(0, len(file_list), n):
            result.append(pool.apply_async(generate_flow_field, (file_list[i: i+n],)))  # 对每一个list都用上面我们定义的函数进行处理
        pool.close()  # 处理结束之后,关闭线程池
        pool.join()

    主要是这样的两行代码,一行是

    pool = multiprocessing.Pool(processes=m)  # 开32线程的线程池

    用来开辟线程池

    另外一行是

    result.append(pool.apply_async(generate_flow_field, (file_list[i: i+n],)))  # 对每一个list都用上面我们定义的函数进行处理

    对于线程池,用apply_async()同时跑generate_flow_field这个函数,传入的参数是:file_list[i: i+n]

    实际上apply_async()这个函数的作用是所有的线程同时跑,速度是比较快的

    怎么样,讲到这里,是不是学会如何使用pool.apply_async()来进行多线程加速了呢?

  • 相关阅读:
    97. Interleaving String
    96. Unique Binary Search Trees
    95. Unique Binary Search Trees II
    94. Binary Tree Inorder Traversal
    odoo many2many字段 指定打开的form视图
    docker sentry 配置文件位置
    postgres 计算时差
    postgres 字符操作补位,字符切割
    postgres判断字符串是否为时间,数字
    odoo fields_view_get
  • 原文地址:https://www.cnblogs.com/yongjieShi/p/9599015.html
Copyright © 2011-2022 走看看