zoukankan      html  css  js  c++  java
  • 【一起学源码-微服务】Nexflix Eureka 源码九:服务续约源码分析

    前言

    前情回顾

    上一讲 我们讲解了服务发现的相关逻辑,所谓服务发现 其实就是注册表抓取,服务实例默认每隔30s去注册中心抓取一下注册表增量数据,然后合并本地注册表数据,最后有个hash对比的操作。

    本讲目录

    今天主要是看下服务续约的逻辑,服务续约就是client端给server端发送心跳检测,告诉对方我还活着。现在很多分布式系统都会有心跳检查的机制,这里一起来学习下Eureka是怎么做心跳检查的。

    目录如下:

    1. client端心跳检查调度任务
    2. server端接收心跳检查,设置最后renew时间

    这一讲内容不太多,因为上一篇文章写全量和增量注册表信息内容有点多,所以这里将博客尽量一篇保持一个知识点,后面还会讲服务实例下线、摘除、注册中心自我保护等机制的实现原理。

    说明

    原创不易,如若转载 请标明来源:一枝花算不算浪漫

    源码分析

    client端心跳检查调度任务

    服务实例续约代码比较简单,这里还是从DiscovertClient.java 开始,很多源码的入口都是在这里,因为client端初始化、注册 都是走的这里,因为前几篇文章对这个类已经分析很多了,这里只截取部分重要代码:

    DiscovertClient.java 初始化后 会继续初始化一些调度任务:

    private void initScheduledTasks() {
        if (clientConfig.shouldRegisterWithEureka()) {
        	//  默认也是30s
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
    
            // Heartbeat timer
    		// 执行heartbeatExecutor心跳检查,默认是30s
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
    
            // 执行线程
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
    
    private class HeartbeatThread implements Runnable {
    
        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }
    
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
        String urlPath = "apps/" + appName + '/' + id;
        Response response = null;
        try {
            WebTarget webResource = jerseyClient.target(serviceUrl)
                    .path(urlPath)
                    .queryParam("status", info.getStatus().toString())
                    .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
            if (overriddenStatus != null) {
                webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
            }
            Builder requestBuilder = webResource.request();
            addExtraProperties(requestBuilder);
            addExtraHeaders(requestBuilder);
            requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE);
            response = requestBuilder.put(Entity.entity("{}", MediaType.APPLICATION_JSON_TYPE)); // Jersey2 refuses to handle PUT with no body
            EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
            if (response.hasEntity()) {
                eurekaResponseBuilder.entity(response.readEntity(InstanceInfo.class));
            }
            return eurekaResponseBuilder.build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey2 HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
    

    这里的流程很简单,初始化DiscoveryClient 后会新建一个调度任务,然后执行HeartbeatThread中的run方法,默认是renewalIntervalInSecs 30s执行一次。
    具体就是给Server端发送一个http请求,类似于:http://localhost:8080/v2/apps/ServiceA/i-000000-1, 走的是put请求。
    最后拿到响应结果,续约成功后会更新lastSuccessfulHeartbeatTimestamp 最近成功心跳检测的时间戳。

    server端接收心跳检查请求

    前几篇文章已经说过,Server端接收http请求的入口在eureka-core模块下的 resource包里面,这里直接找到ApplicationResource.java中的getInstanceInfo 方法,这里直接请求的InstanceResource 类的构造方法,找到这个方法中的@PUT请求。可以直接看下代码:

    InstanceResource.renewLease +AbstractInstanceRegistry.renew 方法:

    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    
        // 省略部分代码
    
        logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
        return response;
    }
    
    
    public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            leaseToRenew.renew();
            return true;
        }
    }
    

    这里主要看renew方法, 这里看到registry 是一个注册表,通过appName获取对应的服务注册表信息。

    这里主要还是看leaseToRenew.renew() 其实很简单,就是设置当前示例注册表的renew属性的lastUpdateTimestamp 为最新时间+duration。

    至于这里的duration 我们下一讲会详细讲解,duration 和服务实例摘除有关。

    总结

    (1)DiscoveryClient初始化的时候,会去调度一堆定时任务,其中有一个就是HeartbeatThread,心跳线程

    (2)在这里可以看到,默认是每隔30秒去发送一次心跳,每隔30秒执行一次HeartbeatTHread线程的逻辑,发送心跳

    (3)这边的话就是去发送这个心跳,走的是EurekaHttpClient的sendHeartbeat()方法,http://localhost:8080/v2/apps/ServiceA/i-000000-1,走的是put请求

    (4)负责承接服务实例的心跳相关的这些操作的,是ApplicationsResource,服务相关的controller。找到ApplicationResource,再次找到InstanceResource,通过PUT请求,可以找到renewLease方法。

    (5)通过注册表的renew()方法,进去完成服务续约,实际进入AbstractInstanceRegistry的renew()方法

    (6)从注册表的map中,根据服务名和实例id,获取一个Lease,实际的服务续约的逻辑,其实就是在Lease对象中,更新一下lastUpdateTimestamp这个时间戳,每次续约,就更新一下这个时间戳就ok了。

    申明

    本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!

    感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫

    22.jpg

  • 相关阅读:
    轻量级数据库sqlite的使用
    Integer引发的思考
    css限制显示行数
    数据库 chapter 17 数据仓库与联机分析处理技术
    数据库 chapter 15 对象关系数据库系统
    数据库 chapter 16 XML数据库
    数据库 chapter 14 分布式数据库系统
    数据库 chapter 11 并发控制
    数据库 chapter 12 数据库管理系统
    数据库 chapter 13 数据库技术新发展
  • 原文地址:https://www.cnblogs.com/wang-meng/p/12122968.html
Copyright © 2011-2022 走看看