zoukankan      html  css  js  c++  java
  • 利用多线程对大数组进行处理

    一、场景

    在进行模型训练的过程中,会对一些大数据资源进行处理。比如对一个(80000,35)进行数据的替换和选择

    二、涉及内容

    对上述场景进行处理,采取的想法是分解数组,处理完在合并,需要做到以下的要求:

    1. 数据顺序正确
    2. 高维数组不能过长迭代(多个for嵌套)

    因此需要涉及到进程池,数组迭代,数组拼接等内容处理流程如下:

    三、实现

    例子:将一个(10,20)维数组里面大于20的值变为0。

    3.1 数组拆解

    使用numpy提供的数组分割的方法实现分割

    import numpy as np
    # 第一个是需要分解的数组,第二个是分解的个数
    # 注意如果是按列堆叠,元素必须保持一致,因此要尽可能的等分
    grey_scale_chunks = np.array_split(data_array, 5)

    3.2 迭代处理

    迭代是进行元素的获取,如果要修改可以使用枚举

    # 使用nditer遍历数组,设置读写操作,默认是只读
    arr = np.array([1, 2, 3])
    
    # op_dtypes:操作类型
    for x in np.nditer(a, op_flags=['readwrite']):
        if x > 20:
            x[...] = 0
    print(a)
    
    # 使用枚举的方法
    for idx, i in np.ndenumerate(binary_array):
        if i > 20:
            binary_array[idx] = i

    3.3 多线程处理

    使用线程池实现处理

    # number_of_threads设置进程数,可以使用os.getcpu()获取cpu信息。
    pool = ThreadPool(processes=number_of_threads)
    results = pool.map_async(perform_calc, grey_scale_chunks)
    pool.close()
    pool.join()  # Block until all threads exit.
    results.get()  # 获取处理的结果

    3.4 合并数组

    # 使用numpy提供的方法进行数组的堆叠
    # numpy提供了三种堆叠的方式:
    # 沿行的堆叠方式(hstack):将每行数据进行拼接,不要求堆叠的数据同型。
    # 沿列的堆叠方式(vstack):将每行数据按着竖直方向进行堆叠,要求数据同型。
    # 沿深度堆叠的方式(dstack):每次选取第i列的所有数据组成一行,直到遍历完所有列。要求数据同型。
    
    # 根据需求按照沿列的方式进行堆叠。
    np_re = np.vstack(results.get())

    3.5 总代码

    将所有的内容融合在一起:

    from multiprocessing.pool import ThreadPool
    import numpy as np
    import time
    def perform_calc(binary_array):
        # perform some operation
        for idx, i in np.ndenumerate(binary_array):
            if i > 20:
                binary_array[idx] = i
        return binary_array
    
    
    def main(data_array):
        number_of_threads = 6
        pool = ThreadPool(processes=number_of_threads)
        grey_scale_chunks = np.array_split(data_array, 5)
        start = time.clock()
        results = pool.map_async(perform_calc, grey_scale_chunks)
        pool.close()
        pool.join()  # Block until all threads exit.
        end = time.clock()
        print(end - start)
        print(len(results.get()))
        # Final results will be a list of arrays.
        result1 = np.vstack(results.get())
        print(result1)
        print(result1.shape)
    
    
    grey_arr = np.random.randint(1, 40, size=(10, 20))
    print(grey_arr)
    main(grey_arr)

    四、总结

    在大数据中的处理,这种思想有着比较好的方法,主要的问题就是设计线程间的通讯问题,主要是操作系统的内容,这里做了一个简单的介绍。

    参考

    w3school数组连接

    python线程处理切割数组(涉及死锁处理等问题)

    numpy枚举

    numpy遍历

  • 相关阅读:
    LINQ 详解
    oracle下查询的sql已经超出IIS响应时间
    IOC应用之 Ninject
    JSONP ---------跨域
    国内各大互联网公司相关技术站点2.0版 (集合腾讯、阿里、百度、搜狐、新浪、360等共49个)
    IO多路复用,以socket为例
    socket机制下实现的多用户与服务器交互
    在一个进程中定义多个线程
    基于tcp的socketserver,即tcp的多线程
    基于upd的socketserver,即udp的多线程
  • 原文地址:https://www.cnblogs.com/future-dream/p/14721077.html
Copyright © 2011-2022 走看看