zoukankan      html  css  js  c++  java
  • python中调用多线程加速处理文件

    问题背景是这样的,我有一批需要处理的文件,对于每一个文件,都需要调用同一个函数进行处理,相当耗时

    有没有加速的办法呢?当然有啦,比如说你将这些文件分成若干批,每一个批次都调用自己写的python脚本进行处理,这样同时运行若干个python程序也可以进行加速

    但是,有没有更简单的方法呢?比如说,我一个运行的一个程序里面,同时分为多个线程,然后进行处理?

    实际上是有的

    大概思路是这样,将这些个文件路径的list,分成若干个,至于分成多少,要看自己cpu核心有多少,比如你的cpu有32核的,理论上就可以加速32倍

    直接上代码:

    # -*-coding:utf-8-*-
    import numpy as np
    from glob import glob
    import math
    import os
    import torch
    from tqdm import tqdm
    import multiprocessing
    
    label_path = '/home/ying/data/shiyongjie/distortion_datasets/new_distortion_dataset/train/label.txt'
    file_path = '/home/ying/data/shiyongjie/distortion_datasets/new_distortion_dataset/train/distortion_image'
    save_path = '/home/ying/data/shiyongjie/distortion_datasets/new_distortion_dataset/train/flow_field'
    
    r_d_max = 128
    image_index = 0
    txt_file = open(label_path)
    file_list = txt_file.readlines()
    txt_file.close()
    file_label = {}
    for i in file_list:
        i = i.split()
        file_label[i[0]] = i[1]
    
    r_d_max = 128
    eps = 1e-32
    H = 256
    W = 256
    
    def generate_flow_field(image_list):
        for image_file_path in ((image_list)):
            pixel_flow = np.zeros(shape=tuple([256, 256, 2]))  # 按照pytorch中的grid来写
            image_file_name = os.path.basename(image_file_path)
            # print(image_file_name)
            k = float(file_label[image_file_name])*(-1)*1e-7
            # print(k)
            r_u_max = r_d_max/(1+k*r_d_max**2)  # 计算出畸变校正之后的对角线的理论长度
            scale = r_u_max/128  # 将这个长度压缩到256的尺寸,会有一个scale,实际上这里写128*sqrt(2)可能会更加直观
            for i_u in range(256):
                for j_u in range(256):
                    x_u = float(i_u - 128)
                    y_u = float(128 - j_u)
                    theta = math.atan2(y_u, x_u)
                    r = math.sqrt(x_u ** 2 + y_u ** 2)
                    r = r * scale  # 实际上得到的r,即没有resize到256×256的图像尺寸size,并且带入公式中
                    r_d = (1.0 - math.sqrt(1 - 4.0 * k * r ** 2)) / (2 * k * r + eps)  # 对应在原图(畸变图)中的r
                    x_d = int(round(r_d * math.cos(theta)))
                    y_d = int(round(r_d * math.sin(theta)))
                    i_d = int(x_d + W / 2.0)
                    j_d = int(H / 2.0 - y_d)
                    if i_d < W and i_d >= 0 and j_d < H and j_d >= 0:  # 只有求的的畸变点在原图中的时候才进行赋值
                        value1 = (i_d - 128.0)/128.0
                        value2 = (j_d - 128.0)/128.0
                        pixel_flow[j_u, i_u, 0] = value1  # mesh中存储的是对应的r的比值,在进行畸变校正的时候,给定一张这样的图,进行找像素即可
                        pixel_flow[j_u, i_u, 1] = value2
    
    # 保存成array格式
            saved_image_file_path = os.path.join(save_path, image_file_name.split('.')[0] + '.npy')
            pixel_flow = pixel_flow.astype('f2')  # 将数据的格式转换成float16类型, 节省空间
            # print(saved_image_file_path)
            # print(pixel_flow)
            np.save(saved_image_file_path, pixel_flow)
        return
    
    
    if __name__ == '__main__':
        file_list = glob(file_path + '/*.JPEG')
        m = 32
        n = int(math.ceil(len(file_list) / float(m)))  # 向上取整
        result = []
        pool = multiprocessing.Pool(processes=m)  # 32进程
        for i in range(0, len(file_list), n):
            result.append(pool.apply_async(generate_flow_field, (file_list[i: i+n],)))
        pool.close()
        pool.join()

    在上面的代码中,我的函数

    generate_flow_field(image_list)

    需要传入一个list,然后对于这个list进行操作,之后对操作的结果进行保存

    所以,只需要将你需要处理的多个文件,切分成尽量等大小的list,然后再对每一个list,开一个线程进行处理即可

    看我上面的主函数

    if __name__ == '__main__':
        file_list = glob(file_path + '/*.JPEG')  # 将文件夹下所有的JPEG文件列成一个list
        m = 32  # 假设CPU有32个核心
        n = int(math.ceil(len(file_list) / float(m)))  # 每一个核心需要处理的list的数目
        result = []
        pool = multiprocessing.Pool(processes=m)  # 开32线程的线程池
        for i in range(0, len(file_list), n):
            result.append(pool.apply_async(generate_flow_field, (file_list[i: i+n],)))  # 对每一个list都用上面我们定义的函数进行处理
        pool.close()  # 处理结束之后,关闭线程池
        pool.join()

    主要是这样的两行代码,一行是

    pool = multiprocessing.Pool(processes=m)  # 开32线程的线程池

    用来开辟线程池

    另外一行是

    result.append(pool.apply_async(generate_flow_field, (file_list[i: i+n],)))  # 对每一个list都用上面我们定义的函数进行处理

    对于线程池,用apply_async()同时跑generate_flow_field这个函数,传入的参数是:file_list[i: i+n]

    实际上apply_async()这个函数的作用是所有的线程同时跑,速度是比较快的

    怎么样,讲到这里,是不是学会如何使用pool.apply_async()来进行多线程加速了呢?

  • 相关阅读:
    snmp安装zabbix
    〖Demo〗-- 用Django实现Video页面分类查询
    〖Python〗-- Django的ORM性能优化建议
    〖Web〗-- 新特性之WebSocket
    〖Python〗-- 数据结构
    〖缓存〗-- Memcached 与 Redis
    〖算法〗-- NB二人组:堆排序、归并排序
    〖算法〗-- 快速排序 、希尔排序、计数排序
    〖算法〗-- 排序lowB三人组:冒泡排序、选择排序、 插入排序
    〖算法〗-- 递归、二分查找、列表查找
  • 原文地址:https://www.cnblogs.com/yongjieShi/p/9599015.html
Copyright © 2011-2022 走看看