一、场景
在进行模型训练的过程中,会对一些大数据资源进行处理。比如对一个(80000,35)进行数据的替换和选择
二、涉及内容
对上述场景进行处理,采取的想法是分解数组,处理完在合并,需要做到以下的要求:
1. 数据顺序正确
2. 高维数组不能过长迭代(多个for嵌套)
因此需要涉及到进程池,数组迭代,数组拼接等内容处理流程如下:
三、实现
例子:将一个(10,20)维数组里面大于20的值变为0。
3.1 数组拆解
使用numpy提供的数组分割的方法实现分割
import numpy as np
# 第一个是需要分解的数组,第二个是分解的个数
# 注意如果是按列堆叠,元素必须保持一致,因此要尽可能的等分
grey_scale_chunks = np.array_split(data_array, 5)
3.2 迭代处理
迭代是进行元素的获取,如果要修改可以使用枚举
# 使用nditer遍历数组,设置读写操作,默认是只读
arr = np.array([1, 2, 3])
# op_dtypes:操作类型
for x in np.nditer(a, op_flags=['readwrite']):
if x > 20:
x[...] = 0
print(a)
# 使用枚举的方法
for idx, i in np.ndenumerate(binary_array):
if i > 20:
binary_array[idx] = i
3.3 多线程处理
使用线程池实现处理
# number_of_threads设置进程数,可以使用os.getcpu()获取cpu信息。
pool = ThreadPool(processes=number_of_threads)
results = pool.map_async(perform_calc, grey_scale_chunks)
pool.close()
pool.join() # Block until all threads exit.
results.get() # 获取处理的结果
3.4 合并数组
# 使用numpy提供的方法进行数组的堆叠
# numpy提供了三种堆叠的方式:
# 沿行的堆叠方式(hstack):将每行数据进行拼接,不要求堆叠的数据同型。
# 沿列的堆叠方式(vstack):将每行数据按着竖直方向进行堆叠,要求数据同型。
# 沿深度堆叠的方式(dstack):每次选取第i列的所有数据组成一行,直到遍历完所有列。要求数据同型。
# 根据需求按照沿列的方式进行堆叠。
np_re = np.vstack(results.get())
3.5 总代码
将所有的内容融合在一起:
from multiprocessing.pool import ThreadPool
import numpy as np
import time
def perform_calc(binary_array):
# perform some operation
for idx, i in np.ndenumerate(binary_array):
if i > 20:
binary_array[idx] = i
return binary_array
def main(data_array):
number_of_threads = 6
pool = ThreadPool(processes=number_of_threads)
grey_scale_chunks = np.array_split(data_array, 5)
start = time.clock()
results = pool.map_async(perform_calc, grey_scale_chunks)
pool.close()
pool.join() # Block until all threads exit.
end = time.clock()
print(end - start)
print(len(results.get()))
# Final results will be a list of arrays.
result1 = np.vstack(results.get())
print(result1)
print(result1.shape)
grey_arr = np.random.randint(1, 40, size=(10, 20))
print(grey_arr)
main(grey_arr)
四、总结
在大数据中的处理,这种思想有着比较好的方法,主要的问题就是设计线程间的通讯问题,主要是操作系统的内容,这里做了一个简单的介绍。
参考