zoukankan      html  css  js  c++  java
  • SDN实验---Ryu的应用开发(三)流量监控

    一:实现流量监控

     (一)流量监控原理

    其中控制器向交换机周期下发获取统计消息,请求交换机消息------是主动下发过程
    流速公式:是(t1时刻的流量-t0时刻的流量)/(t1-t0)
    剩余带宽公式:链路总带宽-流速--------是这一个这一个,例如s2-s3(不是一条,例如:h1->s1->s2->s3->h2)的剩余带宽
    路径有效带宽是只:这一整条路径中,按照最小的剩余带宽处理

    二:代码实现

    (一)代码框架

    from ryu.app import simple_switch_13
    from ryu.controller.handler import set_ev_cls
    from ryu.controller import ofp_event
    from ryu.controller.handler import MAIN_DISPATCHER,DEAD_DISPATCHER
    
    class MyMonitor(simple_switch_13):    #simple_switch_13 is same as the last experiment which named self_learn_switch
        '''
        design a class to achvie managing the quantity of flow
        '''
    
        def __init__(self,*args,**kwargs):
            super(MyMonitor,self).__init__(*args,**kwargs)
    
        @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER,DEAD_DISPATCHER])
        def _state_change_handler(self,ev):
            '''
            design a handler to get switch state transition condition
            '''
            pass
    
        def _monitor(self):
            '''
            design a monitor on timing system to request switch infomations about port and flow
            '''
            pass
    
        def _request_stats(self,datapath):
            '''
            the function is to send requery to datapath
            '''
            pass
    
        @set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER)
        def _port_stats_reply_handler(self,ev):
            '''
            monitor to require the port state, then this function is to get infomation for port`s info
            '''
            pass
    
        @set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER)
        def _port_stats_reply_handler(self,ev):
            '''
            monitor to require the flow state, then this function is to get infomation for flow`s info
            '''
            pass

    (二)推文:协程https://www.cnblogs.com/ssyfj/p/9030165.html

    (三)全部代码实现

    from operator import attrgetter
    
    from ryu.app import simple_switch_13
    from ryu.controller.handler import set_ev_cls
    from ryu.controller import ofp_event
    from ryu.controller.handler import MAIN_DISPATCHER,DEAD_DISPATCHER
    from ryu.lib import hub
    
    class MyMonitor(simple_switch_13.SimpleSwitch13):    #simple_switch_13 is same as the last experiment which named self_learn_switch
        '''
        design a class to achvie managing the quantity of flow
        '''
    
        def __init__(self,*args,**kwargs):
            super(MyMonitor,self).__init__(*args,**kwargs)
            self.datapaths = {}
            #use gevent to start monitor
            self.monitor_thread = hub.spawn(self._monitor)
    
        @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER,DEAD_DISPATCHER])
        def _state_change_handler(self,ev):
            '''
            design a handler to get switch state transition condition
            '''
            #first get ofprocotol info
            datapath = ev.datapath
            ofproto = datapath.ofproto
            ofp_parser = datapath.ofproto_parser
    
            #judge datapath`s status to decide how to operate
            if datapath.state == MAIN_DISPATCHER:    #should save info to dictation 
                if datapath.id not in self.datapaths:
                    self.datapaths[datapath.id] = datapath
                    self.logger.debug("Regist datapath: %16x",datapath.id)
            elif datapath.state == DEAD_DISPATCHER:    #should remove info from dictation
                if datapath.id in self.datapaths:
                    del self.datapaths[datapath.id]
                    self.logger.debug("Unregist datapath: %16x",datapath.id)
    
    
        def _monitor(self):
            '''
            design a monitor on timing system to request switch infomations about port and flow
            '''
            while True:    #initiatie to request port and flow info all the time
                for dp in self.datapaths.values():
                    self._request_stats(dp)
                hub.sleep(5)    #pause to sleep to wait reply, and gave time to other gevent to request
    
        def _request_stats(self,datapath):
            '''
            the function is to send requery to datapath
            '''
            self.logger.debug("send stats reques to datapath: %16x for port and flow info",datapath.id)
    
            ofproto = datapath.ofproto
            parser = datapath.ofproto_parser
    
            req = parser.OFPFlowStatsRequest(datapath)
            datapath.send_msg(req)
    
            req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
            datapath.send_msg(req)
    
    
        @set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER)
        def _port_stats_reply_handler(self,ev):
            '''
            monitor to require the port state, then this function is to get infomation for port`s info
            print("6666666666port info:")
            print(ev.msg)
            print(dir(ev.msg))
            '''
            body = ev.msg.body
            self.logger.info('datapath             port     '
                            'rx_packets            tx_packets'
                            'rx_bytes            tx_bytes'
                            'rx_errors            tx_errors'
                            )
            self.logger.info('---------------    --------'
                            '--------    --------'
                            '--------    --------'
                            '--------    --------'
                            )
            for port_stat in sorted(body,key=attrgetter('port_no')):
                    self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
                        ev.msg.datapath.id,port_stat.port_no,port_stat.rx_packets,port_stat.tx_packets,
                        port_stat.rx_bytes,port_stat.tx_bytes,port_stat.rx_errors,port_stat.tx_errors
                            )
    
    
        @set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER)
        def _flow_stats_reply_handler(self,ev):
            '''
            monitor to require the flow state, then this function is to get infomation for flow`s info
            print("777777777flow info:")
            print(ev.msg)
            print(dir(ev.msg))
            '''
            body = ev.msg.body
    
            self.logger.info('datapath             '
                            'in_port            eth_src'
                            'out_port            eth_dst'
                            'packet_count        byte_count'
                            )
            self.logger.info('---------------    '
                            '----    -----------------'
                            '----    -----------------'
                            '---------    ---------'
                            )
            for flow_stat in sorted([flow for flow in body if flow.priority==1],
                            key=lambda flow:(flow.match['in_port'],flow.match['eth_src'])):
                    self.logger.info('%016x    %8x    %17s    %8x    %17s    %8d    %8d',
                        ev.msg.datapath.id,flow_stat.match['in_port'],flow_stat.match['eth_src'],
                        flow_stat.instructions[0].actions[0].port,flow_stat.match['eth_dst'],
                        flow_stat.packet_count,flow_stat.byte_count
                            )

    补充:注意---每个事件的属性可能不同,需要我们进行Debug,例如上面就出现了ev.msg.body(之前hub实现中没有)

    (四)代码讲解

    1.class MyMonitor(simple_switch_13.SimpleSwitch13):

    simple_switch_13.SimpleSwitch13是样例代码,其中实现了和我们上一次实验中,自学习交换机类似的功能
    (稍微多了个关于交换机是否上传全部packet还是只上传buffer_id),所以我们直接继承,可以减少写代码时间

    2.协程实现伪并发self.monitor_thread = hub.spawn(self._monitor)

        def __init__(self,*args,**kwargs):
            super(MyMonitor,self).__init__(*args,**kwargs)
            self.datapaths = {}
            #use gevent to start monitor
            self.monitor_thread = hub.spawn(self._monitor)

    3.在协程中实现周期请求交换机信息

        def _monitor(self):
            '''
            design a monitor on timing system to request switch infomations about port and flow
            '''
            while True:    #initiatie to request port and flow info all the time
                for dp in self.datapaths.values():
                    self._request_stats(dp)
                hub.sleep(5)    #pause to sleep to wait reply, and gave time to other gevent to request

    4.主动下发消息,请求交换机信息OFPFlowStatsRequest------注意:我们这里请求两个(端口和协议信息),所以我们要使用两个函数来分别处理port和flow响应

        def _request_stats(self,datapath):
            '''
            the function is to send requery to datapath
            '''
            self.logger.debug("send stats reques to datapath: %16x for port and flow info",datapath.id)
    
            ofproto = datapath.ofproto
            parser = datapath.ofproto_parser
    
            req = parser.OFPFlowStatsRequest(datapath)
            datapath.send_msg(req)
    
            req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)  #可以向上面一样省略默认参数
            datapath.send_msg(req)

    源码查看参数

    @_set_stats_type(ofproto.OFPMP_FLOW, OFPFlowStats)
    @_set_msg_type(ofproto.OFPT_MULTIPART_REQUEST)
    class OFPFlowStatsRequest(OFPFlowStatsRequestBase):
        """
        Individual flow statistics request message
    
        The controller uses this message to query individual flow statistics.
    
        ================ ======================================================
        Attribute        Description
        ================ ======================================================
        flags            Zero or ``OFPMPF_REQ_MORE``
        table_id         ID of table to read
        out_port         Require matching entries to include this as an output
                         port
        out_group        Require matching entries to include this as an output
                         group
        cookie           Require matching entries to contain this cookie value
        cookie_mask      Mask used to restrict the cookie bits that must match
        match            Instance of ``OFPMatch``
        ================ ======================================================
    
        Example::
    
            def send_flow_stats_request(self, datapath):
                ofp = datapath.ofproto
                ofp_parser = datapath.ofproto_parser
    
                cookie = cookie_mask = 0
                match = ofp_parser.OFPMatch(in_port=1)
                req = ofp_parser.OFPFlowStatsRequest(datapath, 0,
                                                     ofp.OFPTT_ALL,
                                                     ofp.OFPP_ANY, ofp.OFPG_ANY,
                                                     cookie, cookie_mask,
                                                     match)
                datapath.send_msg(req)
        """
    
        def __init__(self, datapath, flags=0, table_id=ofproto.OFPTT_ALL,
                     out_port=ofproto.OFPP_ANY,
                     out_group=ofproto.OFPG_ANY,
                     cookie=0, cookie_mask=0, match=None, type_=None):

    5.获取端口响应信息ofp_event.EventOFPPortStatsReply

        @set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER)
        def _port_stats_reply_handler(self,ev):
            '''
            monitor to require the port state, then this function is to get infomation for port`s info
            print("6666666666port info:")
            print(ev.msg)
            print(dir(ev.msg))
            '''
            body = ev.msg.body
            self.logger.info('datapath             port     '
                            'rx_packets            tx_packets'
                            'rx_bytes            tx_bytes'
                            'rx_errors            tx_errors'
                            )
            self.logger.info('---------------    --------'
                            '--------    --------'
                            '--------    --------'
                            '--------    --------'
                            )
            for port_stat in sorted(body,key=attrgetter('port_no')):
                    self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
                        ev.msg.datapath.id,port_stat.port_no,port_stat.rx_packets,port_stat.tx_packets,
                        port_stat.rx_bytes,port_stat.tx_bytes,port_stat.rx_errors,port_stat.tx_errors
                            )

    端口信息:《参考》

    6666666666port info:
    version=0x4,msg_type=0x13,msg_len=0x1d0,xid=0x8dcd9187,
    OFPPortStatsReply(
        body=[
    OFPPortStats(port_no=4294967294,rx_packets=0,tx_packets=0,rx_bytes=0,tx_bytes=0,rx_dropped=65,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=331000000), OFPPortStats(port_no=1,rx_packets=154,tx_packets=225,rx_bytes=11660,tx_bytes=19503,rx_dropped=0,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=333000000), OFPPortStats(port_no=2,rx_packets=186,tx_packets=257,rx_bytes=14516,tx_bytes=22343,rx_dropped=0,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=334000000), OFPPortStats(port_no=3,rx_packets=220,tx_packets=232,rx_bytes=18439,tx_bytes=19311,rx_dropped=0,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=333000000)
    ]
    ,flags=0,type=4)
    
    
    OFPPortStats(
    port_no=4294967294,                ----------
    rx_packets=0,                    ----------
    tx_packets=0,                    ----------
    rx_bytes=0,                    ----------
    tx_bytes=0,                    ----------
    rx_dropped=65,
    tx_dropped=0,
    rx_errors=0,                    ----------
    tx_errors=0,                    ----------
    rx_frame_err=0,
    rx_over_err=0,
    rx_crc_err=0,
    collisions=0,
    duration_sec=1912,
    duration_nsec=331000000)
    
    ['_STATS_MSG_TYPES', '_TYPE', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_base_attributes', '_class_prefixes', '_class_suffixes', '_decode_value', '_encode_value', '_get_decoder', '_get_default_decoder', '_get_default_encoder', '_get_encoder', '_get_type', '_is_class', '_opt_attributes', '_restore_args', '_serialize_body', '_serialize_header', '_serialize_pre', 'body', 'buf', 'cls_body_single_struct', 'cls_from_jsondict_key', 'cls_msg_type', 'cls_stats_body_cls', 'cls_stats_type',
    'datapath'                    ----------
    , 'flags', 'from_jsondict', 'msg_len', 'msg_type', 'obj_from_jsondict', 'parser', 'parser_stats', 'parser_stats_body', 'register_stats_type', 'serialize', 'set_buf', 'set_classes', 'set_headers', 'set_xid', 'stringify_attrs', 'to_jsondict', 'type', 'version', 'xid']

    6.获取flow协议响应信息ofp_event.EventOFPFlowStatsReply

        @set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER)
        def _flow_stats_reply_handler(self,ev):
            '''
            monitor to require the flow state, then this function is to get infomation for flow`s info
            print("777777777flow info:")
            print(ev.msg)
            print(dir(ev.msg))
            '''
            body = ev.msg.body
    
            self.logger.info('datapath             '
                            'in_port            eth_src'
                            'out_port            eth_dst'
                            'packet_count        byte_count'
                            )
            self.logger.info('---------------    '
                            '----    -----------------'
                            '----    -----------------'
                            '---------    ---------'
                            )
            for flow_stat in sorted([flow for flow in body if flow.priority==1],
                            key=lambda flow:(flow.match['in_port'],flow.match['eth_src'])):
                    self.logger.info('%016x    %8x    %17s    %8x    %17s    %8d    %8d',
                        ev.msg.datapath.id,flow_stat.match['in_port'],flow_stat.match['eth_src'],
                        flow_stat.instructions[0].actions[0].port,flow_stat.match['eth_dst'],
                        flow_stat.packet_count,flow_stat.byte_count
                            )
    
            

    协议信息《参考》

    777777777flow info:
    version=0x4,msg_type=0x13,msg_len=0x200,xid=0x9e448a1a,
    OFPFlowStatsReply(
        body=[
            OFPFlowStats(byte_count=5446,cookie=0,duration_nsec=552000000,duration_sec=1893,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=1,type=0)],len=24,type=4)],
            length=104,match=OFPMatch(oxm_fields={'in_port': 2, 'eth_src': '8a:06:6a:2c:10:fc', 'eth_dst': '26:20:2f:85:5a:9a'}),packet_count=71,priority=1,table_id=0),     OFPFlowStats(byte_count=5348,cookie=0,duration_nsec=549000000,duration_sec=1893,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=2,type=0)],len=24,type=4)],
            length=104,match=OFPMatch(oxm_fields={'in_port': 1, 'eth_src': '26:20:2f:85:5a:9a', 'eth_dst': '8a:06:6a:2c:10:fc'}),packet_count=70,priority=1,table_id=0), OFPFlowStats(byte_count=8302,cookie=0,duration_nsec=438000000,duration_sec=1887,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=1,type=0)],len=24,type=4)],
            length=104,match=OFPMatch(oxm_fields={'in_port': 2, 'eth_src': 'ca:9e:a1:af:b9:5f', 'eth_dst': '26:20:2f:85:5a:9a'}),packet_count=103,priority=1,table_id=0), OFPFlowStats(byte_count=8204,cookie=0,duration_nsec=436000000,duration_sec=1887,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=2,type=0)],len=24,type=4)]
            ,length=104,match=OFPMatch(oxm_fields={'in_port': 1, 'eth_src': '26:20:2f:85:5a:9a', 'eth_dst': 'ca:9e:a1:af:b9:5f'}),packet_count=102,priority=1,table_id=0), OFPFlowStats(byte_count=6739,cookie=0,duration_nsec=807000000,duration_sec=9,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65535,port=4294967293,type=0)],len=24,type=4)],
            length=80,match=OFPMatch(oxm_fields={}),packet_count=74,priority=0,table_id=0)
    ]
    ,flags=0,type=1)
    
    
    OFPFlowStats(
    byte_count=5446,                ----------
    cookie=0,
    duration_nsec=552000000,
    duration_sec=1893,
    flags=0,
    hard_timeout=0,
    idle_timeout=0,
    instructions=[
        OFPInstructionActions(
            actions=[
                OFPActionOutput(
                    len=16,
                    max_len=65509,
                    port=1,        ----------
                    type=0)
                ],
                len=24,
                type=4
        )
    ],
    length=104,
    match=OFPMatch(oxm_fields={
        'in_port': 2,                 ----------
        'eth_src': '8a:06:6a:2c:10:fc',     ----------
        'eth_dst': '26:20:2f:85:5a:9a'        ----------
    }),
    packet_count=71,                ----------
    priority=1,
    table_id=0
    )
    
    ['_STATS_MSG_TYPES', '_TYPE', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_base_attributes', '_class_prefixes', '_class_suffixes', '_decode_value', '_encode_value', '_get_decoder', '_get_default_decoder', '_get_default_encoder', '_get_encoder', '_get_type', '_is_class', '_opt_attributes', '_restore_args', '_serialize_body', '_serialize_header', '_serialize_pre', 'body', 'buf', 'cls_body_single_struct', 'cls_from_jsondict_key', 'cls_msg_type', 'cls_stats_body_cls', 'cls_stats_type', 
    'datapath'                    ----------
    , 'flags', 'from_jsondict', 'msg_len', 'msg_type', 'obj_from_jsondict', 'parser', 'parser_stats', 'parser_stats_body', 'register_stats_type', 'serialize', 'set_buf', 'set_classes', 'set_headers', 'set_xid', 'stringify_attrs', 'to_jsondict', 'type', 'version', 'xid']

    三:实验演示

    (一)开启Ryu

    ryu-manager my_monitor.py

     

    (二)开启Mininet

    sudo mn --topo=tree,2,2 --controller=remote --mac 

     

    (三)Ryu显示结果

     

    (四)还需要去了解返回的字段含义才可以---以后做,最近没时间了 

  • 相关阅读:
    HDU 4358 莫队算法+dfs序+离散化
    HDU 5692 线段树+dfs序
    Codeforces Round #377 (Div. 2) A B C D 水/贪心/贪心/二分
    LVS负载均衡的三种模式和八种算法总结
    hdfs 常用命令
    Linux 系统监控
    CentOS 7 时区设置
    kubernetes 留言版DEMO
    CentOS7 PostgreSQL 主从配置( 三)
    Postgres数据库在Linux中优化
  • 原文地址:https://www.cnblogs.com/ssyfj/p/11755773.html
Copyright © 2011-2022 走看看