zoukankan      html  css  js  c++  java
  • 一个消息调度框架构建

     

    一个消息调度框架构建

     
    • 基本框架

    1. MDU(消息分发单元):包含一个消息处理任务,包含自身的消息队列,是一个消息调度的基本单位。
    2. PID (功能子模块) :框架中用PID作为模块的划分,每个模块具有自己的PID编号,根据功能和调度需求可以安排多个PID到一个MDU中,PID是消息通信的一个基本单位,每个PID提供一个消息处理入口。
    3. MQ (消息队列) :使用消息队列作为任务通信的数据结构。
    • 消息处理流程

    1. 构建一个MDU模块,注册入框架中,初始MDU没有注册PID,未构建消息处理任务。
    2. 构建PID,注册入对应的MDU中,如果是MDU中第一个PID,构建消息处理任务。消息处理任务从该MDU对应的消息队列中取消息处理。
    3. 消息处理任务获取消息后根据消息中携带的接收PID的信息分发到对应的PID模块处理。
    • 完整的消息交互流程

    1. 任务A申请消息,消息内容必须包括发送模块PID编号、接收模块PID编号、消息内容。
    2. 通过消息框架提供的消息发送接口直接发送消息,消息框架根据接收PID信息,将消息填入对应MDU的消息队列中。
    3. MDU的消息处理任务B从消息队列中获取消息处理。
    4. MDU消息队列会被多个任务并发写入消息,被消息处理任务读取消息处理,需要对消息队列进行互斥和同步。详见http://www.cnblogs.com/chencheng/p/2893421.html
    • MUD、PID规划
    1. MDU作为一个调度基本单元,如果一个MDU中只有一个PID会导致系统中任务多,任务切换的开销大。
    2. 如果MDU中包含太多PID,由于所有PID在一个消息队列中串行运行,会影响PID的响应,影响系统性能。
    3. 功能紧耦合的PID放入一个MDU中。
    4. 耗时PID和实时要求高的PID不放入一个MDU中。
    • 实现

    MDU:

    复制代码
     View Code

    import myQueue
    from myThread import myThread

    class mdu:
    def __init__(self, mduID):
    self.mduId = mduID
    self.msgQue = myQueue.myQueue(10)
    self.map = {}

    def getMduID(self):
    return self.mduId

    def registPid(self, pidID, pid):
    self.map[pidID] = pid
    if 1==len(self.map):
    self.run()

    def msgEnQueue(self,msg):
    self.msgQue.enQueue(msg)

    def msgProcess(self):
    while True:
    msg = self.msgQue.deQueue()
    recvPid = msg.getRecvPid()
    self.map[recvPid].msgProcess(msg);

    def run(self):
    t = myThread(self.msgProcess)
    t.start()

    复制代码

    PID:

    复制代码
     View Code

    import message
    import support
    import mdu
    import pdb

    class pid:
    def __init__(self, pidID):
    self.pidID = pidID
    self.registMe()

    def registMe(self):
    support.registPid(self)

    def getPidID(self):
    return self.pidID

    复制代码

    SUPPORT:

    复制代码
     View Code

    import mdu
    import pdb

    mduMap = {}

    def registMdu(mdu):
    mduMap[mdu.getMduID()] = mdu

    def getMdu(revPid):
    return mduMap[revPid&0xFFFF0000>>16]

    def registPid(pid):
    mdu = getMdu(pid.getPidID())
    #pdb.set_trace()
    mdu.registPid(pid.getPidID(), pid)

    def sendMsg(msg):
    mdu = getMdu(msg.getRecvPid())
    mdu.msgEnQueue(msg)

    复制代码

    MQ:

    复制代码
     View Code

    from threading import Lock
    from threading import Condition
    import threading

    class myQueue:
    def __init__(self, size):
    self.size = size
    self.list = list()
    self.lock = Lock()
    self.notFullCond = Condition(self.lock)
    self.notEmptyCond = Condition(self.lock)

    def isFull(self):
    if self.size == len(self.list):
    return True
    return False

    def isEmpty(self):
    if 0 == len(self.list):
    return True
    return False

    def enQueue(self, elem):
    self.lock.acquire()
    while self.isFull():
    print('queue is full, waiting...')
    self.notFullCond.wait()
    print(threading.current_thread().getName() + ' product ' + str(elem))
    self.list.append(elem)
    self.notEmptyCond.notify()
    self.lock.release()

    def deQueue(self):
    self.lock.acquire()
    while self.isEmpty():
    print('queue is empty, waiting...')
    self.notEmptyCond.wait()

    elem = self.list[0]
    del(self.list[0])
    print(threading.current_thread().getName() + ' consume ' + str(elem))

    self.notFullCond.notify_all()
    self.lock.release()

    return elem

    复制代码

     转载请注明原始出处:http://www.cnblogs.com/chencheng/p/3236158.html

     
     
    分类: 架构设计
  • 相关阅读:
    Longest Palindromic Substring
    Median of Two Sorted Arrays
    Longest Substring Without Repeating Characters
    Add Two Numbers
    Two Sum
    如果要面试
    nodejs zip 安装配置
    如何从官网下载 Google Chrome 离线安装包
    eval和new Function的区别
    WebStorm开发React项目,修代码之后运行的项目不更新
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/3267532.html
Copyright © 2011-2022 走看看