zoukankan      html  css  js  c++  java
  • eureka server集群信息同步

    概述

    image-20210925125812618

    如上图,server1和server2之间会拉取对方的注册表,相互的注册,当client往集群中进行注册的时候,如果是请求到server1上,server1会将这个请求同步到server2,下线心跳也是如此,集群之间的同步是通过3层队列任务批处理的方式进行的。

    集群的初始化

    集群启动

    在EurekaBootStrap的初始化的过程中,第一步会先初始化eureka的环境,在初始化eureka的上下文环境。其中就有initEurekaServerContext下面的一段代码

            PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
                    registry,
                    eurekaServerConfig,
                    eurekaClient.getEurekaClientConfig(),
                    serverCodecs,
                    applicationInfoManager
            );
    
        protected PeerEurekaNodes getPeerEurekaNodes(PeerAwareInstanceRegistry registry, EurekaServerConfig eurekaServerConfig, EurekaClientConfig eurekaClientConfig, ServerCodecs serverCodecs, ApplicationInfoManager applicationInfoManager) {
            PeerEurekaNodes peerEurekaNodes = new PeerEurekaNodes(
                    registry,
                    eurekaServerConfig,
                    eurekaClientConfig,
                    serverCodecs,
                    applicationInfoManager
            );
            
            return peerEurekaNodes;
        }
    

    会得到一个PeerEurekaNodes,它的构造方法如下:

    public class PeerEurekaNodes {
    
        private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNodes.class);
    
        /**
         * 应用实例注册表
         */
        protected final PeerAwareInstanceRegistry registry;
        /**
         * Eureka-Server 配置
         */
        protected final EurekaServerConfig serverConfig;
        /**
         * Eureka-Client 配置
         */
        protected final EurekaClientConfig clientConfig;
        /**
         * Eureka-Server 编解码
         */
        protected final ServerCodecs serverCodecs;
        /**
         * 应用实例信息管理器
         */
        private final ApplicationInfoManager applicationInfoManager;
    
        /**
         * Eureka-Server 集群节点数组
         */
        private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
        /**
         * Eureka-Server 服务地址数组
         */
        private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
    
        /**
         * 定时任务服务
         */
        private ScheduledExecutorService taskExecutor;
    
        @Inject
        public PeerEurekaNodes(
                PeerAwareInstanceRegistry registry,
                EurekaServerConfig serverConfig,
                EurekaClientConfig clientConfig,
                ServerCodecs serverCodecs,
                ApplicationInfoManager applicationInfoManager) {
            this.registry = registry;
            this.serverConfig = serverConfig;
            this.clientConfig = clientConfig;
            this.serverCodecs = serverCodecs;
            this.applicationInfoManager = applicationInfoManager;
        }
    }
    

    其实是在处理eureka server集群信息的初始化,会执行PeerEurekaNodes.start()方法

    //完成eureka-server上下文的构建以及初始化过程
            serverContext = new DefaultEurekaServerContext(
                    eurekaServerConfig,
                    serverCodecs,
                    registry,
                    peerEurekaNodes,
                    applicationInfoManager
            );
    
    //初始化的代码就在下面一行
        @PostConstruct
        @Override
        public void initialize() {
            logger.info("Initializing ...");
            peerEurekaNodes.start();
            try {
                registry.init(peerEurekaNodes);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            logger.info("Initialized");
        }
    

    @PostConstruct的执行顺序就在DefaultEurekaServerContext的构造函数的后面开始执行,结合前面的流程,也就是说一开始先构造出peerEurekaNodes类,然后传进DefaultEurekaServerContext的有参构造中,在进行初始化。

    调用 PeerEurekaNodes#start() 方法,集群节点启动。

    • 初始化集群节点信息
    • 初始化固定周期( 默认:10 分钟,可配置 )更新集群节点信息的任务
        public void start() {
            //先创建一个定时调度
            taskExecutor = Executors.newSingleThreadScheduledExecutor(
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                            thread.setDaemon(true);
                            return thread;
                        }
                    }
            );
            try {
                //第一次初始化的时候,自己先初始化一遍
                updatePeerEurekaNodes(resolvePeerUrls());
    
                Runnable peersUpdateTask = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            updatePeerEurekaNodes(resolvePeerUrls());
                        } catch (Throwable e) {
                            logger.error("Cannot update the replica Nodes", e);
                        }
    
                    }
                };
               //在创建一个线程,10分钟去更新一下初始化的操作,用来移除添加新的server节点。
                taskExecutor.scheduleWithFixedDelay(
                        peersUpdateTask,
                        serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                        serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                        TimeUnit.MILLISECONDS
                );
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
            for (PeerEurekaNode node : peerEurekaNodes) {
                logger.info("Replica node URL:  {}", node.getServiceUrl());
            }
        }
    

    解析配置文件中的其他eureka server的url地址,基于url地址构造一个一个的PeerEurekaNode,一个PeerEurekaNode就代表了一个eureka server。启动一个后台的线程,默认是每隔10分钟,会运行一个任务,就是基于配置文件中的url来刷新eureka server列表。

    更新集群信息

    调用 #resolvePeerUrls() 方法,获得 Eureka-Server 集群服务地址数组,不包含自己的

        /**
         * Resolve peer URLs.
         *
         * @return peer URLs with node's own URL filtered out
         */
        protected List<String> resolvePeerUrls() {
            InstanceInfo myInfo = applicationInfoManager.getInfo();
            String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
            List<String> replicaUrls = EndpointUtils
                    .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
    
            int idx = 0;
            while (idx < replicaUrls.size()) {
                //判断是否是自己的url,是的话,进行移除
                if (isThisMyUrl(replicaUrls.get(idx))) {
                    replicaUrls.remove(idx);
                } else {
                    idx++;
                }
            }
            return replicaUrls;
        }
    
        public boolean isThisMyUrl(String url) {
            final String myUrlConfigured = serverConfig.getMyUrl();
            if (myUrlConfigured != null) {
                return myUrlConfigured.equals(url);
            }
            return isInstanceURL(url, applicationInfoManager.getInfo());
        }
    

    调用 #updatePeerEurekaNodes() 方法,更新集群节点信息,主要完成两部分逻辑:

    • 添加新增的集群节点
    • 关闭删除的集群节点
        protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
            if (newPeerUrls.isEmpty()) {
                logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
                return;
            }
         // 计算 删除的集群节点地址
            Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
            toShutdown.removeAll(newPeerUrls);
            
             // 计算 新增的集群节点地址
            Set<String> toAdd = new HashSet<>(newPeerUrls);
            toAdd.removeAll(peerEurekaNodeUrls);
    
            if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
                return;
            }
    
            // Remove peers no long available
            List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
    
            //关闭删除的集群节点
            if (!toShutdown.isEmpty()) {
                logger.info("Removing no longer available peer nodes {}", toShutdown);
                int i = 0;
                while (i < newNodeList.size()) {
                    PeerEurekaNode eurekaNode = newNodeList.get(i);
                    if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                        newNodeList.remove(i);
                        eurekaNode.shutDown();
                    } else {
                        i++;
                    }
                }
            }
    
            // 添加新的节点
            if (!toAdd.isEmpty()) {
                logger.info("Adding new peer nodes {}", toAdd);
                for (String peerUrl : toAdd) {
                    newNodeList.add(createPeerEurekaNode(peerUrl));
                }
            }
    
            this.peerEurekaNodes = newNodeList;
            this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
        }
    

    获取注册信息

    初始化完成以后,就在Bootstrap的方法中,继续往下走,来到int registryCount = registry.syncUp();

        @Override
        public int syncUp() {
            // Copy entire entry from neighboring DS node
            int count = 0;
    
            //默认可以重试5次拉取注册表
            for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
                if (i > 0) {
                    try {
                        // 如果第一次没有在自己本地的eureka client中获取注册表
                        // 说明自己的本地eureka client还没有从任何其他的eureka server上获取注册表
                        // 所以此时重试,等待30秒
                        Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                    } catch (InterruptedException e) {
                        logger.warn("Interrupted during registry transfer..");
                        break;
                    }
                }
                //eureka server自己本身本来就是个eureka client,在初始化的时候,就会去找任意的一个eureka server
                // 拉取注册表到自己本地来,把这个注册表放到自己身上来,作为自己这个eureka server的注册表
                Applications apps = eurekaClient.getApplications();
                for (Application app : apps.getRegisteredApplications()) {
                    for (InstanceInfo instance : app.getInstances()) {
                        try {
                            if (isRegisterable(instance)) {
                                register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                                count++;
                            }
                        } catch (Throwable t) {
                            logger.error("During DS init copy", t);
                        }
                    }
                }
            }
            return count;
        }
    

    在拉取失败的时候,会等30s后,继续拉取。

    集群注册信息同步

    image-20210925171540039

    • Eureka-Server 接收到 Eureka-Client 的 Register、Heartbeat、Cancel、StatusUpdate、DeleteStatusOverride 操作,固定间隔( 默认值 :500 毫秒,可配 )向 Eureka-Server 集群内其他节点同步( 准实时,非实时 )。

    ApplicationResource的addInstance()方法,负责注册,现在自己本地完成一个注册,接着会replicateToPeers()方法,这个方法就会将这次注册请求,同步到其他所有的eureka server上去。

    如果是某台eureka client来找eureka server进行注册,isReplication是false,此时会给其他所有的你配置的eureka server都同步这个注册请求,此时一定会基于jersey,调用其他所有的eureka server的restful接口,去执行这个服务实例的注册的请求

    eureka-core-jersey2的工程,ReplicationHttpClient,此时同步注册请求给其他eureka server的时候,一定会将isReplication设置为true,其他eureka server接到这个同步的请求,仅仅在自己本地执行,不会再次向其他的eureka server去进行注册

        @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
            if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
                leaseDuration = info.getLeaseInfo().getDurationInSecs();
            }
            super.register(info, leaseDuration, isReplication);
            // Eureka-Server 复制
            replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
        }
    
        private void replicateToPeers(Action action, String appName, String id,
                                      InstanceInfo info /* optional */,
                                      InstanceStatus newStatus /* optional */, boolean isReplication) {
            Stopwatch tracer = action.getTimer().start();
            try {
                if (isReplication) {
                    numberOfReplicationsLastMin.increment();
                }
               // 集群为空 或者isReplication 为true
                if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                    return;
                }
    
                for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                    // If the url represents this host, do not replicate to yourself.
                    if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                        continue;
                    }
                    replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
                }
            } finally {
                tracer.stop();
            }
        }
    
    
    

    PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers(...) 方法,代码如下:

    
        private void replicateInstanceActionsToPeers(Action action, String appName,
                                                     String id, InstanceInfo info, InstanceStatus newStatus,
                                                     PeerEurekaNode node) {
            try {
                InstanceInfo infoFromRegistry;
                CurrentRequestVersion.set(Version.V2);
                switch (action) {
                    case Cancel:
                        node.cancel(appName, id);
                        break;
                    case Heartbeat:
                        InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                        break;
                    case Register:
                        node.register(info);
                        break;
                    case StatusUpdate:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                        break;
                    case DeleteStatusOverride:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.deleteStatusOverride(appName, id, infoFromRegistry);
                        break;
                }
            } catch (Throwable t) {
                logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
            } finally {
                CurrentRequestVersion.remove();
            }
        }
    
    • Cancel :调用 PeerEurekaNode#cancel(...) 方法,
    • Heartbeat :调用 PeerEurekaNode#heartbeat(...) 方法
    • Register :调用 PeerEurekaNode#register(...) 方法
    • StatusUpdate :调用 PeerEurekaNode#statusUpdate(...) 方法
    • DeleteStatusOverride :调用 PeerEurekaNode#deleteStatusOverride(...) 方法

    随便打开其中的一个方法查看:

        public void cancel(final String appName, final String id) throws Exception {
            long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
            batchingDispatcher.process(
                    taskId("cancel", appName, id),
                    new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
                        @Override
                        public EurekaHttpResponse<Void> execute() {
                            return replicationClient.cancel(appName, id);
                        }
    
                        @Override
                        public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                            super.handleFailure(statusCode, responseEntity);
                            if (statusCode == 404) {
                                logger.warn("{}: missing entry.", getTaskName());
                            }
                        }
                    },
                    expiryTime
            );
        }
      
      //相同应用实例的相同同步操作使用相同任务编号
        private static String taskId(String requestType, String appName, String id) {
            return requestType + '#' + appName + '/' + id;
        }
    

    这里会把一个任务封装成一个InstanceReplicationTask,交给batchingDispatcher,进行处理。

        /* For testing */ PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
                                         HttpReplicationClient replicationClient, EurekaServerConfig config,
                                         int batchSize, long maxBatchingDelayMs,
                                         long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
            this.registry = registry;
            this.targetHost = targetHost;
            this.replicationClient = replicationClient;
    
            this.serviceUrl = serviceUrl;
            this.config = config;
            this.maxProcessingDelayMs = config.getMaxTimeForReplication();
    
            String batcherName = getBatcherName();
            ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
            this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
                    batcherName,
                    config.getMaxElementsInPeerReplicationPool(),
                    batchSize,
                    config.getMaxThreadsForPeerReplication(),
                    maxBatchingDelayMs,
                    serverUnavailableSleepTimeMs,
                    retrySleepTimeMs,
                    taskProcessor
            );
            this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
                    targetHost,
                    config.getMaxElementsInStatusReplicationPool(),
                    config.getMaxThreadsForStatusReplication(),
                    maxBatchingDelayMs,
                    serverUnavailableSleepTimeMs,
                    retrySleepTimeMs,
                    taskProcessor
            );
        }
    
    
        public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
                                                                                 int maxBufferSize,
                                                                                 int workloadSize,
                                                                                 int workerCount,
                                                                                 long maxBatchingDelay,
                                                                                 long congestionRetryDelayMs,
                                                                                 long networkFailureRetryMs,
                                                                                 TaskProcessor<T> taskProcessor) {
            final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
                    id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
            );
            final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
            return new TaskDispatcher<ID, T>() {
                @Override
                public void process(ID id, T task, long expiryTime) {
                    acceptorExecutor.process(id, task, expiryTime);
                }
    
                @Override
                public void shutdown() {
                    acceptorExecutor.shutdown();
                    taskExecutor.shutdown();
                }
            };
        }
    

    其中,createBatchingTaskDispatcher进行创建的时候,会把process进行重写,最终是由acceptorExecutor进行处理。

        void process(ID id, T task, long expiryTime) {
            acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
            acceptedTasks++;
        }
    

    他会把之前封装的任务放到acceptorQueue中,在AcceptorExecutor的构造器中,会启动一个acceptorThread回台进程。

        AcceptorExecutor(String id,
                         int maxBufferSize,
                         int maxBatchingSize,
                         long maxBatchingDelay,
                         long congestionRetryDelayMs,
                         long networkFailureRetryMs) {
            this.id = id;
            this.maxBufferSize = maxBufferSize;
            this.maxBatchingSize = maxBatchingSize;
            this.maxBatchingDelay = maxBatchingDelay;
            this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs);
    
            ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
            this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
            this.acceptorThread.setDaemon(true);
            this.acceptorThread.start();
    
            final double[] percentiles = {50.0, 95.0, 99.0, 99.5};
            final StatsConfig statsConfig = new StatsConfig.Builder()
                    .withSampleSize(1000)
                    .withPercentiles(percentiles)
                    .withPublishStdDev(true)
                    .build();
            final MonitorConfig config = MonitorConfig.builder(METRIC_REPLICATION_PREFIX + "batchSize").build();
            this.batchSizeMetric = new StatsTimer(config, statsConfig);
            try {
                Monitors.registerObject(id, this);
            } catch (Throwable e) {
                logger.warn("Cannot register servo monitor for this object", e);
            }
        }
    

    启动后去执行AcceptorRunner的run方法。

        class AcceptorRunner implements Runnable {
            @Override
            public void run() {
                long scheduleTime = 0;
                while (!isShutdown.get()) {
                    try {
                        drainInputQueues();
    
                        int totalItems = processingOrder.size();
    
                        long now = System.currentTimeMillis();
                        if (scheduleTime < now) {
                            scheduleTime = now + trafficShaper.transmissionDelay();
                        }
                        if (scheduleTime <= now) {
                            assignBatchWork();
                            assignSingleItemWork();
                        }
    
                        // If no worker is requesting data or there is a delay injected by the traffic shaper,
                        // sleep for some time to avoid tight loop.
                        if (totalItems == processingOrder.size()) {
                            Thread.sleep(10);
                        }
                    } catch (InterruptedException ex) {
                        // Ignore
                    } catch (Throwable e) {
                        // Safe-guard, so we never exit this loop in an uncontrolled way.
                        logger.warn("Discovery AcceptorThread error", e);
                    }
                }
            }
            
               private void drainInputQueues() throws InterruptedException {
                do {
                    drainReprocessQueue();
                    drainAcceptorQueue();
    
                    if (isShutdown.get()) {
                        break;
                    }
                    // If all queues are empty, block for a while on the acceptor queue
                    if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
                        TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
                        if (taskHolder != null) {
                            appendTaskHolder(taskHolder);
                        }
                    }
                } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
            }
    
             private void drainAcceptorQueue() {
                while (!acceptorQueue.isEmpty()) {
                    appendTaskHolder(acceptorQueue.poll());
                }
            }
    

    runner这个后台线程,会把acceptorQueue的task任务移到processingOrder。接着就会把processingOrder的任务进行打包批量的放到batchWorkQueue中。

            void assignBatchWork() {
                if (hasEnoughTasksForNextBatch()) {
                    if (batchWorkRequests.tryAcquire(1)) {
                        long now = System.currentTimeMillis();
                        int len = Math.min(maxBatchingSize, processingOrder.size());
                        List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
                        while (holders.size() < len && !processingOrder.isEmpty()) {
                            ID id = processingOrder.poll();
                            TaskHolder<ID, T> holder = pendingTasks.remove(id);
                            if (holder.getExpiryTime() > now) {
                                holders.add(holder);
                            } else {
                                expiredTasks++;
                            }
                        }
                        if (holders.isEmpty()) {
                            batchWorkRequests.release();
                        } else {
                            batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
                            batchWorkQueue.add(holders);
                        }
                    }
                }
            }
    
            private boolean hasEnoughTasksForNextBatch() {
                if (processingOrder.isEmpty()) {
                    return false;
                }
                if (pendingTasks.size() >= maxBufferSize) {
                    return true;
                }
    
                TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
                long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
                return delay >= maxBatchingDelay;
            }
        }
    

    最后是由ReplicationTaskProcessor去执行Jersey2ReplicationClient#submitBatchUpdates

        @Override
        public ProcessingResult process(List<ReplicationTask> tasks) {
            ReplicationList list = createReplicationListOf(tasks);
            try {
                EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
                int statusCode = response.getStatusCode();
                if (!isSuccess(statusCode)) {
                    if (statusCode == 503) {
                        logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
                        return ProcessingResult.Congestion;
                    } else {
                        // Unexpected error returned from the server. This should ideally never happen.
                        logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
                        return ProcessingResult.PermanentError;
                    }
                } else {
                    handleBatchResponse(tasks, response.getEntity().getResponseList());
                }
            } catch (Throwable e) {
                if (maybeReadTimeOut(e)) {
                    logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
                	//read timeout exception is more Congestion then TransientError, return Congestion for longer delay 
                    return ProcessingResult.Congestion;
                } else if (isNetworkConnectException(e)) {
                    logNetworkErrorSample(null, e);
                    return ProcessingResult.TransientError;
                } else {
                    logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
                    return ProcessingResult.PermanentError;
                }
            }
            return ProcessingResult.Success;
        }
    
        @Override
        public EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList) {
            Response response = null;
            try {
                response = jerseyClient.target(serviceUrl)
                        .path(PeerEurekaNode.BATCH_URL_PATH)
                        .request(MediaType.APPLICATION_JSON_TYPE)
                        .post(Entity.json(replicationList));
                if (!isSuccess(response.getStatus())) {
                    return anEurekaHttpResponse(response.getStatus(), ReplicationListResponse.class).build();
                }
                ReplicationListResponse batchResponse = response.readEntity(ReplicationListResponse.class);
                return anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build();
            } finally {
                if (response != null) {
                    response.close();
                }
            }
        }
    

    image-20210925180656781

    去发送一个peerreplication/batch/ 接口,映射 PeerReplicationResource#batchReplication(...) 方法,代码如下:

        @Path("batch")
        @POST
        public Response batchReplication(ReplicationList replicationList) {
            try {
                ReplicationListResponse batchResponse = new ReplicationListResponse();
                for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
                    try {
                        // 逐个同步操作任务处理,并将处理结果( ReplicationInstanceResponse ) 合并到 ReplicationListResponse 。
                        batchResponse.addResponse(dispatch(instanceInfo));
                    } catch (Exception e) {
                        batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
                        logger.error("{} request processing failed for batch item {}/{}",
                                instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
                    }
                }
                return Response.ok(batchResponse).build();
            } catch (Throwable e) {
                logger.error("Cannot execute batch Request", e);
                return Response.status(Status.INTERNAL_SERVER_ERROR).build();
            }
        }
    
    
        private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
            ApplicationResource applicationResource = createApplicationResource(instanceInfo);
            InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
    
            String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
            String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
            String instanceStatus = toString(instanceInfo.getStatus());
    
            Builder singleResponseBuilder = new Builder();
            switch (instanceInfo.getAction()) {
                case Register:
                    singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
                    break;
                case Heartbeat:
                    singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
                    break;
                case Cancel:
                    singleResponseBuilder = handleCancel(resource);
                    break;
                case StatusUpdate:
                    singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
                    break;
                case DeleteStatusOverride:
                    singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
                    break;
            }
            return singleResponseBuilder.build();
        }
    

    1、集群同步的机制:闪光点,client可以找任何一个server发送请求,然后这个server会将请求同步到其他所有的server上去,但是其他的server仅仅会在自己本地执行,不会再次同步了

    2、数据同步的异步批处理机制:闪光点,三个队列,第一个队列,就是纯写入;第二个队列,是用来根据时间和大小,来拆分队列;第三个队列,用来放批处理任务 ==》 异步批处理机制

  • 相关阅读:
    macbook 无声音解决方案
    webapck dev server代理请求 json截断问题
    百度卫星地图开启
    服务器 nginx配置 防止其他域名绑定自己的服务器
    记一次nginx php配置的心路历程
    遇到npm报错read ECONNRESET怎么办
    运行svn tortoiseSvn cleanup 命令失败的解决办法
    svn add 命令 递归目录下所有文件
    m4出现Please port gnulib freadahead.c to your platform! Look at the definition of fflush, fread, ungetc on your system, then report this to bug-gnulib."
    Ubuntu下安装GCC,mpc、mpfr、gmp
  • 原文地址:https://www.cnblogs.com/dalianpai/p/15335045.html
Copyright © 2011-2022 走看看