zoukankan      html  css  js  c++  java
  • Eureka功能和可用性解读

    元数据

    除了普通的基础设置之外,eureka支持自定义元数据。配置方式如下

    eureka:
    	instance:
    			 metadata-map:
    							cluster: cl1
    							name: zhaozhen
    

    获取元数据代码

            List<ServiceInstance> list = discoveryClient.getInstances("zhao-service-resume");
            ServiceInstance serviceInstance = list.get(0);
            list.stream().forEach(s->{
                System.out.println(s.getMetadata());
            });
    

    在调用时通过断点可以知道具体的元数据。在实际使用过程中,我们可以针对配置的不同元数据采取不同的执行
    file

    可用性

    从技术网站上搜到的一个面试题就有这样的问题:eureka怎么保证可用性.
    众所周知,eureka采用的是AP模式,实现高可用最好的方式就是利用最少三台eureke server实例,实现两两之间的服务注册。从而达到同步数据的目的
    那么这就涉及到如下的方面

    • eureka client和eureka server之间如何进行通信
    • eureka注册在客户端和服务端分别怎么操作实现可用性的
    • eureka续约/心跳在客户端和服务端分别怎么操作实现可用性的
    • eureka下线是怎么操作的

    eureka client和eureka server之间如何进行通信

    通过查询各种资料并追踪自动配置类发现,eureka和eureka之间的通信是采用类似springmvc的Jersey框架暴露接口进行通信的。通信的形式基本类似于我们使用http进行请求的方式。在EurekaServerAutoConfiguration中通过注入FilterRegistrationBean实现了在filter中加入包含了指定包名下的所有的Jersey的外部接口

    /**
    	 * Register the Jersey filter
    	 */
    	@Bean
    	public FilterRegistrationBean jerseyFilterRegistration(
    			javax.ws.rs.core.Application eurekaJerseyApp) {
    		FilterRegistrationBean bean = new FilterRegistrationBean();
    		bean.setFilter(new ServletContainer(eurekaJerseyApp));
    		bean.setOrder(Ordered.LOWEST_PRECEDENCE);
    		bean.setUrlPatterns(
    				Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
    
    		return bean;
    	}
    
    	/**
    	 * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
    	 * required by the Eureka server.
    	 */
    	@Bean
    	public javax.ws.rs.core.Application jerseyApplication(Environment environment,
    			ResourceLoader resourceLoader) {
    
    		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
    				false, environment);
    
    		// Filter to include only classes that have a particular annotation.
    		//
    		provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
    		provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    
    		// Find classes in Eureka packages (or subpackages)
    		//
    		Set<Class<?>> classes = new HashSet<>();
    		for (String basePackage : EUREKA_PACKAGES) {
    			Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
    			for (BeanDefinition bd : beans) {
    				Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
    						resourceLoader.getClassLoader());
    				classes.add(cls);
    			}
    		}
    
    		// Construct the Jersey ResourceConfig
    		//
    		Map<String, Object> propsAndFeatures = new HashMap<>();
    		propsAndFeatures.put(
    				// Skip static content used by the webapp
    				ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
    				EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
    
    		DefaultResourceConfig rc = new DefaultResourceConfig(classes);
    		rc.setPropertiesAndFeatures(propsAndFeatures);
    
    		return rc;
    	}
    

    代码中扫描的EUREKA_PACKAGES(private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",
    "com.netflix.eureka" };)即是Jersey框架的具体的接口类
    file

    另外可以提一点的就是,eureka对外暴露的dashboard依然采用的是springmvc的controller形式。具体的可以看到在EurekaServerAutoConfiguration中注入的EurekaController

    	@Bean
    	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
    	public EurekaController eurekaController() {
    		return new EurekaController(this.applicationInfoManager);
    	}
    

    感兴趣的可以再研究下后续EurekaController的内部实现

    eureka注册在客户端和服务端分别怎么操作实现可用性的

    服务每隔30秒会向注册中⼼续约(⼼跳)⼀次(也称为报活),如果没有续约,租约在90秒后到期,然后服务会被失效。每隔30秒的续约操作我们称之为⼼跳检测
    首先在服务端,通过上述的Jersey框架暴露的接口进行注册,在ApplicationResource中通过addInstance进行注册,在这个过程中另一个eureka server也相当于是一个eureka client,同样会进行注册
    file
    通过addInstance中的register方法,一直向下调试到PeerAwareInstanceRegistryImpl的replicateInstanceActionsToPeers相互注册方法

       /**
         * Replicates all instance changes to peer eureka nodes except for
         * replication traffic to this node.
         *
         */
        private void replicateInstanceActionsToPeers(Action action, String appName,
                                                     String id, InstanceInfo info, InstanceStatus newStatus,
                                                     PeerEurekaNode node) {
            try {
                InstanceInfo infoFromRegistry = null;
                CurrentRequestVersion.set(Version.V2);
                switch (action) {
                    case Cancel:
                        node.cancel(appName, id);
                        break;
                    case Heartbeat:
                        InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                        break;
                    case Register:
                        node.register(info);
                        break;
                    case StatusUpdate:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                        break;
                    case DeleteStatusOverride:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.deleteStatusOverride(appName, id, infoFromRegistry);
                        break;
                }
            } catch (Throwable t) {
                logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
            }
        }
    

    此时,注册时,进入的是Register

        public void register(final InstanceInfo info) throws Exception {
            long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
            batchingDispatcher.process(
                    taskId("register", info),
                    new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                        public EurekaHttpResponse<Void> execute() {
                            return replicationClient.register(info);
                        }
                    },
                    expiryTime
            );
        }
    

    查阅源码可知此处的getLeaseRenewalOf(info)的默认值为90秒,这就印证了90秒到期的说法

        private static int getLeaseRenewalOf(InstanceInfo info) {
            return (info.getLeaseInfo() == null ? Lease.DEFAULT_DURATION_IN_SECS : info.getLeaseInfo().getRenewalIntervalInSecs()) * 1000;
        }
    

    发起请求

        @Override
        public EurekaHttpResponse<Void> register(InstanceInfo info) {
            String urlPath = "apps/" + info.getAppName();
            ClientResponse response = null;
            try {
                Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
                addExtraHeaders(resourceBuilder);
                response = resourceBuilder
                        .header("Accept-Encoding", "gzip")
                        .type(MediaType.APPLICATION_JSON_TYPE)
                        .accept(MediaType.APPLICATION_JSON)
                        .post(ClientResponse.class, info);
                return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                            response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }
    

    发起请求的地址可以追踪到的是ApplicationsResource中的

        @Path("{appId}")
        public ApplicationResource getApplicationResource(
                @PathParam("version") String version,
                @PathParam("appId") String appId) {
            CurrentRequestVersion.set(Version.toEnum(version));
            return new ApplicationResource(appId, serverConfig, registry);
        }
    

    此处重新构建了一个ApplicationResource对象。并将服务的信息配置等传递到application中,等待后续使用
    分析完这一段之后,我对addInstance如何接收请求的还是有疑问,经过断点调试发现,这个过程实际上是通过EurekaServerAutoConfiguration引入的 EurekaServerInitializerConfiguration来完成的,

    @Configuration
    public class EurekaServerInitializerConfiguration
    		implements ServletContextAware, SmartLifecycle, Ordered {
    }
    

    EurekaServerInitializerConfiguration实现了SmartLifecycle方法,start方法会再容器初始化时执行。而start方法的内容

    @Override
    	public void start() {
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					//TODO: is this class even needed now?
    					eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
    					log.info("Started Eureka Server");
    
    					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
    					EurekaServerInitializerConfiguration.this.running = true;
    					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
    				}
    				catch (Exception ex) {
    					// Help!
    					log.error("Could not initialize Eureka servlet context", ex);
    				}
    			}
    		}).start();
    	}
    

    具体的业务内容在

    	public void contextInitialized(ServletContext context) {
    		try {
    			initEurekaEnvironment();
    			initEurekaServerContext();
    
    			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    		}
    		catch (Throwable e) {
    			log.error("Cannot bootstrap eureka server :", e);
    			throw new RuntimeException("Cannot bootstrap eureka server :", e);
    		}
    	}
    

    第一步initEurekaEnvironment为初始化环境,第二步initEurekaServerContext为业务操作
    而随后的操作中最主要的是

    		int registryCount = this.registry.syncUp();
    		this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
    		// Register all monitoring statistics.
    		EurekaMonitors.registerAllStats();
    

    openForTraffic中主要是为开启服务通信做准备

      @Override
        public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
            // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
            this.expectedNumberOfClientsSendingRenews = count;
            updateRenewsPerMinThreshold();
            logger.info("Got {} instances from neighboring DS node", count);
            logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
            this.startupTime = System.currentTimeMillis();
            if (count > 0) {
                this.peerInstancesTransferEmptyOnStartup = false;
            }
            DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
            boolean isAws = Name.Amazon == selfName;
            if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
                logger.info("Priming AWS connections for all replicas..");
                primeAwsReplicas(applicationInfoManager);
            }
            logger.info("Changing status to UP");
            applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
            super.postInit();
        }
    

    引发向addIntsance发起请求的就是 applicationInfoManager.setInstanceStatus(InstanceStatus.UP);这个方法内部执行一串事件
    其中就有向addInstance发起请求的

    public synchronized void setInstanceStatus(InstanceStatus status) {
            InstanceStatus next = instanceStatusMapper.map(status);
            if (next == null) {
                return;
            }
    
            InstanceStatus prev = instanceInfo.setStatus(next);
            if (prev != null) {
                for (StatusChangeListener listener : listeners.values()) {
                    try {
                        listener.notify(new StatusChangeEvent(prev, next));
                    } catch (Exception e) {
                        logger.warn("failed to notify listener: {}", listener.getId(), e);
                    }
                }
            }
        }
    

    DiscoveryClient类内部

             statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    

    指向

        public void run() {
            try {
                discoveryClient.refreshInstanceInfo();
    
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    

    最后即是向addInstance发起请求的地方

        boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
        }
    

    发起请求即是向ApplicationResource的Instance方法发起。

    eureka续约在客户端和服务端分别怎么操作实现可用性的

    从上面注册中可推测出续约/心跳接口可能也是在DiscoveryClient中完成的。搜索HeatBeat之后发现注入 DiscoveryClient方法中有一个初始化定时任务的方法

    private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    
            if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    

    其中就有心跳的定时任务。默认的心跳间隔时间renewalIntervalInSecs为30秒

        /**
         * The heartbeat task that renews the lease in the given intervals.
         */
        private class HeartbeatThread implements Runnable {
    
            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }
    

    renew方法中即是向服务端发起调用的过程,与上述注册基本相同

    Eureka下线服务

    Eureka下线是在EurekaClientAutoConfiguration中注入EurekaClient时定义的shutDown方法。
    我们可以看到

       @PreDestroy
        @Override
        public synchronized void shutdown() {
            if (isShutdown.compareAndSet(false, true)) {
                logger.info("Shutting down DiscoveryClient ...");
    
                if (statusChangeListener != null && applicationInfoManager != null) {
                    applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
                }
    
                cancelScheduledTasks();
    
                // If APPINFO was registered
                if (applicationInfoManager != null
                        && clientConfig.shouldRegisterWithEureka()
                        && clientConfig.shouldUnregisterOnShutdown()) {
                    applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                    unregister();
                }
    
                if (eurekaTransport != null) {
                    eurekaTransport.shutdown();
                }
    
                heartbeatStalenessMonitor.shutdown();
                registryStalenessMonitor.shutdown();
    
                logger.info("Completed shut down of DiscoveryClient");
            }
        }
    

    同样的。执行了一个取消定时任务的状态。。另外利用上面说的applicationInfoManager.setInstanceStatus()方法进行了事件通知,另外unregister();进行了取消注册操作。eurekaTransport.shutdown();关闭传输。

    Eureka的功能特性总体上来说就是这样。有些地方可能还是不够清楚。欢迎大家一起沟通探讨

    欢迎搜索关注本人与朋友共同开发的微信面经小程序【大厂面试助手】和公众号【微瞰技术】,以及总结的分类面试题https://github.com/zhendiao/JavaInterview

    file
    file

  • 相关阅读:
    break-continue
    函数定义
    函数类型
    为何要继承SpringBootServletInitializer,为何要实现configure这方法
    查询一个表中的两个字段值相同的数据
    数据库中查出来的时间多8小时&查询数据正常展示少8小时
    @JsonFormat与@DateTimeFormat注解的使用
    用js获取当前月份的天数
    js获取当前年,月,日,时,分,秒
    maven配置和安装
  • 原文地址:https://www.cnblogs.com/zhendiao/p/14988340.html
Copyright © 2011-2022 走看看