Date: 2020-09-02
多核处理概述
图像增强通常是一个缓慢的过程,特别是当在大量的图像数据下进行多种不同的增强技术时。
提升性能的一种方式就是在多个CPU内核上同时进行数据增强。imgaug中提供了一个实现多线程的原生系统,大致遵循下述步骤:
-
将数据集拆分为批次。每批包含一个或多个图像以及与它们相关的附加数据,如,边界框或分割图。
-
启动一个或多个子进程。它们中的每一个都在自己的CPU核心上运行。
-
将批次发送到子进程。尝试在子进程上平均分配它们,以便它们中的每一个都有相似的工作量。
-
让子进程增强数据;
-
从子进程接收增强的批处理。
Important: imgaug中提供了多核功能,并且也建议使用提供的功能进行多核处理。不建议使用python的multiprocess库的多进程处理,可能会产生不可预估的错误。
示例:augment_batches(..., background=True)
在imgaug中进行多核增强最简单的方式就是调用augment_batches(..., background=True).其工作原理相似于augment_images()。区别在于以下几点:
-
其需要一个imgaug.augmentables.batches.Batch或者imgaug.augmentables.batches.UnnormalizedBatch实例构成的列表。这些实例中均包含批次的数据,例如图像或者是bbox信息。而实例中包含的批次batch可以通过batch = UnnormalizedBatch(images=<list of numpy arrays>, bounding_boxes=<list of imgaug.BoundingBoxOnImages>)生成;
-
另一个不同在于augment_batches()返回的是一个生成器,当从子进程接收到时会不断产生批处理;
-
最后一各却别在于augment_batches()当前不使用增强器中设置的随机状态,而是选择一个新的。不这样做的话,所有子进程都会应用相同的增强。
Tip: 可还记得augment_images()是干什么用的?
答:augment_images()是通过aug.augment_images(images=images)进行使用的。也就是说,是使用某个图像增强方法的实例对图像进行图像增强时使用!
使用单核处理
-
首先,生成一些实例数据:
1 import numpy as np 2 import imgaug as ia 3 import imageio 4 from imgaug import augmenters as iaa 5 %matplotlib inline 6 7 BATCH_SIZE = 16 8 NB_BATCHES = 100 # number of batch 9 10 image = imageio.imread("./pick1.jpg") 11 images = [np.copy(image) for _ in range(BATCH_SIZE)]
2.使用UnnormalizedBatch将图像生成批次batch:
1 from imgaug.augmentables.batches import UnnormalizedBatch 2 batches = [UnnormalizedBatch(images=images) for _ in range(NB_BATCHES)]
3.定义增强序列
这里选用处理时长较长的PiecewiseAffine,在图像上使用更密集的点网格会加剧减慢速度。 每个这样的点将导致更多的局部仿射变换被应用。
1 seq = iaa.Sequential([ 2 iaa.PiecewiseAffine(scale=0.05, nb_cols=6, nb_rows=6), # very slow 3 iaa.Fliplr(0.5), # very fast 4 iaa.CropAndPad(px=(-10, 10)) # very fast 5 ])
从上述代码中可以看出,增强序列中选择的3个增强方法具有不同的速度。
4.对生成的batches进行增强:
首先我们不使用多核增强的方式,看看使用单个CPU核进行处理需要多长时间。
augment_batches()返回一个Batch示例的生成器,然后我们可以通过UnnormalizedBatch.images_aug访问增强后的图像
1 import time 2 3 time_start = time.time() 4 batches_aug = list(seq.augment_batches(batches, background=False)) # list() converts generator to list 5 time_end = time.time() 6 7 print("Augmentation done in %.2fs" % (time_end - time_start,)) 8 ia.imshow(batches_aug[0].images_aug[0])
Augmentation done in 398.36s
通过上述运行结果的反馈,对于100个batch每个batch16张图片(500*313)进行数据增强,居然用了398.36s。
太慢了,官方教程上说,使用GPU会再快一些。
使用多核处理
从下述代码中可以看出, 使用多核处理和单核处理的区别 在于augment_batches(..., background=)中background参数为True还是False。
1 time_start = time.time() 2 batches_aug = list(seq.augment_batches(batches, background=True)) # list() converts generator to list 3 time_end = time.time() 4 5 print("Augmentation done in %.2fs" % (time_end - time_start,)) 6 ia.imshow(batches_aug[0].images_aug[0])
Augmentation done in 214.00s
使用多核处理,处理时间从398.36s降低到214.00s,节省用时184.36s。
如下图所示,本节教程使用的CUP具有2核,4个线程。对于更高核数的CPU和线程数,多核处理会有更好的效果。
Tip:如何查看自己电脑的核数和线程数?
答:在cmd命令中输入"wmic",然后在生成的新一行中输入"cpu get *"。
-
NumberOfCores: 表示CPU核心数;
-
NumberOfLogicalProcessors:表示CPU线程数。
处理非图像数据
之前的示例只展示了如何增强图像,通常还需要增强关键点,bbox等非图像的数据,以保持增强前后的一致性。
对非数据类型进行多核处理与图像的区别在于使用UnnormalizedBatch生成batch实例时的一些小的不同。并且在对非图像数据进行多核增强处理时,不必再像之前那样考虑随机/确定模式的情况。imgaug会自动处理,并确保图像和相关数据之间的增强一致。
关键点示例
生成关键点
1 import numpy as np 2 import imgaug as ia 3 import imageio 4 from imgaug import augmenters as iaa 5 %matplotlib inline 6 7 BATCH_SIZE = 16 8 NB_BATCHES = 100 9 image = imageio.imread("./pick1.jpg") 10 images = [np.copy(image) for _ in range(BATCH_SIZE)] 11 12 keypoint = ia.KeypointsOnImage([ 13 ia.Keypoint(x=151, y=106), 14 ia.Keypoint(x=247, y=78), 15 ia.Keypoint(x=179, y=140), 16 ia.Keypoint(x=206, y=134) 17 ], shape=image.shape) # KeyPointsOnImage class 18 keypoints = [keypoint.deepcopy() for _ in range(BATCH_SIZE)] # has deepcopy func 19 20 seq = iaa.Sequential([ 21 iaa.PiecewiseAffine(scale=0.05, nb_cols=6, nb_rows=6), # very slow 22 iaa.Fliplr(0.5), # very fast 23 iaa.CropAndPad(px=(-10, 10)) # very fast 24 ])
关键点增强处理
之前在UnnormalizedBatch()中只添加了images,此时由于具有keypoints,也同时添加了keypoint字段。
多核的利用分为两步走, 其一,对自己生成的批次图像、真值数据使用UnnormalizedBatch()生成可使用的batches数据, 其二,用数据增强方法中包含的augment_batches()方法(background要设置为True)
1 from imgaug.augmentables.batches import UnnormalizedBatch 2 import time 3 4 batches = [UnnormalizedBatch(images=images, keypoints=keypoints) for _ in range(NB_BATCHES)] 5 6 time_start = time.time() 7 batched_aug = list(seq.augment_batches(batches, background=True)) # background=True for multicore aug 8 time_end = time.time() 9 10 print("Augmentation done in %.2fs" % (time_end - time_start,)) 11 ia.imshow( 12 batched_aug[0].keypoints_aug[0].draw_on_image( 13 batched_aug[0].images_aug[0] 14 ) 15 )
Augmentation done in 370.16s
从结果可以看出,仅仅增加了keypoint,运行时间便从214.00s增加至370.16s,增加了155.84s,近一倍的时间。
同理,UnnormalizedBatch()函数同样可以使用
-
bbox,即bounding_boxes=[list of imgaug.augmentables.bbs.BoundingBoxesOnImage]
-
polygons(多边形),即polygons=[list of imgaug.augmentables.polygons.PolygonsOnImage]
-
heatmaps(热力图),即heatmaps=[list of imgaug.augmentables.heatmaps.HeatmapsOnImage]
-
segmentation maps(分割图), 即segmentation_maps=[list of imgaug.augmentables.segmaps.SegmentationMapOnImage] 等图像中使用的真值类型。
关于augment_batches()的使用感觉还比较简单,但是有些无脑。例如,关于控制使用CPU的核数仿佛并没有自定义设置的方式。感觉一切都交给它,私人设置的少,缺乏可控性和安全感。
Tip: 基于坐标的数据(如关键点或边界框)时,避免使用PiecewiseAffine,时间代价会增加很多。
Pool
通过前两个案例的使用来看,augment_batches()虽然使用简单,但是没有提供更多定制化的设置,比如控制使用CPU的核心数量。
augmenter.pool()可以轻松的替代augment_batches()的使用,并且具有以下两种优点:
-
增加可控性,可以具体设置CPU的使用;
-
可以使用生成器避免大数据量一次性读入造成内存不足的问题。
pool()的使用也是使用之前定义的batches。 pool()方法是通过增强方法或者增强序列产生的。
(一) 增加可控性
将pool配置为使用除一个(processes=-1)之外的所有CPU核心,在执行20个任务后重启子进程(maxtasksperchild=20),并以随机数种子1开始。
如果处理导致越来越多的内存泄漏问题,参数maxtasksperchild可能会很有用。
1 with seq.pool(processes=-1, maxtasksperchild=20, seed=1) as pool: 2 batches_aug = pool.map_batches(batches) 3 ia.imshow(batches_aug[0].images_aug[0])
如果电脑上安装有任意杀毒软件,在内存使用悬浮窗上可以看到两者在内存使用的区别。
注意,这里只调用了一次map_batch()来增加输入批。在实践中,可以使用不同的输入批对每个生成的pool多次调用该命令——建议这样做,因为创建新pool需要重新生成子进程,这会花费一些时间。
augmeter.pool()是创建imgaug.multicore.Pool()实例的一条捷径,imgaug.multicore.Pool()类是对于python的多进程multiprocessing.Pool()的一个关于随机状态处理的封装。通过imgaug.multicore.Pool进行处理的代码如下,(可略过)
1 from imgaug import multicore 2 3 with multicore.Pool(aug, processes=-1, maxtasksperchild=20, seed=1) as pool: 4 batches_aug = pool.map_batches(batches) 5 6 ia.imshow(batches_aug[0].images_aug[0])
(二) 数据批次读入内存
在处理图像数据时,往往会遇到一个问题,即图像数据量较大,难以一次性将图像数据读入内存。
在深度学习各种框架(Tensorflow,Pytorch)都有关于图像分批读入的方式。图像增强imgaug同样需要这样的操作。
在imgaug的Pool中,使用方式也相对容易,就是将map_batches([list])替换为imap_batches([generator])。此时的输出batches_aug将会是一个生成器。
-
batches to generator
由于imap_batches()的输入是生成器,则首先需要将batches构造成一个generator。这里会使用到yield。
1 def create_generator(lst): 2 for list_entry in lst: 3 yield list_entry 4 5 my_generator = create_generator(batches)
2.使用pool中的imap_batches()接收生成器
以下代码由于涉及到多进程的处理,如果在pycharm中编写,需要将下属代码放到"if name == 'main':"之后。否则会报
The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable.
1 with seq.pool(processes=-1, seed=1) as pool: 2 batches_aug = pool.imap_batches(my_generator) 3 4 for i, batch_aug in enumerate(batches_aug): 5 if i == 0: 6 ia.imshow(batch_aug.images_aug[0]) 7 # do something else with the batch here
通过上述的示例,个人认为pool中包含的map_batches(<list>)和imap_batches(<generator>)实际的作用是将seq(增强方法/序列)作用于图像,建立其图像-进程-增强操作之间的桥梁。
完整示例
1 import imageio 2 import imgaug as ia 3 import imgaug.augmenters as iaa 4 from imgaug.augmentables.batches import UnnormalizedBatch 5 import numpy as np 6 import time 7 8 BATCH_SIZE = 10 9 BATCH_NB = 8 10 img = imageio.imread("./pick1.jpg") 11 images = [np.copy(img) for _ in range(BATCH_SIZE)] 12 13 aug = iaa.Affine(rotate=(-20, 20)) 14 15 batches = [UnnormalizedBatch(images=images) for _ in range(BATCH_NB)] # 形成batch 16 17 18 def create_generator(batches): 19 for i in batches: 20 yield i 21 22 23 gen = create_generator(batches) 24 25 # if __name__ == '__main__': # pycharm中需要添加! 26 27 with aug.pool(processes=-1, seed=1) as pool: 28 batches_aug = pool.imap_batches(gen) 29 30 for i, batch in enumerate(batches_aug): 31 if i == 0: 32 ia.imshow(batch.images_aug[0])
图像增强与模型训练步调不一致
在上一小节介绍的pool.imap_batches()在解决数据同时读入内存的问题。看起来这样处理之后已经近乎完美,但实际上仍然存在问题。
这个问题体现在图像增强后会使用模型进行训练,由于数据增强处理速度相对较快,模型训练的速度相对较慢,就会存在二者步调不一致的问题。 同时,pool总是贪婪的从生成器中导入batch,步调不一致的问题会造成pool将数据抢先都读入到RAM中,造成RAM资源紧张的问题。
解决这一问题的方法:在imap_batches()中添加output_buffer_size 参数,限制pool从gen中最多导入batch的数量。也就是说,会让pipeline中最多存在output_buffer_size个batch,不再是没上限的读取。
1 import time 2 3 # We use a single, very fast augmenter here to show that batches 4 # are only loaded once there is space again in the buffer. 5 pipeline = iaa.Fliplr(0.5) 6 7 def create_generator(lst): 8 for list_entry in lst: 9 print("Loading next unaugmented batch...") 10 yield list_entry 11 12 # only use 25 batches here, which is enough to show the effect 13 my_generator = create_generator(batches[0:25]) 14 15 with pipeline.pool(processes=-1, seed=1) as pool: 16 batches_aug = pool.imap_batches(my_generator, output_buffer_size=5) 17 18 print("Requesting next augmented batch...") 19 for i, batch_aug in enumerate(batches_aug): 20 # sleep here for a while to simulate a slowly training model 21 time.sleep(0.1) 22 23 if i < len(batches)-1: 24 print("Requesting next augmented batch...")
Requesting next augmented batch...
Loading next unaugmented batch...
Loading next unaugmented batch...
Loading next unaugmented batch...
Loading next unaugmented batch...
Loading next unaugmented batch...
Loading next unaugmented batch...
Requesting next augmented batch...
Loading next unaugmented batch...
Requesting next augmented batch...
Loading next unaugmented batch...
Requesting next augmented batch...
Requesting next augmented batch...
Requesting next augmented batch...
Requesting next augmented batch...
Requesting next augmented batch...
整理总结
本节主要介绍了不可控多核augment_batches()和可控多核pool()的使用。二者都是基于UnnormalizedBatch()方法生成的batches(该函数可以通过参数将image,各种真值组成batch)。
-
augment_batches()不可控是因为其只能通过background=True/False控制是否进行多核处理,并不能调节使用核的数量。功能较简单。
-
pool中具有map_batches和imap_batches两种选项,前者输入为list列表,后者输入为生成器generator。由于输入的不同,后者在功能上还可以实现数据分批次的读入内存,并且通过output_buffer_size参数控制图像增强与模型训练步调不一致的问题(限制pipeline中batch的数量,从而抑制pool贪婪的从生成器中获取batch)。
值得注意的是上述两种方法都是通过增强方法/序列点出来的。实际上是在多核上建立图像和增强技术间的桥梁。