前言
死锁和具体的开发语言无关,工业界使用的主流开发语言者都提供并行/并发,线程/进程,及各种锁的元语
多线程导致的死锁在现在的代码开发中已经很少见了,现在日常谈到的死锁主要是sql这类db的事务导致的sql死锁
因为大部分开发工作已经很少直接和锁打交道,都是各种封装好的组件,如java的juc等。
以java为例,早年jdk1.6,jdk1.7的是时候,还有大量直接和锁打交道的场景,但现在基本都是使用juc实现的工具类
另外一些无锁架构,例如 lbev/lebuv/akka/回调/async/await/协程,本身设计上就回避了可能出现死锁的场景
这类机制个人都搞过
无锁的 lua,nodejs,golang,akka,及python里的async/await,gevent机制
有锁的 c#,java,scala,juc
问题
程序在某次变更后出现假死,执行一段时间后,不再有日志输出,消费和生产停滞
其实一开始并没有想到死锁,也是对自己的代码比较自信,代码里用到了资源池,但读写分治,没想到会有死锁的部分,最后确实打脸了
因为有大量的io 操作,最初以为是哪个io环节出问题了,且没有释放连接
在所有连接处加了超时控制,依然出现假死
因为方法是高度抽象的,该程序的执行组件与其他组件,区别只是某个类的实现不同
别的程序组件完全正常,就这个组件假死,判断是执行部分异常
简述下该项目的并发模型
项目为机器学习/深度学习的算法应用,由于要加载算法模型做计算(预测/分类),而算法模型这一片基本是python的天下
因此项目算法应用部分以python语言开发,同时在算法应用外的io部分,python语言本身,也不构成性能瓶颈,因此全部使用python,而未使用多语言
传统的服务主要分为cpu密集,或io密集,web类的读写各种db/sql/nosql/mq业务系统是io密集型,占业力开发总量的90%以上
算法相关的则为cpu密集,部分mem密集,由于深度学习的工业界应用,深度学习重度的依赖显卡资源
因此除cpu密集(cpu通用计算)外,又出现大量gpu密集的场景
机器学习阶段,cpu算法应用的模型普遍较小,内存也较为廉价,因此mem并不是个值得注意的点,gpu算法则因为模型占用大量的显存空间,显存昂贵,gpu mem密集也成为必须注意的点
项目功能简单,但要同时考虑cpu/gpu/mem和io
算法应用部分既有cpu/gpu/mem/显存密集
大量小文件的读写又是io密集
最终项目的选型和优化方案是
计算部分 多线程 cpu/gpu密集,并行计算
io 部分协程 async/await,减少线程调度,同时提高并行度
mem/显存部分(加载模型) 资源池,假设加载一个模型2G,有限显存,只能加载4个模型,模型复用,提供4个并发度。
排查最终定位到执行部分
class BoundedThreadPoolExecutor(ThreadPoolExecutor):
def __init__(self, max_workers, max_waiting_tasks, *args, **kwargs):
super().__init__(max_workers=max_workers, *args, **kwargs)
self.model_queue = Queue(maxsize=max_workers)
for i in range(0, max_workers):
self.model_queue.put(TextSystem(ocr_config.DEFAULT_TS_ARGS))
class DocStageOcrHandle(DocStageHandle):
def __init__(self, job_name="test_job_name", config_name="test_config.ini", media_type="media_type", use_gpu=False):
DocStageHandle.__init__(self, job_name, "alg", config_name, media_type)
ocr_config.DEFAULT_TS_ARGS.use_gpu = use_gpu
self.exector = BoundedThreadPoolExecutor(4, 8)
async def _ocr_by_thread_submit(self, media_type: str, rsq_data: bytes, file_name: str, url_info: dict):
ocr_list_set = []
# 拿出一个模型
text_sys = self.exector.model_queue.get(block=True)
f = self.exector.submit(ocrImageToSetByBuffer, rsq_data, text_sys)
return f, text_sys
async def _ocr_by_thread_load_result(self, media_type: None, f: None, text_sys: None):
result = f.result()
# 再把模型放回
self.exector.model_queue.put(text_sys)
ocr_list_set = []
return [result]
async def _ocr_media_without_lock_async_by_pool(self, media_type: str, rsq_data: bytes, url_info: dict):
try:
f, text_sys = await self._ocr_by_thread_submit(media_type, rsq_data, "", url_info)
ocr_list_set = await self._ocr_by_thread_load_result(media_type, f, text_sys)
except RuntimeError as exception:
pass
return ocr_list_set
发生死锁的位置
f, text_sys = await self._ocr_by_thread_submit(media_type, rsq_data, "", url_info)
ocr_list_set = await self._ocr_by_thread_load_result(media_type, f, text_sys)
协程和线程并用,出现死锁
多线程死锁,理解简单
协程和线程死锁,不好描述,但换个角度,协程实际也是在线程中执行的,本质上依然是线程和线程间的死锁
该执行部分的死锁部分是self.exector.model_queue.get(block=True) ,model_queue的行为类似阻塞队列
text_sys = self.exector.model_queue.get(block=True)
和 self.exector.model_queue.put(text_sys)
有经验的应该已经看出问题了
result = f.result()
# 再把模型放回
self.exector.model_queue.put(text_sys)
这里,由于模型计算时会出现异常,导致直接跳出方法外,无法执行self.exector.model_queue.put(text_sys),将算法模型归还
导致其他线程阻塞在 text_sys = self.exector.model_queue.get(block=True) 一直在等待一个无法取到的资源
try:
result = f.result()
except Exception as e:
raise e
finally:
# 再把模型放回
self.exector.model_queue.put(text_sys)
如此,问题解决,这个问题是疏忽了
在每个方法调用时创建加载模型,开销较大,用了一个资源池,放固定加载好的几个模型
流程没问题,但因为异常没处理好,导致资源泄露,服务假死