zoukankan      html  css  js  c++  java
  • Eureka 系列(08)心跳续约与自动过期

    Eureka 系列(08)心跳续约与自动过期

    Spring Cloud 系列目录 - Eureka 篇

    在上一篇 Eureka 系列(07)服务注册与主动下线 中对服务的注册与下线进行了分析,本文继续分析 Eureka 是如何进行心跳续约的。

    1. 心跳续约

    心跳续约有两种情况:一是客户端发起的心跳续约(isReplication=false);二是服务器消息广播时发起的心跳续约(isReplication=true)。这两种心跳续约的处理稍有不同。

    1.1 心跳续约机制

    当服务器收到客户端的心跳续约后,首先在当着服务器上更新租约时间,如果成功,则将心跳广播给其它服务器。

    图1:Eureka 心跳续约机制
    sequenceDiagram participant InstanceResource participant PeerAwareInstanceRegistryImpl participant AbstractInstanceRegistry participant PeerEurekaNode note over InstanceResource: PUT:/euraka/apps/{appName}/{id}<br/>renewLease InstanceResource ->> PeerAwareInstanceRegistryImpl : 心跳请求:renew(appName,id,isReplication) PeerAwareInstanceRegistryImpl ->> AbstractInstanceRegistry : 1. 本地数据更新:renew(appName,id,isReplication) loop 同步到其它 Eureka Server 节点 PeerAwareInstanceRegistryImpl ->> PeerAwareInstanceRegistryImpl : 2.1 数据同步:replicateToPeers PeerAwareInstanceRegistryImpl ->> PeerEurekaNode : 2.2 heartbeat -> PUT:/euraka/apps/{appName}/{id} alt 3.1 failure: 404则更新对方节点 PeerEurekaNode -->> PeerEurekaNode : register(info) else 3.2 failure: 否则更新自己节点 PeerEurekaNode -->> PeerEurekaNode : syncInstancesIfTimestampDiffers PeerEurekaNode -->> PeerEurekaNode : register(infoFromPeer, true) end end

    总结:

    1. renewLease 心跳续约请求是 InstanceResource#renewLease 方法进行处理。isReplication=false 则是客户端请求,true 则是消息广播请求。
    2. renew 本地服务器心跳处理。处理成功则进行心跳消息广播。
    3. heartbeat 心跳消息广播给其它服务器。需要注意心跳广播失败的处理机制:
      • 如果对方服务器不存在该实例或 PK 失败,需要重新注册更新对方服务的实例信息。
      • 如果对方服务器 PK 成功,则需要反过来更新本地服务的注册信息。

    1.2 接收心跳续约 - renewLease

    InstanceResource#renewLease 处理心跳续约请求,路径是 PUT /apps/{appName}/{id}

    1. 如果本地服务端处理失败(包括实例不存在或实例的状态是UNKNOWN),就返回 NOT_FOUND,也是需要重新注册,更新实例信息。
    2. 服务端和客户端实例的 lastDirtyTimestamp 进行 PK。结果两种情况:一是服务端实例 PK 失败,返回 NOT_FOUND,客户端重新注册,从而更新服务端实例信息;二是服务端实例 PK 成功,返回实例信息给客户端,从而更新客户端实例信息。
    @PUT
    public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        // 1. 心跳处理,本地心跳处理成功后进行消息广播。
        //    由于消息广播是异步的,实际返回的结果是本地心跳处理的结果。
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    
        // 2. 心跳处理失败分两种情况:一是本地服务器不存在该服务实例;
        //    二是本地服务实例和lastDirtyTimestamp进行PK失败,则说明本地服务实例信息不是最新的
        if (!isSuccess) {
            return Response.status(Status.NOT_FOUND).build();
        }
        
        // 3. 本地服务实例和请求的lastDirtyTimestamp进行PK失败,则说明本地服务实例信息不是最新的
        //    后面有时间专门介绍一下 OverriddenStatus
        Response response;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                && (overriddenStatus != null)
                && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        return response;
    }
    

    总结: 下面分三部分说明心跳续约的整个流程:

    1. 本地服务器是如何处理续约的?主要是 AbstractInstanceRegistry#renew 方法。
    2. 本地服务器和客户端实例的 lastDirtyTimestamp 如何进行 PK ?主要是 InstanceResource#validateDirtyTimestamp 方法。
    3. Eureka Client 是如何发起心跳续约请求,并处理请求结果?主要是 DiscoveryClient。
    4. 心跳续约消息广播如何处理?主要是 PeerEurekaNode#heartbeat 方法。

    1.3 本地续约处理 - renew

    本地服务端续约,如果实例不存在或实例状态是 UNKNOWN 时返回 false,表示需要客户端重新注册,更新服务端实例信息。当然返回 true 时,也不意味着数据是最新的,需要在下一步继续校验脏数据。

    public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        // 1. 获取服务端注册的实例
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        // 2.1 服务实例不存在,返回404
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            return false;
        // 2.2 服务实例存在,
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // 实例的状态是 UNKNOWN 时返回 false,否则返回 true
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            // 3. 更新最后一次的心跳时间(核心)
            leaseToRenew.renew();
            return true;
        }
    }
    

    总结: 如果实例存在且状态不是 UNKNOWN 时就需要在下一步继续校验脏数据。其中最核心的一名代码就是 leaseToRenew.renew() 更新最后一次的心跳时间,Eureka 的租约管理都是在 Lease 完成的。

    1.4 脏数据校验 - validateDirtyTimestamp

    validateDirtyTimestamp 方法主要是将客户端实例和服务端本地实例进行 PK。PK 的原则就是:服务实例 lastDirtyTimestamp 大的代表是最新的注册信息。 其实原因也很简单,每次服务实例更新时都会更新时间戳,这样时间戳大的就代表最后更新的实例,其它服务节点的实例信息都要这个服务实例进行同步。

    private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
                                            boolean isReplication) {
        // 1. 获取本地注册的实例,和客户端的实例进行 PK
        InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
        if (appInfo != null) {
            // 2. 客户端和服务端的实例更新的时间戳发生了变化,说明实例信息不同步了,进行PK
            if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
    			// 3.1 客户端 PK 成功,客户端需要重新将实例注册一次,更新服务端的实例信息
                if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
                    return Response.status(Status.NOT_FOUND).build();
    			// 3.2 服务端 PK 成功,将实例信息返回给客户端,更新客户端的实例信息
                } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
                    // ture表示Eureka内部之间同步数据,需要更新实例信息
                    // 集群内部数据要一致,肯定要同步数据
                    if (isReplication) {
                        return Response.status(Status.CONFLICT).entity(appInfo).build();
                    // false表示EurekaClient的心跳,不需要同步实例信息给EurekaClient?
                    } else {
                        return Response.ok().build();
                    }
                }
            }
        }
        return Response.ok().build();
    }
    

    总结: 就一句话,lastDirtyTimestamp 大代表是最新的注册信息。

    注意: 集群内部消息广播和 EurekaClient 心跳续约的处理不一样(3.2):

    • 集群内部消息广播:如果数据不一致,肯定要进行数据同步处理,达到最终一致性。
    • EurekaClient 心跳续约,如果服务端是最新的数据,不需要同步给客户端。

    1.5 客户端处理 - renew

    EurekaClient 心跳续约时,如果客户端的实例信息是最新的,需要发起重新注册,更新服务端的实例信息,但服务端的实例信息是最新的,不会更新客户端的实例信息。

    // DiscoveryClient
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            // 404时重新发起注册,更新服务端的实例信息
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            return false;
        }
    }
    

    1.6 心跳广播 - heartbeat

    心跳广播重点需要关注失败时的处理逻辑:一是返回 404,也就是客户端的实例信息是最新的,重新发起注册,更新服务端的实例信息;二是其它异常,则需要根据服务端返回的实例更新客户端的注册信息。其中第二点是和 EurekaClient 心跳续约不同的地方。

    public void heartbeat(final String appName, final String id,
                          final InstanceInfo info, final InstanceStatus overriddenStatus,
                          boolean primeConnection) throws Throwable {
        // 1. primeConnection时不关心心跳续约的结果,发送请求后直接返回
        if (primeConnection) {
            replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
            return;
        }
        // 2. 关注请求结果,A -> B 发送心跳,成功就不说了
        // 3. 心跳续约失败有两种情况:一是 B 节点不存在该实例或 PK 失败,A -> B 重新发起注册请求;
        //    二是 B 节点存在该实例且 PK 成功,则反过来需要更新 A 节点该实例的注册信息。
        ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
            @Override
            public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
                return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
            }
    
            @Override
            public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                super.handleFailure(statusCode, responseEntity);
                // 一是 B 节点不存在该实例,A -> B 重新发起注册请求
                if (statusCode == 404) {
                    if (info != null) {
                        register(info);
                    }
        		// 二是 B 节点存在该实例且 PK 赢了,则反过来需要更新 A 节点该实例的注册信息
                } else if (config.shouldSyncWhenTimestampDiffers()) {
                    InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
                    if (peerInstanceInfo != null) {
                        syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                    }
                }
            }
        };
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
    }
    

    总结: 心跳广播是保证 Eureka 数据最终一致性的重要一环,只要集群内部一直发送心跳广播,如果数据出现不一致的情况就会进行数据同步,从而保证数据的最终一致性。

    // 更新本地实例注册信息
    private void syncInstancesIfTimestampDiffers(
        String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) {
        try {
            if (infoFromPeer != null) {
                // 1. 更新overriddenStatus状态
                if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) {
                    registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus());
                }
                // 2. 更新本地实例注册信息
                registry.register(infoFromPeer, true);
            }
        } catch (Throwable e) {
        }
    }
    

    2. 自动过期

    还记得在 Eureka 系列(03)Spring Cloud 自动装配原理 中分析EurekaServerBootstrap 启动时会调用 registry.openForTraffic() 方法启动自动过期的定时任务 EvictionTask 吗?本文就从 EvictionTask 开始分析起。

    2.1 启动 EvictionTask 定时任务

    图2:启动自动过期定时任务
    graph LR EurekaServerBootstrap -- openForTraffic --> PeerAwareInstanceRegistryImpl PeerAwareInstanceRegistryImpl -- postInit --> AbstractInstanceRegistry AbstractInstanceRegistry -- start --> EvictionTask
    // 启动自动过期定时任务 EvictionTask,默认每 60s 执行一次
    protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),
                               serverConfig.getEvictionIntervalTimerInMs(),
                               serverConfig.getEvictionIntervalTimerInMs());
    }
    

    总结: 默认 EvictionTask 每 60s 执行一次,客户端每 30s 进行一次心跳续约,如果心跳续约超过 90s 则下线。

    2.2 EvictionTask执行原理

    图2:EvictionTask执行原理
    sequenceDiagram participant EvictionTask participant AbstractInstanceRegistry participant Lease note left of EvictionTask : 60s定时任务 EvictionTask ->> AbstractInstanceRegistry : evict loop 自动过期 AbstractInstanceRegistry ->> Lease : isExpired AbstractInstanceRegistry ->> AbstractInstanceRegistry : internalCancel end

    2.2.1 如何判断是否过期

    首先对 Lease 几个重要属性进行说明:

    private long evictionTimestamp;		// 服务下线时间
    private long registrationTimestamp;	// 服务注册时间
    private long serviceUpTimestamp;	// 服务UP时间
    private volatile long lastUpdateTimestamp;	// 最后一次心跳续约时间
    private long duration;				// 心跳过期时间,默认 90s
    

    Lease 每次心跳续约时都会更新最后一次续约时间 lastUpdateTimestamp。如果服务下线则会更新下线时间 evictionTimestamp,这样 evictionTimestamp > 0 就表示服务已经下线了。默认心跳续约时间超过 90s 服务就自动过期。

    public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > 
                (lastUpdateTimestamp + duration + additionalLeaseMs));
    }
    

    总结: additionalLeaseMs 是一种补偿机制,可以当成默认值 0ms。

    2.2.2 服务下线

    服务下线时首先判断是否开启了自我保护机制,再计算出一次最多下线的实例个数,最后调用 internalCancel 将实例下线。

    public void evict(long additionalLeaseMs) {
    	// 1. 是否开启自我保护机制
        if (!isLeaseExpirationEnabled()) {
            return;
        }
    
        // 2. 调用 lease.isExpired 筛选出所有过期的实例
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }
    
       	// 3. 计算一次最多下线的实例个数 toEvict
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;
    
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);
    
                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                // 4. 和自动下线一样,调用internalCancel进行下线
                internalCancel(appName, id, false);
            }
        }
    }
    

    总结: 自动过期和主动下线的区别是自动过期会考虑服务的自我保护,计算一次最多下线的实例个数,其余的都一样。


    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    浅谈SQLite——查询处理及优化
    .NET 并行(多核)编程系列之七 共享数据问题和解决概述
    sql 存储过程学习一
    SQL中获得EXEC后面的sql语句或者存储过程的返回值的方法 【收藏】
    script刷新页面,刷新代码
    C#编程中关于数据缓存的经验总结
    SQL存储过程的概念,优点及语法
    SQLite数据库安装、试用及编程测试手记
    c# sqlite 数据库加密
    进销存管理系统的设计与实现
  • 原文地址:https://www.cnblogs.com/binarylei/p/11621403.html
Copyright © 2011-2022 走看看