zoukankan      html  css  js  c++  java
  • Eureka源码分析(四)

    多级缓存设计

    Eureka Server存在三个变量:(registry、readWriteCacheMap、readOnlyCacheMap)保存服务注册信息,默认情况下定时任务每30s将readWriteCacheMap同步至readOnlyCacheMap,每60s清理超过90s未续约的节点,Eureka Client每30s从readOnlyCacheMap更新服务注册信息,而客户端服务的注册则从registry更新服务注册信息。

    多级缓存的意义

    这里为什么要设计多级缓存呢?原因很简单,就是当存在大规模的服务注册和更新时,如果只是修改一个ConcurrentHashMap数据,那么势必因为锁的存在导致竞争,影响性能。而Eureka又是AP模型,只需要满足最终可用就行。所以它在这里用到多级缓存来实现读写分离。注册方法写的时候直接写内存注册表,写完表之后主动失效读写缓存。获取注册信息接口先从只读缓存取,只读缓存没有再去读写缓存取,读写缓存没有再去内存注册表里取(不只是取,此处较复杂)。并且,读写缓存会更新回写只读缓存

    • responseCacheUpdateIntervalMs : readOnlyCacheMap 缓存更新的定时器时间间隔,默认为30秒
    • responseCacheAutoExpirationInSeconds : readWriteCacheMap 缓存过期时间,默认为 180 秒。

    服务注册的缓存失效

    在AbstractInstanceRegistry.register方法的最后,会调用invalidateCache(registrant.getAppName(), registrant.getVIPAddress(),registrant.getSecureVipAddress()); 方法,使得读写缓存失效。

    public void invalidate(Key... keys) {
        for (Key key : keys) {
            logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                         key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
    
            readWriteCacheMap.invalidate(key);
            Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
            if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
                for (Key keysWithRegion : keysWithRegions) {
                    logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                                 key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
                    readWriteCacheMap.invalidate(keysWithRegion);
                }
            }
        }
    }
    

    定时同步缓存

    ResponseCacheImpl的构造方法中,会启动一个定时任务,这个任务会定时检查写缓存中的数据变化,进行更新和同步。

    private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            @Override
            public void run() {
                logger.debug("Updating the client cache from response cache");
                for (Key key : readOnlyCacheMap.keySet()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                     key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                    }
                    try {
                        CurrentRequestVersion.set(key.getVersion());
                        Value cacheValue = readWriteCacheMap.get(key);
                        Value currentCacheValue = readOnlyCacheMap.get(key);
                        if (cacheValue != currentCacheValue) {
                            readOnlyCacheMap.put(key, cacheValue);
                        }
                    } catch (Throwable th) {
                        logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                    } finally {
                        CurrentRequestVersion.remove();
                    }
                }
            }
        };
    }
    

    服务续约

    客户端会在 initScheduledTasks 中,创建一个心跳检测的定时任务

    heartbeatTask = new TimedSupervisorTask(
        "heartbeat",
        scheduler,
        heartbeatExecutor,
        renewalIntervalInSecs,
        TimeUnit.SECONDS,
        expBackOffBound,
        new HeartbeatThread()
    );
    scheduler.schedule(
        heartbeatTask,
        renewalIntervalInSecs, TimeUnit.SECONDS);
    

    HeartbeatThread

    然后这个定时任务中,会执行一个 HearbeatThread 的线程,这个线程会定时调用renew()来做续约。

    private class HeartbeatThread implements Runnable {
    
        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    

    服务端收到心跳请求的处理

    在ApplicationResource.getInstanceInfo这个接口中,会返回一个InstanceResource的实例,在该实例下,定义了一个statusUpdate的接口来更新状态

    @Path("{id}")
    public InstanceResource getInstanceInfo(@PathParam("id") String id) {
        return new InstanceResource(this, id, serverConfig, registry);
    }
    

    InstanceResource.statusUpdate()

    在该方法中,我们重点关注 registry.statusUpdate 这个方法,它会调用AbstractInstanceRegistry.statusUpdate来更新指定服务提供者在服务端存储的信息中的变化。

    @PUT
    @Path("status")
    public Response statusUpdate(
        @QueryParam("value") String newStatus,
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        try {
            if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
                logger.warn("Instance not found: {}/{}", app.getName(), id);
                return Response.status(Status.NOT_FOUND).build();
            }
            boolean isSuccess = registry.statusUpdate(app.getName(), id,
                                                      InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
                                                      "true".equals(isReplication));
    
            if (isSuccess) {
                logger.info("Status updated: {} - {} - {}", app.getName(), id, newStatus);
                return Response.ok().build();
            } else {
                logger.warn("Unable to update status: {} - {} - {}", app.getName(), id, newStatus);
                return Response.serverError().build();
            }
        } catch (Throwable e) {
            logger.error("Error updating instance {} for status {}", id,
                         newStatus);
            return Response.serverError().build();
        }
    }
    

    AbstractInstanceRegistry.statusUpdate

    在这个方法中,会拿到应用对应的实例列表,然后调用Lease.renew()去进行心跳续约。

    public boolean statusUpdate(String appName, String id,
                                InstanceStatus newStatus, String lastDirtyTimestamp,
                                boolean isReplication) {
        try {
            read.lock();
            // 更新状态的次数 状态统计
            STATUS_UPDATE.increment(isReplication);
            // 从本地数据里面获取实例信息,
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> lease = null;
            if (gMap != null) {
                lease = gMap.get(id);
            }
            // 实例不存在,则直接返回,表示失败
            if (lease == null) {
                return false;
            } else {
                // 执行一下lease的renew方法,里面主要是更新了这个instance的最后更新时间
                lease.renew();
                // 获取instance实例信息
                InstanceInfo info = lease.getHolder();
                // Lease is always created with its instance info object.
                // This log statement is provided as a safeguard, in case this invariant is violated.
                if (info == null) {
                    logger.error("Found Lease without a holder for instance id {}", id);
                }
                // 当instance信息不为空时,并且实例状态发生了变化
                if ((info != null) && !(info.getStatus().equals(newStatus))) {
                    // 如果新状态是UP的状态,那么启动一下serviceUp() , 主要是更新服务的注册时间
                    if (InstanceStatus.UP.equals(newStatus)) {
                        lease.serviceUp();
                    }
                    // 将instance Id 和这个状态的映射信息放入覆盖缓存MAP里面去
                    overriddenInstanceStatusMap.put(id, newStatus);
                    //设置覆盖状态到实例信息里面去
                    info.setOverriddenStatus(newStatus);
                    long replicaDirtyTimestamp = 0;
                    info.setStatusWithoutDirty(newStatus);
                    if (lastDirtyTimestamp != null) {
                        replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
                    }
                    // If the replication's dirty timestamp is more than the existing one, just update
                    // it to the replica's.
                    //如果replicaDirtyTimestamp 的时间大于instance的getLastDirtyTimestamp() ,则更新
                    if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
                        info.setLastDirtyTimestamp(replicaDirtyTimestamp);
                    }
                    info.setActionType(ActionType.MODIFIED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                    info.setLastUpdatedTimestamp();
                    //更新写缓存
                    invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
                }
                return true;
            }
        } finally {
            read.unlock();
        }
    }
    
  • 相关阅读:
    JavaScript数字和字符串转换示例
    Angular CLI 使用教程指南参考
    angular2 post以“application/x-www-form-urlencoded”形式传参的解决办法
    Arison [JS]window.location获取url各项参数详解
    $.ajaxComplete()
    angular2 编写公用组件
    获取本周、本季度、本月、上月的开端日期、停止日期
    Angular2
    轮播图插件 SuperSlide2.1滑动门jQuery插件
    书写Css文件要点
  • 原文地址:https://www.cnblogs.com/snail-gao/p/14132474.html
Copyright © 2011-2022 走看看