基于ocata版本的,源码梳理
1)用户输入cinder service-list命令行,查看cinder服务的状态时,cinder的入口函数为cinder/api/contrib/services.py:Service:index方法
class ServiceController(wsgi.Controller): def __init__(self, ext_mgr=None): self.ext_mgr = ext_mgr super(ServiceController, self).__init__() self.volume_api = volume.API() def index(self, req): """Return a list of all running services. Filter by host & service name. """ context = req.environ['cinder.context'] authorize(context, action='index') detailed = self.ext_mgr.is_loaded('os-extended-services') now = timeutils.utcnow(with_timezone=True)------------------//获取controller 当前的时间 filters = {} if 'host' in req.GET: filters['host'] = req.GET['host'] if 'binary' in req.GET: filters['binary'] = req.GET['binary'] elif 'service' in req.GET: filters['binary'] = req.GET['service'] versionutils.report_deprecated_feature(LOG, _( "Query by service parameter is deprecated. " "Please use binary parameter instead.")) services = objects.ServiceList.get_all(context, filters)----------//从 db 获取所有的 cinder service 列表 svcs = [] for svc in services:----------------------------//循环每个 service updated_at = svc.updated_at delta = now - (svc.updated_at or svc.created_at)-------------//获取 updated_at。不存在的话,获取 created_at,并和当前时间计算时间差 delta_sec = delta.total_seconds() if svc.modified_at: delta_mod = now - svc.modified_at if abs(delta_sec) >= abs(delta_mod.total_seconds()): updated_at = svc.modified_at alive = abs(delta_sec) <= CONF.service_down_time------/获取时间差值的绝对值,并检查是否小于配置的 server_down_time,该配置项默认是60秒 art = (alive and "up") or "down"----------------------//如果差值小于60,则service 状态为 up,否则为 down active = 'enabled' if svc.disabled: active = 'disabled' if updated_at: updated_at = timeutils.normalize_time(updated_at) ret_fields = {'binary': svc.binary, 'host': svc.host, 'zone': svc.availability_zone, 'status': active, 'state': art, 'updated_at': updated_at} # On V3.7 we added cluster support if req.api_version_request.matches('3.7'): ret_fields['cluster'] = svc.cluster_name if detailed: ret_fields['disabled_reason'] = svc.disabled_reason if svc.binary == "cinder-volume": ret_fields['replication_status'] = svc.replication_status ret_fields['active_backend_id'] = svc.active_backend_id ret_fields['frozen'] = svc.frozen svcs.append(ret_fields) return {'services': svcs}
因此 service 的 up/down 状态取决于数据库中 service 表对应某 service 的行的 updated_at 列的值和当前 controller 节点的时间的差值是否在配置的范围之内,如果差值在设置的范围之内,那么就认为服务是up的,如果差值不在设置的范围之内,那么就认为服务时down的,那么每个服务的updated_at的值从如何更新的?
2、cinder各个服务对应数据库中update_at值的更新,这个字段的时间值,获取的是该服务运行在哪个物理节点上,就获取当前物理节点的时间值,更新到数据库值,计数器加1
cinder 的各种service,比如cinder-api,cinder-backup 等,都是cinder/service.py 文件中 class Service(service.Service) 的一个实例
(这个实例,采用一个manager,使能rpc,通过监听基于topic的队列,同时他还定期在manager上运行一个任务,上报他的状态给数据库服务表)
该类的start方法如下:
def start(self): version_string = version.version_string() LOG.info(_LI('Starting %(topic)s node (version %(version_string)s)'), {'topic': self.topic, 'version_string': version_string}) self.model_disconnected = False if self.coordination: coordination.COORDINATOR.start() self.manager.init_host(added_to_cluster=self.added_to_cluster,--------调用的是manager模块中init_host方法, service_id=Service.service_id)------------------这个方法的实现中依次调用driver.do_setup,driver.check_for_setup_error,driver.init_capabilities 三个函数,而函数init_capabilities中有会调动驱动的get_volume_stats函数来获取存储后端的存储状态信息 LOG.debug("Creating RPC server for service %s", self.topic) ctxt = context.get_admin_context() endpoints = [self.manager] endpoints.extend(self.manager.additional_endpoints) obj_version_cap = objects.Service.get_minimum_obj_version(ctxt) LOG.debug("Pinning object versions for RPC server serializer to %s", obj_version_cap) serializer = objects_base.CinderObjectSerializer(obj_version_cap) target = messaging.Target(topic=self.topic, server=self.host) self.rpcserver = rpc.get_server(target, endpoints, serializer) self.rpcserver.start() # NOTE(dulek): Kids, don't do that at home. We're relying here on # oslo.messaging implementation details to keep backward compatibility # with pre-Ocata services. This will not matter once we drop # compatibility with them. if self.topic == constants.VOLUME_TOPIC: target = messaging.Target( topic='%(topic)s.%(host)s' % {'topic': self.topic, 'host': self.host}, server=vol_utils.extract_host(self.host, 'host')) self.backend_rpcserver = rpc.get_server(target, endpoints, serializer) self.backend_rpcserver.start() # TODO(geguileo): In O - Remove the is_svc_upgrading_to_n part if self.cluster and not self.is_svc_upgrading_to_n(self.binary): LOG.info(_LI('Starting %(topic)s cluster %(cluster)s (version ' '%(version)s)'), {'topic': self.topic, 'version': version_string, 'cluster': self.cluster}) target = messaging.Target( topic='%s.%s' % (self.topic, self.cluster), server=vol_utils.extract_host(self.cluster, 'host')) serializer = objects_base.CinderObjectSerializer(obj_version_cap) self.cluster_rpcserver = rpc.get_server(target, endpoints, serializer) self.cluster_rpcserver.start() self.manager.init_host_with_rpc() if self.report_interval:-------------------------//如果设置了 report_interval 配置项,那么该 service 将启动一个无限循环来执行 report_state 方法, 运行间隔就是 report_interval,其默认值是 10 秒,即默认10上报一次状态 pulse = loopingcall.FixedIntervalLoopingCall(----------这是一个循环,self.report_state这个方法就是要循环执行的任务 self.report_state) pulse.start(interval=self.report_interval,-----------开始这个运行这个循环,s1 initial_delay=self.report_interval) self.timers.append(pulse) if self.periodic_interval: if self.periodic_fuzzy_delay: initial_delay = random.randint(0, self.periodic_fuzzy_delay) else: initial_delay = None periodic = loopingcall.FixedIntervalLoopingCall( self.periodic_tasks) periodic.start(interval=self.periodic_interval, initial_delay=initial_delay) self.timers.append(periodic)
s1方法的实现 def report_state(self):----更新服务的状态到数据库中 """Update the state of this service in the datastore.""" if not self.manager.is_working(): # NOTE(dulek): If manager reports a problem we're not sending # heartbeats - to indicate that service is actually down. LOG.error(_LE('Manager for service %(binary)s %(host)s is ' 'reporting problems, not sending heartbeat. ' 'Service will appear "down".'), {'binary': self.binary, 'host': self.host}) return ctxt = context.get_admin_context() zone = CONF.storage_availability_zone try: try: service_ref = objects.Service.get_by_id(ctxt,Service.service_id)-----根据service_id从数据库中获取service信息 except exception.NotFound: LOG.debug('The service database object disappeared, ' 'recreating it.') self._create_service_ref(ctxt) service_ref = objects.Service.get_by_id(ctxt,Service.service_id) service_ref.report_count += 1--------------更新报告计数器,加1 if zone != service_ref.availability_zone: service_ref.availability_zone = zone service_ref.save() # TODO(termie): make this pattern be more elegant. if getattr(self, 'model_disconnected', False): self.model_disconnected = False LOG.error(_LE('Recovered model server connection!'))
3、services表字段的内容
mysql> desc services; +------------------------+--------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +------------------------+--------------+------+-----+---------+----------------+ | created_at | datetime | YES | | NULL | | | updated_at | datetime | YES | | NULL | | | deleted_at | datetime | YES | | NULL | | | deleted | tinyint(1) | YES | | NULL | | | id | int(11) | NO | PRI | NULL | auto_increment | | host | varchar(255) | YES | | NULL | | | binary | varchar(255) | YES | | NULL | | | topic | varchar(255) | YES | | NULL | | | report_count | int(11) | NO | | NULL | | | disabled | tinyint(1) | YES | | NULL | | | availability_zone | varchar(255) | YES | | NULL | | | disabled_reason | varchar(255) | YES | | NULL | | | modified_at | datetime | YES | | NULL | | | rpc_current_version | varchar(36) | YES | | NULL | | | object_current_version | varchar(36) | YES | | NULL | | | replication_status | varchar(36) | YES | | NULL | | | frozen | tinyint(1) | YES | | NULL | | | active_backend_id | varchar(255) | YES | | NULL | | | cluster_name | varchar(255) | YES | | NULL | | +------------------------+--------------+------+-----+---------+----------------+ 19 rows in set (0.01 sec)
样例 mysql> select * from services limit 2G; *************************** 1. row *************************** created_at: 2018-08-16 07:29:20 updated_at: 2019-06-14 09:22:23 deleted_at: NULL deleted: 0 id: 1 host: 10.24.1.9 binary: cinder-scheduler topic: cinder-scheduler report_count: 838433 disabled: 0 availability_zone: nova disabled_reason: NULL modified_at: NULL rpc_current_version: 3.5 object_current_version: 1.21 replication_status: not-capable frozen: 0 active_backend_id: NULL cluster_name: NULL *************************** 2. row ***************************