zoukankan      html  css  js  c++  java
  • Octavia health-manager 与 amphora 故障修复的实现与分析

    目录

    Health Manager

    Health Manager - This subcomponent monitors individual amphorae to ensure they are up and running, and otherwise healthy. It also handles failover events if amphorae fail unexpectedly.

    简单的说,Health Manager 用于监控每个 amphora 的监控状态,如果 amphora 出现故障,则启动故障转移流程,以此来保障 LB 的高可用性。

    那么掌握 Health Manager Service,就是要搞清楚它是如何监控 amphora 的健康状态的,然后再弄明白故障转移的流程细节。

    监控 amphora 健康状态

    还是从服务进程的程序入口开始(/opt/rocky/octavia/octavia/cmd/health_manager.py),启动 octavia-health-manager service 加载了 UDPStatusGetter.check()HealthManager.health_check() 两个 method,我们先看看前者的实现:

    # file: /opt/rocky/octavia/octavia/amphorae/drivers/health/heartbeat_udp.py
    
    class UDPStatusGetter(object):
        """This class defines methods that will gather heatbeats
    
        The heartbeats are transmitted via UDP and this class will bind to a port
        and absorb them
        """
        def __init__(self):
            self.key = cfg.CONF.health_manager.heartbeat_key
            self.ip = cfg.CONF.health_manager.bind_ip
            self.port = cfg.CONF.health_manager.bind_port
            self.sockaddr = None
            LOG.info('attempting to listen on %(ip)s port %(port)s',
                     {'ip': self.ip, 'port': self.port})
            self.sock = None
            self.update(self.key, self.ip, self.port)
    
            self.executor = futures.ProcessPoolExecutor(
                max_workers=cfg.CONF.health_manager.status_update_threads)
            self.repo = repositories.Repositories().amphorahealth
    
        def update(self, key, ip, port):
            """Update the running config for the udp socket server
    
            :param key: The hmac key used to verify the UDP packets. String
            :param ip: The ip address the UDP server will read from
            :param port: The port the UDP server will read from
            :return: None
            """
            self.key = key
            for addrinfo in socket.getaddrinfo(ip, port, 0, socket.SOCK_DGRAM):
                ai_family = addrinfo[0]
                self.sockaddr = addrinfo[4]
                if self.sock is not None:
                    self.sock.close()
                self.sock = socket.socket(ai_family, socket.SOCK_DGRAM)
                self.sock.settimeout(1)
                self.sock.bind(self.sockaddr)
                if cfg.CONF.health_manager.sock_rlimit > 0:
                    rlimit = cfg.CONF.health_manager.sock_rlimit
                    LOG.info("setting sock rlimit to %s", rlimit)
                    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
                                         rlimit)
                break  # just used the first addr getaddrinfo finds
            if self.sock is None:
                raise exceptions.NetworkConfig("unable to find suitable socket")
    

    Class:UDPStatusGetter 在 octavia-health-manager service 中负责接收从 amphora 发送过来的 heatbeats(心跳包),然后 prepare heatbeats 中的数据并持久化到数据库中。从 __init__() 得知 amphora 与 octavia-health-manager service 的通信实现是 UDP socket,socket 为 (CONF.health_manager.bind_ip, CONF.health_manager.bind_port)

    NOTE:这里需要强调一下 amphora 与 octavia-health-manager service 通信的网络拓扑细节。

    • 如果部署 Octavia 时,直接使用 ext-net 作为 octavia 的 “lb-mgmt-net”,那么 CONF.health_manager.bind_ip 应该是物理主机的 IP 地址,amphora 与 octavia-health-manager service 直接通过 OpenStack Management Network 进行通信。不过这种方式,amphora 会占用 ext-net 的 fixed ip,所以在生产环境中并不建议使用该方式。
    • 如果部署 Octavia 时,使用另外创建的 tenant network 作为 lb-mgmt-net,那么 CONF.health_manager.bind_ip 就应该是 lb-mgmt-net IP pool 中的地址。那么就需要解决 lb-mgmt-net 与 OpenStack Management Network 互通的问题。其中 devstack 的做法如下,将 lb-mgmt-net 的一个 port 挂载到 ex-int 上,lb-mgmt-net 中的 amphora 就可以通过这个 port 与运行在物理主机上的 octavia-health-manager service 进行通信了。而在生产环境中,就需要结合现场网络环境由网管进行配置了。
    neutron port-create --name octavia-health-manager-standalone-listen-port 
      --security-group <lb-health-mgr-sec-grp> 
      --device-owner Octavia:health-mgr 
      --binding:host_id=<hostname> lb-mgmt-net 
      --tenant-id <octavia service>
    
    ovs-vsctl --may-exist add-port br-int o-hm0 
      -- set Interface o-hm0 type=internal 
      -- set Interface o-hm0 external-ids:iface-status=active 
      -- set Interface o-hm0 external-ids:attached-mac=<Health Manager Listen Port MAC> 
      -- set Interface o-hm0 external-ids:iface-id=<Health Manager Listen Port ID>
      
    # /etc/octavia/dhcp/dhclient.conf
    request subnet-mask,broadcast-address,interface-mtu;
    do-forward-updates false;
    
    ip link set dev o-hm0 address <Health Manager Listen Port MAC>
    dhclient -v o-hm0 -cf /etc/octavia/dhcp/dhclient.conf
    
    
    o-hm0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1450
            inet 192.168.0.4  netmask 255.255.255.0  broadcast 192.168.0.255
            inet6 fe80::f816:3eff:fef0:b9ee  prefixlen 64  scopeid 0x20<link>
            ether fa:16:3e:f0:b9:ee  txqueuelen 1000  (Ethernet)
            RX packets 1240893  bytes 278415460 (265.5 MiB)
            RX errors 0  dropped 45  overruns 0  frame 0
            TX packets 417078  bytes 75842972 (72.3 MiB)
            TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0
    

    回到主题 UDPStatusGetter.check() 的实现是:

        def check(self):
            try:
                obj, srcaddr = self.dorecv()
            except socket.timeout:
                # Pass here as this is an expected cycling of the listen socket
                pass
            except exceptions.InvalidHMACException:
                # Pass here as the packet was dropped and logged already
                pass
            except Exception as e:
                LOG.warning('Health Manager experienced an exception processing a'
                            'heartbeat packet. Ignoring this packet. '
                            'Exception: %s', e)
            else:
                self.executor.submit(update_health, obj, srcaddr)
                self.executor.submit(update_stats, obj, srcaddr)
    
    • 调用 self.dorecv() 接收数据
    • 调用 self.executor.submit(update_health, obj, srcaddr) 将 health 持久化到 table amphora_health
    • 调用 self.executor.submit(update_stats, obj, srcaddr) 将 stats 持久化到 table listener_statistics

    下面继续看 amphora 是怎么发出 heatbeats。

    # file: /opt/rocky/octavia/octavia/cmd/agent.py
    
    def main():
        # comment out to improve logging
        service.prepare_service(sys.argv)
    
        gmr.TextGuruMeditation.setup_autorun(version)
    
        health_sender_proc = multiproc.Process(name='HM_sender',
                                               target=health_daemon.run_sender,
                                               args=(HM_SENDER_CMD_QUEUE,))
        health_sender_proc.daemon = True
        health_sender_proc.start()
    
        # Initiate server class
        server_instance = server.Server()
    
        bind_ip_port = utils.ip_port_str(CONF.haproxy_amphora.bind_host,
                                         CONF.haproxy_amphora.bind_port)
        options = {
            'bind': bind_ip_port,
            'workers': 1,
            'timeout': CONF.amphora_agent.agent_request_read_timeout,
            'certfile': CONF.amphora_agent.agent_server_cert,
            'ca_certs': CONF.amphora_agent.agent_server_ca,
            'cert_reqs': True,
            'preload_app': True,
            'accesslog': '/var/log/amphora-agent.log',
            'errorlog': '/var/log/amphora-agent.log',
            'loglevel': 'debug',
        }
        AmphoraAgent(server_instance.app, options).run()
    

    在启动 amphora-agent 服务进程时,加载了 health_daemon.run_sender 这就是 amphora 向 octavia-health-manager service 发送心跳包的实现。

    # file: /opt/rocky/octavia/octavia/amphorae/backends/health_daemon/health_daemon.py
    
    def run_sender(cmd_queue):
        LOG.info('Health Manager Sender starting.')
        sender = health_sender.UDPStatusSender()
    
        keepalived_cfg_path = util.keepalived_cfg_path()
        keepalived_pid_path = util.keepalived_pid_path()
    
        while True:
    
            try:
                # If the keepalived config file is present check
                # that it is running, otherwise don't send the health
                # heartbeat
                if os.path.isfile(keepalived_cfg_path):
                    # Is there a pid file for keepalived?
                    with open(keepalived_pid_path, 'r') as pid_file:
                        pid = int(pid_file.readline())
                    os.kill(pid, 0)
    
                message = build_stats_message()
                sender.dosend(message)
    
            except IOError as e:
                # Missing PID file, skip health heartbeat
                if e.errno == errno.ENOENT:
                    LOG.error('Missing keepalived PID file %s, skipping health '
                              'heartbeat.', keepalived_pid_path)
                else:
                    LOG.error('Failed to check keepalived and haproxy status due '
                              'to exception %s, skipping health heartbeat.', e)
            except OSError as e:
                # Keepalived is not running, skip health heartbeat
                if e.errno == errno.ESRCH:
                    LOG.error('Keepalived is configured but not running, '
                              'skipping health heartbeat.')
                else:
                    LOG.error('Failed to check keepalived and haproxy status due '
                              'to exception %s, skipping health heartbeat.', e)
            except Exception as e:
                LOG.error('Failed to check keepalived and haproxy status due to '
                          'exception %s, skipping health heartbeat.', e)
    
            try:
                cmd = cmd_queue.get_nowait()
                if cmd == 'reload':
                    LOG.info('Reloading configuration')
                    CONF.reload_config_files()
                elif cmd == 'shutdown':
                    LOG.info('Health Manager Sender shutting down.')
                    break
            except queue.Empty:
                pass
            time.sleep(CONF.health_manager.heartbeat_interval)
    

    run_sender function 调用了 build_stats_message() 构建 heatbeats,然后调用 UDPStatusSender.dosend() 来发送数据。注意,当 keepalived 服务进程没有正常运行的时候,是不会发送 heatbeats 的。也就是说 keepalived 不正常的 amphora 就会被当作故障 amphora 处理。数据发送依旧使用了 UDP socket,目标 URL 由 CONF.health_manager.controller_ip_port_list 设定。

    # file: /etc/octavia/octavia.conf
    
    [health_manager]
    bind_port = 5555
    bind_ip = 192.168.0.4
    controller_ip_port_list = 192.168.0.4:5555
    

    简而言之,octavia-health-manager 与 amphora-agent 之间实现了周期性的心跳协议来监控 amphora 的健康状态。

    故障转移

    故障转移机制由 health_manager.HealthManager.health_check() 周期性监控和触发。

    health_check method 周期性的从 table amphora_health 获取所谓的 stale amphora 记录,也就是过期没有上报 heatbeats 被判定为故障的 amphora:

    # file: /opt/rocky/octavia/octavia/db/repositories.py
    
        def get_stale_amphora(self, session):
            """Retrieves a stale amphora from the health manager database.
    
            :param session: A Sql Alchemy database session.
            :returns: [octavia.common.data_model]
            """
    
            timeout = CONF.health_manager.heartbeat_timeout
            expired_time = datetime.datetime.utcnow() - datetime.timedelta(
                seconds=timeout)
    
            amp = session.query(self.model_class).with_for_update().filter_by(
                busy=False).filter(
                self.model_class.last_update < expired_time).first()
    
            if amp is None:
                return None
    
            amp.busy = True
    
            return amp.to_data_model()
    

    如果存在 stale amphora 并且 loadbalancer status 不处于 PENDING_UPDATE,那么就会进入 failover amphora 流程,failover amphora 的 taskflow 是 self._amphora_flows.get_failover_flow,UML 如下:

    在这里插入图片描述

    很明显,整个 failover_flow 分为 delete old amphora 和 get a new amphora 两大部分。

    • delete old amphora
      • MarkAmphoraPendingDeleteInDB
      • MarkAmphoraHealthBusy
      • ComputeDelete:删除 amphora
      • WaitForPortDetach:卸载 amphora 上的 port(s)
      • MarkAmphoraDeletedInDB

    NOTE:如果故障的 amphora 是一个 free amphora,那么直接删除掉即可。

    • get a new amphora
      • get_amphora_for_lb_subflow:获取一个可用的 free amphora
      • UpdateAmpFailoverDetails:将 old amphora 的信息(table amphora)更新到 new amphora
      • ReloadLoadBalancer & ReloadAmphora:从数据库获取 loadbalancer 和 amphora 的记录作为 stores 传入 flow 中
      • GetAmphoraeNetworkConfigs & GetListenersFromLoadbalancer & GetAmphoraeFromLoadbalancer:获取 listener、amphora 及其网络信息, 作为 stores 传入 flow 中,准备重建 amphora 网络模型
      • PlugVIPPort:为 amphora 设定 keepalived 的 VIP NIC
      • AmphoraPostVIPPlug:将 amphora 的 VIP NIC 注入 network namespace 中
      • update_amps_subflowAmpListenersUpdate:根据 listener 数据更新 amphora 的 haproxy 配置文件,该 flow 为 unordered 类型,所以如果存在多个 listener 则会并发执行。
      • CalculateAmphoraDelta:计算 amphora 需要的 NICs 和 amphora 已存在的 NICs 的差值
      • HandleNetworkDelta:根据上述的差值添加或删除 NICs
      • AmphoraePostNetworkPlug:添加一个 port 连接到 member 所处于的 subnet 中
      • ReloadLoadBalancer
      • MarkAmphoraMasterInDB
      • AmphoraUpdateVRRPInterface:根据 amphora 的 role 获取并更新 table amphora 中的 VRRP intreface name(字段:vrrp_interface)
      • CreateVRRPGroupForLB:根据 amphora 的 role 更新 loadbalancer’s 主从 amphorae 的 group
      • AmphoraVRRPUpdate:根据 amphora 的 role 更新 keepalived 服务进程的 VRRP 配置
      • AmphoraVRRPStart:启动 keepalived 服务进程
      • ListenersStart:启动 haproxy 服务进程
      • DisableAmphoraHealthMonitoring:删除对应的 amphora_health 数据库记录

    上述的其中绝大部分的 TASK 我们早已介绍过,这里简单描述关键 Task 的功能。

    最后简单终结一下 amphora failover 的思路,首先删除故障的 old amphora,然后获取一个可用的 new amphora,将 old 的关联系数据(e.g. database)以及对象(e.g. 网络模型)转移的 new。

    NOTE:

    It seems intuitive to boot an amphora prior to deleting the old amphora, however this is a complicated issue. If the target host (due to anit-affinity) is resource constrained, this will fail where a post-delete will succeed. Since this is async with the API it would result in the LB ending in ERROR though the amps are still alive.

    Consider in the future making this a complicated try-on-failure-retry flow, or move upgrade failovers to be synchronous with the API.

    For now spares pool and act/stdby will mitigate most of this delay.

    虽然故障转移就是 delete old amphora 然后 get new amphora,但实际上过程却是复杂的。例如:在删除 old amphora 成功后,创建 new amphora 却可能会由于资源限制导致失败;再例如:由于异步的 API 调用,所以也有可能 create new amphora 成功了,但 loadbalancer 的状态已变成 ERROR。对于异步 API 的问题,将来可能会考虑使用同步 API 来解决,但就目前来说更加依赖于 space amphora 来缓解异步创建的时延问题。

    故障迁移测试

    关闭 MASTER amphora 的电源,octavia-health-manager service 触发 amphora failover。

    Nov 22 11:22:31 control01 octavia-health-manager[29147]: INFO octavia.controller.healthmanager.health_manager [-] Stale amphora's id is: cd444019-ce8f-4f89-be6b-0edf76f41b77
    Nov 22 11:22:31 control01 octavia-health-manager[29147]: INFO octavia.controller.healthmanager.health_manager [-] Waiting for 1 failovers to finish
    

    old:

    | 2ddc4ba5-b829-4962-93d8-562de91f1dab | amphora-4ff5d6fe-854c-4022-8194-0c6801a7478b | ACTIVE | lb-mgmt-net=192.168.0.23                                                    | amphora-x64-haproxy      | m1.amphora |
    | b237b2b8-afe4-407b-83f2-e2e60361fa07 | amphora-bcff6f9e-4114-4d43-a403-573f1d97d27e | ACTIVE | lb-mgmt-net=192.168.0.11                                                    | amphora-x64-haproxy      | m1.amphora |
    | 46eccf47-be10-47ec-89b2-0de44ea3caec | amphora-cd444019-ce8f-4f89-be6b-0edf76f41b77 | ACTIVE | lb-mgmt-net=192.168.0.9; web-server-net=192.168.1.3; lb-vip-net=172.16.1.3  | amphora-x64-haproxy      | m1.amphora |
    | bc043b23-d481-45c4-9410-f7b349987c98 | amphora-a1c1ba86-6f99-4f60-b469-a4a29d7384c5 | ACTIVE | lb-mgmt-net=192.168.0.3; web-server-net=192.168.1.12; lb-vip-net=172.16.1.7 | amphora-x64-haproxy      | m1.amphora |
    

    new:

    | 712ff785-c082-4b53-994c-591d1ec0bf7b | amphora-caa6ba0f-1a68-4f22-9be9-8521695ac4f4 | ACTIVE | lb-mgmt-net=192.168.0.13                                                    | amphora-x64-haproxy      | m1.amphora |
    | 2ddc4ba5-b829-4962-93d8-562de91f1dab | amphora-4ff5d6fe-854c-4022-8194-0c6801a7478b | ACTIVE | lb-mgmt-net=192.168.0.23; web-server-net=192.168.1.4; lb-vip-net=172.16.1.3 | amphora-x64-haproxy      | m1.amphora |
    | b237b2b8-afe4-407b-83f2-e2e60361fa07 | amphora-bcff6f9e-4114-4d43-a403-573f1d97d27e | ACTIVE | lb-mgmt-net=192.168.0.11                                                    | amphora-x64-haproxy      | m1.amphora |
    | bc043b23-d481-45c4-9410-f7b349987c98 | amphora-a1c1ba86-6f99-4f60-b469-a4a29d7384c5 | ACTIVE | lb-mgmt-net=192.168.0.3; web-server-net=192.168.1.12; lb-vip-net=172.16.1.7 | amphora-x64-haproxy      | m1.amphora |
    

    new amphora haproxy config:

    # Configuration for loadbalancer 01197be7-98d5-440d-a846-cd70f52dc503
    global
        daemon
        user nobody
        log /dev/log local0
        log /dev/log local1 notice
        stats socket /var/lib/octavia/1385d3c4-615e-4a92-aea1-c4fa51a75557.sock mode 0666 level user
        maxconn 1000000
        external-check
    
    defaults
        log global
        retries 3
        option redispatch
    
    peers 1385d3c4615e4a92aea1c4fa51a75557_peers
        peer 3dVescsRZ-RdRBfYVLW6snVI9gI 172.16.1.3:1025
        peer l_Ustq0qE-h-_Q1dlXLXBAiWR8U 172.16.1.7:1025
    
    
    frontend 1385d3c4-615e-4a92-aea1-c4fa51a75557
        option httplog
        maxconn 1000000
        bind 172.16.1.10:8080
        mode http
            acl 8d9b8b1e-83d7-44ca-a5b4-0103d5f90cb9 req.hdr(host) -i -m beg server
        use_backend 8196f752-a367-4fb4-9194-37c7eab95714 if 8d9b8b1e-83d7-44ca-a5b4-0103d5f90cb9
            acl c76f36bc-92c0-4f48-8d57-a13e3b1f09e1 req.hdr(host) -i -m beg server
        use_backend 822f78c3-ea2c-4770-bef0-e97f1ac2eba8 if c76f36bc-92c0-4f48-8d57-a13e3b1f09e1
        default_backend 8196f752-a367-4fb4-9194-37c7eab95714
        timeout client 50000
    
    backend 8196f752-a367-4fb4-9194-37c7eab95714
        mode http
        balance roundrobin
        timeout check 10s
        option external-check
        external-check command /var/lib/octavia/ping-wrapper.sh
        fullconn 1000000
        option allbackups
        timeout connect 5000
        timeout server 50000
        server b6e464fd-dd1e-4775-90f2-4231444a0bbe 192.168.1.14:80 weight 1 check inter 5s fall 3 rise 3
    
    backend 822f78c3-ea2c-4770-bef0-e97f1ac2eba8
        mode http
        balance roundrobin
        timeout check 10s
        option external-check
        external-check command /var/lib/octavia/ping-wrapper.sh
        fullconn 1000000
        option allbackups
        timeout connect 5000
        timeout server 50000
        server 7da6f176-36c6-479a-9d86-c892ecca6ae5 192.168.1.6:80 weight 1 check inter 5s fall 3 rise 3
    

    new amphora keepalived config:

    vrrp_script check_script {
      script /var/lib/octavia/vrrp/check_script.sh
      interval 5
      fall 2
      rise 2
    }
    
    vrrp_instance 01197be798d5440da846cd70f52dc503 {
      state MASTER
      interface eth1
      virtual_router_id 1
      priority 100
      nopreempt
      garp_master_refresh 5
      garp_master_refresh_repeat 2
      advert_int 1
      authentication {
        auth_type PASS
        auth_pass b76d77e
      }
    
      unicast_src_ip 172.16.1.3
      unicast_peer {
        172.16.1.7
      }
    
      virtual_ipaddress {
        172.16.1.10
      }
      track_script {
        check_script
      }
    }
    

    haproxy 和 keepalived 的配置文件内容一致,转移成功。

  • 相关阅读:
    xp+Eclipse+Android开发环境搭建
    ADT下载地址整理
    Iris Network Traffic Analyzer简易教程
    cadence实用技巧
    How to solve "drivers/ner/igbvf/igbvf.h:129:15: error: duplicate member ‘page’"
    Python–ToDay(01)python基本数据类型 二进制—>十六进制
    OpenStack认识
    新的一天,我只想静静
    linux学习第一阶段
    冒泡排序
  • 原文地址:https://www.cnblogs.com/jmilkfan-fanguiju/p/10589750.html
Copyright © 2011-2022 走看看