zoukankan      html  css  js  c++  java
  • 如何使用modelarts训练海量数据

    在modelarts上使用notebook上使用evs空间默认大小是5G,能满足大部分文本和图片训练模型的需求。如果训练数据稍微超过这个限额,可以适当的扩增下空间。但如果训练对象是视频,或是实际生成过程中的海量数据,这个空间就显得小了,这时候扩增evs空间就显得很不经济了。

    最近老山便碰到这样的案例,客户的训练数据大约在1T的量级,在obs上存储的数据结构大概如下图所示。

    your-obs-name
    └── ...
       └── video
           ├── folder1
           │   ├── text.txt
           │   └── video.mp4
           ├── folder2
           │   ├── text.txt
           │   └── video.mp4
           ├── folder3
           │   ├── text.txt
           │   └── video.mp4
           ├── folder4
           │   ├── text.txt
           │   └── video.mp4
           ├── folder5
           │   ├── text.txt
           │   └── video.mp4
           ├── ...

    虽然使用华为云自带的moxing模块可以直接读取obs的数据,但由于实质是通过http实时读取数据,这个速度比从evs的ssd硬盘上读取数据要慢得多。而解决方案也比较直接,在evs上开辟一个固定大小的空间作为缓存区,一方面不断把obs数据读入缓存区,如果缓存区满了,就等待其腾出空间,另一方面训练任务消费evs数据,当消费完后便删除数据。

    程序上也自然选用生产者-消费者模型。程序定义了管道类Pipeline,有生产者线程producer用于将obs数据保存到evs;同时输出evs数据用于外部模型的消费。由于每个视频文件都单独放在一个文件夹下,所以程序的输出对象也是这个文件夹在evs上保存的地址,如folder1,folder2等。至于读取文件夹内部文件信息等消费工作,由用户自行定义。

    不多说,直接上代码。

    import moxing as mox
    mox.file.shift('os', 'mox')
    import os, shutil
    from queue import Queue
    from time import sleep
    import threading
    import logging
    logging.basicConfig(level=logging.INFO,
                       format="%(asctime)s %(name)s %(levelname)s %(message)s",)
    
    class ObsClient:
       def __init__(self, root):
           '''获取obs路径上需要读取的文件夹的相关信息'''
           self.root = root
           self.directory = self.list_directory()
           self.maxSize = self.getMaxSize()
    
       def getMaxSize(self):
           '''最大的文件夹的大小'''
           return max([size for *_, size in self.directory])
    
       def list_directory(self):
           '''输出用于训练的文件夹的路径,输出directory:
           [(文件夹相对路径,文件夹绝对路径,文件夹大小), ...]
           '''
           directory = []
           folders = mox.file.list_directory(self.root)
           for folder in folders:
               folderPath = os.path.join(self.root, folder)
               if mox.file.is_directory(folderPath):
                   size = self.get_size(folderPath)
                   directory.append((folder, folderPath, size))
           return directory
    
       def get_size(self, path):
           '''获取文件(夹)的大小'''
           if mox.file.is_directory(path):
               return self.get_size_folder(path)
           return self.get_size_file(path)
    
       def get_size_file(self, path):
           '''获取文件的大小'''
           return mox.file.get_size(path)
    
       def get_size_folder(self, path):
           '''获取文件夹的大小'''
           size = 0
           for filename in mox.file.list_directory(path, recursive=True):
               filepath = os.path.join(path, filename)
               if not mox.file.is_directory(filepath):
                   size+= self.get_size_file(filepath)
           return size
       
    class EvsClient:
       def __init__(self, root, memory, queue, directory, interval = 0.1):
           self.root = root # evs缓存区根目录
           self.directory = directory # obs文件夹信息
           self.size = 0 # evs缓存区已使用的空间
           self.memory = memory # evs上用于缓存的空间大小
           self.queue = queue # 队列,存储了evs缓存区文件夹的信息
           self.interval = interval # 如果缓存区满后,查询缓存大小的间隔时间
    
       def remove(self, folder, size):
           '''删除evs文件夹,在文件夹被消费后调用'''
           logging.info(f"consumer: start removing folder {folder} with size {size}|{self.size}")
           shutil.rmtree(folder, True)
           self.size -= size
           logging.info(f"consumer: end removing folder {folder} with size -{size}|{self.size}")
       
       def work(self):
           '''生成者主程序,用于从obs中copy文件夹到evs'''
           for relObsFolder, absObsFolder, size in self.directory:
               while True:
                   # 缓存区没满,就copy文件
                   if not self.waitOrDo(size):
                       self.copy(relObsFolder, absObsFolder, size)
                       break
                   # 如果缓存区满了,就等待
                   sleep(self.interval)
           # 当所有文件都拷贝后,置入结束符(None, None)
           self.queue.put((None, None))
                   
       def waitOrDo(self, size):
           '''返回True时等待,返回False时工作'''
           return self.size + size > self.memory
    
       def copy(self, relObsFolder, absObsFolder, size):
           '''从obs中copy文件夹到evs'''
           evsFolder = os.path.join(self.root, relObsFolder)
           logging.info(f"producer: start copying folder {relObsFolder} with size {size}|{self.size}")
           mox.file.copy_parallel(absObsFolder, evsFolder)
           self.queue.put((evsFolder, size))
           self.size += size
           logging.info(f"producer: end copying folder {relObsFolder} with size +{size}|{self.size}")
    
    class Pipeline:
       def __init__(self, evsRoot, obsRoot, memory = '1g', timeout = 300, interval = 0.1):
           self.memory = self.rescript(memory) # evs上用于缓存的空间大小
           self.timeout = timeout # 消费者获取evs缓存区文件夹的最长等待时间
           self.queue = Queue() # 队列,存储了evs缓存区文件夹的信息
           self.obsClient = ObsClient(obsRoot) # 存储obs上的文件夹信息
           # evs上的操作
           self.evsClient = EvsClient(evsRoot, self.memory, self.queue, self.obsClient.directory, interval)
           self.checkMemory() # 验证evs上用于缓存的空间大小是否足够大       
    
       def checkMemory(self):
           '''evs上用于缓存的空间大小不能小于obs上最大文件夹大小'''
           if self.memory<self.obsClient.maxSize:
               raise Exception("memory should bigger than maxFolderSize!")
    
       def rescript(self, memory):
           '''将文本或数值类型的memory转写成数值'''
           try:
               if isinstance(memory, str):
                   if memory[-1].lower()=='g':
                       return int(float(memory[:-1])*1024*1024*1024)
                   elif memory[-1].lower()=='m':
                       return int(float(memory[:-1])*1024*1024)
                   elif memory[-1].lower()=='k':
                       return int(float(memory[:-1])*1024)
                   else:
                       return int(float(memory))
               else:
                   return int(float(memory))
           except:
               raise Exception("Error when rescripting memory!")
    
       def __iter__(self):
           '''生成器,yield输出evs文件夹路径和大小'''
           # 生产者线程
           producer = threading.Thread(target = self.evsClient.work)
           producer.start()
           # 主程序提供生成器用于消费,输出evs文件夹路径和大小
           while True:
               logging.info(f"consumer: start to get the queue")
               path, size = self.queue.get(timeout=self.timeout)
               logging.info(f"consumer: get the queue {path}, {size} ")
               if path is None and size is None:
                   break
               yield path, size
               self.evsClient.remove(path, size)
           # 主程序等待
           producer.join()
    
    if __name__ == '__main__':
       # 使用示例
       for path, size in Pipeline('./video', 's3://your-obs-name/.../video'):
           do_job(path, size)

    如果你觉得老山的文章不错,不妨点击下关注。

    作者::山找海味

  • 相关阅读:
    6-ESP8266 SDK开发基础入门篇--操作系统入门使用
    5-ESP8266 SDK开发基础入门篇--了解一下操作系统
    【java规则引擎】基本语法和相关属性介绍
    【eclipse】 怎么解决java.lang.NoClassDefFoundError错误
    【java规则引擎】java规则引擎搭建开发环境
    【4】JDK和CGLIB生成动态代理类的区别
    【java规则引擎】一个基于drools规则引擎实现的数学计算例子
    【3】SpringMVC的Controller
    设计模式之禅之代理模式
    【java规则引擎】规则引擎RuleBase中利用观察者模式
  • 原文地址:https://www.cnblogs.com/huaweicloud/p/11861455.html
Copyright © 2011-2022 走看看