zoukankan      html  css  js  c++  java
  • Eureka 系列(07)服务注册与主动下线

    Eureka 系列(07)服务注册与主动下线

    Spring Cloud 系列目录 - Eureka 篇

    在上一篇 Eureka 系列(05)消息广播 中对 Eureka 消息广播的源码进行了分析,之后的几篇文章会具体分析本地服务注册、主动下线、心跳续约、自动过期等的实现机制。

    • PeerAwareInstanceRegistryImpl 负责集群内部消息通信。
    • AbstractInstanceRegistry 负责本地服务信息管理,这也是之后几篇文章关注的重点。
    表1:Eureka OPEN API
    资源 功能 url
    ApplicationsResource 获取全部或增量服务实例信息 GET /apps
    GET /apps/delta
    ApplicationResource 1. 获取单个应用的信息
    2. 注册实例信息
    GET /apps/{appName}
    POST /apps/{appName}
    InstanceResource 服务实例的CURD:
    1. 获取实例的信息
    2.修改服务实例元信息
    3. 删除实例信息,服务下线
    4. 发送心跳
    GET /apps/{appName}/{id}
    PUT /apps/{appName}/{id}/metadata
    DELETE /apps/{appName}/{id}
    PUT /apps/{appName}/{id}
    InstancesResource 直接根据实例id获取实例信息 GET /instances/{id}
    PeerReplicationResource 集群内部批量数据同步 POST /peerreplication/batch
    ServerInfoResource ??? POST /serverinfo/statusoverrides
    StatusResource ??? GET /statusoverrides

    注: {appName} 表示应用名称或服务id,{id} 表示实例id。eg: http://localhost:8080/eureka/apps

    1. 服务注册

    1.1 服务实例注册流程

    图1:Eureka 服务实例注册时序图
    sequenceDiagram participant ApplicationResource participant PeerAwareInstanceRegistryImpl participant AbstractInstanceRegistry participant PeerEurekaNode note over ApplicationResource: POST:/euraka/apps/{appName}<br/>addInstance(instanceInfo,isReplication) ApplicationResource ->> PeerAwareInstanceRegistryImpl: 注册请求:register(instanceInfo,isReplication) PeerAwareInstanceRegistryImpl ->> AbstractInstanceRegistry: 1. 本地数据更新: register(instanceInfo,leaseDuration,isReplication) loop 同步到其它 Eureka Server 节点 PeerAwareInstanceRegistryImpl ->> PeerAwareInstanceRegistryImpl: 2.1 数据同步:replicateInstanceActionsToPeers PeerAwareInstanceRegistryImpl ->> PeerEurekaNode: 2.2 register(instanceInfo) -> POST:/euraka/apps/{appName} end

    总结: Eureka Web 使用的是 Jersey 容器,服务注册的请求入口是 ApplicationResource 的 register 方法,请求的路径是 POST:/euraka/apps/{appName}

    1.2 ApplicationResource

    // ApplicationResource HTTP请求入口
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
    	@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        ... // 数据检验
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }
    

    总结: ApplicationResource 入口主要是进行参数检验,主要的逻辑都委托给了 PeerAwareInstanceRegistryImpl 完成。

    注意:isReplication 参数,如果是客户端的请求则为 false,表示需要将这个消息广播给其它服务器。如果是集群内部消息广播则为true,表示不再需要继续广播,否则会造成循环广播的问题。

    // PeerAwareInstanceRegistryImpl 默认注册器实现
    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        // 租约的过期时间,默认90秒
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            // 如果客户端自定义了,那么以客户端为准
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        // 本地注册
        super.register(info, leaseDuration, isReplication);
        // 消息广播
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
    

    总结: PeerAwareInstanceRegistryImpl 这个类应该都很熟悉了,主要是负责进行集群间内部通信的,其父类 AbstractInstanceRegistry 则负责本地服务信息管理,也是本文的研究重点。

    1.3 AbstractInstanceRegistry

    在 Eureka 中,服务注册信息存储在内存中,数据结构为 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry,Map 嵌套了两层,外层是的 Key 是 appName,内存的 Key 是 InstanceId。

    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
                = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            // 1. 获取该服务对应的所有服务实例,如果不存在就创建一个新的Map
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            
            // 2. 两种情况:一是实例已经注册,二是实例没有注册
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // 2.1 实例已经注册,就需要PK,PK原则:谁最后一次更新就是谁赢
            //     也就是说如果已经注册的实例最近更新了,就不用重新更新了
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                // 已经注册的实例PK赢了
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    registrant = existingLease.getHolder();
                }
            // 2.2 没有注册,很好处理。更新注册的实例个数
            } else {
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
            }
            
            // 3. 更新注册信息(核心步骤)
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // (核心步骤)
            gMap.put(registrant.getId(), lease);
            // 添加到最近的注册队列里面去,以时间戳作为Key,名称作为value,主要是为了运维界面的统计数据
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            
            // 4. 更新实例状态 InstanceStatus
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }
    
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
    
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            
            // 5. 清理缓存等善后工作
            registrant.setActionType(ActionType.ADDED);
            // 租约变更记录队列,记录了实例的每次变化, 用于注册信息的增量获取
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            // 清理缓存 ,传入的参数为key
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        } finally {
            read.unlock();
        }
    }
    

    总结: 前三步的逻辑都很清楚,目的就是更新内存中的实例信息,只是需要注意实例已经存在的情况下需要 PK 一下,原则就是谁最后一次更新就是谁赢。

    第四步更新服务实例的状态,OverriddenStatus 参考 InstanceInfo 中 OverriddenStatus 的作用

    每五步清理缓存等善后工作。目前来说,到 gMap.put(registrant.getId(), lease)这一步就够了。

    2. 主动下线

    服务下线对应的 OPEN API 为 DELETE /apps/{appName}/{id}

    protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            read.lock();
            CANCEL.increment(isReplication);
            // 1. 清空registry中注册的实例信息(核心步骤)
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                leaseToCancel = gMap.remove(id);
            }
            // 2. 添加到 recentCanceledQueue 队列中
            synchronized (recentCanceledQueue) {
                recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            }
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (leaseToCancel == null) {
                CANCEL_NOT_FOUND.increment(isReplication);
                return false;
            } else {
                // 3. 和注册时一样,也要做一下清除缓存等善后工作
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    instanceInfo.setActionType(ActionType.DELETED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                invalidateCache(appName, vip, svip);
                return true;
            }
        } finally {
            read.unlock();
        }
    }
    

    总结: 服务的注册和主动下线的逻辑还是很清楚的。目前来说,到 gMap.remove(id)这一步就够了。至于细节以后真正用到 Eureka 再继续深入研究。


    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    八月第二周学习心得
    七月第二周学习心得
    八月第一周学习
    八月第三周学习心得
    7月第一周学习心得
    php mysql_error()函数用法详解
    php mysql_select_db
    php中的释放“语句”unset和释放“函数”mysql_free_result()
    JavaScript]Cookie详解(转)
    Javascript类型转换的规则
  • 原文地址:https://www.cnblogs.com/binarylei/p/11618566.html
Copyright © 2011-2022 走看看