zoukankan      html  css  js  c++  java
  • Ryu源码之拓扑发现原理分析

    参考:Ryu拓扑发现原理分析(ryu/topology/switches.py),通过对该文件的分析,可以更好的了解数据平面中设备信息

    一:拓扑成员类分析

    (一)Port类

    class Port(object):
        # This is data class passed by EventPortXXX
        def __init__(self, dpid, ofproto, ofpport):
            super(Port, self).__init__()
    
            self.dpid = dpid
            self._ofproto = ofproto
            self._config = ofpport.config
            self._state = ofpport.state
    
            self.port_no = ofpport.port_no
            self.hw_addr = ofpport.hw_addr
            self.name = ofpport.name
    
        def is_reserved(self):
            return self.port_no > self._ofproto.OFPP_MAX
    
        def is_down(self):
            return (self._state & self._ofproto.OFPPS_LINK_DOWN) > 0 
                or (self._config & self._ofproto.OFPPC_PORT_DOWN) > 0
    
        def is_live(self):
            # NOTE: OF1.2 has OFPPS_LIVE state
            #       return (self._state & self._ofproto.OFPPS_LIVE) > 0
            return not self.is_down()
    
        def to_dict(self):
            return {'dpid': dpid_to_str(self.dpid),
                    'port_no': port_no_to_str(self.port_no),
                    'hw_addr': self.hw_addr,
                    'name': self.name.decode('utf-8')}
    
        # for Switch.del_port()
        def __eq__(self, other):
            return self.dpid == other.dpid and self.port_no == other.port_no
    
        def __ne__(self, other):
            return not self.__eq__(other)
    
        def __hash__(self):
            return hash((self.dpid, self.port_no))
    
        def __str__(self):
            LIVE_MSG = {False: 'DOWN', True: 'LIVE'}
            return 'Port<dpid=%s, port_no=%s, %s>' % 
                (self.dpid, self.port_no, LIVE_MSG[self.is_live()])
    class Port(object):

    存储端口相关信息,数据成员有:

    self.dpid = dpid
    self._ofproto = ofproto
    self._config = ofpport.config
    self._state = ofpport.state
    self.port_no = ofpport.port_no
    self.hw_addr = ofpport.hw_addr
    self.name = ofpport.name

    其中要特别注意的是dpid和port_no,即交换机ID和端口号,这两个信息在下发流表项时很重要。

    (二)Switch类

    class Switch(object):
        # This is data class passed by EventSwitchXXX
        def __init__(self, dp):
            super(Switch, self).__init__()
    
            self.dp = dp
            self.ports = []
    
        def add_port(self, ofpport):
            port = Port(self.dp.id, self.dp.ofproto, ofpport)
            if not port.is_reserved():
                self.ports.append(port)
    
        def del_port(self, ofpport):
            self.ports.remove(Port(ofpport))
    
        def to_dict(self):
            d = {'dpid': dpid_to_str(self.dp.id),
                 'ports': [port.to_dict() for port in self.ports]}
            return d
    
        def __str__(self):
            msg = 'Switch<dpid=%s, ' % self.dp.id
            for port in self.ports:
                msg += str(port) + ' '
    
            msg += '>'
            return msg
    class Switch(object):

    存储交换机相关信息,数据成员有:

    self.dp = dp
    self.ports = []

    其中dp是Datapath类的实例,该类定义在在ryu/controller/controller.py,主要属性有:

    self.socket = socket
    self.address = address
    self.is_active = True
    self.id = None  # datapath_id is unknown yet
    self.ports = None

    ports是一个由Port类实例组成的列表,存储该交换机的端口。

    (三)Link类:

    class Link(object):
        # This is data class passed by EventLinkXXX
        def __init__(self, src, dst):
            super(Link, self).__init__()
            self.src = src
            self.dst = dst
    
        def to_dict(self):
            d = {'src': self.src.to_dict(),
                 'dst': self.dst.to_dict()}
            return d
    
        # this type is used for key value of LinkState
        def __eq__(self, other):
            return self.src == other.src and self.dst == other.dst
    
        def __ne__(self, other):
            return not self.__eq__(other)
    
        def __hash__(self):
            return hash((self.src, self.dst))
    
        def __str__(self):
            return 'Link: %s to %s' % (self.src, self.dst)
    class Link(object):

    保存的是源端口和目的端口(都是Port类实例),数据成员有:

    self.src = src
    self.dst = dst

    (四)PortState类

    class PortState(dict):
        # dict: int port_no -> OFPPort port
        # OFPPort is defined in ryu.ofproto.ofproto_v1_X_parser
        def __init__(self):
            super(PortState, self).__init__()
    
        def add(self, port_no, port):
            self[port_no] = port
    
        def remove(self, port_no):
            del self[port_no]
    
        def modify(self, port_no, port):
            self[port_no] = port
    class PortState(dict):

    该类继承自dict,保存了从port_no(int型)到port(OFPPort类实例)的映射。

    该类主要用作self.port_state字典的值(键是dpid),用于存储dpid对应的交换机的所有端口情况。
    OFPPort类定义在ryu/ofproto目录下对应的ofproto_v1_X_parser.py中(X代表版本号),继承自一个namedtuple,保存有port_no等信息。

    (五)PortData类

    class PortData(object):
        def __init__(self, is_down, lldp_data):
            super(PortData, self).__init__()
            self.is_down = is_down
            self.lldp_data = lldp_data
            self.timestamp = None
            self.sent = 0
    
        def lldp_sent(self):
            self.timestamp = time.time()
            self.sent += 1
    
        def lldp_received(self):
            self.sent = 0
    
        def lldp_dropped(self):
            return self.sent
    
        def clear_timestamp(self):
            self.timestamp = None
    
        def set_down(self, is_down):
            self.is_down = is_down
    
        def __str__(self):
            return 'PortData<live=%s, timestamp=%s, sent=%d>' 
                % (not self.is_down, self.timestamp, self.sent)
    class PortData(object):

    保存每个端口与对应的LLDP报文数据,数据成员有:

    self.is_down = is_down
    self.lldp_data = lldp_data(这是LLDP报文的数据)
    self.timestamp = None
    self.sent = 0

    每调用一次lldp_sent函数,便会把self.timestamp置为当前的时间(time.time()),并将self.sent加1;每调用一次lldp_received函数,便会把self.sent置为0。

    (六)PortDataState类

    class PortDataState(dict):
        # dict: Port class -> PortData class
        # slimed down version of OrderedDict as python 2.6 doesn't support it.
        _PREV = 0
        _NEXT = 1
        _KEY = 2
    
        def __init__(self):
            super(PortDataState, self).__init__()
            self._root = root = []  # sentinel node
            root[:] = [root, root, None]  # [_PREV, _NEXT, _KEY] doubly linked list
            self._map = {}
    
        def _remove_key(self, key):
            link_prev, link_next, key = self._map.pop(key)
            link_prev[self._NEXT] = link_next
            link_next[self._PREV] = link_prev
    
        def _append_key(self, key):
            root = self._root
            last = root[self._PREV]
            last[self._NEXT] = root[self._PREV] = self._map[key] = [last, root,
                                                                    key]
    
        def _prepend_key(self, key):
            root = self._root
            first = root[self._NEXT]
            first[self._PREV] = root[self._NEXT] = self._map[key] = [root, first,
                                                                     key]
    
        def _move_last_key(self, key):
            self._remove_key(key)
            self._append_key(key)
    
        def _move_front_key(self, key):
            self._remove_key(key)
            self._prepend_key(key)
    
        def add_port(self, port, lldp_data):
            if port not in self:
                self._prepend_key(port)
                self[port] = PortData(port.is_down(), lldp_data) #为端口添加LLDP报文
            else:
                self[port].is_down = port.is_down()
    
        def lldp_sent(self, port):
            port_data = self[port] #获取了PortData类实例
            port_data.lldp_sent() #设置了timestamp和sent标识+1
            self._move_last_key(port) #循环列表,前面移动到后面
            return port_data #返回PortData类实例
    
        def lldp_received(self, port):
            self[port].lldp_received()
    
        def move_front(self, port):
            port_data = self.get(port, None)
            if port_data is not None:
                port_data.clear_timestamp()
                self._move_front_key(port)
    
        def set_down(self, port):
            is_down = port.is_down()
            port_data = self[port]
            port_data.set_down(is_down)
            port_data.clear_timestamp()
            if not is_down:
                self._move_front_key(port)
            return is_down
    
        def get_port(self, port):
            return self[port]
    
        def del_port(self, port):
            del self[port]
            self._remove_key(port)
    
        def __iter__(self):
            root = self._root
            curr = root[self._NEXT]
            while curr is not root:
                yield curr[self._KEY]
                curr = curr[self._NEXT]
    
        def clear(self):
            for node in self._map.values():
                del node[:]
            root = self._root
            root[:] = [root, root, None]
            self._map.clear()
            dict.clear(self)
    
        def items(self):
            'od.items() -> list of (key, value) pairs in od'
            return [(key, self[key]) for key in self]
    
        def iteritems(self):
            'od.iteritems -> an iterator over the (key, value) pairs in od'
            for k in self:
                yield (k, self[k])
    class PortDataState(dict):

      继承自dict类,保存从Port类到PortData类的映射。该类维护了一个类似双向循环链表的数据结构,并重写了__iter__(),使得遍历该类的实例(self.ports)时,会按照该双向循环链表从哨兵节点(self._root)后一个节点开始遍历。

      包含一个add_port函数,传入port和lldp_data,port作键,构建的PortData类实例作为值。
      包含一个lldp_sent(self,port)函数,根据传入的port(Port类实例)获得对应的PortData类实例port_data,然后调用port_data.lldp_sent()(该函数会设置时间戳),再调用self._move_last_key(port),把该port移到类似双向循环链表的数据结构中哨兵节点的前面(相当于下次遍历的末尾);最后返回port_data。

    (七)LinkState类

    class LinkState(dict):
        # dict: Link class -> timestamp
        def __init__(self):
            super(LinkState, self).__init__()
            self._map = {}
    
        def get_peer(self, src):
            return self._map.get(src, None)
    
        def update_link(self, src, dst):
            link = Link(src, dst)
    
            self[link] = time.time()
            self._map[src] = dst
    
            # return if the reverse link is also up or not
            rev_link = Link(dst, src)
            return rev_link in self
    
        def link_down(self, link):
            del self[link]
            del self._map[link.src]
    
        def rev_link_set_timestamp(self, rev_link, timestamp):
            # rev_link may or may not in LinkSet
            if rev_link in self:
                self[rev_link] = timestamp
    
        def port_deleted(self, src):
            dst = self.get_peer(src)
            if dst is None:
                raise KeyError()
    
            link = Link(src, dst)
            rev_link = Link(dst, src)
            del self[link]
            del self._map[src]
            # reverse link might not exist
            self.pop(rev_link, None)
            rev_link_dst = self._map.pop(dst, None)
    
            return dst, rev_link_dst
    class LinkState(dict):

    继承自dict,保存从Link类到时间戳的映射。数据成员self._map字典用于存储Link两端互相映射的关系。

    (八)LLDPPacket类

    class LLDPPacket(object):
        # make a LLDP packet for link discovery.
    
        CHASSIS_ID_PREFIX = 'dpid:'
        CHASSIS_ID_PREFIX_LEN = len(CHASSIS_ID_PREFIX)
        CHASSIS_ID_FMT = CHASSIS_ID_PREFIX + '%s'
    
        PORT_ID_STR = '!I'      # uint32_t
        PORT_ID_SIZE = 4
    
        class LLDPUnknownFormat(RyuException):
            message = '%(msg)s'
    
        @staticmethod
        def lldp_packet(dpid, port_no, dl_addr, ttl):
            pkt = packet.Packet() #生成数据包
    
            dst = lldp.LLDP_MAC_NEAREST_BRIDGE
            src = dl_addr
            ethertype = ETH_TYPE_LLDP
            eth_pkt = ethernet.ethernet(dst, src, ethertype)
            pkt.add_protocol(eth_pkt) #构造数据
    
            tlv_chassis_id = lldp.ChassisID(
                subtype=lldp.ChassisID.SUB_LOCALLY_ASSIGNED,
                chassis_id=(LLDPPacket.CHASSIS_ID_FMT %
                            dpid_to_str(dpid)).encode('ascii'))
    
            tlv_port_id = lldp.PortID(subtype=lldp.PortID.SUB_PORT_COMPONENT,
                                      port_id=struct.pack(
                                          LLDPPacket.PORT_ID_STR,
                                          port_no))
    
            tlv_ttl = lldp.TTL(ttl=ttl)
            tlv_end = lldp.End()
    
            tlvs = (tlv_chassis_id, tlv_port_id, tlv_ttl, tlv_end)
            lldp_pkt = lldp.lldp(tlvs)
            pkt.add_protocol(lldp_pkt)
    
            pkt.serialize()
            return pkt.data
    
        @staticmethod
        def lldp_parse(data):
            pkt = packet.Packet(data)
            i = iter(pkt)
            eth_pkt = six.next(i)
            assert type(eth_pkt) == ethernet.ethernet
    
            lldp_pkt = six.next(i)
            if type(lldp_pkt) != lldp.lldp:
                raise LLDPPacket.LLDPUnknownFormat()
    
            tlv_chassis_id = lldp_pkt.tlvs[0]
            if tlv_chassis_id.subtype != lldp.ChassisID.SUB_LOCALLY_ASSIGNED:
                raise LLDPPacket.LLDPUnknownFormat(
                    msg='unknown chassis id subtype %d' % tlv_chassis_id.subtype)
            chassis_id = tlv_chassis_id.chassis_id.decode('utf-8')
            if not chassis_id.startswith(LLDPPacket.CHASSIS_ID_PREFIX):
                raise LLDPPacket.LLDPUnknownFormat(
                    msg='unknown chassis id format %s' % chassis_id)
            src_dpid = str_to_dpid(chassis_id[LLDPPacket.CHASSIS_ID_PREFIX_LEN:])
    
            tlv_port_id = lldp_pkt.tlvs[1]
            if tlv_port_id.subtype != lldp.PortID.SUB_PORT_COMPONENT:
                raise LLDPPacket.LLDPUnknownFormat(
                    msg='unknown port id subtype %d' % tlv_port_id.subtype)
            port_id = tlv_port_id.port_id
            if len(port_id) != LLDPPacket.PORT_ID_SIZE:
                raise LLDPPacket.LLDPUnknownFormat(
                    msg='unknown port id %d' % port_id)
            (src_port_no, ) = struct.unpack(LLDPPacket.PORT_ID_STR, port_id)
    
            return src_dpid, src_port_no
    class LLDPPacket(object):

    静态方法lldp_packet(dpid,port_no,dl_addr,ttl)用于构造LLDP报文,静态方法lldp_parse(data)用于解析LLDP包,并返回源DPID和源端口号。

    二:分析Switches类拓扑发现

    (一)类成员分析

    该类是Ryu拓扑发现的核心所在。Switches类是app_manager.RyuApp类的子类,当运行switches应用时会被实例化,其__init__函数主要包括:

    class Switches(app_manager.RyuApp):
        OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION, ofproto_v1_2.OFP_VERSION,
                        ofproto_v1_3.OFP_VERSION, ofproto_v1_4.OFP_VERSION]
        _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave,
                   event.EventSwitchReconnected,
                   event.EventPortAdd, event.EventPortDelete,
                   event.EventPortModify,
                   event.EventLinkAdd, event.EventLinkDelete,
                   event.EventHostAdd]
    
        DEFAULT_TTL = 120  # unused. ignored.
        LLDP_PACKET_LEN = len(LLDPPacket.lldp_packet(0, 0, DONTCARE_STR, 0))
    
        LLDP_SEND_GUARD = .05
        LLDP_SEND_PERIOD_PER_PORT = .9
        TIMEOUT_CHECK_PERIOD = 5.
        LINK_TIMEOUT = TIMEOUT_CHECK_PERIOD * 2
        LINK_LLDP_DROP = 5
    
        def __init__(self, *args, **kwargs):
            super(Switches, self).__init__(*args, **kwargs)
    
            self.name = 'switches'
            self.dps = {}                 #self.dps字典用于保存dpid到Datapath类实例的映射,会在_register函数中添加新成员,_unregister函数中删除成员。遍历该字典可以得到连接的所有交换机。
            self.port_state = {}          #字典中键为dpid,值为PortState类型。遍历该字典可以得到所有交换机对应的端口情况。当交换机连接时,会检查交换机的id是否在self.port_state中,不在则创建PortState类实例,把交换机的所有端口号和端口存储到该实例中;交换机断开时,会从self.port_state中删除。
            self.ports = PortDataState()  #self.ports是PortDataState类的实例,保存每个端口(Port类型)对应的LLDP报文数据(保存在PortData类实例中),遍历self.ports用于发送LLDP报文。
            self.links = LinkState()      #self.links是LinkState类的实例,保存所有连接(Link类型)到时间戳的映射。遍历self.links的键即可得到所有交换机之间的连接情况。
            self.hosts = HostState()      # mac address -> Host class list
            self.is_active = True
    
            self.link_discovery = self.CONF.observe_links
            if self.link_discovery: #如果ryu-manager启动时加了--observe-links参数,则下面的self.link_discovery将为真,从而执行if下面的语句:
                self.install_flow = self.CONF.install_lldp_flow
                self.explicit_drop = self.CONF.explicit_drop
                self.lldp_event = hub.Event()
                self.link_event = hub.Event()
                self.threads.append(hub.spawn(self.lldp_loop))
                self.threads.append(hub.spawn(self.link_loop))

    综上所述,该初始化函数__init__()主要是创建用于存储相关信息的数据结构,创建两个事件,然后调用hub.spawn创建两个新线程执行self.lldp_loop和self.link_loop两个函数。

    (二)其他成员方法

        def close(self):
            self.is_active = False
            if self.link_discovery:
                self.lldp_event.set()
                self.link_event.set()
                hub.joinall(self.threads)
    
        def _register(self, dp):
            assert dp.id is not None
    
            self.dps[dp.id] = dp
            if dp.id not in self.port_state:
                self.port_state[dp.id] = PortState()
                for port in dp.ports.values():
                    self.port_state[dp.id].add(port.port_no, port) #遍历dp.ports.values,将所有port(OFPPort类型)添加到该PortState实例中。
    
        def _unregister(self, dp):
            if dp.id in self.dps:
                if (self.dps[dp.id] == dp):
                    del self.dps[dp.id]
                    del self.port_state[dp.id]
    
        def _get_switch(self, dpid):
            if dpid in self.dps:
                switch = Switch(self.dps[dpid])
                for ofpport in self.port_state[dpid].values():
                    switch.add_port(ofpport)
                return switch
    
        def _get_port(self, dpid, port_no):
            switch = self._get_switch(dpid)
            if switch:
                for p in switch.ports:
                    if p.port_no == port_no:
                        return p
    
        def _port_added(self, port):
            lldp_data = LLDPPacket.lldp_packet( #调用静态方法,构建LLDP数据报文
                port.dpid, port.port_no, port.hw_addr, self.DEFAULT_TTL)
            self.ports.add_port(port, lldp_data) #将数据添加到端口中
            # LOG.debug('_port_added dpid=%s, port_no=%s, live=%s',
            #           port.dpid, port.port_no, port.is_live())
    
        def _link_down(self, port):
            try:
                dst, rev_link_dst = self.links.port_deleted(port)
            except KeyError:
                # LOG.debug('key error. src=%s, dst=%s',
                #           port, self.links.get_peer(port))
                return
            link = Link(port, dst)
            self.send_event_to_observers(event.EventLinkDelete(link))
            if rev_link_dst:
                rev_link = Link(dst, rev_link_dst)
                self.send_event_to_observers(event.EventLinkDelete(rev_link))
            self.ports.move_front(dst)
    
        def _is_edge_port(self, port):
            for link in self.links:
                if port == link.src or port == link.dst:
                    return False
    
            return True

    (三)交换机之间链路连接信息探测:发送(1)lldp_loop方法---由初始化函数中线程方法进行管理调用

        def lldp_loop(self):
            while self.is_active:
                self.lldp_event.clear()
    
                now = time.time() #当前时刻
                timeout = None
                ports_now = []
                ports = []
                for (key, data) in self.ports.items(): #遍历self.ports(PortDataState类的实例),获得key(Port类实例)和data(PortData类实例)
                    if data.timestamp is None: #如果data.timestamp为None(该端口还没发送过LLDP报文)
                        ports_now.append(key) #则将key(端口)加入ports_now列表;
                        continue #------------
    
                    expire = data.timestamp + self.LLDP_SEND_PERIOD_PER_PORT #否则,计算下次应该发送LLDP报文的时间expire
                    if expire <= now: #如果已经超时,则放到ports列表-----表示本来就应该发送下一次LLDP了
                        ports.append(key)
                        continue #------------
    
                    timeout = expire - now #获取timeout时间,表示还差多久到达期望的时间expire,进行LLDP数据包发送
                    break #否则就是还没到发送时间,停止遍历
                    #(发送LLDP报文时是按序发的,找到第一个未超时的端口,后面的端口肯定更没有超时,
                    #因为后面端口上次发送LLDP是在前一端口之后,前一个都没超时后面的自然也没超时)
    
                #注意:列表中都是按序的,先发ports_now中还没有发送过LLDP数据的Port实例(data.timestamp为None),
                #然后查看已经发送过,但是已经超时(及本来应该进行下一次发送的)ports中的Port实例for port in ports_now: #遍历ports_now列表,对每个端口调用self.send_lldp_packet(port),发送LLDP报文
                    self.send_lldp_packet(port) #--------发送LLDP报文
                for port in ports: #遍历ports列表,对每个端口调用self.send_lldp_packet(port),发送LLDP报文
                    self.send_lldp_packet(port) #--------发送LLDP报文
                    hub.sleep(self.LLDP_SEND_GUARD)      # don't burst 设置睡眠时间,防止回送数据太多,导致出现等待时延
    
                if timeout is not None and ports: #timeout!=None表示:还有端口Port实例没有到达下一次发送LLDP报文的时机 Ports表示:本来应该发生LLDP报文的Port实例(超过本来周期)--上面发送过
                    timeout = 0     # We have already slept ---- ??设置为0---主要为了避免None在下面wait中出错
                # LOG.debug('lldp sleep %s', timeout)
                self.lldp_event.wait(timeout=timeout) #事件等待timeout时刻

    (四)交换机之间链路连接信息探测:发送(2)send_lldp_packet方法---由lldp_loop函数调用

        def send_lldp_packet(self, port):
            try:
                #!!!--------!!!重点:在发送时我们会将当前时刻保存在self.timestamp = time.time()中,后面我们接收到LLDP数据包时,
                #!!!--------!!!重点:到对应端口中获取timestamp,然后现在时刻-timestamp获得LLDP测量的C-S-S-C时间!!!
    
                port_data = self.ports.lldp_sent(port) #调用PortDataState类的lldp_sent函数,该函数会设置时间戳,
                #移动相应端口在双向循环链表中的位置,最后返回PortData类实例port_data。
            except KeyError:
                # ports can be modified during our sleep in self.lldp_loop()
                # LOG.debug('send_lld error', exc_info=True)
                return
            if port_data.is_down: #如果该端口已经down掉,直接返回,否则执行下一步
                return
    
            dp = self.dps.get(port.dpid, None) #获取了datapath实例
            if dp is None: #根据port.dpid得到对应的Datapath类实例dp,如果不存在,则直接返回,否则执行下一步
                # datapath was already deleted
                return
    
            # LOG.debug('lldp sent dpid=%s, port_no=%d', dp.id, port.port_no)
            # TODO:XXX
            #发送LLDP报文。具体地:
            #(1)生成actions:从port.port_no端口发出消息;
            #(2)生成PacketOut消息:datapath指定为上一步得到的dp,actions为前面的,data为步骤a中返回的port_data的lldp_data
            if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
                actions = [dp.ofproto_parser.OFPActionOutput(port.port_no)]
                dp.send_packet_out(actions=actions, data=port_data.lldp_data)
            elif dp.ofproto.OFP_VERSION >= ofproto_v1_2.OFP_VERSION:
                actions = [dp.ofproto_parser.OFPActionOutput(port.port_no)] #设置actions
                out = dp.ofproto_parser.OFPPacketOut( #下发LLDP数据
                    datapath=dp, in_port=dp.ofproto.OFPP_CONTROLLER,
                    buffer_id=dp.ofproto.OFP_NO_BUFFER, actions=actions,
                    data=port_data.lldp_data) #PortData类实例的lldp_data数据
                dp.send_msg(out)
            else:
                LOG.error('cannot send lldp packet. unsupported version. %x',
                          dp.ofproto.OFP_VERSION)

    (五)交换机之间链路连接信息探测:接收(1)state_change_handler方法---由事件触发函数调用(下发流表!!!)

        @set_ev_cls(ofp_event.EventOFPStateChange,
                    [MAIN_DISPATCHER, DEAD_DISPATCHER]) #该函数用于处理EventOFPStateChange事件,当交换机连接或者断开时会触发该事件。
        def state_change_handler(self, ev):
            dp = ev.datapath
            assert dp is not None
            LOG.debug(dp)
    
            if ev.state == MAIN_DISPATCHER: #如果状态是MAIN_DISPATCHER:
                dp_multiple_conns = False
                if dp.id in self.dps: #(1)从ev.datapath获得Datapath类实例dp,如果该dp的dpid已经在self.dps里有,则报出重复链接的警告。
                    LOG.warning('Multiple connections from %s', dpid_to_str(dp.id))
                    dp_multiple_conns = True
                    (self.dps[dp.id]).close()
    
                #(2)调用_register(),将dp.id和dp添加到self.dps中;
                #如果该dp.id不在self.port_state中,则创建该dp.id对应的PortState实例,
                #并遍历dp.ports.values,将所有port(OFPPort类型)添加到该PortState实例中。
                self._register(dp)
    
                #(3)调用_get_switch(),如果dp.id在self.dps中,则创建一个Switch类实例,
                #并把self.port_state中对应的端口都添加到该实例中,最终返回该实例。
                switch = self._get_switch(dp.id)
                LOG.debug('register %s', switch)
    
                if not dp_multiple_conns: #(4)如果交换机没有重复连接,触发EventSwitchEnter事件。
                    self.send_event_to_observers(event.EventSwitchEnter(switch))
                else:
                    evt = event.EventSwitchReconnected(switch)
                    self.send_event_to_observers(evt)
    
                if not self.link_discovery: #(5)如果没设置self.link_discovery,返回;否则执行下一步。
                    return
    
                #(6)如果设置了self.install_flow,则根据OpenFlow版本生成相应流表项,
                #使得收到的LLDP报文(根据目的MAC地址匹配)上报给控制器。
                if self.install_flow:
                    ofproto = dp.ofproto
                    ofproto_parser = dp.ofproto_parser
    
                    # TODO:XXX need other versions
                    if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
                        rule = nx_match.ClsRule()
                        rule.set_dl_dst(addrconv.mac.text_to_bin(
                                        lldp.LLDP_MAC_NEAREST_BRIDGE))
                        rule.set_dl_type(ETH_TYPE_LLDP)
                        actions = [ofproto_parser.OFPActionOutput(
                            ofproto.OFPP_CONTROLLER, self.LLDP_PACKET_LEN)]
                        dp.send_flow_mod(
                            rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
                            idle_timeout=0, hard_timeout=0, actions=actions,
                            priority=0xFFFF)
                    elif ofproto.OFP_VERSION >= ofproto_v1_2.OFP_VERSION:
                        match = ofproto_parser.OFPMatch( 
                            eth_type=ETH_TYPE_LLDP,
                            eth_dst=lldp.LLDP_MAC_NEAREST_BRIDGE) #重点:设置match---交换机获取得到以太网LLDP数据包(由另一个交换机转发而来)!!!!!
                        # OFPCML_NO_BUFFER is set so that the LLDP is not
                        # buffered on switch
                        parser = ofproto_parser
                        actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,
                                                          ofproto.OFPCML_NO_BUFFER
                                                          )] #设置actions,当第二个交换机获取到LLDP数据后,不需要缓存LLDP数据,全部上交给控制器
                        inst = [parser.OFPInstructionActions(
                                ofproto.OFPIT_APPLY_ACTIONS, actions)]
                        mod = parser.OFPFlowMod(datapath=dp, match=match,
                                                idle_timeout=0, hard_timeout=0,
                                                instructions=inst,
                                                priority=0xFFFF) # 标识流表的优先级,范围为0-65535,值越大,优先级越高
                        dp.send_msg(mod) #发送给datapath
                    else:
                        LOG.error('cannot install flow. unsupported version. %x',
                                  dp.ofproto.OFP_VERSION)
    
                # Do not add ports while dp has multiple connections to controller.
                if not dp_multiple_conns:
                    for port in switch.ports:
                        if not port.is_reserved():
                            self._port_added(port)
    
                self.lldp_event.set()
    
            elif ev.state == DEAD_DISPATCHER:
                # dp.id is None when datapath dies before handshake
                if dp.id is None:
                    return
    
                switch = self._get_switch(dp.id)
                if switch:
                    if switch.dp is dp:
                        self._unregister(dp)
                        LOG.debug('unregister %s', switch)
                        evt = event.EventSwitchLeave(switch)
                        self.send_event_to_observers(evt)
    
                        if not self.link_discovery:
                            return
    
                        for port in switch.ports:
                            if not port.is_reserved():
                                self.ports.del_port(port)
                                self._link_down(port)
                        self.lldp_event.set()

    (六)交换机之间链路连接信息探测:接收(2)lldp_packet_in_handler方法---由事件触发函数调用

        @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
        def lldp_packet_in_handler(self, ev):
            if not self.link_discovery:
                return
    
            msg = ev.msg
            try:
                src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data) #解析lldp数据包,获取src_dpid和src_port_no
            except LLDPPacket.LLDPUnknownFormat:
                # This handler can receive all the packets which can be
                # not-LLDP packet. Ignore it silently
                return
    
            dst_dpid = msg.datapath.id #获取目的datapath的dst_dpid
            if msg.datapath.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION:
                dst_port_no = msg.in_port #获取目的datapath的dst_port_no
            elif msg.datapath.ofproto.OFP_VERSION >= ofproto_v1_2.OFP_VERSION:
                dst_port_no = msg.match['in_port']
            else:
                LOG.error('cannot accept LLDP. unsupported version. %x',
                          msg.datapath.ofproto.OFP_VERSION)
    
            src = self._get_port(src_dpid, src_port_no)
            if not src or src.dpid == dst_dpid:
                return
            try:
                self.ports.lldp_received(src)
            except KeyError:
                # There are races between EventOFPPacketIn and
                # EventDPPortAdd. So packet-in event can happend before
                # port add event. In that case key error can happend.
                # LOG.debug('lldp_received error', exc_info=True)
                pass
    
            dst = self._get_port(dst_dpid, dst_port_no)
            if not dst:
                return
    
            old_peer = self.links.get_peer(src)
            # LOG.debug("Packet-In")
            # LOG.debug("  src=%s", src)
            # LOG.debug("  dst=%s", dst)
            # LOG.debug("  old_peer=%s", old_peer)
            if old_peer and old_peer != dst:
                old_link = Link(src, old_peer)
                del self.links[old_link]
                self.send_event_to_observers(event.EventLinkDelete(old_link))
    
            link = Link(src, dst)
            if link not in self.links:
                self.send_event_to_observers(event.EventLinkAdd(link))
    
                # remove hosts if it's not attached to edge port
                host_to_del = []
                for host in self.hosts.values():
                    if not self._is_edge_port(host.port):
                        host_to_del.append(host.mac)
    
                for host_mac in host_to_del:
                    del self.hosts[host_mac]
    
            if not self.links.update_link(src, dst):
                # reverse link is not detected yet.
                # So schedule the check early because it's very likely it's up
                self.ports.move_front(dst)
                self.lldp_event.set()
            if self.explicit_drop:
                self._drop_packet(msg)

    重点:LLDP报文发送时,会将时间戳timestamp存放在端口实例中;如果我们想要获取得到LLDP报文在网络中的传输时间,则在控制器再次获取得到LLDP报文时,会获取得到当前时刻now_time,然后获取对应端口在发送LLDP时保存的原始时间戳timestamp信息,通过now_time-timestamp获取得到LLDP在网络中的传输时间!!!

    只分析到这一步!!!,我们可以获取得到LLDP报文传输时间!!!,想要了解其他的,请参考:Ryu拓扑发现原理分析

  • 相关阅读:
    Java内存模型
    Thread.sleep(0)的作用
    Java中用到的线程调度算法是什么
    怎么检测一个线程是否持有对象监视器
    为什么要使用线程池
    ThreadLocal
    生产者消费者模型的作用
    线程间通信
    线程安全
    c#常日期转换(转)
  • 原文地址:https://www.cnblogs.com/ssyfj/p/14193150.html
Copyright © 2011-2022 走看看