在我们的异常检测应用中,需要对每组IoT设备分别训练一个模型,每个模型对一组设备的指标数据进行实时异常检测。方案采用master-worker+消息队列的方式实现模型对外服务,但是每个worker的日志需要集中收集到同一个日志文件,而不是每个worker产生一个日志文件。此时我们采用基于logging模块的QueueHandler+QueueListener方案。master在创建worker之前,首先创建一个日志队列,创建worker时,将日志队列传入,同时worker端使用QueueHandler作为日志处理例程,将日志写入队列,master端启动Queuelistener对日志队列中的消息进行监听,对接收到的消息调用相应的FileHandler处理例程进行处理,写入文件。以次实现对多worker日志的集中管理。相关代码如下:
master侧的日志相关代码:
def _init_logger(self): self.logger = logging.getLogger("job.manager") self.log_queue = Queue() self.log_queue_listener = QueueListener(self.log_queue, *self.logger.handlers) def _register_signals(self): signal.signal(signal.SIGINT, self.stop) signal.signal(signal.SIGTERM, self.stop) def _start_single_worker(self, worker_id, worker_group, worker_config): worker = Worker( worker_id=worker_id, worker_group=worker_group, group_members=worker_config.get('group_members'), consume_topic=mq_config.get('sample_topic'), consume_subscription=worker_group + '_subscription', produce_topic=mq_config.get('result_topic'), model_path=worker_config.get('model').get('binary_path'), model_type=worker_config.get('model').get('type'), log_queue=self.log_queue) try: worker.start() except Exception as e: raise Exception("worker start error")
worker侧日志相关代码:
_mutex = RLock() def __init__(self, worker_id, worker_group, group_members, consume_topic, consume_subscription, produce_topic, model_path, model_type, log_queue): self.id = worker_id self.group = worker_group, self.group_members = group_members self.running = True self.status = 'idle' self.logger = None self.mq_client = None self.model = None self._init_logger(log_queue) self._init_mq_client(consume_topic, consume_subscription, produce_topic) self._load_model(model_path, model_type) self._register_signals() self.logger.info('Worker {} started.'.format(self.id)) def __del__(self): if self.mq_client: self.mq_client.close() @property def status(self): with self._mutex: return self._status @status.setter def status(self, status): with self._mutex: self._status = status def _init_logger(self, log_queue): queue_handler = QueueHandler(log_queue) self.logger = logging.getLogger('job.worker') self.logger.addHandler(queue_handler) def _register_signals(self): signal.signal(signal.SIGINT, self.stop) signal.signal(signal.SIGTERM, self.stop) def _init_mq_client(self, consume_topic, consume_subscription, produce_topic): self.mq_client = mq_cls() self.logger.info('worker {}: mq client initialised.'.format(self.id)) self.mq_client.init_consumer(consume_topic, consume_subscription) self.logger.info('worker {}: mq consumer initialised.'.format(self.id)) self.mq_client.init_producer(produce_topic) self.logger.info('worker {}: mq producer initialised.'.format(self.id)) def _load_model(self, model_path, model_type): if model_type == "sklearn": self.model = load_sklearn_model(model_path) elif model_type == "keras": self.model = load_keras_model(model_path) else: raise Exception('worker {}: model type error.'.format(self.id)) self.logger.info('worker {}: model loaded.'.format(self.id))