zoukankan      html  css  js  c++  java
  • Eureka Server的多级缓存和过期机制

    Eureka Server的多级缓存和过期机制

    多级缓存

    之前写eureka-client客户端全量请求的时候,会走到缓存这边,下面就具体的看一下,server的多级缓存是怎么回事?

    eureka client初始化的时候,就会自动发送个请求到eureka server拉一次清抓取全量的注册表,这一讲,我们来看看eureka server端如何处理抓取全量注册表的请求的,eureka client发送的请求是:http://localhost:8080/v2/apps/,get请求 ApplicationsResource的getContainers()方法,获取全量注册表的方法

    com.netflix.eureka.registry.ResponseCache,响应缓存接口,接口代码如下:

    public interface ResponseCache {
    
        String get(Key key);
        
        byte[] getGZIP(Key key);
        
        void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);
    
        AtomicLong getVersionDelta();
        
        AtomicLong getVersionDeltaWithRegions();
    
    }
    

    com.netflix.eureka.registry.Key,缓存键。实现代码如下:

    public class Key {
    
        public enum KeyType {
            JSON, XML
        }
    
        /**
         * An enum to define the entity that is stored in this cache for this key.
         */
        public enum EntityType {
            Application, VIP, SVIP
        }
    
        /**
         * 实体名
         */
        private final String entityName;
        /**
         * TODO[0009]:RemoteRegionRegistry
         */
        private final String[] regions;
        /**
         * 请求参数类型
         */
        private final KeyType requestType;
        /**
         * 请求 API 版本号
         */
        private final Version requestVersion;
        /**
         * hashKey
         */
        private final String hashKey;
        /**
         * 实体类型
         *
         * {@link EntityType}
         */
        private final EntityType entityType;
        /**
         * {@link EurekaAccept}
         */
        private final EurekaAccept eurekaAccept;
        
        public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
            this.regions = regions;
            this.entityType = entityType;
            this.entityName = entityName;
            this.requestType = type;
            this.requestVersion = v;
            this.eurekaAccept = eurekaAccept;
            hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
                    + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
        }
        
        public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
            this.regions = regions;
            this.entityType = entityType;
            this.entityName = entityName;
            this.requestType = type;
            this.requestVersion = v;
            this.eurekaAccept = eurekaAccept;
            hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
                    + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
        }
        
        @Override
        public int hashCode() {
            String hashKey = getHashKey();
            return hashKey.hashCode();
        }
    
        @Override
        public boolean equals(Object other) {
            if (other instanceof Key) {
                return getHashKey().equals(((Key) other).getHashKey());
            } else {
                return false;
            }
        }
        
    }
    

    具体的缓存是在它的实现类ResponseCacheImpl中,在ResponseCacheImpl初始化的时候,就会定义一个readWriteCacheMap,指定它的一些策略。

       ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
            this.serverConfig = serverConfig;
            this.serverCodecs = serverCodecs;
            this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
            this.registry = registry;
    
            long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
            this.readWriteCacheMap =
                    CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                            .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                            .removalListener(new RemovalListener<Key, Value>() {
                                @Override
                                public void onRemoval(RemovalNotification<Key, Value> notification) {
                                    Key removedKey = notification.getKey();
                                    if (removedKey.hasRegions()) {
                                        Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                        regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                    }
                                }
                            })
                //这里的话,就是如果在缓存中找不到的话,就会generatePayload从这里获取,抓取全量注册表,后面会单独写个demo测试一下。
                            .build(new CacheLoader<Key, Value>() {
                                @Override
                                public Value load(Key key) throws Exception {
                                    if (key.hasRegions()) {
                                        Key cloneWithNoRegions = key.cloneWithoutRegions();
                                        regionSpecificKeys.put(cloneWithNoRegions, key);
                                    }
                                    Value value = generatePayload(key);
                                    return value;
                                }
                            });
    
            if (shouldUseReadOnlyResponseCache) {
                timer.schedule(getCacheUpdateTask(),
                        new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                                + responseCacheUpdateIntervalMs),
                        responseCacheUpdateIntervalMs);
            }
    
            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
            }
        }
    

    下面的代码就是先走

          public String get(final Key key) {
            return get(key, shouldUseReadOnlyResponseCache);
        }
    
        @VisibleForTesting
        String get(final Key key, boolean useReadOnlyCache) {
            Value payload = getValue(key, useReadOnlyCache);
            if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
                return null;
            } else {
                return payload.getPayload();
            }
        }  
    
      @VisibleForTesting
        Value getValue(final Key key, boolean useReadOnlyCache) {
            Value payload = null;
            try {
                if (useReadOnlyCache) {
                    final Value currentPayload = readOnlyCacheMap.get(key);
                    if (currentPayload != null) {
                        payload = currentPayload;
                    } else {
                        payload = readWriteCacheMap.get(key);
                        readOnlyCacheMap.put(key, payload);
                    }
                } else {
                    payload = readWriteCacheMap.get(key);
                }
            } catch (Throwable t) {
                logger.error("Cannot get value for key : {}", key, t);
            }
            return payload;
        }
    

    #get(key, useReadOnlyCache) 方法,读取缓存。其中 shouldUseReadOnlyResponseCache 通过配置 eureka.shouldUseReadOnlyResponseCache = true (默认值 :true ) 开启只读缓存。如果你对数据的一致性有相对高的要求,可以关闭这个开关,当然因为少了 readOnlyCacheMap ,性能会有一定的下降。

    过期策略

    主动过期

    image-20211009145734568

    image-20211009145806582

    进行搜索一下,发现注册,下线、故障的时候,都会调用这个方法,那就去看一下这个方法到底做了什么

    image-20211009145920592

    应用实例注册、下线、过期时,调用 ResponseCacheImpl#invalidate() 方法,主动过期读写缓存( readWriteCacheMap ),实现代码如下:

        @Override
        public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
            for (Key.KeyType type : Key.KeyType.values()) {
                for (Version v : Version.values()) {
                    invalidate(
                            new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
                            new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
                            new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
                            new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
                            new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
                            new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
                    );
                    if (null != vipAddress) {
                        invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
                    }
                    if (null != secureVipAddress) {
                        invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
                    }
                }
            }
        }
    
    
        public void invalidate(Key... keys) {
            for (Key key : keys) {
                logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                        key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
    
                readWriteCacheMap.invalidate(key);
                Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
                if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
                    for (Key keysWithRegion : keysWithRegions) {
                        logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                                key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
                        readWriteCacheMap.invalidate(keysWithRegion);
                    }
                }
            }
        }
    

    将readWriteCacheMap中的ALL_APPS缓存key,对应的缓存给过期掉

    被动过期

    • 配置 eureka.responseCacheAutoExpirationInSeconds ,设置写入过期时长。默认值 :180 秒。

    image-20211009150624516

    定时刷新

    初始化定时任务。配置 eureka.responseCacheUpdateIntervalMs,设置任务执行频率,默认值 :30 * 1000 毫秒。对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。

            if (shouldUseReadOnlyResponseCache) {
                timer.schedule(getCacheUpdateTask(),
                        new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                                + responseCacheUpdateIntervalMs),
                        responseCacheUpdateIntervalMs);
            }
    
        private TimerTask getCacheUpdateTask() {
            return new TimerTask() {
                @Override
                public void run() {
                    logger.debug("Updating the client cache from response cache");
                    for (Key key : readOnlyCacheMap.keySet()) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                    key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                        }
                        try {
                            CurrentRequestVersion.set(key.getVersion());
                            Value cacheValue = readWriteCacheMap.get(key);
                            Value currentCacheValue = readOnlyCacheMap.get(key);
                            if (cacheValue != currentCacheValue) { //对比,如果不一致的话,将会读取读写缓存中的记录,将只读
                                readOnlyCacheMap.put(key, cacheValue);
                            }
                        } catch (Throwable th) {
                            logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                        } finally {
                            CurrentRequestVersion.remove();
                        }
                    }
                }
            };
        }
    

    测试

        @SneakyThrows
        public static void main(String[] args) {
            LoadingCache<String,String> cahceBuilder= CacheBuilder
                    .newBuilder()
                    .expireAfterWrite(2L, TimeUnit.SECONDS)
                    .build(new CacheLoader<String, String>(){
                        @Override
                        public String load(String key) throws Exception {
                            System.out.println("缓存中没有。。。");
                            String strProValue="hello "+key+"!";
                            return strProValue;
                        }
    
                    });
    
            System.out.println("jerry value:"+cahceBuilder.get("jerry"));
            System.out.println("jerry value:"+cahceBuilder.get("jerry"));
            Thread.sleep(2000);
            System.out.println("jerry value:"+cahceBuilder.get("jerry"));
        }
    
    

    image-20211011170527631

  • 相关阅读:
    JQuery中$.ajax()方法参数详解
    overload和override的区别
    linux 安装jdk和tomcat
    linux链接外网手动设置
    RISC与CISCCPU构架
    32位与64位内存区别
    system 系统调用、gcc编译过程
    c helloworld
    C语言中 有符号数、无符号数、整数溢出 (转)
    samba安装
  • 原文地址:https://www.cnblogs.com/dalianpai/p/15394139.html
Copyright © 2011-2022 走看看