zoukankan      html  css  js  c++  java
  • 记一次python 协程给合多线程死锁问题

    前言

    死锁和具体的开发语言无关,工业界使用的主流开发语言者都提供并行/并发,线程/进程,及各种锁的元语

    多线程导致的死锁在现在的代码开发中已经很少见了,现在日常谈到的死锁主要是sql这类db的事务导致的sql死锁

    因为大部分开发工作已经很少直接和锁打交道,都是各种封装好的组件,如java的juc等。

    以java为例,早年jdk1.6,jdk1.7的是时候,还有大量直接和锁打交道的场景,但现在基本都是使用juc实现的工具类

    另外一些无锁架构,例如 lbev/lebuv/akka/回调/async/await/协程,本身设计上就回避了可能出现死锁的场景

    这类机制个人都搞过

    无锁的 lua,nodejs,golang,akka,及python里的async/await,gevent机制

    有锁的 c#,java,scala,juc


    问题

    程序在某次变更后出现假死,执行一段时间后,不再有日志输出,消费和生产停滞

    其实一开始并没有想到死锁,也是对自己的代码比较自信,代码里用到了资源池,但读写分治,没想到会有死锁的部分,最后确实打脸了

    因为有大量的io 操作,最初以为是哪个io环节出问题了,且没有释放连接

    在所有连接处加了超时控制,依然出现假死

    因为方法是高度抽象的,该程序的执行组件与其他组件,区别只是某个类的实现不同

    别的程序组件完全正常,就这个组件假死,判断是执行部分异常


    简述下该项目的并发模型

    项目为机器学习/深度学习的算法应用,由于要加载算法模型做计算(预测/分类),而算法模型这一片基本是python的天下

    因此项目算法应用部分以python语言开发,同时在算法应用外的io部分,python语言本身,也不构成性能瓶颈,因此全部使用python,而未使用多语言

    传统的服务主要分为cpu密集,或io密集,web类的读写各种db/sql/nosql/mq业务系统是io密集型,占业力开发总量的90%以上

    算法相关的则为cpu密集,部分mem密集,由于深度学习的工业界应用,深度学习重度的依赖显卡资源

    因此除cpu密集(cpu通用计算)外,又出现大量gpu密集的场景

    机器学习阶段,cpu算法应用的模型普遍较小,内存也较为廉价,因此mem并不是个值得注意的点,gpu算法则因为模型占用大量的显存空间,显存昂贵,gpu mem密集也成为必须注意的点


    项目功能简单,但要同时考虑cpu/gpu/mem和io

    算法应用部分既有cpu/gpu/mem/显存密集

    大量小文件的读写又是io密集

    最终项目的选型和优化方案是

    计算部分 多线程 cpu/gpu密集,并行计算

    io 部分协程 async/await,减少线程调度,同时提高并行度

    mem/显存部分(加载模型) 资源池,假设加载一个模型2G,有限显存,只能加载4个模型,模型复用,提供4个并发度。

    排查最终定位到执行部分

    class BoundedThreadPoolExecutor(ThreadPoolExecutor):
        def __init__(self, max_workers, max_waiting_tasks, *args, **kwargs):
            super().__init__(max_workers=max_workers, *args, **kwargs)
            self.model_queue = Queue(maxsize=max_workers)
            for i in range(0, max_workers):
                self.model_queue.put(TextSystem(ocr_config.DEFAULT_TS_ARGS))
                
    class DocStageOcrHandle(DocStageHandle):
        def __init__(self, job_name="test_job_name", config_name="test_config.ini", media_type="media_type", use_gpu=False):
            DocStageHandle.__init__(self, job_name, "alg", config_name, media_type)
            ocr_config.DEFAULT_TS_ARGS.use_gpu = use_gpu
            self.exector = BoundedThreadPoolExecutor(4, 8)    
        
        async def _ocr_by_thread_submit(self, media_type: str, rsq_data: bytes, file_name: str, url_info: dict):
            ocr_list_set = []
            # 拿出一个模型
            text_sys = self.exector.model_queue.get(block=True)
            f = self.exector.submit(ocrImageToSetByBuffer, rsq_data, text_sys)
            return f, text_sys
    
        async def _ocr_by_thread_load_result(self, media_type: None, f: None, text_sys: None):
            result = f.result()
            # 再把模型放回
            self.exector.model_queue.put(text_sys)
            ocr_list_set = []
            return [result]
          
        async def _ocr_media_without_lock_async_by_pool(self, media_type: str, rsq_data: bytes, url_info: dict):
            try:
                    f, text_sys = await self._ocr_by_thread_submit(media_type, rsq_data, "", url_info)
                    ocr_list_set = await self._ocr_by_thread_load_result(media_type, f, text_sys)          
            except RuntimeError as exception:
                    pass
            return ocr_list_set      
    

    发生死锁的位置

                    f, text_sys = await self._ocr_by_thread_submit(media_type, rsq_data, "", url_info)
                    ocr_list_set = await self._ocr_by_thread_load_result(media_type, f, text_sys)          
    

    协程和线程并用,出现死锁

    多线程死锁,理解简单

    协程和线程死锁,不好描述,但换个角度,协程实际也是在线程中执行的,本质上依然是线程和线程间的死锁

    该执行部分的死锁部分是self.exector.model_queue.get(block=True) ,model_queue的行为类似阻塞队列

    text_sys = self.exector.model_queue.get(block=True)self.exector.model_queue.put(text_sys)

    有经验的应该已经看出问题了

            result = f.result()
            # 再把模型放回
            self.exector.model_queue.put(text_sys)
    

    这里,由于模型计算时会出现异常,导致直接跳出方法外,无法执行self.exector.model_queue.put(text_sys),将算法模型归还

    导致其他线程阻塞在 text_sys = self.exector.model_queue.get(block=True) 一直在等待一个无法取到的资源

            try:
                result = f.result()
            except Exception as e:
                raise e
            finally:
                # 再把模型放回
                self.exector.model_queue.put(text_sys)
    

    如此,问题解决,这个问题是疏忽了

    在每个方法调用时创建加载模型,开销较大,用了一个资源池,放固定加载好的几个模型
    流程没问题,但因为异常没处理好,导致资源泄露,服务假死

  • 相关阅读:
    [python subprocess学习篇] 调用系统命令
    linux dd命令创建一定大小的文件
    [linux time命令学习篇] time 统计命令执行的时间
    新建应用母版页的网页index.aspx,about.aspx,login.aspx
    MOSS母板页制作 学习笔记(一)
    SharePoint 2010顶部链接导航栏的详细操作
    使用SharePoint 2010 母版页
    SharePoint 2010 母版页制作的简单介绍
    在 SharePoint 2010 中访问数据
    牛刀小试、用SharePoint 实现请假管理功能
  • 原文地址:https://www.cnblogs.com/zihunqingxin/p/14916329.html
Copyright © 2011-2022 走看看