zoukankan      html  css  js  c++  java
  • l2_multi.py

    # Copyright 2012-2013 James McCauley
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at:
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """
    A shortest-path forwarding application.
    
    This is a standalone L2 switch that learns ethernet addresses
    across the entire network and picks short paths between them.
    
    You shouldn't really write an application this way -- you should
    keep more state in the controller (that is, your flow tables),
    and/or you should make your topology more static.  However, this
    does (mostly) work. :)
    
    Depends on openflow.discovery
    Works with openflow.spanning_tree
    """
    
    from pox.core import core
    import pox.openflow.libopenflow_01 as of
    from pox.lib.revent import *
    from pox.lib.recoco import Timer
    from collections import defaultdict
    from pox.openflow.discovery import Discovery
    from pox.lib.util import dpid_to_str
    import time
    
    log = core.getLogger()
    
    # Adjacency map.  [sw1][sw2] -> port from sw1 to sw2
    adjacency = defaultdict(lambda:defaultdict(lambda:None))
    
    # Switches we know of.  [dpid] -> Switch
    switches = {}
    
    # ethaddr -> (switch, port)
    mac_map = {}
    
    # [sw1][sw2] -> (distance, intermediate)
    path_map = defaultdict(lambda:defaultdict(lambda:(None,None)))
    
    # Waiting path.  (dpid,xid)->WaitingPath
    waiting_paths = {}
    
    # Time to not flood in seconds
    FLOOD_HOLDDOWN = 5
    
    # Flow timeouts
    FLOW_IDLE_TIMEOUT = 10
    FLOW_HARD_TIMEOUT = 30
    
    # How long is allowable to set up a path?
    PATH_SETUP_TIME = 4
    
    
    def _calc_paths ():
      """
      Essentially Floyd-Warshall algorithm
      """
    
      def dump ():
        for i in sws:
          for j in sws:
            a = path_map[i][j][0]
            #a = adjacency[i][j]
            if a is None: a = "*"
            print a,
          print
    
      sws = switches.values()
      path_map.clear()
      for k in sws:
        for j,port in adjacency[k].iteritems():
          if port is None: continue
          path_map[k][j] = (1,None)
        path_map[k][k] = (0,None) # distance, intermediate
    
      #dump()
    
      for k in sws:
        for i in sws:
          for j in sws:
            if path_map[i][k][0] is not None:
              if path_map[k][j][0] is not None:
                # i -> k -> j exists
                ikj_dist = path_map[i][k][0]+path_map[k][j][0]
                if path_map[i][j][0] is None or ikj_dist < path_map[i][j][0]:
                  # i -> k -> j is better than existing
                  path_map[i][j] = (ikj_dist, k)
    
      #print "--------------------"
      #dump()
    
    
    def _get_raw_path (src, dst):
      """
      Get a raw path (just a list of nodes to traverse)
      """
      if len(path_map) == 0: _calc_paths()
      if src is dst:
        # We're here!
        return []
      if path_map[src][dst][0] is None:
        return None
      intermediate = path_map[src][dst][1]
      if intermediate is None:
        # Directly connected
        return []
      return _get_raw_path(src, intermediate) + [intermediate] + 
             _get_raw_path(intermediate, dst)
    
    
    def _check_path (p):
      """
      Make sure that a path is actually a string of nodes with connected ports
    
      returns True if path is valid
      """
      for a,b in zip(p[:-1],p[1:]):
        if adjacency[a[0]][b[0]] != a[2]:
          return False
        if adjacency[b[0]][a[0]] != b[1]:
          return False
      return True
    
    
    def _get_path (src, dst, first_port, final_port):
      """
      Gets a cooked path -- a list of (node,in_port,out_port)
      """
      # Start with a raw path...
      if src == dst:
        path = [src]
      else:
        path = _get_raw_path(src, dst)
        if path is None: return None
        path = [src] + path + [dst]
    
      # Now add the ports
      r = []
      in_port = first_port
      for s1,s2 in zip(path[:-1],path[1:]):
        out_port = adjacency[s1][s2]
        r.append((s1,in_port,out_port))
        in_port = adjacency[s2][s1]
      r.append((dst,in_port,final_port))
    
      assert _check_path(r), "Illegal path!"
    
      return r
    
    
    class WaitingPath (object):
      """
      A path which is waiting for its path to be established
      """
      def __init__ (self, path, packet):
        """
        xids is a sequence of (dpid,xid)
        first_switch is the DPID where the packet came from
        packet is something that can be sent in a packet_out
        """
        self.expires_at = time.time() + PATH_SETUP_TIME
        self.path = path
        self.first_switch = path[0][0].dpid
        self.xids = set()
        self.packet = packet
    
        if len(waiting_paths) > 1000:
          WaitingPath.expire_waiting_paths()
    
      def add_xid (self, dpid, xid):
        self.xids.add((dpid,xid))
        waiting_paths[(dpid,xid)] = self
    
      @property
      def is_expired (self):
        return time.time() >= self.expires_at
    
      def notify (self, event):
        """
        Called when a barrier has been received
        """
        self.xids.discard((event.dpid,event.xid))
        if len(self.xids) == 0:
          # Done!
          if self.packet:
            log.debug("Sending delayed packet out %s"
                      % (dpid_to_str(self.first_switch),))
            msg = of.ofp_packet_out(data=self.packet,
                action=of.ofp_action_output(port=of.OFPP_TABLE))
            core.openflow.sendToDPID(self.first_switch, msg)
    
          core.l2_multi.raiseEvent(PathInstalled(self.path))
    
    
      @staticmethod
      def expire_waiting_paths ():
        packets = set(waiting_paths.values())
        killed = 0
        for p in packets:
          if p.is_expired:
            killed += 1
            for entry in p.xids:
              waiting_paths.pop(entry, None)
        if killed:
          log.error("%i paths failed to install" % (killed,))
    
    
    class PathInstalled (Event):
      """
      Fired when a path is installed
      """
      def __init__ (self, path):
        Event.__init__(self)
        self.path = path
    
    
    class Switch (EventMixin):
      def __init__ (self):
        self.connection = None
        self.ports = None
        self.dpid = None
        self._listeners = None
        self._connected_at = None
    
      def __repr__ (self):
        return dpid_to_str(self.dpid)
    
      def _install (self, switch, in_port, out_port, match, buf = None):
        msg = of.ofp_flow_mod()
        msg.match = match
        msg.match.in_port = in_port
        msg.idle_timeout = FLOW_IDLE_TIMEOUT
        msg.hard_timeout = FLOW_HARD_TIMEOUT
        msg.actions.append(of.ofp_action_output(port = out_port))
        msg.buffer_id = buf
        switch.connection.send(msg)
    
      def _install_path (self, p, match, packet_in=None):
        wp = WaitingPath(p, packet_in)
        for sw,in_port,out_port in p:
          self._install(sw, in_port, out_port, match)
          msg = of.ofp_barrier_request()
          sw.connection.send(msg)
          wp.add_xid(sw.dpid,msg.xid)
    
      def install_path (self, dst_sw, last_port, match, event):
        """
        Attempts to install a path between this switch and some destination
        """
        p = _get_path(self, dst_sw, event.port, last_port)
        if p is None:
          log.warning("Can't get from %s to %s", match.dl_src, match.dl_dst)
    
          import pox.lib.packet as pkt
    
          if (match.dl_type == pkt.ethernet.IP_TYPE and
              event.parsed.find('ipv4')):
            # It's IP -- let's send a destination unreachable
            log.debug("Dest unreachable (%s -> %s)",
                      match.dl_src, match.dl_dst)
    
            from pox.lib.addresses import EthAddr
            e = pkt.ethernet()
            e.src = EthAddr(dpid_to_str(self.dpid)) #FIXME: Hmm...
            e.dst = match.dl_src
            e.type = e.IP_TYPE
            ipp = pkt.ipv4()
            ipp.protocol = ipp.ICMP_PROTOCOL
            ipp.srcip = match.nw_dst #FIXME: Ridiculous
            ipp.dstip = match.nw_src
            icmp = pkt.icmp()
            icmp.type = pkt.ICMP.TYPE_DEST_UNREACH
            icmp.code = pkt.ICMP.CODE_UNREACH_HOST
            orig_ip = event.parsed.find('ipv4')
    
            d = orig_ip.pack()
            d = d[:orig_ip.hl * 4 + 8]
            import struct
            d = struct.pack("!HH", 0,0) + d #FIXME: MTU
            icmp.payload = d
            ipp.payload = icmp
            e.payload = ipp
            msg = of.ofp_packet_out()
            msg.actions.append(of.ofp_action_output(port = event.port))
            msg.data = e.pack()
            self.connection.send(msg)
    
          return
    
        log.debug("Installing path for %s -> %s %04x (%i hops)",
            match.dl_src, match.dl_dst, match.dl_type, len(p))
    
        # We have a path -- install it
        self._install_path(p, match, event.ofp)
    
        # Now reverse it and install it backwards
        # (we'll just assume that will work)
        p = [(sw,out_port,in_port) for sw,in_port,out_port in p]
        self._install_path(p, match.flip())
    
    
      def _handle_PacketIn (self, event):
        def flood ():
          """ Floods the packet """
          if self.is_holding_down:
            log.warning("Not flooding -- holddown active")
          msg = of.ofp_packet_out()
          # OFPP_FLOOD is optional; some switches may need OFPP_ALL
          msg.actions.append(of.ofp_action_output(port = of.OFPP_FLOOD))
          msg.buffer_id = event.ofp.buffer_id
          msg.in_port = event.port
          self.connection.send(msg)
    
        def drop ():
          # Kill the buffer
          if event.ofp.buffer_id is not None:
            msg = of.ofp_packet_out()
            msg.buffer_id = event.ofp.buffer_id
            event.ofp.buffer_id = None # Mark is dead
            msg.in_port = event.port
            self.connection.send(msg)
    
        packet = event.parsed
    
        loc = (self, event.port) # Place we saw this ethaddr
        oldloc = mac_map.get(packet.src) # Place we last saw this ethaddr
    
        if packet.effective_ethertype == packet.LLDP_TYPE:
          drop()
          return
    
        if oldloc is None:
          if packet.src.is_multicast == False:
            mac_map[packet.src] = loc # Learn position for ethaddr
            log.debug("Learned %s at %s.%i", packet.src, loc[0], loc[1])
        elif oldloc != loc:
          # ethaddr seen at different place!
          if core.openflow_discovery.is_edge_port(loc[0].dpid, loc[1]):
            # New place is another "plain" port (probably)
            log.debug("%s moved from %s.%i to %s.%i?", packet.src,
                      dpid_to_str(oldloc[0].dpid), oldloc[1],
                      dpid_to_str(   loc[0].dpid),    loc[1])
            if packet.src.is_multicast == False:
              mac_map[packet.src] = loc # Learn position for ethaddr
              log.debug("Learned %s at %s.%i", packet.src, loc[0], loc[1])
          elif packet.dst.is_multicast == False:
            # New place is a switch-to-switch port!
            # Hopefully, this is a packet we're flooding because we didn't
            # know the destination, and not because it's somehow not on a
            # path that we expect it to be on.
            # If spanning_tree is running, we might check that this port is
            # on the spanning tree (it should be).
            if packet.dst in mac_map:
              # Unfortunately, we know the destination.  It's possible that
              # we learned it while it was in flight, but it's also possible
              # that something has gone wrong.
              log.warning("Packet from %s to known destination %s arrived "
                          "at %s.%i without flow", packet.src, packet.dst,
                          dpid_to_str(self.dpid), event.port)
    
    
        if packet.dst.is_multicast:
          log.debug("Flood multicast from %s", packet.src)
          flood()
        else:
          if packet.dst not in mac_map:
            log.debug("%s unknown -- flooding" % (packet.dst,))
            flood()
          else:
            dest = mac_map[packet.dst]
            match = of.ofp_match.from_packet(packet)
            self.install_path(dest[0], dest[1], match, event)
    
      def disconnect (self):
        if self.connection is not None:
          log.debug("Disconnect %s" % (self.connection,))
          self.connection.removeListeners(self._listeners)
          self.connection = None
          self._listeners = None
    
      def connect (self, connection):
        if self.dpid is None:
          self.dpid = connection.dpid
        assert self.dpid == connection.dpid
        if self.ports is None:
          self.ports = connection.features.ports
        self.disconnect()
        log.debug("Connect %s" % (connection,))
        self.connection = connection
        self._listeners = self.listenTo(connection)
        self._connected_at = time.time()
    
      @property
      def is_holding_down (self):
        if self._connected_at is None: return True
        if time.time() - self._connected_at > FLOOD_HOLDDOWN:
          return False
        return True
    
      def _handle_ConnectionDown (self, event):
        self.disconnect()
    
    
    class l2_multi (EventMixin):
    
      _eventMixin_events = set([
        PathInstalled,
      ])
    
      def __init__ (self):
        # Listen to dependencies
        def startup ():
          core.openflow.addListeners(self, priority=0)
          core.openflow_discovery.addListeners(self)
        core.call_when_ready(startup, ('openflow','openflow_discovery'))
    
      def _handle_LinkEvent (self, event):
        def flip (link):
          return Discovery.Link(link[2],link[3], link[0],link[1])
    
        l = event.link
        sw1 = switches[l.dpid1]
        sw2 = switches[l.dpid2]
    
        # Invalidate all flows and path info.
        # For link adds, this makes sure that if a new link leads to an
        # improved path, we use it.
        # For link removals, this makes sure that we don't use a
        # path that may have been broken.
        #NOTE: This could be radically improved! (e.g., not *ALL* paths break)
        clear = of.ofp_flow_mod(command=of.OFPFC_DELETE)
        for sw in switches.itervalues():
          if sw.connection is None: continue
          sw.connection.send(clear)
        path_map.clear()
    
        if event.removed:
          # This link no longer okay
          if sw2 in adjacency[sw1]: del adjacency[sw1][sw2]
          if sw1 in adjacency[sw2]: del adjacency[sw2][sw1]
    
          # But maybe there's another way to connect these...
          for ll in core.openflow_discovery.adjacency:
            if ll.dpid1 == l.dpid1 and ll.dpid2 == l.dpid2:
              if flip(ll) in core.openflow_discovery.adjacency:
                # Yup, link goes both ways
                adjacency[sw1][sw2] = ll.port1
                adjacency[sw2][sw1] = ll.port2
                # Fixed -- new link chosen to connect these
                break
        else:
          # If we already consider these nodes connected, we can
          # ignore this link up.
          # Otherwise, we might be interested...
          if adjacency[sw1][sw2] is None:
            # These previously weren't connected.  If the link
            # exists in both directions, we consider them connected now.
            if flip(l) in core.openflow_discovery.adjacency:
              # Yup, link goes both ways -- connected!
              adjacency[sw1][sw2] = l.port1
              adjacency[sw2][sw1] = l.port2
    
          # If we have learned a MAC on this port which we now know to
          # be connected to a switch, unlearn it.
          bad_macs = set()
          for mac,(sw,port) in mac_map.iteritems():
            if sw is sw1 and port == l.port1: bad_macs.add(mac)
            if sw is sw2 and port == l.port2: bad_macs.add(mac)
          for mac in bad_macs:
            log.debug("Unlearned %s", mac)
            del mac_map[mac]
    
      def _handle_ConnectionUp (self, event):
        sw = switches.get(event.dpid)
        if sw is None:
          # New switch
          sw = Switch()
          switches[event.dpid] = sw
          sw.connect(event.connection)
        else:
          sw.connect(event.connection)
    
      def _handle_BarrierIn (self, event):
        wp = waiting_paths.pop((event.dpid,event.xid), None)
        if not wp:
          #log.info("No waiting packet %s,%s", event.dpid, event.xid)
          return
        #log.debug("Notify waiting packet %s,%s", event.dpid, event.xid)
        wp.notify(event)
    
    
    def launch ():
      core.registerNew(l2_multi)
    
      timeout = min(max(PATH_SETUP_TIME, 5) * 2, 15)
      Timer(timeout, WaitingPath.expire_waiting_paths, recurring=True)
    

  • 相关阅读:
    PNG文件格式具体解释
    opencv2对读书笔记——使用均值漂移算法查找物体
    Jackson的Json转换
    Java实现 蓝桥杯VIP 算法训练 装箱问题
    Java实现 蓝桥杯VIP 算法训练 装箱问题
    Java实现 蓝桥杯VIP 算法训练 单词接龙
    Java实现 蓝桥杯VIP 算法训练 单词接龙
    Java实现 蓝桥杯VIP 算法训练 方格取数
    Java实现 蓝桥杯VIP 算法训练 方格取数
    Java实现 蓝桥杯VIP 算法训练 单词接龙
  • 原文地址:https://www.cnblogs.com/gremount/p/5768015.html
Copyright © 2011-2022 走看看