zoukankan      html  css  js  c++  java
  • eureka server集群信息同步

    概述

    image-20210925125812618

    如上图,server1和server2之间会拉取对方的注册表,相互的注册,当client往集群中进行注册的时候,如果是请求到server1上,server1会将这个请求同步到server2,下线心跳也是如此,集群之间的同步是通过3层队列任务批处理的方式进行的。

    集群的初始化

    集群启动

    在EurekaBootStrap的初始化的过程中,第一步会先初始化eureka的环境,在初始化eureka的上下文环境。其中就有initEurekaServerContext下面的一段代码

            PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
                    registry,
                    eurekaServerConfig,
                    eurekaClient.getEurekaClientConfig(),
                    serverCodecs,
                    applicationInfoManager
            );
    
        protected PeerEurekaNodes getPeerEurekaNodes(PeerAwareInstanceRegistry registry, EurekaServerConfig eurekaServerConfig, EurekaClientConfig eurekaClientConfig, ServerCodecs serverCodecs, ApplicationInfoManager applicationInfoManager) {
            PeerEurekaNodes peerEurekaNodes = new PeerEurekaNodes(
                    registry,
                    eurekaServerConfig,
                    eurekaClientConfig,
                    serverCodecs,
                    applicationInfoManager
            );
            
            return peerEurekaNodes;
        }
    

    会得到一个PeerEurekaNodes,它的构造方法如下:

    public class PeerEurekaNodes {
    
        private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNodes.class);
    
        /**
         * 应用实例注册表
         */
        protected final PeerAwareInstanceRegistry registry;
        /**
         * Eureka-Server 配置
         */
        protected final EurekaServerConfig serverConfig;
        /**
         * Eureka-Client 配置
         */
        protected final EurekaClientConfig clientConfig;
        /**
         * Eureka-Server 编解码
         */
        protected final ServerCodecs serverCodecs;
        /**
         * 应用实例信息管理器
         */
        private final ApplicationInfoManager applicationInfoManager;
    
        /**
         * Eureka-Server 集群节点数组
         */
        private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
        /**
         * Eureka-Server 服务地址数组
         */
        private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
    
        /**
         * 定时任务服务
         */
        private ScheduledExecutorService taskExecutor;
    
        @Inject
        public PeerEurekaNodes(
                PeerAwareInstanceRegistry registry,
                EurekaServerConfig serverConfig,
                EurekaClientConfig clientConfig,
                ServerCodecs serverCodecs,
                ApplicationInfoManager applicationInfoManager) {
            this.registry = registry;
            this.serverConfig = serverConfig;
            this.clientConfig = clientConfig;
            this.serverCodecs = serverCodecs;
            this.applicationInfoManager = applicationInfoManager;
        }
    }
    

    其实是在处理eureka server集群信息的初始化,会执行PeerEurekaNodes.start()方法

    //完成eureka-server上下文的构建以及初始化过程
            serverContext = new DefaultEurekaServerContext(
                    eurekaServerConfig,
                    serverCodecs,
                    registry,
                    peerEurekaNodes,
                    applicationInfoManager
            );
    
    //初始化的代码就在下面一行
        @PostConstruct
        @Override
        public void initialize() {
            logger.info("Initializing ...");
            peerEurekaNodes.start();
            try {
                registry.init(peerEurekaNodes);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            logger.info("Initialized");
        }
    

    @PostConstruct的执行顺序就在DefaultEurekaServerContext的构造函数的后面开始执行,结合前面的流程,也就是说一开始先构造出peerEurekaNodes类,然后传进DefaultEurekaServerContext的有参构造中,在进行初始化。

    调用 PeerEurekaNodes#start() 方法,集群节点启动。

    • 初始化集群节点信息
    • 初始化固定周期( 默认:10 分钟,可配置 )更新集群节点信息的任务
        public void start() {
            //先创建一个定时调度
            taskExecutor = Executors.newSingleThreadScheduledExecutor(
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                            thread.setDaemon(true);
                            return thread;
                        }
                    }
            );
            try {
                //第一次初始化的时候,自己先初始化一遍
                updatePeerEurekaNodes(resolvePeerUrls());
    
                Runnable peersUpdateTask = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            updatePeerEurekaNodes(resolvePeerUrls());
                        } catch (Throwable e) {
                            logger.error("Cannot update the replica Nodes", e);
                        }
    
                    }
                };
               //在创建一个线程,10分钟去更新一下初始化的操作,用来移除添加新的server节点。
                taskExecutor.scheduleWithFixedDelay(
                        peersUpdateTask,
                        serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                        serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                        TimeUnit.MILLISECONDS
                );
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
            for (PeerEurekaNode node : peerEurekaNodes) {
                logger.info("Replica node URL:  {}", node.getServiceUrl());
            }
        }
    

    解析配置文件中的其他eureka server的url地址,基于url地址构造一个一个的PeerEurekaNode,一个PeerEurekaNode就代表了一个eureka server。启动一个后台的线程,默认是每隔10分钟,会运行一个任务,就是基于配置文件中的url来刷新eureka server列表。

    更新集群信息

    调用 #resolvePeerUrls() 方法,获得 Eureka-Server 集群服务地址数组,不包含自己的

        /**
         * Resolve peer URLs.
         *
         * @return peer URLs with node's own URL filtered out
         */
        protected List<String> resolvePeerUrls() {
            InstanceInfo myInfo = applicationInfoManager.getInfo();
            String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
            List<String> replicaUrls = EndpointUtils
                    .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
    
            int idx = 0;
            while (idx < replicaUrls.size()) {
                //判断是否是自己的url,是的话,进行移除
                if (isThisMyUrl(replicaUrls.get(idx))) {
                    replicaUrls.remove(idx);
                } else {
                    idx++;
                }
            }
            return replicaUrls;
        }
    
        public boolean isThisMyUrl(String url) {
            final String myUrlConfigured = serverConfig.getMyUrl();
            if (myUrlConfigured != null) {
                return myUrlConfigured.equals(url);
            }
            return isInstanceURL(url, applicationInfoManager.getInfo());
        }
    

    调用 #updatePeerEurekaNodes() 方法,更新集群节点信息,主要完成两部分逻辑:

    • 添加新增的集群节点
    • 关闭删除的集群节点
        protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
            if (newPeerUrls.isEmpty()) {
                logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
                return;
            }
         // 计算 删除的集群节点地址
            Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
            toShutdown.removeAll(newPeerUrls);
            
             // 计算 新增的集群节点地址
            Set<String> toAdd = new HashSet<>(newPeerUrls);
            toAdd.removeAll(peerEurekaNodeUrls);
    
            if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
                return;
            }
    
            // Remove peers no long available
            List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
    
            //关闭删除的集群节点
            if (!toShutdown.isEmpty()) {
                logger.info("Removing no longer available peer nodes {}", toShutdown);
                int i = 0;
                while (i < newNodeList.size()) {
                    PeerEurekaNode eurekaNode = newNodeList.get(i);
                    if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                        newNodeList.remove(i);
                        eurekaNode.shutDown();
                    } else {
                        i++;
                    }
                }
            }
    
            // 添加新的节点
            if (!toAdd.isEmpty()) {
                logger.info("Adding new peer nodes {}", toAdd);
                for (String peerUrl : toAdd) {
                    newNodeList.add(createPeerEurekaNode(peerUrl));
                }
            }
    
            this.peerEurekaNodes = newNodeList;
            this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
        }
    

    获取注册信息

    初始化完成以后,就在Bootstrap的方法中,继续往下走,来到int registryCount = registry.syncUp();

        @Override
        public int syncUp() {
            // Copy entire entry from neighboring DS node
            int count = 0;
    
            //默认可以重试5次拉取注册表
            for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
                if (i > 0) {
                    try {
                        // 如果第一次没有在自己本地的eureka client中获取注册表
                        // 说明自己的本地eureka client还没有从任何其他的eureka server上获取注册表
                        // 所以此时重试,等待30秒
                        Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                    } catch (InterruptedException e) {
                        logger.warn("Interrupted during registry transfer..");
                        break;
                    }
                }
                //eureka server自己本身本来就是个eureka client,在初始化的时候,就会去找任意的一个eureka server
                // 拉取注册表到自己本地来,把这个注册表放到自己身上来,作为自己这个eureka server的注册表
                Applications apps = eurekaClient.getApplications();
                for (Application app : apps.getRegisteredApplications()) {
                    for (InstanceInfo instance : app.getInstances()) {
                        try {
                            if (isRegisterable(instance)) {
                                register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                                count++;
                            }
                        } catch (Throwable t) {
                            logger.error("During DS init copy", t);
                        }
                    }
                }
            }
            return count;
        }
    

    在拉取失败的时候,会等30s后,继续拉取。

    集群注册信息同步

    image-20210925171540039

    • Eureka-Server 接收到 Eureka-Client 的 Register、Heartbeat、Cancel、StatusUpdate、DeleteStatusOverride 操作,固定间隔( 默认值 :500 毫秒,可配 )向 Eureka-Server 集群内其他节点同步( 准实时,非实时 )。

    ApplicationResource的addInstance()方法,负责注册,现在自己本地完成一个注册,接着会replicateToPeers()方法,这个方法就会将这次注册请求,同步到其他所有的eureka server上去。

    如果是某台eureka client来找eureka server进行注册,isReplication是false,此时会给其他所有的你配置的eureka server都同步这个注册请求,此时一定会基于jersey,调用其他所有的eureka server的restful接口,去执行这个服务实例的注册的请求

    eureka-core-jersey2的工程,ReplicationHttpClient,此时同步注册请求给其他eureka server的时候,一定会将isReplication设置为true,其他eureka server接到这个同步的请求,仅仅在自己本地执行,不会再次向其他的eureka server去进行注册

        @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
            if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
                leaseDuration = info.getLeaseInfo().getDurationInSecs();
            }
            super.register(info, leaseDuration, isReplication);
            // Eureka-Server 复制
            replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
        }
    
        private void replicateToPeers(Action action, String appName, String id,
                                      InstanceInfo info /* optional */,
                                      InstanceStatus newStatus /* optional */, boolean isReplication) {
            Stopwatch tracer = action.getTimer().start();
            try {
                if (isReplication) {
                    numberOfReplicationsLastMin.increment();
                }
               // 集群为空 或者isReplication 为true
                if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                    return;
                }
    
                for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                    // If the url represents this host, do not replicate to yourself.
                    if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                        continue;
                    }
                    replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
                }
            } finally {
                tracer.stop();
            }
        }
    
    
    

    PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers(...) 方法,代码如下:

    
        private void replicateInstanceActionsToPeers(Action action, String appName,
                                                     String id, InstanceInfo info, InstanceStatus newStatus,
                                                     PeerEurekaNode node) {
            try {
                InstanceInfo infoFromRegistry;
                CurrentRequestVersion.set(Version.V2);
                switch (action) {
                    case Cancel:
                        node.cancel(appName, id);
                        break;
                    case Heartbeat:
                        InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                        break;
                    case Register:
                        node.register(info);
                        break;
                    case StatusUpdate:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                        break;
                    case DeleteStatusOverride:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.deleteStatusOverride(appName, id, infoFromRegistry);
                        break;
                }
            } catch (Throwable t) {
                logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
            } finally {
                CurrentRequestVersion.remove();
            }
        }
    
    • Cancel :调用 PeerEurekaNode#cancel(...) 方法,
    • Heartbeat :调用 PeerEurekaNode#heartbeat(...) 方法
    • Register :调用 PeerEurekaNode#register(...) 方法
    • StatusUpdate :调用 PeerEurekaNode#statusUpdate(...) 方法
    • DeleteStatusOverride :调用 PeerEurekaNode#deleteStatusOverride(...) 方法

    随便打开其中的一个方法查看:

        public void cancel(final String appName, final String id) throws Exception {
            long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
            batchingDispatcher.process(
                    taskId("cancel", appName, id),
                    new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
                        @Override
                        public EurekaHttpResponse<Void> execute() {
                            return replicationClient.cancel(appName, id);
                        }
    
                        @Override
                        public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                            super.handleFailure(statusCode, responseEntity);
                            if (statusCode == 404) {
                                logger.warn("{}: missing entry.", getTaskName());
                            }
                        }
                    },
                    expiryTime
            );
        }
      
      //相同应用实例的相同同步操作使用相同任务编号
        private static String taskId(String requestType, String appName, String id) {
            return requestType + '#' + appName + '/' + id;
        }
    

    这里会把一个任务封装成一个InstanceReplicationTask,交给batchingDispatcher,进行处理。

        /* For testing */ PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
                                         HttpReplicationClient replicationClient, EurekaServerConfig config,
                                         int batchSize, long maxBatchingDelayMs,
                                         long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
            this.registry = registry;
            this.targetHost = targetHost;
            this.replicationClient = replicationClient;
    
            this.serviceUrl = serviceUrl;
            this.config = config;
            this.maxProcessingDelayMs = config.getMaxTimeForReplication();
    
            String batcherName = getBatcherName();
            ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
            this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
                    batcherName,
                    config.getMaxElementsInPeerReplicationPool(),
                    batchSize,
                    config.getMaxThreadsForPeerReplication(),
                    maxBatchingDelayMs,
                    serverUnavailableSleepTimeMs,
                    retrySleepTimeMs,
                    taskProcessor
            );
            this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
                    targetHost,
                    config.getMaxElementsInStatusReplicationPool(),
                    config.getMaxThreadsForStatusReplication(),
                    maxBatchingDelayMs,
                    serverUnavailableSleepTimeMs,
                    retrySleepTimeMs,
                    taskProcessor
            );
        }
    
    
        public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
                                                                                 int maxBufferSize,
                                                                                 int workloadSize,
                                                                                 int workerCount,
                                                                                 long maxBatchingDelay,
                                                                                 long congestionRetryDelayMs,
                                                                                 long networkFailureRetryMs,
                                                                                 TaskProcessor<T> taskProcessor) {
            final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
                    id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
            );
            final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
            return new TaskDispatcher<ID, T>() {
                @Override
                public void process(ID id, T task, long expiryTime) {
                    acceptorExecutor.process(id, task, expiryTime);
                }
    
                @Override
                public void shutdown() {
                    acceptorExecutor.shutdown();
                    taskExecutor.shutdown();
                }
            };
        }
    

    其中,createBatchingTaskDispatcher进行创建的时候,会把process进行重写,最终是由acceptorExecutor进行处理。

        void process(ID id, T task, long expiryTime) {
            acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
            acceptedTasks++;
        }
    

    他会把之前封装的任务放到acceptorQueue中,在AcceptorExecutor的构造器中,会启动一个acceptorThread回台进程。

        AcceptorExecutor(String id,
                         int maxBufferSize,
                         int maxBatchingSize,
                         long maxBatchingDelay,
                         long congestionRetryDelayMs,
                         long networkFailureRetryMs) {
            this.id = id;
            this.maxBufferSize = maxBufferSize;
            this.maxBatchingSize = maxBatchingSize;
            this.maxBatchingDelay = maxBatchingDelay;
            this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs);
    
            ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
            this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
            this.acceptorThread.setDaemon(true);
            this.acceptorThread.start();
    
            final double[] percentiles = {50.0, 95.0, 99.0, 99.5};
            final StatsConfig statsConfig = new StatsConfig.Builder()
                    .withSampleSize(1000)
                    .withPercentiles(percentiles)
                    .withPublishStdDev(true)
                    .build();
            final MonitorConfig config = MonitorConfig.builder(METRIC_REPLICATION_PREFIX + "batchSize").build();
            this.batchSizeMetric = new StatsTimer(config, statsConfig);
            try {
                Monitors.registerObject(id, this);
            } catch (Throwable e) {
                logger.warn("Cannot register servo monitor for this object", e);
            }
        }
    

    启动后去执行AcceptorRunner的run方法。

        class AcceptorRunner implements Runnable {
            @Override
            public void run() {
                long scheduleTime = 0;
                while (!isShutdown.get()) {
                    try {
                        drainInputQueues();
    
                        int totalItems = processingOrder.size();
    
                        long now = System.currentTimeMillis();
                        if (scheduleTime < now) {
                            scheduleTime = now + trafficShaper.transmissionDelay();
                        }
                        if (scheduleTime <= now) {
                            assignBatchWork();
                            assignSingleItemWork();
                        }
    
                        // If no worker is requesting data or there is a delay injected by the traffic shaper,
                        // sleep for some time to avoid tight loop.
                        if (totalItems == processingOrder.size()) {
                            Thread.sleep(10);
                        }
                    } catch (InterruptedException ex) {
                        // Ignore
                    } catch (Throwable e) {
                        // Safe-guard, so we never exit this loop in an uncontrolled way.
                        logger.warn("Discovery AcceptorThread error", e);
                    }
                }
            }
            
               private void drainInputQueues() throws InterruptedException {
                do {
                    drainReprocessQueue();
                    drainAcceptorQueue();
    
                    if (isShutdown.get()) {
                        break;
                    }
                    // If all queues are empty, block for a while on the acceptor queue
                    if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
                        TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
                        if (taskHolder != null) {
                            appendTaskHolder(taskHolder);
                        }
                    }
                } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
            }
    
             private void drainAcceptorQueue() {
                while (!acceptorQueue.isEmpty()) {
                    appendTaskHolder(acceptorQueue.poll());
                }
            }
    

    runner这个后台线程,会把acceptorQueue的task任务移到processingOrder。接着就会把processingOrder的任务进行打包批量的放到batchWorkQueue中。

            void assignBatchWork() {
                if (hasEnoughTasksForNextBatch()) {
                    if (batchWorkRequests.tryAcquire(1)) {
                        long now = System.currentTimeMillis();
                        int len = Math.min(maxBatchingSize, processingOrder.size());
                        List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
                        while (holders.size() < len && !processingOrder.isEmpty()) {
                            ID id = processingOrder.poll();
                            TaskHolder<ID, T> holder = pendingTasks.remove(id);
                            if (holder.getExpiryTime() > now) {
                                holders.add(holder);
                            } else {
                                expiredTasks++;
                            }
                        }
                        if (holders.isEmpty()) {
                            batchWorkRequests.release();
                        } else {
                            batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
                            batchWorkQueue.add(holders);
                        }
                    }
                }
            }
    
            private boolean hasEnoughTasksForNextBatch() {
                if (processingOrder.isEmpty()) {
                    return false;
                }
                if (pendingTasks.size() >= maxBufferSize) {
                    return true;
                }
    
                TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
                long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
                return delay >= maxBatchingDelay;
            }
        }
    

    最后是由ReplicationTaskProcessor去执行Jersey2ReplicationClient#submitBatchUpdates

        @Override
        public ProcessingResult process(List<ReplicationTask> tasks) {
            ReplicationList list = createReplicationListOf(tasks);
            try {
                EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
                int statusCode = response.getStatusCode();
                if (!isSuccess(statusCode)) {
                    if (statusCode == 503) {
                        logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
                        return ProcessingResult.Congestion;
                    } else {
                        // Unexpected error returned from the server. This should ideally never happen.
                        logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
                        return ProcessingResult.PermanentError;
                    }
                } else {
                    handleBatchResponse(tasks, response.getEntity().getResponseList());
                }
            } catch (Throwable e) {
                if (maybeReadTimeOut(e)) {
                    logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
                	//read timeout exception is more Congestion then TransientError, return Congestion for longer delay 
                    return ProcessingResult.Congestion;
                } else if (isNetworkConnectException(e)) {
                    logNetworkErrorSample(null, e);
                    return ProcessingResult.TransientError;
                } else {
                    logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
                    return ProcessingResult.PermanentError;
                }
            }
            return ProcessingResult.Success;
        }
    
        @Override
        public EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList) {
            Response response = null;
            try {
                response = jerseyClient.target(serviceUrl)
                        .path(PeerEurekaNode.BATCH_URL_PATH)
                        .request(MediaType.APPLICATION_JSON_TYPE)
                        .post(Entity.json(replicationList));
                if (!isSuccess(response.getStatus())) {
                    return anEurekaHttpResponse(response.getStatus(), ReplicationListResponse.class).build();
                }
                ReplicationListResponse batchResponse = response.readEntity(ReplicationListResponse.class);
                return anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build();
            } finally {
                if (response != null) {
                    response.close();
                }
            }
        }
    

    image-20210925180656781

    去发送一个peerreplication/batch/ 接口,映射 PeerReplicationResource#batchReplication(...) 方法,代码如下:

        @Path("batch")
        @POST
        public Response batchReplication(ReplicationList replicationList) {
            try {
                ReplicationListResponse batchResponse = new ReplicationListResponse();
                for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
                    try {
                        // 逐个同步操作任务处理,并将处理结果( ReplicationInstanceResponse ) 合并到 ReplicationListResponse 。
                        batchResponse.addResponse(dispatch(instanceInfo));
                    } catch (Exception e) {
                        batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
                        logger.error("{} request processing failed for batch item {}/{}",
                                instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
                    }
                }
                return Response.ok(batchResponse).build();
            } catch (Throwable e) {
                logger.error("Cannot execute batch Request", e);
                return Response.status(Status.INTERNAL_SERVER_ERROR).build();
            }
        }
    
    
        private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
            ApplicationResource applicationResource = createApplicationResource(instanceInfo);
            InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
    
            String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
            String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
            String instanceStatus = toString(instanceInfo.getStatus());
    
            Builder singleResponseBuilder = new Builder();
            switch (instanceInfo.getAction()) {
                case Register:
                    singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
                    break;
                case Heartbeat:
                    singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
                    break;
                case Cancel:
                    singleResponseBuilder = handleCancel(resource);
                    break;
                case StatusUpdate:
                    singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
                    break;
                case DeleteStatusOverride:
                    singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
                    break;
            }
            return singleResponseBuilder.build();
        }
    

    1、集群同步的机制:闪光点,client可以找任何一个server发送请求,然后这个server会将请求同步到其他所有的server上去,但是其他的server仅仅会在自己本地执行,不会再次同步了

    2、数据同步的异步批处理机制:闪光点,三个队列,第一个队列,就是纯写入;第二个队列,是用来根据时间和大小,来拆分队列;第三个队列,用来放批处理任务 ==》 异步批处理机制

  • 相关阅读:
    day06作业
    day04_ATM项目说明书
    ATM+购物车基本思路流程
    装饰器、迭代器、生成器、递归、匿名函数、面向过程编程、三元表达式6
    day05函数部分
    自制七段数码管源码
    字符串格式化
    字符串表示
    格式化输出
    python入门——列表类型、元组、字典类型
  • 原文地址:https://www.cnblogs.com/dalianpai/p/15335045.html
Copyright © 2011-2022 走看看