zoukankan      html  css  js  c++  java
  • vnpy源码阅读学习(6):事件引擎

    看完了MainEngine的代码,大概有一个了解以后。我们在初始化MainEngine的时候,要传入或者实例化一个事件引擎对象。

    代码基本结构

    按照惯例,我把所有的方法体折叠,只保留类和方法,先大概对这个类要实现的功能有一个基本的了解。然后再逐个深入。

    class EventEngine:
        def __init__(self, interval: int = 1):
            pass
    
        def _run(self):
            pass
    
        def _process(self, event: Event):
            pass
    
        def _run_timer(self):
            pass
    
        def start(self):
            pass
    
        def stop(self):
            pass
    
        def put(self, event: Event):
            pass
    
        def register(self, type: str, handler: HandlerType):
            pass
    
        def unregister(self, type: str, handler: HandlerType):
            pass
    
        def register_general(self, handler: HandlerType):
            pass
    
        def unregister_general(self, handler: HandlerType):
            pass
    

    这个EventEngine看起来要比MainEngin有点货,至少从方法名字上很难一目了然的猜测到方法的作用。脑海中浮过VC的消息循环,或者C#的事件委托,又或者PyQt5的事件和插槽,不会是和那个机制类似的东东吧。

    __init__(self, interval: int = 1)

        def __init__(self, interval: int = 1):
            """
            Timer event is generated every 1 second by default, if
            interval not specified.
            """
            self._interval = interval
            self._queue = Queue()
            self._active = False
            self._thread = Thread(target=self._run)
            self._timer = Thread(target=self._run_timer)
            self._handlers = defaultdict(list)
            self._general_handlers = []
    

    从初始化的代码中可以看到,EventEngine用到了两个线程Thread同时初始化了一个队列__queue

    register

        def register(self, type: str, handler: HandlerType):
            """
            Register a new handler function for a specific event type. Every
            function can only be registered once for each event type.
            """
            handler_list = self._handlers[type]
            if handler not in handler_list:
                handler_list.append(handler)
    

    我们找到HandlerType的定义

    HandlerType = Callable[[Event], None]
    

    基本上可以理解到注册一个可以被回调的方法,就是把这个方法的type和方法传给EventEngine然后,EventEngine会把这个回调方法保存到_handlers[type]中的数组中。

    unregister

        def unregister(self, type: str, handler: HandlerType):
            """
            Unregister an existing handler function from event engine.
            """
            handler_list = self._handlers[type]
    
            if handler in handler_list:
                handler_list.remove(handler)
    
            if not handler_list:
                self._handlers.pop(type)
    

    正好和register相反的方法,把一个事件从列表中移除。

    register_general

        def register_general(self, handler: HandlerType):
            """
            Register a new handler function for all event types. Every
            function can only be registered once for each event type.
            """
            if handler not in self._general_handlers:
                self._general_handlers.append(handler)
    

    这个方法相当于把一个不指定type的回调方法放入_general_handlers,跟他相反的方法:unregister_general

    put

        def put(self, event: Event):
            """
            Put an event object into event queue.
            """
            self._queue.put(event)
    

    这个是传入一个Event,然后把这个Event放入到一个队列中去
    我们先来看看Event这个类。

    class Event:
        """
        Event object consists of a type string which is used
        by event engine for distributing event, and a data
        object which contains the real data.
        """
    
        def __init__(self, type: str, data: Any = None):
            """"""
            self.type = type
            self.data = data
    

    从以上的代码,我们可以基本猜测在vnpy的事件引擎中,过来一个类型的数据,然后把这个数据存放到队列。然后再通过某种机制,定时的去调用那些通过regisiter进来的EventHandler。而什么类型的数据进来,给什么样的Handler回调。应该是通过Type去对应的。接下来,我们继续深入。

    start和stop

        def start(self):
            """
            Start event engine to process events and generate timer events.
            """
            self._active = True
            self._thread.start()
            self._timer.start()
    
        def stop(self):
            """
            Stop event engine.
            """
            self._active = False
            self._timer.join()
            self._thread.join()
    

    设置_active状态,启动或者阻塞线程。

    核心部分的代码

        def _run(self):
            while self._active:
                try:
                    event = self._queue.get(block=True, timeout=1)
                    self._process(event)
                except Empty:
                    pass
    
        def _process(self, event: Event):
            
            if event.type in self._handlers:
                [handler(event) for handler in self._handlers[event.type]]
    
            if self._general_handlers:
                [handler(event) for handler in self._general_handlers]
    
        def _run_timer(self):
            
            while self._active:
                sleep(self._interval)
                event = Event(EVENT_TIMER)
                self.put(event)
    

    通过核心代码代码,我们豁然开朗。原来就是启动了两个线程,一个线程不断的循环把所有接受到的Event取出来发送给需要它的Handler进行回调,另外一个线程不断定时成Event,然后这个Event可以定时回调某些Handler

    总结

    我总结成一个图片

  • 相关阅读:
    网上搜索整理的前端开发工程师面试题附答案
    Python-RabbitMQ-topic(细致消息过滤的广播模式)
    Python-RabbitMQ-direct(广播模式)
    Python-RabbitMQ-fanout(广播模式)
    rabbitmq中关于exchange模式type报错
    Python-RabbitMQ(持久化)
    Python-RabbitMQ(简单发送模型)
    python-gevent模块实现socket大并发
    python-gevent模块(自动切换io的协程)
    python-greenlet模块(协程)
  • 原文地址:https://www.cnblogs.com/bbird/p/12460898.html
Copyright © 2011-2022 走看看