zoukankan      html  css  js  c++  java
  • 基于queue的python多进程日志管理

    在我们的异常检测应用中,需要对每组IoT设备分别训练一个模型,每个模型对一组设备的指标数据进行实时异常检测。方案采用master-worker+消息队列的方式实现模型对外服务,但是每个worker的日志需要集中收集到同一个日志文件,而不是每个worker产生一个日志文件。此时我们采用基于logging模块的QueueHandler+QueueListener方案。master在创建worker之前,首先创建一个日志队列,创建worker时,将日志队列传入,同时worker端使用QueueHandler作为日志处理例程,将日志写入队列,master端启动Queuelistener对日志队列中的消息进行监听,对接收到的消息调用相应的FileHandler处理例程进行处理,写入文件。以次实现对多worker日志的集中管理。相关代码如下:

    master侧的日志相关代码:

      def _init_logger(self):
            self.logger = logging.getLogger("job.manager")
            self.log_queue = Queue()
            self.log_queue_listener = QueueListener(self.log_queue, *self.logger.handlers)
    
        def _register_signals(self):
            signal.signal(signal.SIGINT, self.stop)
            signal.signal(signal.SIGTERM, self.stop)
    
        def _start_single_worker(self, worker_id, worker_group, worker_config):
            worker = Worker(
                worker_id=worker_id,
                worker_group=worker_group,
                group_members=worker_config.get('group_members'),
                consume_topic=mq_config.get('sample_topic'),
                consume_subscription=worker_group + '_subscription',
                produce_topic=mq_config.get('result_topic'),
                model_path=worker_config.get('model').get('binary_path'),
                model_type=worker_config.get('model').get('type'),
                log_queue=self.log_queue)
            try:
                worker.start()
            except Exception as e:
                raise Exception("worker start error")

    worker侧日志相关代码:

    _mutex = RLock()
    
        def __init__(self, worker_id, worker_group, group_members, consume_topic, consume_subscription,
                     produce_topic, model_path, model_type, log_queue):
            self.id = worker_id
            self.group = worker_group,
            self.group_members = group_members
            self.running = True
            self.status = 'idle'
            self.logger = None
            self.mq_client = None
            self.model = None
    
            self._init_logger(log_queue)
            self._init_mq_client(consume_topic, consume_subscription, produce_topic)
            self._load_model(model_path, model_type)
            self._register_signals()
    
            self.logger.info('Worker {} started.'.format(self.id))
    
        def __del__(self):
            if self.mq_client:
                self.mq_client.close()
    
        @property
        def status(self):
            with self._mutex:
                return self._status
    
        @status.setter
        def status(self, status):
            with self._mutex:
                self._status = status
    
        def _init_logger(self, log_queue):
            queue_handler = QueueHandler(log_queue)
            self.logger = logging.getLogger('job.worker')
            self.logger.addHandler(queue_handler)
    
        def _register_signals(self):
            signal.signal(signal.SIGINT, self.stop)
            signal.signal(signal.SIGTERM, self.stop)
    
        def _init_mq_client(self, consume_topic, consume_subscription, produce_topic):
            self.mq_client = mq_cls()
            self.logger.info('worker {}: mq client initialised.'.format(self.id))
    
            self.mq_client.init_consumer(consume_topic, consume_subscription)
            self.logger.info('worker {}: mq consumer initialised.'.format(self.id))
    
            self.mq_client.init_producer(produce_topic)
            self.logger.info('worker {}: mq producer initialised.'.format(self.id))
    
        def _load_model(self, model_path, model_type):
            if model_type == "sklearn":
                self.model = load_sklearn_model(model_path)
            elif model_type == "keras":
                self.model = load_keras_model(model_path)
            else:
                raise Exception('worker {}: model type error.'.format(self.id))
            self.logger.info('worker {}: model loaded.'.format(self.id))
  • 相关阅读:
    XML文件详解以及解析
    Delphi 泛型详解
    Delphi 修改本地日期和时间
    java -> this关键字
    java ->super关键字
    Java -> 构造器(构造方法)
    java -> 方法的重载
    java面向对象->多态
    Java面向对象->接口
    Java面向对象->抽象类
  • 原文地址:https://www.cnblogs.com/zcsh/p/14338526.html
Copyright © 2011-2022 走看看