zoukankan      html  css  js  c++  java
  • GroupBasedPolicy代码分析及总结

    Neutron-server配置

    [default]
    service_plugins=$original_service_plugins,qos,group_policy
    [group_policy]
    extension_drivers = proxy_group
    policy_drivers = implicit_policy,resource_mapping

    ml2_conf.ini
    [ml2]
    extension_drivers = xxxx,qos

    entry_points
    gbpservice.neutron.group_policy.extension_drivers =
    proxy_group = gbpservice.neutron.services.grouppolicy.drivers.extensions.proxy_group_driver:ProxyGroupDriver

    gbpservice.neutron.group_policy.policy_drivers =
    resource_mapping = gbpservice.neutron.services.grouppolicy.drivers.resource_mapping:ResourceMappingDriver
    implicit_policy = gbpservice.neutron.services.grouppolicy.drivers.implicit_policy:ImplicitPolicyDriver
    chain_mapping = gbpservice.neutron.services.grouppolicy.drivers.chain_mapping:ChainMappingDriver

    neutron.service_plugins =
    group_policy = gbpservice.neutron.services.grouppolicy.plugin:GroupPolicyPlugin

    说明:

    • 由于代码有很多重复,为控制篇幅,只针对一些关键流程作说明,不对所有的流程详细描述
    • 当前只针对neutron资源的port、subnet、network、router、security group分析。
    • Plugin资源处理函数结构:
    def action_resource(self, context, rousource_info):
        session = context.session
        with session.begin(subtransactions=True):
            result = super(GroupPolicyPlugin,
                           self).action_resource(context, resource_info)
            self.extension_manager.process_action_resource(
                session, resource_info, result)
            self._validate_shared_create(self, context, result, 'resource')
            resource_context = p_context.ResourceContext(self, context, result)
            //  在资源写入数据库前作检查,一般包括资源租户对应及资源参数的检验
            self.policy_driver_manager.action_resource_precommit(
                policy_context)
    
        try:
            // 对象写入数据库后,policy driver来管理资源:
            // ImplicitPolicyDriver:创建隐示的资源,如l2 policy, l3 policy等
            // ResourceMappingDriver:创建neutron侧对应的资源,如network, router等
            self.policy_driver_manager.action_resource_postcommit(
                policy_context)
        except Exception:
            with excutils.save_and_reraise_exception():
                LOG.exception(_LE("action_resource_postcommit "
                                  "failed, deleting resource %s"),
                              result['id'])
                self.delete_resource(context, result['id'])
    
        return self.get_resource(context, result['id'])

    代码分析

    create l2 policy

    // GroupPolicyPlugin
    def create_l2_policy(self, context, l2_policy):
        self._ensure_tenant(context, l2_policy['l2_policy'])
        session = context.session
        with session.begin(subtransactions=True):
            result = super(GroupPolicyPlugin,
                           self).create_l2_policy(context, l2_policy)
            self.extension_manager.process_create_l2_policy(
                session, l2_policy, result)
            self._validate_shared_create(self, context, result, 'l2_policy')
            policy_context = p_context.L2PolicyContext(self, context, result)
            // 对network、l3policy做租户校验
            self.policy_driver_manager.create_l2_policy_precommit(
                policy_context)
    
        try:
            self.policy_driver_manager.create_l2_policy_postcommit(
                policy_context)
        except Exception:
            with excutils.save_and_reraise_exception():
                LOG.exception(_LE("create_l2_policy_postcommit "
                                  "failed, deleting l2_policy %s"),
                              result['id'])
                self.delete_l2_policy(context, result['id'])
    
        return self.get_l2_policy(context, result['id'])
    
    // ImplicitPolicyDriver
    // 未指定创建l3-policy,创建l3 policy
    def create_l2_policy_postcommit(self, context):
        if not context.current['l3_policy_id']:
            // 创建隐示的l3 policy,并与l2 policy关联
            self._use_implicit_l3_policy(context)
    
    def _create_implicit_l3_policy(self, context):
        tenant_id = context.current['tenant_id']
        filter = {'tenant_id': [tenant_id],
                  'name': [self._default_l3p_name]}
    // 查询租户默认的l3 policy
        l3ps = self._get_l3_policies(context._plugin_context, filter)
        l3p = l3ps and l3ps[0]
        if not l3p:
            attrs = {'tenant_id': tenant_id,
                     'name': self._default_l3p_name,
                     'description': _("Implicitly created L3 policy"),
                     'ip_version': self._default_ip_version,
                     'ip_pool': self._default_ip_pool,
                     'shared': context.current.get('shared', False),
                     'subnet_prefix_length':
                     self._default_subnet_prefix_length}
            if self._proxy_group_enabled:
                attrs['proxy_ip_pool'] = (
                    self._default_proxy_ip_pool)
                attrs['proxy_subnet_prefix_length'] = (
                    self._default_proxy_subnet_prefix_length)
            try:
                l3p = self._create_l3_policy(context._plugin_context, attrs)
            // 通过指定owned关系,与未使用隐示的l3 policy区分,以便删除l2
           // policy时只删除隐示创建的l3 policy
                self._mark_l3_policy_owned(context._plugin_context.session,
                                           l3p['id'])
            except exc.DefaultL3PolicyAlreadyExists:
                with excutils.save_and_reraise_exception(
                        reraise=False) as ctxt:
                    LOG.debug("Possible concurrent creation of default L3 "
                              "policy for tenant %s", tenant_id)
                    l3ps = self._get_l3_policies(context._plugin_context,
                                                 filter)
                    l3p = l3ps and l3ps[0]
                    if not l3p:
                        LOG.warning(_LW(
                            "Caught DefaultL3PolicyAlreadyExists, "
                            "but default L3 policy not concurrently "
                            "created for tenant %s"), tenant_id)
                        ctxt.reraise = True
            except exc.OverlappingIPPoolsInSameTenantNotAllowed:
                with excutils.save_and_reraise_exception():
                    LOG.info(_LI("Caught "
                                 "OverlappingIPPoolsinSameTenantNotAllowed "
                                 "during creation of default L3 policy for "
                                 "tenant %s"), tenant_id)
        context.current['l3_policy_id'] = l3p['id']
    
    // ResourceMappingDriver 
    def create_l2_policy_postcommit(self, context):
        if not context.current['network_id']:
            // 创建l2 polic关联的network
            self._use_implicit_network(context)
    
    

    create l3 policy

    //GroupPolicyPlugin
    def create_l3_policy(self, context, l3_policy):
        self._ensure_tenant(context, l3_policy['l3_policy'])
        session = context.session
        with session.begin(subtransactions=True):
            result = super(GroupPolicyPlugin,
                           self).create_l3_policy(context, l3_policy)
            self.extension_manager.process_create_l3_policy(
                session, l3_policy, result)
            self._validate_shared_create(self, context, result, 'l3_policy')
            self._validate_l3p_es(context, result)
            policy_context = p_context.L3PolicyContext(self, context,
                                                       result)
            self.policy_driver_manager.create_l3_policy_precommit(
                policy_context)
    
        try:
            self.policy_driver_manager.create_l3_policy_postcommit(
                policy_context)
        except Exception:
            with excutils.save_and_reraise_exception():
                LOG.exception(_LE("create_l3_policy_postcommit "
                                  "failed, deleting l3_policy %s"),
                              result['id'])
                self.delete_l3_policy(context, result['id'])
    
        return self.get_l3_policy(context, result['id'])
    
    // ResourceMappingDriver
    // 限制:
    // 1. l3 policy与router一一对应
    // 2. 同租户下不同l3 policy的的ip pool 址址不能重叠
    def create_l3_policy_precommit(self, context):
        curr = context.current
        if len(curr['routers']) > 1:
            raise exc.L3PolicyMultipleRoutersNotSupported()
        # Validate non overlapping IPs in the same tenant
        l3ps = context._plugin.get_l3_policies(
            context._plugin_context, {'tenant_id': [curr['tenant_id']]})
        subnets = []
        for l3p in l3ps:
            if l3p['id'] != curr['id']:
                for prefix in gbp_utils.convert_ip_pool_string_to_list(
                        l3p['ip_pool']):
                    if prefix:
                        subnets.append(prefix)
                if 'proxy_ip_pool' in l3p:
                    subnets.extend(gbp_utils.convert_ip_pool_string_to_list(
                        l3p['proxy_ip_pool']))
        l3p_subnets = gbp_utils.convert_ip_pool_string_to_list(curr['ip_pool'])
        if 'proxy_ip_pool' in curr:
            l3p_subnets.extend(gbp_utils.convert_ip_pool_string_to_list(
                curr['proxy_ip_pool']))
    
        current_set = netaddr.IPSet(subnets)
        l3p_set = netaddr.IPSet(l3p_subnets)
    
        if l3p_set & current_set:
            raise exc.OverlappingIPPoolsInSameTenantNotAllowed(
                ip_pool=l3p_subnets, overlapping_pools=subnets)
        # In Neutron, one external gateway per router is allowed. Therefore
        # we have to limit the number of ES per L3P to 1
        if len(context.current['external_segments']) > 1:
            raise exc.MultipleESPerL3PolicyNotSupported()
        self._reject_invalid_router_access(context)
    
    // ImplicitPolicyDriver
    // 使用隐示创建的external_segemts
    def create_l3_policy_postcommit(self, context):
        if not context.current['external_segments']:
            self._use_implicit_external_segment(context)
    
    // ResourceMappingDriver
    def create_l3_policy_postcommit(self, context):
        // 创建l3 policy对应的subnet pool
        if MAPPING_CFG.use_subnetpools:
            self._create_l3p_subnetpools(context)
    
        l3p = context.current
        // 创建隐示的router
        if not l3p['routers']:
            self._use_implicit_router(context)
        if l3p['external_segments']:
            self._plug_router_to_external_segment(
                context, l3p['external_segments'])
            self._set_l3p_external_routes(context)
        if not MAPPING_CFG.use_subnetpools:
            self._process_new_l3p_ip_pool(context, context.current['ip_pool'])
    
    
    

    create policy target group

    //GroupPolicyPlugin
    def create_policy_target_group(self, context, policy_target_group):
        self._ensure_tenant(context,
                            policy_target_group['policy_target_group'])
        session = context.session
        with session.begin(subtransactions=True):
            // 数据库gp,关联子网,关联proivided or consumed policy rule sets
            result = super(GroupPolicyPlugin,
                           self).create_policy_target_group(
                               context, policy_target_group)
            // ProxyGroupDriver 暂时未用到
            self.extension_manager.process_create_policy_target_group(
                session, policy_target_group, result)
            self._validate_shared_create(self, context, result,
                                         'policy_target_group')
            policy_context = p_context.PolicyTargetGroupContext(
                self, context, result)
            self.policy_driver_manager.create_policy_target_group_precommit(
                policy_context)
    
        try:
            self.policy_driver_manager.create_policy_target_group_postcommit(
                policy_context)
        except Exception:
            with excutils.save_and_reraise_exception():
                LOG.exception(_LE("create_policy_target_group_postcommit "
                                  "failed, deleting policy_target_group %s"),
                              result['id'])
                self.delete_policy_target_group(context, result['id'])
    
        return self.get_policy_target_group(context, result['id'])
    
    // ResourceMappingDriver
    // 校验:
    // 1. L2 policy与l2 policy同属一个租户
    // 2. group若指定子网必须属于l2 policy的network,
    // note: 若开启了use_subnetpools,l2 policy 的network的subnet从l3 policy的
    // ip pool对应的subnet_pool中分配,之后该network下所有subnet都需要从该
    // subnet_pool中分配
    def create_policy_target_group_precommit(self, context):
        self._reject_cross_tenant_ptg_l2p(context)
        self._validate_ptg_subnets(context)
        self._validate_nat_pool_for_nsp(context)
        self._validate_proxy_ptg(context)
    
    // ImplicitPolicyDriver
    // 若未指定l2 policy,会隐示创建一个
    def create_policy_target_group_postcommit(self, context):
        if not context.current['l2_policy_id']:
            self._use_implicit_l2_policy(context)
    
    
    def _use_implicit_l2_policy(self, context):
        // 创建l2 policy,并关联(创建)l3 policy
        self._create_implicit_l2_policy(context)
        // 将l2 policy id 记录到 相应的target group db entry中
        context.set_l2_policy_id(context.current['l2_policy_id'])
    
    def _create_implicit_l2_policy(self, context):
        attrs = {'tenant_id': context.current['tenant_id'],
                 'name': context.current['name'],
                 'description': _("Implicitly created L2 policy"),
                 'l3_policy_id': None,
                 'shared': context.current.get('shared', False),
                 'network_id': None}
        if context.current.get('proxied_group_id'):
            # The L3P has to be the same as the proxied group
            group = context._plugin.get_policy_target_group(
                context._plugin_context, context.current['proxied_group_id'])
            l2p = context._plugin.get_l2_policy(
                context._plugin_context, group['l2_policy_id'])
            attrs['l3_policy_id'] = l2p['l3_policy_id']
    
        l2p = self._create_l2_policy(context._plugin_context, attrs)
        context.current['l2_policy_id'] = l2p['id']
        self._mark_l2_policy_owned(context._plugin_context.session, l2p['id'])
    
    // 调用plugin创建l2 policy,具体调用流程参创建l2 policy
    // 若没有指定l3 policy且使用implicit driver,该租户下没有默认(名字配置)的l3 policy, // neutron-server会对该租户创建一个默认的l3 policy
    def _create_l2_policy(self, plugin_context, attrs):
        return self._create_resource(self._group_policy_plugin, plugin_context,
                                     'l2_policy', attrs, False)
    
    
    
    // ResourceMappingDriver 
    def create_policy_target_group_postcommit(self, context):
        # REVISIT(ivar) this validates the PTG L2P after the IPD creates it
        # (which happens in the postcommit phase)
        self._validate_proxy_ptg(context)
    
        # connect router to subnets of the PTG
        l2p_id = context.current['l2_policy_id']
        l2p = context._plugin.get_l2_policy(context._plugin_context,
                                            l2p_id)
        l3p_id = l2p['l3_policy_id']
        l3p = context._plugin.get_l3_policy(context._plugin_context,
                                            l3p_id)
    
        if not context.current['subnets']:
            is_proxy = bool(context.current.get('proxied_group_id'))
            // use_subnetpools 默认为True,会从subnetpool中自动分配置subnet
            // proxy是什么意思,需要确认?
            if (not MAPPING_CFG.use_subnetpools or
               (is_proxy and
                context.current.get('proxy_type') == proxy_ext.PROXY_TYPE_L2)):
                self._use_implicit_subnet(context, is_proxy=is_proxy)
            else:
                try:
                    subnet_specifics = {}
                    if context.current.get('proxied_group_id'):
                        # Since this is proxy group, we need to allocate
                        # subnet with proxy-specific prefix len
                        subnet_specifics = {
                            'prefixlen': l3p['proxy_subnet_prefix_length']}
                    // 从subnetpool中获取subnet并与target-group关联
                    subnets = self._use_implicit_subnet_from_subnetpool(
                        context, subnet_specifics)
                    context.add_subnets([sub['id'] for sub in subnets])
                except neutron_exc.SubnetAllocationError:
                    # Translate to GBP exception
                    raise exc.NoSubnetAvailable()
        // 将target group下所有的subnets都挂到l3 policy对应的router下
        self._stitch_ptg_to_l3p(context, context.current, l3p,
                                context.current['subnets'])
        self._handle_network_service_policy(context)
    // 根据policy rule创建security-group-rule
        self._handle_policy_rule_sets(context)
        self._update_default_security_group(context._plugin_context,
                                            context.current['id'],
                                            context.current['tenant_id'],
                                            context.current['subnets'])
    
    def _stitch_ptg_to_l3p(self, context, ptg, l3p, subnet_ids):
        if l3p['routers']:
            router_id = l3p['routers'][0]
            if ptg.get('proxied_group_id'):
                self._stitch_proxy_ptg_to_l3p(context, ptg, l3p, subnet_ids)
            else:
                try:
                    for subnet_id in subnet_ids:
                        self._plug_router_to_subnet(
                            context._plugin_context, subnet_id, router_id)
                except n_exc.InvalidInput:
                    # This exception is not expected.
                    LOG.exception(_LE("adding subnet to router failed"))
                    for subnet_id in subnet_ids:
                        self._delete_subnet(context._plugin_context, subnet_id)
                    raise exc.GroupPolicyInternalError()
    
    def _handle_policy_rule_sets(self, context):
        # This method handles policy_rule_set => SG mapping
        # context is PTG context
    
        # for all consumed policy_rule_sets, simply associate
        # each EP's port from the PTG
        # rules are expected to be filled out already
        consumed_policy_rule_sets = context.current[
            'consumed_policy_rule_sets']
        provided_policy_rule_sets = context.current[
            'provided_policy_rule_sets']
        subnets = context.current['subnets']
        ptg_id = context.current['id']
        self._set_sg_rules_for_subnets(context, subnets,
                                       provided_policy_rule_sets,
                                       consumed_policy_rule_sets)
        self._update_sgs_on_ptg(context, ptg_id, provided_policy_rule_sets,
                                consumed_policy_rule_sets, "ASSOCIATE")
    
    
    def _update_default_security_group(self, plugin_context, ptg_id,
                                       tenant_id, subnets=None):
       // 查找该租户下target group的默认安全组(sg_name = “gbp_%s” % ptg_id)
        sg_id = self._get_default_security_group(plugin_context, ptg_id,
                                                 tenant_id)
        ip_v = {4: n_const.IPv4, 6: n_const.IPv6}
        if not sg_id:
            sg_name = DEFAULT_SG_PREFIX % ptg_id
            sg = self._create_gbp_sg(plugin_context, tenant_id, sg_name,
                                     description='default GBP security group')
            sg_id = sg['id']
        // 初始化sg, 该target group下所有的子网都允许通过
        for subnet in self._get_subnets(
                plugin_context, filters={'id': subnets or []}):
            self._sg_rule(plugin_context, tenant_id, sg_id,
                          'ingress', cidr=subnet['cidr'],
                          ethertype=ip_v[subnet['ip_version']])
            self._sg_rule(plugin_context, tenant_id, sg_id,
                          'egress', cidr=subnet['cidr'],
                          ethertype=ip_v[subnet['ip_version']])
    // 用于放行cloudint dhcp dns等功能
        self._sg_rule(plugin_context, tenant_id, sg_id, 'egress',
                      cidr='169.254.0.0/16', ethertype=ip_v[4])
    // 放行dns 端口
        for ether_type in ip_v:
            for proto in [n_const.PROTO_NAME_TCP, n_const.PROTO_NAME_UDP]:
                self._sg_rule(plugin_context, tenant_id, sg_id, 'egress',
                              protocol=proto, port_range='53',
                              ethertype=ip_v[ether_type])
    
        return sg_id
    

    create policy target

    //GroupPolicyPlugin
    def create_policy_target(self, context, policy_target):
        self._ensure_tenant(context, policy_target['policy_target'])
        // 若创建target时指定fixed-ips(ip and subnet),保存fixed-ip信息保存,以便在创建port指定
        self._add_fixed_ips_to_port_attributes(policy_target)
        session = context.session
        with session.begin(subtransactions=True):
            result = super(GroupPolicyPlugin,
                           self).create_policy_target(context, policy_target)
            self.extension_manager.process_create_policy_target(
                session, policy_target, result)
            self._validate_shared_create(
                self, context, result, 'policy_target')
            policy_context = p_context.PolicyTargetContext(self, context,
                                                           result)
            self.policy_driver_manager.create_policy_target_precommit(
                policy_context)
    
        try:
            self.policy_driver_manager.create_policy_target_postcommit(
                policy_context)
        except Exception:
            with excutils.save_and_reraise_exception():
                LOG.exception(_LE("create_policy_target_postcommit "
                                  "failed, deleting policy_target %s"),
                              result['id'])
                self.delete_policy_target(context, result['id'])
    
        return self.get_policy_target(context, result['id'])
    
    // ResourceMappingDriver 
    def create_policy_target_precommit(self, context):
        self._check_create_policy_target(context)
    
    def _check_create_policy_target(self, context, verify_port_subnet=True):
        self._validate_cluster_id(context)
        if not context.current['policy_target_group_id']:
            raise exc.PolicyTargetRequiresPolicyTargetGroup()
       // 检查port子网在target group的子网内
        if context.current['port_id'] and verify_port_subnet:
            # Validate if explicit port's subnet
            # is same as the subnet of PTG.
            self._validate_pt_port_subnets(context)
        group_id = context.current['policy_target_group_id']
        if context.current.get('proxy_gateway'):
            pts = context._plugin.get_policy_targets(
                context._plugin_context, {'policy_target_group_id': group_id,
                                          'proxy_gateway': True})
            pts = [x['id'] for x in pts if x['id'] != context.current['id']]
            if pts:
                exc.OnlyOneProxyGatewayAllowed(group_id=group_id)
        if context.current.get('group_default_gateway'):
            pts = context._plugin.get_policy_targets(
                context._plugin_context, {'policy_target_group_id': group_id,
                                          'group_default_gateway': True})
            pts = [x['id'] for x in pts if x['id'] != context.current['id']]
            if pts:
                exc.OnlyOneGroupDefaultGatewayAllowed(group_id=group_id)
    
    // ResourceMappingDriver 
    def create_policy_target_postcommit(self, context):
       // 若没有指定port创建policy target,则创建一个
        if not context.current['port_id']:
            self._use_implicit_port(context)
    
        self._update_cluster_membership(
            context, new_cluster_id=context.current['cluster_id'])
    // 将port与target group关联的sg绑定(通过consumed和provided的rule set)
        self._assoc_ptg_sg_to_pt(context, context.current['id'],
                                 context.current['policy_target_group_id'])
    // QOS和fip的功能也能做,有必要统一?
    // 没有,仅仅使用GBP来完成,高级功能不需要GBP来接管,但是要考虑他们之间的相互影响?
        self._associate_fip_to_pt(context)
        self._associate_qosp_to_pt(context)
        if context.current.get('proxy_gateway'):
            self._set_proxy_gateway_routes(context, context.current)
    

    create Policy rule set

    // 创建policy rule set时会创建security group
    // GroupPolicyPlugin
    def create_policy_rule_set(self, context, policy_rule_set):
        self._ensure_tenant(context, policy_rule_set['policy_rule_set'])
        session = context.session
        with session.begin(subtransactions=True):
            result = super(GroupPolicyPlugin,
                           self).create_policy_rule_set(
                               context, policy_rule_set)
            self.extension_manager.process_create_policy_rule_set(
                session, policy_rule_set, result)
            self._validate_shared_create(
                self, context, result, 'policy_rule_set')
            policy_context = p_context.PolicyRuleSetContext(
                self, context, result)
            self.policy_driver_manager.create_policy_rule_set_precommit(
                policy_context)
    
        try:
            self.policy_driver_manager.create_policy_rule_set_postcommit(
                policy_context)
        except Exception:
            with excutils.save_and_reraise_exception():
                LOG.exception(_LE(
                    "policy_driver_manager.create_policy_rule_set_postcommit "
                    "failed, deleting policy_rule_set %s"), result['id'])
                self.delete_policy_rule_set(context, result['id'])
    
        return self.get_policy_rule_set(context, result['id'])
    
    // ResourceMappingDriver
    def create_policy_rule_set_postcommit(self, context):
        # creating SGs
        policy_rule_set_id = context.current['id']
    // 以rule set为单位,consumed和provided分别创建一个安全组
        consumed_sg = self._create_policy_rule_set_sg(context, 'consumed')
        provided_sg = self._create_policy_rule_set_sg(context, 'provided')
        consumed_sg_id = consumed_sg['id']
        provided_sg_id = provided_sg['id']
    // rule set 与consume和provided security对应关系存数据库
        self._set_policy_rule_set_sg_mapping(
            context._plugin_context.session, policy_rule_set_id,
            consumed_sg_id, provided_sg_id)
        rules = context._plugin.get_policy_rules(
            context._plugin_context,
            {'id': context.current['policy_rules']})
        self._apply_policy_rule_set_rules(context, context.current, rules)
        if context.current['child_policy_rule_sets']:
            self._recompute_policy_rule_sets(
                context, context.current['child_policy_rule_sets'])
    
    def _apply_policy_rule_set_rules(self, context, policy_rule_set,
                                     policy_rules):
        policy_rules = self._get_enforced_prs_rules(
            context, policy_rule_set, subset=[x['id'] for x in policy_rules])
        # Don't add rules unallowed by the parent
        self._manage_policy_rule_set_rules(
            context, policy_rule_set, policy_rules)
    
    def _manage_policy_rule_set_rules(self, context, policy_rule_set,
                                      policy_rules, unset=False,
                                      unset_egress=False):
        policy_rule_set_sg_mappings = self._get_policy_rule_set_sg_mapping(
            context._plugin_context.session, policy_rule_set['id'])
        policy_rule_set = context._plugin.get_policy_rule_set(
            context._plugin_context, policy_rule_set['id'])
        // 获取与rule_set关联的consumed和provided的target_group对应子网的cidr
        cidr_mapping = self._get_cidrs_mapping(context, policy_rule_set)
        for policy_rule in policy_rules:
            self._add_or_remove_policy_rule_set_rule(
                context, policy_rule, policy_rule_set_sg_mappings,
                cidr_mapping, unset=unset, unset_egress=unset_egress)
    
    // 创建安全组规则
    def _add_or_remove_policy_rule_set_rule(self, context, policy_rule,
                                            policy_rule_set_sg_mappings,
                                            cidr_mapping, unset=False,
                                            unset_egress=False,
                                            classifier=None):
        in_out = [gconst.GP_DIRECTION_IN, gconst.GP_DIRECTION_OUT]
        prov_cons = [policy_rule_set_sg_mappings['provided_sg_id'],
                     policy_rule_set_sg_mappings['consumed_sg_id']]
        cidr_prov_cons = [cidr_mapping['providing_cidrs'],
                          cidr_mapping['consuming_cidrs']]
    
        if not classifier:
            classifier_id = policy_rule['policy_classifier_id']
            classifier = context._plugin.get_policy_classifier(
                context._plugin_context, classifier_id)
    
        protocol = classifier['protocol']
        port_range = classifier['port_range']
        admin_context = n_context.get_admin_context()
        prs = context._plugin.get_policy_rule_set(
            admin_context, policy_rule_set_sg_mappings.policy_rule_set_id)
        tenant_id = prs['tenant_id']

    // classifier方向只有一个 in/out/bi,所以classifier与security group有如下组合:

    由于rule set作用在provided和consumed的两个组,所以sg rule里需要加上对端组所有的子网cidr以作限制

        for pos, sg in enumerate(prov_cons):
            if classifier['direction'] in [gconst.GP_DIRECTION_BI,
                                           in_out[pos]]:
                for cidr in cidr_prov_cons[pos - 1]:
                    self._sg_ingress_rule(context, sg, protocol, port_range,
                                          cidr, tenant_id, unset=unset)
            if classifier['direction'] in [gconst.GP_DIRECTION_BI,
                                           in_out[pos - 1]]:
                for cidr in cidr_prov_cons[pos - 1]:
                    self._sg_egress_rule(context, sg, protocol, port_range,
                                         cidr, tenant_id,
                                         unset=unset or unset_egress)
    

     

    // GroupPolicyPlugin
    def delete_l2_policy(self, context, l2_policy_id):
        session = context.session
        with session.begin(subtransactions=True):
            l2_policy = self.get_l2_policy(context, l2_policy_id)
            policy_context = p_context.L2PolicyContext(self, context,
                                                       l2_policy)
            self.policy_driver_manager.delete_l2_policy_precommit(
                policy_context)
            super(GroupPolicyPlugin, self).delete_l2_policy(context,
                                                            l2_policy_id)
    
        try:
            self.policy_driver_manager.delete_l2_policy_postcommit(
                policy_context)
        except Exception:
            LOG.exception(_LE("delete_l2_policy_postcommit failed "
                              "for l2_policy %s"), l2_policy_id)
    
    // ImplicitPolicyDriver
    // 若l3 policy为隐示创建则删除 l3 policy
    def delete_l2_policy_postcommit(self, context):
        l3p_id = context.current['l3_policy_id']
        self._cleanup_l3_policy(context, l3p_id)
    def _cleanup_l3_policy(self, context, l3p_id):
        if self._l3_policy_is_owned(context._plugin_context.session, l3p_id):
            // 调用plugin删除l3 policy
            context._plugin.delete_l3_policy(context._plugin_context, l3p_id,
                                             check_unused=True)
    
    // ResourceMappingDriver
    // 若network为隐示创建则删除
    def delete_l2_policy_postcommit(self, context):
        network_id = context.current['network_id']
        self._cleanup_network(context._plugin_context, network_id)
    def _cleanup_network(self, plugin_context, network_id):
        if self._network_is_owned(plugin_context.session, network_id):
            // 调用core_plugin 删除network
            self._delete_network(plugin_context, network_id) 
    

    create network service policy

    def create_network_service_policy(self, context, network_service_policy):
        self._ensure_tenant(
            context, network_service_policy['network_service_policy'])
        session = context.session
        with session.begin(subtransactions=True):
            result = super(GroupPolicyPlugin,
                           self).create_network_service_policy(
                               context, network_service_policy)
            self.extension_manager.process_create_network_service_policy(
                session, network_service_policy, result)
            self._validate_shared_create(self, context, result,
                                         'network_service_policy')
            policy_context = p_context.NetworkServicePolicyContext(
                self, context, result)
            pdm = self.policy_driver_manager
            pdm.create_network_service_policy_precommit(
                policy_context)
    
        try:
            pdm.create_network_service_policy_postcommit(
                policy_context)
        except Exception:
            with excutils.save_and_reraise_exception():
                LOG.exception(_LE(
                    "create_network_service_policy_postcommit "
                    "failed, deleting network_service_policy %s"),
                    result['id'])
                self.delete_network_service_policy(context, result['id'])
    
        return self.get_network_service_policy(context, result['id'])
    
    
    // ResourceMappingDriver
    def create_network_service_policy_precommit(self, context):
        // 对network service policy支持的型及格式做校验
        self._validate_nsp_parameters(context)
    // 命令举例
    // gbp network-service-policy-create --network-service-params 
    // type=qos_maxrate,name=qos_maxrate,value=10 test
    // network service policy支持的类型及取值组合如下:
    

      

    def _validate_nsp_parameters(self, context):
        nsp = context.current
        nsp_params = nsp.get("network_service_params")
    
        supported_static_nsp_pars = {
            gconst.GP_NETWORK_SVC_PARAM_TYPE_IP_SINGLE: [
                gconst.GP_NETWORK_SVC_PARAM_VALUE_SELF_SUBNET,
                gconst.GP_NETWORK_SVC_PARAM_VALUE_NAT_POOL],
            gconst.GP_NETWORK_SVC_PARAM_TYPE_IP_POOL: [
                gconst.GP_NETWORK_SVC_PARAM_VALUE_NAT_POOL]}
    
        # for params without a static value - later evaluation needed:
        supported_flexible_nsp_params = (
            gconst.GP_NETWORK_SVC_PARAM_TYPE_QOS_BURST,
            gconst.GP_NETWORK_SVC_PARAM_TYPE_QOS_MAX)
    
        // service policy一种类型只能有一个值
        # validate unique param types:
        types_inside = set((d['type'] for d in nsp_params))
        if len(types_inside) != len(nsp_params):
            raise exc.InvalidNetworkServiceParameters()
    
        for params in nsp_params:
            type_ = params.get("type")
            value_ = params.get("value")
            if (type_ not in supported_flexible_nsp_params):
                if (type_ not in supported_static_nsp_pars or
                    value_ not in supported_static_nsp_pars[type_]):
                    raise exc.InvalidNetworkServiceParameters()
            else:
                try:
                    if int(value_) < 0:
                        raise exc.InvalidNetworkServiceParameters()
                except ValueError:
                    raise exc.InvalidNetworkServiceParameters()
    
    
    
    def create_network_service_policy_postcommit(self, context):
        p = context.current['network_service_params']
        max = burst = 0
        setting_qos = False
        # assumes single value per parameter type, as the API currently states
        params = {p[n]['type']: p[n]['value'] for n in range(len(p))}
        # check for QoS param types..
        if gconst.GP_NETWORK_SVC_PARAM_TYPE_QOS_MAX in params:
            max = params[gconst.GP_NETWORK_SVC_PARAM_TYPE_QOS_MAX]
            setting_qos = True
        if gconst.GP_NETWORK_SVC_PARAM_TYPE_QOS_BURST in params:
            burst = params[gconst.GP_NETWORK_SVC_PARAM_TYPE_QOS_BURST]
            setting_qos = True
        # ..and create needed Neutron resources
        if setting_qos:
            // 创建使用qos service_plugin创建qos policy
            // qos policy命名规则gbp_$(network_service_policy_name)
            qos_policy_id = self._create_implicit_qos_policy(context)
            nsp_id = context.current['id']
            // 创建qos policy rule
            // qos policy rule命名规则gbp_$(network_service_policy_name)
            // attrs = {'max_kbps': max,  'max_burst_kbps': burst}
            self._create_implicit_qos_rule(context, qos_policy_id, max, burst)
            // 数据库记录network service policy与qos policy的关系
            self._set_nsp_qos_mapping(context._plugin_context.session,
                                            nsp_id,
                                            qos_policy_id)
    

      

     

    delete l3 policy

    //GroupPolicyPlugin
    def delete_l3_policy(self, context, l3_policy_id, check_unused=False):
        session = context.session
        with session.begin(subtransactions=True):
            // 若有关联的l2 policy,删除失败
            if (check_unused and
                (session.query(group_policy_mapping_db.L2PolicyMapping).
                 filter_by(l3_policy_id=l3_policy_id).count())):
                return False
            l3_policy = self.get_l3_policy(context, l3_policy_id)
            policy_context = p_context.L3PolicyContext(self, context,
                                                       l3_policy)
            self.policy_driver_manager.delete_l3_policy_precommit(
                policy_context)
            super(GroupPolicyPlugin, self).delete_l3_policy(context,
                                                            l3_policy_id)
    
        try:
            self.policy_driver_manager.delete_l3_policy_postcommit(
                policy_context)
        except Exception:
            LOG.exception(_LE("delete_l3_policy_postcommit failed "
                              "for l3_policy %s"), l3_policy_id)
        return True
    
    // ResourceMappingDriver
    def delete_l3_policy_postcommit(self, context):
        for router_id in context.current['routers']:
            // 若router为隐示创建,则删除router; router与l3 policy是一一对应关系
            self._cleanup_router(context._plugin_context, router_id)
        // 删除 l3 poicy创建的subnet Pool
        if MAPPING_CFG.use_subnetpools:
            self._delete_l3p_subnetpools(context)
        else:
            self._process_remove_l3p_ip_pool(context,
                                             context.current['ip_pool'])
    

    1. GBP作为应用层,架构在neutron之上,可以通过GBP的抽象资源来管理neutron的网络资源,所以用户可以将精力放在应用层资源的构建上
    2. GBP实现了以下的资源映射关系(通过group policy driver-resource_mapping)

    3. GBP可以建立policy rule并与policy group绑定来完成一组资源的网络白名单功能,底层使用neutron的安全组功能
    4. 除了之前提到的四种基本的neutron资源以外,GBP还可以fip、fw、qos等neutron高级服务,这样可以使用户进一步脱离开直接操作neutron资源
    不足:
    1. GBP并未在neutron侧提供详细的api文档,不能给开发人员直接提供的充足参考

    附classifier 与neutron security group的对应关系:

    参考
    overview of GroupBasedPolicy. http://gbp.readthedocs.io/en/latest/usage.html
    IP protocol number. https://en.wikipedia.org/wiki/List_of_IP_protocol_number

  • 相关阅读:
    一篇文章了解_docker
    一篇文章了解_接口测试
    一篇文章了解_unittest
    一篇文章了解_selenium
    Python命令行参数sys.argv[]
    Python_pycharm调试模式+使用pycharm给python传递参数
    Python_异常处理、调试
    [问答题]写出下列程序的输出结果:
    [单选题]函数的参数传递包括:
    [单选题]PHP函数,mail($param1, $param2, $param3),其中的$param2参数包含什么?
  • 原文地址:https://www.cnblogs.com/haoqingchuan/p/7510011.html
Copyright © 2011-2022 走看看