zoukankan      html  css  js  c++  java
  • python实现事件驱动模型

    转:https://www.jianshu.com/p/a605fab0ab11

    # encoding: UTF-8
    # 系统模块
    from Queue import Queue, Empty
    from threading import *
    ########################################################################
    class EventManager:
        #----------------------------------------------------------------------
        def __init__(self):
            """初始化事件管理器"""
            # 事件对象列表
            self.__eventQueue = Queue()
            # 事件管理器开关
            self.__active = False
            # 事件处理线程
            self.__thread = Thread(target = self.__Run)
    
            # 这里的__handlers是一个字典,用来保存对应的事件的响应函数
            # 其中每个键对应的值是一个列表,列表中保存了对该事件监听的响应函数,一对多
            self.__handlers = {}
    
        #----------------------------------------------------------------------
        def __Run(self):
            """引擎运行"""
            while self.__active == True:
                try:
                    # 获取事件的阻塞时间设为1秒
                    event = self.__eventQueue.get(block = True, timeout = 1)  
                    self.__EventProcess(event)
                except Empty:
                    pass
    
        #----------------------------------------------------------------------
        def __EventProcess(self, event):
            """处理事件"""
            # 检查是否存在对该事件进行监听的处理函数
            if event.type_ in self.__handlers:
                # 若存在,则按顺序将事件传递给处理函数执行
                for handler in self.__handlers[event.type_]:
                    handler(event)
    
        #----------------------------------------------------------------------
        def Start(self):
            """启动"""
            # 将事件管理器设为启动
            self.__active = True
            # 启动事件处理线程
            self.__thread.start()
    
        #----------------------------------------------------------------------
        def Stop(self):
            """停止"""
            # 将事件管理器设为停止
            self.__active = False
            # 等待事件处理线程退出
            self.__thread.join()
    
        #----------------------------------------------------------------------
        def AddEventListener(self, type_, handler):
            """绑定事件和监听器处理函数"""
            # 尝试获取该事件类型对应的处理函数列表,若无则创建
            try:
                handlerList = self.__handlers[type_]
            except KeyError:
                handlerList = []
                
            self.__handlers[type_] = handlerList
            # 若要注册的处理器不在该事件的处理器列表中,则注册该事件
            if handler not in handlerList:
                handlerList.append(handler)
                
        #----------------------------------------------------------------------
        def RemoveEventListener(self, type_, handler):
            """移除监听器的处理函数"""
            #读者自己试着实现
            
        #----------------------------------------------------------------------
        def SendEvent(self, event):
            """发送事件,向事件队列中存入事件"""
            self.__eventQueue.put(event)
    
    ########################################################################
    """事件对象"""
    class Event:
        def __init__(self, type_=None):
            self.type_ = type_      # 事件类型
            self.dict = {}          # 字典用于保存具体的事件数据

    测试代码

    #-------------------------------------------------------------------
    # encoding: UTF-8
    import sys
    from datetime import datetime
    from threading import *
    from EventManager import *
    
    #事件名称  新文章
    EVENT_ARTICAL = "Event_Artical"
    
    #事件源 公众号
    class PublicAccounts:
        def __init__(self,eventManager):
            self.__eventManager = eventManager
    
        def WriteNewArtical(self):
            #事件对象,写了新文章
            event = Event(type_=EVENT_ARTICAL)
            event.dict["artical"] = u'如何写出更优雅的代码
    '
            #发送事件
            self.__eventManager.SendEvent(event)
            print u'公众号发送新文章
    '
    
    #监听器 订阅者
    class Listener:
        def __init__(self,username):
            self.__username = username
    
        #监听器的处理函数 读文章
        def ReadArtical(self,event):
            print(u'%s 收到新文章' % self.__username)
            print(u'正在阅读新文章内容:%s'  % event.dict["artical"])
    
    """测试函数"""
    #--------------------------------------------------------------------
    def test():
        listner1 = Listener("thinkroom") #订阅者1
        listner2 = Listener("steve")#订阅者2
    
        eventManager = EventManager()
        
        #绑定事件和监听器响应函数(新文章)
        eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical)
        eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical)
        eventManager.Start()
    
        publicAcc = PublicAccounts(eventManager)
        timer = Timer(2, publicAcc.WriteNewArtical)
        timer.start()
        
    if __name__ == '__main__':
        test()
  • 相关阅读:
    实体类中的date类型问题
    java.sql.SQLException: validateConnection false
    本地计算机的mysql服务启动后停止
    VUE遇到Windows 64-bit with Unsupported runtime (64) For more information on which environments are supported please see
    有关详细信息, 请使用 -Xlint:unchecked 重新编译。
    mysql出错ERROR 2003 (HY000): Can't connect to MySQL server on 'localhost' (10061)
    WIN7系统如何在文件列表中显示文件夹后缀
    shell 两类执行方法
    Git 报错 error setting certificate verify locations
    maven打包不同jdk版本的包
  • 原文地址:https://www.cnblogs.com/wangbin2188/p/13335079.html
Copyright © 2011-2022 走看看