zoukankan      html  css  js  c++  java
  • 超详细的Eureka源码解析

    Eureka简介

    Eureka是什么?

    Eureka是基于REST(Representational State Transfer)服务,主要以AWS云服务为支撑,提供服务发现并实现负载均衡和故障转移。我们称此服务为Eureka服务。Eureka提供了Java客户端组件,Eureka Client,方便与服务端的交互。客户端内置了基于round-robin实现的简单负载均衡。在Netflix,为Eureka提供更为复杂的负载均衡方案进行封装,以实现高可用,它包括基于流量、资源利用率以及请求返回状态的加权负载均衡。

    Eureka架构

    Eureka架构从CAP理论看,Eureka是一个AP系统,优先保证可用性(A)和分区容错性(P),Eureka里使用了大量的缓存。

    Eureka中的一些概念

    • Register :服务注册

    Eureka客户端向Eureka Server注册时,它提供自身的元数据,比如IP地址、端口等

    • Renew:服务续约

    Eureka客户端会每隔30秒发送一次心跳来续约。通过续约来告知Eureka Server该客户端仍然存在。

    • Fetch Registries:获取注册列表信息

    Eureka客户端从服务器获取注册表信息,将其缓存到本地。客户端会使用该信息查找其他服务,从而进行远程调用。该注册列表信息定期(每30秒)更新一次。

    • Cancel:服务下线

    Eureka客户端在程序关闭时向Eureka服务器发送取消请求。

    • Eviction:服务剔除

    在默认情况下,当Eureka客户端90秒没有向Eureka服务器发送续约,Eureka服务器就会将该服务实例从服务注册列表删除。

    除了以上的特性外,Eureka的缓存机制也非常经典,下面详细介绍一下。

    Eureka缓存

    Eureka Server里存在三个变量(registry、readWriteCacheMap、readOnlyCacheMap)保存服务注册信息。

    Eureka客户端向服务端注册之后,数据会立即同步到readWriteCacheMap和registry。

    Eureka客户端想查看注册信息,每隔30秒从readOnlyCacheMap拉取。

    readOnlyCacheMap会通过定时器每30秒从readWriteCacheMap拉取。

    还有一个线程每隔60会将90秒都没有续约的服务剔除出去。

    变量 类型 说明
    registry ConcurrentHashMap 实时更新,类AbstractInstanceRegistry成员变量,UI端请求的是这里的服务注册信息
    readWriteCacheMap Guava Cache 实时更新,类ResponseCacheImpl成员变量,缓存时间180秒
    readOnlyCacheMap ConcurrentHashMap 周期更新,类ResponseCacheImpl成员变量,默认每30s从readWriteCacheMap更新,Eureka client默认从这里更新服务注册信息,可配置直接从readWriteCacheMap更新

    Eureka Client

    本文使用的是2.0.2.RELEASE版本

    接下来开始分析Eureka Client的源码。引入spring-cloud-starter-netflix-eureka-client后,Eureka Client会自动启用。EurekaClientAutoConfiguration配置类生效,会注入Bean CloudEurekaClient,然后调用父类DiscoveryClient的构造方法。

    @Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider) {
       //省略部分代码
       //如果配置不用注册到Eureka && 配置不用从注册中心获取配置,则不用初始化相关组件
            if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
                logger.info("Client configured to neither register nor query for data.");
                scheduler = null;
                heartbeatExecutor = null;
                cacheRefreshExecutor = null;
                eurekaTransport = null;
                instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
                // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
                // to work with DI'd DiscoveryClient
                DiscoveryManager.getInstance().setDiscoveryClient(this);
                DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
                initTimestampMs = System.currentTimeMillis();
                logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                        initTimestampMs, this.getApplications().size());
    
                return;  // no need to setup up an network tasks and we are done
            }
    
            try {
                // default size of 2 - 1 each for heartbeat and cacheRefresh
                //初始化定时调度器
                scheduler = Executors.newScheduledThreadPool(2,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
                //发送心跳的线程池
                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
                //注册信息缓存刷新的线程池
                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                eurekaTransport = new EurekaTransport();
                scheduleServerEndpointTask(eurekaTransport, args);
    
                AzToRegionMapper azToRegionMapper;
                if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                    azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
                } else {
                    azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
                }
                if (null != remoteRegionsToFetch.get()) {
                    azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
                }
                instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
            } catch (Throwable e) {
                throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
            }
    
            if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
                fetchRegistryFromBackup();
            }
    
            // call and execute the pre registration handler before all background tasks (inc registration) is started
            if (this.preRegistrationHandler != null) {
                this.preRegistrationHandler.beforeRegistration();
            }
    
            if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
                try {
                    if (!register() ) {
                        throw new IllegalStateException("Registration error at startup. Invalid server response.");
                    }
                } catch (Throwable th) {
                    logger.error("Registration error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
    
            // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
            //初始化定时任务,服务心跳、服务注册、服务列表获取等功能在此处完成
            initScheduledTasks();
    
            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register timers", e);
            }
    
            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());
        }
    

    接下来看initScheduledTasks方法

    
        private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                //默认30s
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                //缓存刷新定时任务
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                //1. 缓存刷新具体逻辑
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    
            if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                //2. 心跳具体逻辑
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    
                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
                
                //状态改变监听
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
    
               //3. clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()默认40s,服务注册就在这个方法里完成。
               instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }
    

    再来细看上面3个主要方法的具体逻辑。

    1. 缓存刷新

     class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }
    
     void refreshRegistry() {
           //省略部分代码
           //获取服务列表信息
            boolean success = fetchRegistry(remoteRegionsModified);
           //省略部分代码
    
    
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // If the delta is disabled or if it is the first time, get all
                // applications
                Applications applications = getApplications();
    
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    //全量获取服务列表缓存在本地
                    getAndStoreFullRegistry();
                } else {
                   //更新服务列表 getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            // Notify about cache refresh before updating the instance remote status
            onCacheRefreshed();
    
            // Update remote status based on refreshed data held in the cache
            updateInstanceRemoteStatus();
    
            // registry was fetched successfully, so return true
            return true;
        }
    

    然后调用EurekaHttpClient接口的方法去获取服务列表。请求发送通过jersey

    2. 服务心跳

    继续跟踪HeartbeatThread方法

      private class HeartbeatThread implements Runnable {
    
            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }
    
    boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == 404) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    //调用注册方法
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == 200;
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }
    

    3. 服务注册

    跟踪instanceInfoReplicator.start方法

     public void start(int initialDelayMs) {
            if (started.compareAndSet(false, true)) {
                //设置标识,为了启动时进行服务注册
                instanceInfo.setIsDirty();  // for initial register
                
                //延迟40s执行,执行的是this对象的run方法
                Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    
    public void run() {
            try {
                discoveryClient.refreshInstanceInfo();
                //刚才start方法中,设置了标识,所以此处dirtyTimestamp不为空
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                   //服务注册
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                //递归性的延迟30s执行当前run方法
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    

    4. 服务关闭

    服务关闭之后,会回调到EurekaAutoServiceRegistration类的stop方法,回调的方法是:

    @EventListener(ContextClosedEvent.class)
    	public void onApplicationEvent(ContextClosedEvent event) {
    		if( event.getApplicationContext() == context ) {
    		    //服务关闭
    			stop();
    		}
    	}
    

    SmartLifecycle接口也有这个作用,不过我本地使用了一下,是通过ContextClosedEvent来回调的。

    public void stop() {
    		this.serviceRegistry.deregister(this.registration);
    		this.running.set(false);
    	}
    
    	@Override
    	public void deregister(EurekaRegistration reg) {
    		if (reg.getApplicationInfoManager().getInfo() != null) {
    
    			if (log.isInfoEnabled()) {
    				log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName()
    						+ " with eureka with status DOWN");
    			}
                //状态改为DOWN
    			reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
    
    			//shutdown of eureka client should happen with EurekaRegistration.close()
    			//auto registration will create a bean which will be properly disposed
    			//manual registrations will need to call close()
    		}
    	}
    

    从上文分析得知Eureka Client调取服务端的接口都是通过EurekaHttpClient接口,而最终发送请求的httpClient是jersey里面的ApacheHttpClient4。

    public interface EurekaHttpClient {
    
        EurekaHttpResponse<Void> register(InstanceInfo info);
    
        EurekaHttpResponse<Void> cancel(String appName, String id);
    
        EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus);
    
        EurekaHttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus newStatus, InstanceInfo info);
    
        EurekaHttpResponse<Void> deleteStatusOverride(String appName, String id, InstanceInfo info);
    
        EurekaHttpResponse<Applications> getApplications(String... regions);
    
        EurekaHttpResponse<Applications> getDelta(String... regions);
    
        EurekaHttpResponse<Applications> getVip(String vipAddress, String... regions);
    
        EurekaHttpResponse<Applications> getSecureVip(String secureVipAddress, String... regions);
    
        EurekaHttpResponse<Application> getApplication(String appName);
    
        EurekaHttpResponse<InstanceInfo> getInstance(String appName, String id);
    
        EurekaHttpResponse<InstanceInfo> getInstance(String id);
    
        void shutdown();
    }
    

    Eureka Server

    Eureka Server需要做的事有:

    • 维护服务注册信息列表
    • 接收客户端的register、renew、cancel等请求
    • Eureka Server多节点之间的数据复制同步

    项目启动时,EurekaServerAutoConfiguration会被自动注入到容器中。

    @Configuration
    @Import(EurekaServerInitializerConfiguration.class)
    @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
    @EnableConfigurationProperties({ EurekaDashboardProperties.class,
    		InstanceRegistryProperties.class })
    @PropertySource("classpath:/eureka/server.properties")
    public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
        //省略部分代码
    
    	@Configuration
    	protected static class EurekaServerConfigBeanConfiguration {
    		@Bean
    		@ConditionalOnMissingBean
    		public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
    			EurekaServerConfigBean server = new EurekaServerConfigBean();
    			if (clientConfig.shouldRegisterWithEureka()) {
    				// Set a sensible default if we are supposed to replicate
    				server.setRegistrySyncRetries(5);
    			}
    			return server;
    		}
    	}
        
        //Eureka管理页面的Controller
    	@Bean
    	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
    	public EurekaController eurekaController() {
    		return new EurekaController(this.applicationInfoManager);
    	}
    
    	static {
    		CodecWrappers.registerWrapper(JACKSON_JSON);
    		EurekaJacksonCodec.setInstance(JACKSON_JSON.getCodec());
    	}
    
    	@Bean
    	public ServerCodecs serverCodecs() {
    		return new CloudServerCodecs(this.eurekaServerConfig);
    	}
    
    	private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) {
    		CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName());
    		return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec;
    	}
    
    	private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) {
    		CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName());
    		return codec == null ? CodecWrappers.getCodec(CodecWrappers.XStreamXml.class)
    				: codec;
    	}
    
    	class CloudServerCodecs extends DefaultServerCodecs {
    
    		public CloudServerCodecs(EurekaServerConfig serverConfig) {
    			super(getFullJson(serverConfig),
    					CodecWrappers.getCodec(CodecWrappers.JacksonJsonMini.class),
    					getFullXml(serverConfig),
    					CodecWrappers.getCodec(CodecWrappers.JacksonXmlMini.class));
    		}
    	}
    
        //处理Eureka Client的register、renew、cancel等请求
    	@Bean
    	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
    			ServerCodecs serverCodecs) {
    		this.eurekaClient.getApplications(); // force initialization
    		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
    				serverCodecs, this.eurekaClient,
    				this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
    				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    	}
    
        //处理Eureka Server多节点同步
    	@Bean
    	@ConditionalOnMissingBean
    	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
    			ServerCodecs serverCodecs) {
    		return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
    				this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
    	}
    	
    	//省略部分代码
    

    1. 请求接受处理

    InstanceResource类主要用于接受请求,收到请求后调用InstanceRegistry类的方法进行处理。以renew为例:

     @PUT
        public Response renewLease(
                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
                @QueryParam("overriddenstatus") String overriddenStatus,
                @QueryParam("status") String status,
                @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
            boolean isFromReplicaNode = "true".equals(isReplication);
            boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
            //...
    

    2. 服务剔除

    EurekaServerAutoConfiguration类引入了EurekaServerInitializerConfiguration类。容器初始化会触发start方法,start方法如下:

    @Override
    	public void start() {
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					//TODO: is this class even needed now?
    					//初始化方法
    					eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
    					log.info("Started Eureka Server");
    
    					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
    					EurekaServerInitializerConfiguration.this.running = true;
    					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
    				}
    				catch (Exception ex) {
    					// Help!
    					log.error("Could not initialize Eureka servlet context", ex);
    				}
    			}
    		}).start();
    	}
    
    eurekaServerBootstrap.contextInitialized()
    -》this.registry.openForTraffic(this.applicationInfoManager, registryCount);
        -》 super.postInit();
    

    postInit代码如下:

     protected void postInit() {
            renewsLastMin.start();
            if (evictionTaskRef.get() != null) {
                evictionTaskRef.get().cancel();
            }
            //服务剔除定时任务
            evictionTaskRef.set(new EvictionTask());
            evictionTimer.schedule(evictionTaskRef.get(),
            //延迟60s,每60执行一次
                    serverConfig.getEvictionIntervalTimerInMs(),
                    serverConfig.getEvictionIntervalTimerInMs());
        }
    

    3. readOnlyCacheMap缓存周期更新

    DefaultEurekaServerContext类的initialize方法上加了@PostConstruct注解,会在bean构造后被执行:

      @PostConstruct
        @Override
        public void initialize() {
            logger.info("Initializing ...");
            peerEurekaNodes.start();
            try {
                registry.init(peerEurekaNodes);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            logger.info("Initialized");
        }
    

    init()-》 initializedResponseCache()-》new ResponseCacheImpl

    ResponseCacheImpl方法如下:

    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
            this.serverConfig = serverConfig;
            this.serverCodecs = serverCodecs;
            this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
            this.registry = registry;
    
            long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
            //readWriteCacheMap是guava缓存,缓存加载是用的load方法里的实现
            this.readWriteCacheMap =
                    CacheBuilder.newBuilder().initialCapacity(1000)
                            .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                            .removalListener(new RemovalListener<Key, Value>() {
                                @Override
                                public void onRemoval(RemovalNotification<Key, Value> notification) {
                                    Key removedKey = notification.getKey();
                                    if (removedKey.hasRegions()) {
                                        Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                        regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                    }
                                }
                            })
                            .build(new CacheLoader<Key, Value>() {
                                @Override
                                public Value load(Key key) throws Exception {
                                    if (key.hasRegions()) {
                                        Key cloneWithNoRegions = key.cloneWithoutRegions();
                                        regionSpecificKeys.put(cloneWithNoRegions, key);
                                    }
                                    Value value = generatePayload(key);
                                    return value;
                                }
                            });
            
            if (shouldUseReadOnlyResponseCache) {
            //定时30s刷新缓存,具体逻辑在getCacheUpdateTask
                timer.schedule(getCacheUpdateTask(),
                        new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                                + responseCacheUpdateIntervalMs),
                        responseCacheUpdateIntervalMs);
            }
    
            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
            }
        }
    
    private TimerTask getCacheUpdateTask() {
            return new TimerTask() {
                @Override
                public void run() {
                    logger.debug("Updating the client cache from response cache");
                    for (Key key : readOnlyCacheMap.keySet()) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                    key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                        }
                        try {
                            CurrentRequestVersion.set(key.getVersion());
                            Value cacheValue = readWriteCacheMap.get(key);
                            Value currentCacheValue = readOnlyCacheMap.get(key);
                            //对比值,不同的话readOnlyCacheMap取readWriteCacheMap里的值放入。
                            if (cacheValue != currentCacheValue) {
                                readOnlyCacheMap.put(key, cacheValue);
                            }
                        } catch (Throwable th) {
                            logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                        }
                    }
                }
            };
        }
    

    番外:源码里有一个这个东西,存最近的数据,如果有相同需求可以借鉴

     private class CircularQueue<E> extends ConcurrentLinkedQueue<E> {
            private int size = 0;
    
            public CircularQueue(int size) {
                this.size = size;
            }
    
            @Override
            public boolean add(E e) {
                this.makeSpaceIfNotAvailable();
                return super.add(e);
    
            }
    
            private void makeSpaceIfNotAvailable() {
                if (this.size() == size) {
                    this.remove();
                }
            }
    
            public boolean offer(E e) {
                this.makeSpaceIfNotAvailable();
                return super.offer(e);
            }
        }
    
    书山有路勤为径,学海无涯苦作舟
  • 相关阅读:
    表相关操作
    表的约束
    windows平台MySQL安装
    网络编程2
    Python元类
    并发编程这个只是占位使用而已
    并发编程2
    并发编程1
    Mac装机神器Homebrew
    基于Django框架开发BBS项目
  • 原文地址:https://www.cnblogs.com/javammc/p/15403279.html
Copyright © 2011-2022 走看看