zoukankan      html  css  js  c++  java
  • python多线程与多进程异步事件框架

    多线程简单实现

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    import logging
    import queue
    import threading
    from concurrent.futures import ThreadPoolExecutor
    
    
    # 任务:事件
    def func_a(a, b):
        return a + b
    def func_b(a, b):
        return a * b
    def func_c(a, b, c):
        return a * b - c
    # 回调函数
    def handle_result1(result):
        print(type(result), result)
    def handle_result2(result):
        print(type(result), result)
    def handle_result3(result):
        print(type(result), result)
    
    
    class EventEngine(object):
        # 初始化事件事件驱动引擎
        def __init__(self):
            # 保存事件列表:异步任务队列
            self.__eventQueue = queue.Queue()
            # 引擎开关
            self.__active = False
            # 事件处理字典{'event1': [handler1,handler2] , 'event2':[handler3, ...,handler4]}
            self.__handlers = {}
            # 事件引擎主进程
            self.__Thread = threading.Thread(target=self.task_queue_consumer)
            # 事件处理线程池
            self.__thread_pool = ThreadPoolExecutor(max_workers=5)
            # 线程处理存储
            self.__thread_Pool = []
    
        #注册事件
        def register(self,event, callback, *args, **kwargs):
            Event = {
                'function': event,
                'callback': callback,
                'args': args,
                'kwargs': kwargs
            }
            self.__handlers[event] = Event
    
        #注销事件
        def unregister(self,event):
            if(self.__handlers[event]):
                del self.__handlers[event]
    
    
        #提交事件
        def sendevent(self,event):
            if ( event in self.__handlers.keys()):
                self.__eventQueue.put(self.__handlers[event])
    
    
        # 开启事件引擎
        def start(self):
            self.__active = True
            self.__Thread.start()
    
        # 暂停事件引擎
        def stop(self):
            self.__active = False
    
        # 暂停后开始
        def restart(self):
            self.__active = True
    
        # 关闭事件引擎
        def close(self):
            pass
    
        # 开启事件循环
        def task_queue_consumer(self):
            """
            异步任务队列
            """
            while(1):
                while self.__active:
                    if (self.__eventQueue.empty() == False):
                        try:
                            task = self.__eventQueue.get()
                            function = task.get('function')
                            callback = task.get('callback')
                            args = task.get('args')
                            kwargs = task.get('kwargs')
                            try:
                                if callback:
                                    thread = self.__thread_pool.submit(callback,function(*args, **kwargs))
                                    self.__thread_Pool.append(thread)
                                    # callback(function(*args, **kwargs))
                            except Exception as ex:
                                if callback:
                                    callback(ex)
                            finally:
                                self.__eventQueue.task_done()
                        except Exception as ex:
                            logging.warning(ex)
    
    
    
    if __name__ == '__main__':
        import time
        #初始化多线程异步框架
        Engine = EventEngine()
        #启动
        Engine.start()
        #注册回调函数
        Engine.register(func_a, handle_result1, 1, 2)
        Engine.register(func_b, handle_result2, 1, 2)
        Engine.register(func_c, handle_result3, 1, 2, 3)
        #提交事件
        Engine.sendevent(func_a)
        Engine.sendevent(func_b)
        Engine.sendevent(func_c)
        time.sleep(2)
        Engine.stop()
        Engine.restart()
        Engine.sendevent(func_b)
        Engine.sendevent(func_c)
        # for i in range(100):
        #     Engine.sendevent(func_a)

    多进程实现

    from multiprocessing import Process, Queue
     
     
    class EventEngine(object):
      # 初始化事件事件驱动引擎
      def __init__(self):
        #保存事件列表
        self.__eventQueue = Queue()
        #引擎开关
        self.__active = False
        #事件处理字典{'event1': [handler1,handler2] , 'event2':[handler3, ...,handler4]}
        self.__handlers = {}
        #保存事件处理进程池
        self.__processPool = []
        #事件引擎主进程
        self.__mainProcess = Process(target=self.__run)
     
     
      #执行事件循环
      def __run(self):
        while self.__active:
          #事件队列非空
          if not self.__eventQueue.empty():
            #获取队列中的事件 超时1秒
            event = self.__eventQueue.get(block=True ,timeout=1)
            #执行事件
            self.__process(event)
          else:
            # print('无任何事件')
            pass
     
     
      #执行事件
      def __process(self, event):
        if event.type in self.__handlers:
          for handler in self.__handlers[event.type]:
            #开一个进程去异步处理
            p = Process(target=handler, args=(event, ))
            #保存到进程池
            self.__processPool.append(p)
            p.start()
     
     
      #开启事件引擎
      def start(self):
        self.__active = True
        self.__mainProcess.start()
     
     
      #暂停事件引擎
      def stop(self):
        """停止"""
        # 将事件管理器设为停止
        self.__active = False
        # 等待事件处理进程退出
        for p in self.__processPool:
          p.join()
        self.__mainProcess.join()
     
     
      #终止事件引擎
      def terminate(self):
        self.__active = False
        #终止所有事件处理进程
        for p in self.__processPool:
          p.terminate()
        self.__mainProcess.join()
     
     
      #注册事件
      def register(self, type, handler):
        """注册事件处理函数监听"""
        # 尝试获取该事件类型对应的处理函数列表,若无则创建
        try:
          handlerList = self.__handlers[type]
        except KeyError:
          handlerList = []
          self.__handlers[type] = handlerList
     
        # 若要注册的处理器不在该事件的处理器列表中,则注册该事件
        if handler not in handlerList:
          handlerList.append(handler)
     
     
      def unregister(self, type, handler):
        """注销事件处理函数监听"""
        # 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
        try:
          handlerList = self.__handlers[type]
     
          # 如果该函数存在于列表中,则移除
          if handler in handlerList:
            handlerList.remove(handler)
     
          # 如果函数列表为空,则从引擎中移除该事件类型
          if not handlerList:
            del self.__handlers[type]
        except KeyError:
          pass
     
     
      def sendEvent(self, event):
        #发送事件 像队列里存入事件
        self.__eventQueue.put(event)
     
     
    class Event(object):
      #事件对象
      def __init__(self, type =None):
        self.type = type
        self.dict = {}
     
     
     
    #测试
    if __name__ == '__main__':
      import time
      EVENT_ARTICAL = "Event_Artical"
     
      # 事件源 公众号
      class PublicAccounts:
        def __init__(self, eventManager):
          self.__eventManager = eventManager
     
        def writeNewArtical(self):
          # 事件对象,写了新文章
          event = Event(EVENT_ARTICAL)
          event.dict["artical"] = u'如何写出更优雅的代码
    '
          # 发送事件
          self.__eventManager.sendEvent(event)
          print(u'公众号发送新文章
    ')
     
     
      # 监听器 订阅者
      class ListenerTypeOne:
        def __init__(self, username):
          self.__username = username
     
        # 监听器的处理函数 读文章
        def ReadArtical(self, event):
          print(u'%s 收到新文章' % self.__username)
          print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))
     
     
      class ListenerTypeTwo:
        def __init__(self, username):
          self.__username = username
     
        # 监听器的处理函数 读文章
        def ReadArtical(self, event):
          print(u'%s 收到新文章 睡3秒再看' % self.__username)
          time.sleep(3)
          print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))
     
     
      def test():
        listner1 = ListenerTypeOne("thinkroom") # 订阅者1
        listner2 = ListenerTypeTwo("steve") # 订阅者2
     
        ee = EventEngine()
     
        # 绑定事件和监听器响应函数(新文章)
        ee.register(EVENT_ARTICAL, listner1.ReadArtical)
        ee.register(EVENT_ARTICAL, listner2.ReadArtical)
        for i in range(0, 20):
          listner3 = ListenerTypeOne("Jimmy") # 订阅者X
          ee.register(EVENT_ARTICAL, listner3.ReadArtical)
     
        ee.start()
     
        #发送事件
        publicAcc = PublicAccounts(ee)
        publicAcc.writeNewArtical()
     
      test()

    多进程程序来源:http://blog.sina.com.cn/s/blog_13bb711fd0102x5nd.html

  • 相关阅读:
    我是这样搞懂一个神奇的BUG
    如何在Promise链中共享变量?
    .NET Core中基类可以反射子类的成员
    .NET Core中NETSDK1061错误解决(转载)
    为何.NET Core控制台项目发布后是一个dll文件,而不是exe文件?
    SQL Server 子查询错误:No column name was specified for column 2 of 'a' error (转载)
    如何处理Entity Framework / Entity Framework Core中的DbUpdateConcurrencyException异常(转载)
    EF Core 中多次从数据库查询实体数据,DbContext跟踪实体的情况
    ASP.NET Core MVC中的IActionFilter.OnActionExecuted方法执行时,Controller中Action返回的对象是否已经输出到Http Response中
    ASP.NET Core MVC中Controller的Action如何直接使用Response.Body的Stream流输出数据
  • 原文地址:https://www.cnblogs.com/-wenli/p/11774747.html
Copyright © 2011-2022 走看看