zoukankan      html  css  js  c++  java
  • Neutron Callback System

     用于core and service components之间的通信,传递resource的lifecycle events (e.g. before creation, before deletion, etc.)消息

    ,比如当一个Neutron resource与多个service (VPN, Firewall and Load Balancer) 关联,

    需要Service的管理者来确定resource正确状态。

    这样可以避免各个Service需要感知其他Service,在它们之间解耦和。

     它是一种进程内通信机制,如果是进程间则需要Messaging callbacks

    如果是远程进程则需要 RPC API 。

    比如service A, B, and C 都需要知道router creation,如果没有一个中介来采用消息的方式关联它们,

    那么,A在router creation的时候就需要直接call B/C。

    如果有了中介I,则流程如下:

    # B and C ask I to be notified when A is done creating the resource

    # ... # A is done creating the resource # A gets hold of the reference to the intermediary I # A calls I I->notify()

    只要B/C订阅了router creation消息,A and ‘I’ 不需要知道B/C的状态和变化。

    同时callback也可以抛出异常告诉A该操作是否可行,这就需要A先发一个events.BEFORE_CREATE来测试一下问问B/C的意见了。

    实例代码如下:

    from neutron.callbacks import events
    from neutron.callbacks import exceptions
    from neutron.callbacks import resources
    from neutron.callbacks import registry
    
    
    def callback1(resource, event, trigger, **kwargs):
        raise Exception('I am failing!')
    
    def callback2(resource, event, trigger, **kwargs):
        print('Callback2 called by %s on event  %s' % (trigger, event))
    
    
    registry.subscribe(callback1, resources.ROUTER, events.BEFORE_CREATE)
    registry.subscribe(callback2, resources.ROUTER, events.BEFORE_CREATE)
    registry.subscribe(callback2, resources.ROUTER, events.ABORT_CREATE)
    print('Subscribed')
    
    
    def do_notify():
        kwargs = {'foo': 'bar'}
        registry.notify(resources.ROUTER, events.BEFORE_CREATE, do_notify, **kwargs)
    
    
    print('Notifying...')
    try:
        do_notify()
    except exceptions.CallbackFailure as e:
        print('Error: ', e)

    Unsubscribing to events

    • clear(): it unsubscribes all subscribed callbacks: this can be useful especially when winding down the system, and notifications shall no longer be triggered.
    • unsubscribe(): it selectively unsubscribes a callback for a specific resource’s event. Say callback C has subscribed to event A for resource R, any notification of event A for resource R will no longer be handed over to C, after the unsubscribe() invocation.
    • unsubscribe_by_resource(): say that callback C has subscribed to event A, B, and C for resource R, any notification of events related to resource R will no longer be handed over to C, after the unsubscribe_by_resource() invocation.
    • unsubscribe_all(): say that callback C has subscribed to events A, B for resource R1, and events C, D for resource R2, any notification of events pertaining resources R1 and R2 will no longer be handed over to C, after the unsubscribe_all() invocation.

     git grep subscribe:

    neutron/db/db_base_plugin_v2.py:            registry.subscribe(self.validate_network_rbac_policy_change,
    neutron/db/external_net_db.py:        registry.subscribe(self._process_ext_policy_create,
    neutron/db/external_net_db.py:            registry.subscribe(self._validate_ext_not_in_use_by_tenant,
    neutron/db/l3_db.py:def subscribe():
    neutron/db/l3_db.py:    registry.subscribe(
    neutron/db/l3_db.py:    registry.subscribe(
    neutron/db/l3_db.py:    registry.subscribe(
    neutron/db/l3_db.py:    registry.subscribe(
    neutron/db/l3_db.py:subscribe()
    neutron/db/l3_dvrscheduler_db.py:def subscribe():
    neutron/db/l3_dvrscheduler_db.py:    registry.subscribe(
    neutron/db/l3_dvrscheduler_db.py:    registry.subscribe(
    neutron/db/l3_dvrscheduler_db.py:    registry.subscribe(
    neutron/objects/rbac_db.py:    def subscribe_to_rbac_events(class_instance):
    neutron/objects/rbac_db.py:            registry.subscribe(class_instance.validate_rbac_policy_change,
    neutron/objects/rbac_db.py:        mcs.subscribe_to_rbac_events(cls)
    neutron/plugins/ml2/extensions/dns_integration.py:def subscribe():
    neutron/plugins/ml2/extensions/dns_integration.py:    registry.subscribe(
    neutron/plugins/ml2/extensions/dns_integration.py:    registry.subscribe(
    neutron/plugins/ml2/extensions/dns_integration.py:    registry.subscribe(
    neutron/plugins/ml2/extensions/dns_integration.py:subscribe()
    neutron/services/auto_allocate/db.py:    registry.subscribe(_ensure_external_network_default_value_callback,
    neutron/services/auto_allocate/db.py:    registry.subscribe(_ensure_external_network_default_value_callback,
    neutron/services/auto_allocate/db.py:    registry.subscribe(_ensure_external_network_default_value_callback,
    neutron/services/bgp/bgp_plugin.py:        registry.subscribe(self.floatingip_update_callback,
    neutron/services/bgp/bgp_plugin.py:        registry.subscribe(self.router_interface_callback,
    neutron/services/bgp/bgp_plugin.py:        registry.subscribe(self.router_interface_callback,
    neutron/services/bgp/bgp_plugin.py:        registry.subscribe(self.router_interface_callback,
    neutron/services/bgp/bgp_plugin.py:        registry.subscribe(self.router_gateway_callback,
    neutron/services/bgp/bgp_plugin.py:        registry.subscribe(self.router_gateway_callback,
    neutron/services/l3_router/l3_router_plugin.py:            l3_dvrscheduler_db.subscribe()
    neutron/services/l3_router/l3_router_plugin.py:        l3_db.subscribe()

     比如neutron/db/l3_dvrscheduler_db.py: registry.subscribe(

    在port delete之前要问问l3是否可以:

            _prevent_l3_port_delete_callback, resources.PORT, events.BEFORE_DELETE)

     l3的判断可以得条件是:

    core plugin中没有它的记录

    port的owner不属于router

    没有fixed ip

    owner router已经不存在

        def prevent_l3_port_deletion(self, context, port_id):
            """Checks to make sure a port is allowed to be deleted.
    
            Raises an exception if this is not the case.  This should be called by
            any plugin when the API requests the deletion of a port, since some
            ports for L3 are not intended to be deleted directly via a DELETE
            to /ports, but rather via other API calls that perform the proper
            deletion checks.
            """
            try:
                port = self._core_plugin.get_port(context, port_id)
            except n_exc.PortNotFound:
                # non-existent ports don't need to be protected from deletion
                return
            if port['device_owner'] not in self.router_device_owners:
                return
            # Raise port in use only if the port has IP addresses
            # Otherwise it's a stale port that can be removed
            fixed_ips = port['fixed_ips']
            if not fixed_ips:
                LOG.debug("Port %(port_id)s has owner %(port_owner)s, but "
                          "no IP address, so it can be deleted",
                          {'port_id': port['id'],
                           'port_owner': port['device_owner']})
                return
            # NOTE(kevinbenton): we also check to make sure that the
            # router still exists. It's possible for HA router interfaces
            # to remain after the router is deleted if they encounter an
            # error during deletion.
            # Elevated context in case router is owned by another tenant
            if port['device_owner'] == DEVICE_OWNER_FLOATINGIP:
                if not self._floating_ip_exists(context, port['device_id']):
                    LOG.debug("Floating IP %(f_id)s corresponding to port "
                              "%(port_id)s no longer exists, allowing deletion.",
                              {'f_id': port['device_id'], 'port_id': port['id']})
                    return
            elif not self._router_exists(context, port['device_id']):
                LOG.debug("Router %(router_id)s corresponding to port "
                          "%(port_id)s  no longer exists, allowing deletion.",
                          {'router_id': port['device_id'],
                           'port_id': port['id']})
                return
    
            reason = _('has device owner %s') % port['device_owner']
            raise n_exc.ServicePortInUse(port_id=port['id'],
                                         reason=reason)

     同时,在port delete之后,router需要发出 routers_updated notification:

        registry.subscribe(
            _notify_routers_callback, resources.PORT, events.AFTER_DELETE)
        def routers_updated(self, context, router_ids, operation=None, data=None,
                            shuffle_agents=False, schedule_routers=True):
            if router_ids:
                self._notification(context, 'routers_updated', router_ids,
                                   operation, shuffle_agents, schedule_routers)

    http://docs.openstack.org/developer/neutron/devref/callbacks.html

  • 相关阅读:
    Saltstack module gem 详解
    Saltstack module freezer 详解
    Saltstack module firewalld 详解
    Saltstack module file 详解
    Saltstack module event 详解
    Saltstack module etcd 详解
    Saltstack module environ 详解
    Saltstack module drbd 详解
    Saltstack module dnsutil 详解
    获取主页_剥离百度
  • 原文地址:https://www.cnblogs.com/allcloud/p/5484672.html
Copyright © 2011-2022 走看看