zoukankan      html  css  js  c++  java
  • nacos源码解析(三)-注册中心服务注册处理

    概述

      注册中心服务端的主要功能包括,接收客户端的服务注册,服务发现,服务下线的功能,但是除了这些和客户端的交互之外,服务端还要做一些更重要的事情,就是我们常常会在分布式系统中听到的AP和CP,作为一个集群,nacos即实现了AP也实现了CP,其中AP使用的自己实现的Distro协议,而CP是采用raft协议实现的,这个过程中牵涉到心跳啊,选主啊等操作,说复杂还是挺复杂的。

      本文主要介绍一下注册中心服务端接收客户端服务注册的功能,其他功能暂时先不涉及。

    服务端接收客户端注册的接口如下

    @CanDistro
        @PostMapping
        @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
        public String register(HttpServletRequest request) throws Exception {
    
            final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
            final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
                    Constants.DEFAULT_NAMESPACE_ID);
    
            final Instance instance = parseInstance(request);
    
            serviceManager.registerInstance(namespaceId, serviceName, instance);
            return "ok";
        }

    该方法在com.alibaba.nacos.naming.controllers.InstanceController类中

    服务注册流程图

    根据流程图,我把这个过程拆分成了两块,第一块就是更新本地缓存,因为服务注册并不是向集群中每个节点都注册,而是随机选择其中一个节点进行注册的。第二块其实就是把注册的服务信息同步给集群中别的节点,好,接下来我们详细分析一下这两块。

    第一块:更新本地缓存

     public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
            //判断本地缓存中是否存在该命名空间,如果不存在就创建,之后判断该命名空间下是否
            //存在该服务,如果不存在就创建空的服务
            //注意这里并没有更新服务的实例信息
            createEmptyService(namespaceId, serviceName, instance.isEphemeral());
            //从本地缓存中获取服务信息
            Service service = getService(namespaceId, serviceName);
    
            if (service == null) {
                throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
            }
            //服务注册,这一步才会把服务的实例信息和服务绑定起来
            addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
        }

    进入com.alibaba.nacos.naming.core.ServiceManager#createEmptyService

    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
            Service service = getService(namespaceId, serviceName);
            if (service == null) {
                //如果服务不存在,创建一个空的服务
                Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
                service = new Service();
                service.setName(serviceName);
                service.setNamespaceId(namespaceId);
                service.setGroupName(NamingUtils.getGroupName(serviceName));
                // now validate the service. if failed, exception will be thrown
                service.setLastModifiedMillis(System.currentTimeMillis());
                service.recalculateChecksum();
                if (cluster != null) {
                    cluster.setService(service);
                    service.getClusterMap().put(cluster.getName(), cluster);
                }
                service.validate();
                //将创建的空的服务插入缓存,并初始化
                putServiceAndInit(service);
                if (!local) {
                    addOrReplaceService(service);
                }
            }
        }

    进入com.alibaba.nacos.naming.core.ServiceManager#putServiceAndInit

     private void putServiceAndInit(Service service) throws NacosException {
            //将服务插入缓存
            putService(service);
            //对服务启动一个健康检查的定时任务
            service.init();
            consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
            consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
            Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
        }

    1、putService很简单就不分析了

    2.进入com.alibaba.nacos.naming.core.Service#init

    这里就不深入分析了,关于健康检查我打算专门写一篇文章分析,因为这里阿里的大佬们使用了tcp连接,并且使用了nio来进行检查,里面还是挺复杂的。

    3.监听的代码,这部分涉及的东西也很多,这里也是使用的观察者模式实现的。

    总结:到这里,第一部分的代码就算是分析完了,里面我觉得要仔细分析,还有很多的细节没有仔细看。

    第二部分:将服务信息持久化到磁盘并同步到其他节点

    进入com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put()方法

    @Override
        public void put(String key, Record value) throws NacosException {
            mapConsistencyService(key).put(key, value);
        }

    由于我们注册的服务是临时的,至于为什么是临时的,看校验参数那一块,具体方法为:

    com.alibaba.nacos.naming.controllers.InstanceController#parseInstance(),在这个方法内部会调用getIPAddress()方法

    之后进com.alibaba.nacos.naming.controllers.InstanceController#getIPAddress(),之后你就会发现有这段代码

    boolean ephemeral = BooleanUtils.toBoolean(WebUtils.optional(request, "ephemeral",
                    String.valueOf(switchDomain.isDefaultInstanceEphemeral())));
    
            Instance instance = new Instance();
            instance.setPort(Integer.parseInt(port));
            instance.setIp(ip);
            instance.setWeight(Double.parseDouble(weight));
            instance.setClusterName(cluster);
            instance.setHealthy(healthy);
            instance.setEnabled(enabled);
            instance.setEphemeral(ephemeral);

    这个参数其实是由你注册服务的时候,客户端自己设置的。

    在客户端代码com.alibaba.nacos.client.naming.net.NamingProxy#registerService

     public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
            NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
                namespaceId, serviceName, instance);
    
            final Map<String, String> params = new HashMap<String, String>(9);
            params.put(CommonParams.NAMESPACE_ID, namespaceId);
            params.put(CommonParams.SERVICE_NAME, serviceName);
            params.put(CommonParams.GROUP_NAME, groupName);
            params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
            params.put("ip", instance.getIp());
            params.put("port", String.valueOf(instance.getPort()));
            params.put("weight", String.valueOf(instance.getWeight()));
            params.put("enable", String.valueOf(instance.isEnabled()));
            params.put("healthy", String.valueOf(instance.isHealthy()));
            //设置是否是临时服务,默认为true
            params.put("ephemeral", String.valueOf(instance.isEphemeral()));
            params.put("metadata", JSON.toJSONString(instance.getMetadata()));
    
            reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
    
        }

    所以上面的put方法是com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put(),我们进入

        @Override
        public void put(String key, Record value) throws NacosException {
            //将服务信息放入缓存
            onPut(key, value);
            //新增任务,把服务信息同步给别的节点
            taskDispatcher.addTask(key);
        }

    1.onput方法比较简单,不介绍了

    2.进入taskDispatcher.addTask(key);,由于代码太长,我就折叠了

    /*
     * Copyright 1999-2018 Alibaba Group Holding Ltd.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *      http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.alibaba.nacos.naming.consistency.ephemeral.distro;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.nacos.core.cluster.Member;
    import com.alibaba.nacos.naming.misc.*;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Data sync task dispatcher
     *
     * @author nkorange
     * @since 1.0.0
     */
    @Component
    public class TaskDispatcher {
    
        @Autowired
        private GlobalConfig partitionConfig;
    
        @Autowired
        private DataSyncer dataSyncer;
    
        private List<TaskScheduler> taskSchedulerList = new ArrayList<>();
    
        private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();
        //当这个bean加载的时候执行
        @PostConstruct
        public void init() {
            //根据cpu个数,新建任务,并且执行每个任务
            for (int i = 0; i < cpuCoreCount; i++) {
                TaskScheduler taskScheduler = new TaskScheduler(i);
                taskSchedulerList.add(taskScheduler);
                GlobalExecutor.submitTaskDispatch(taskScheduler);
            }
        }
    
        public void addTask(String key) {
            //从任务列表中随机选择一个任务处理当前的key
            taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
        }
        //任务类
        public class TaskScheduler implements Runnable {
    
            private int index;
    
            private int dataSize = 0;
    
            private long lastDispatchTime = 0L;
    
            private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
    
            public TaskScheduler(int index) {
                this.index = index;
            }
    
            public void addTask(String key) {
                queue.offer(key);
            }
    
            public int getIndex() {
                return index;
            }
    
            @Override
            public void run() {
    
                List<String> keys = new ArrayList<>();
                while (true) {
    
                    try {
    
                        String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                            TimeUnit.MILLISECONDS);
    
    //                    String key = queue.take();
    //                    System.out.println("test");
    
    
                        if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                            Loggers.DISTRO.debug("got key: {}", key);
                        }
    
                        if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
                            continue;
                        }
    
                        if (StringUtils.isBlank(key)) {
                            continue;
                        }
    
                        if (dataSize == 0) {
                            keys = new ArrayList<>();
                        }
    
                        keys.add(key);
                        dataSize++;
    
                        if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                            (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
    
                            for (Member member : dataSyncer.getServers()) {
                                if (NetUtils.localServer().equals(member.getAddress())) {
                                    continue;
                                }
                                //new一个同步任务,用于同步
                                SyncTask syncTask = new SyncTask();
                                syncTask.setKeys(keys);
                                syncTask.setTargetServer(member.getAddress());
    
                                if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                    Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                                }
                                //执行同步
                                dataSyncer.submit(syncTask, 0);
                            }
                            lastDispatchTime = System.currentTimeMillis();
                            dataSize = 0;
                        }
    
                    } catch (Exception e) {
                        Loggers.DISTRO.error("dispatch sync task failed.", e);
                    }
                }
            }
        }
    }
    View Code
    /*
     * Copyright 1999-2018 Alibaba Group Holding Ltd.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *      http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.alibaba.nacos.naming.consistency.ephemeral.distro;
    
    import com.alibaba.nacos.core.cluster.Member;
    import com.alibaba.nacos.core.cluster.ServerMemberManager;
    import com.alibaba.nacos.naming.cluster.transport.Serializer;
    import com.alibaba.nacos.naming.consistency.Datum;
    import com.alibaba.nacos.naming.consistency.KeyBuilder;
    import com.alibaba.nacos.naming.core.DistroMapper;
    import com.alibaba.nacos.naming.misc.*;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.DependsOn;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * Data replicator
     *
     * @author nkorange
     * @since 1.0.0
     */
    @Component
    @DependsOn("ProtocolManager")
    public class DataSyncer {
    
        @Autowired
        private DataStore dataStore;
    
        @Autowired
        private GlobalConfig partitionConfig;
    
        @Autowired
        private Serializer serializer;
    
        @Autowired
        private DistroMapper distroMapper;
    
        @Autowired
        private ServerMemberManager memberManager;
    
        private Map<String, String> taskMap = new ConcurrentHashMap<>();
    
        @PostConstruct
        public void init() {
            startTimedSync();
        }
    
        public void submit(SyncTask task, long delay) {
    
            // If it's a new task:
            if (task.getRetryCount() == 0) {
                Iterator<String> iterator = task.getKeys().iterator();
                while (iterator.hasNext()) {
                    String key = iterator.next();
                    if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
                        // associated key already exist:
                        if (Loggers.DISTRO.isDebugEnabled()) {
                            Loggers.DISTRO.debug("sync already in process, key: {}", key);
                        }
                        iterator.remove();
                    }
                }
            }
    
            if (task.getKeys().isEmpty()) {
                // all keys are removed:
                return;
            }
    
            GlobalExecutor.submitDataSync(() -> {
                // 1. check the server
                if (getServers() == null || getServers().isEmpty()) {
                    Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
                    return;
                }
    
                List<String> keys = task.getKeys();
    
                if (Loggers.SRV_LOG.isDebugEnabled()) {
                    Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
                }
                // 2. get the datums by keys and check the datum is empty or not
                Map<String, Datum> datumMap = dataStore.batchGet(keys);
                if (datumMap == null || datumMap.isEmpty()) {
                    // clear all flags of this task:
                    for (String key : keys) {
                        taskMap.remove(buildKey(key, task.getTargetServer()));
                    }
                    return;
                }
    
                byte[] data = serializer.serialize(datumMap);
    
                long timestamp = System.currentTimeMillis();
                //同步到别的节点
                boolean success = NamingProxy.syncData(data, task.getTargetServer());
                if (!success) {
                    SyncTask syncTask = new SyncTask();
                    syncTask.setKeys(task.getKeys());
                    syncTask.setRetryCount(task.getRetryCount() + 1);
                    syncTask.setLastExecuteTime(timestamp);
                    syncTask.setTargetServer(task.getTargetServer());
                    retrySync(syncTask);
                } else {
                    // clear all flags of this task:
                    for (String key : task.getKeys()) {
                        taskMap.remove(buildKey(key, task.getTargetServer()));
                    }
                }
            }, delay);
        }
    
        public void retrySync(SyncTask syncTask) {
            Member member = new Member();
            member.setIp(syncTask.getTargetServer().split(":")[0]);
            member.setPort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
            if (!getServers().contains(member)) {
                // if server is no longer in healthy server list, ignore this task:
                //fix #1665 remove existing tasks
                if (syncTask.getKeys() != null) {
                    for (String key : syncTask.getKeys()) {
                        taskMap.remove(buildKey(key, syncTask.getTargetServer()));
                    }
                }
                return;
            }
    
            // TODO may choose other retry policy.
            submit(syncTask, partitionConfig.getSyncRetryDelay());
        }
    
        public void startTimedSync() {
            GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
        }
    
        public class TimedSync implements Runnable {
    
            @Override
            public void run() {
    
                try {
    
                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("server list is: {}", getServers());
                    }
    
                    // send local timestamps to other servers:
                    Map<String, String> keyChecksums = new HashMap<>(64);
                    for (String key : dataStore.keys()) {
                        if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
                            continue;
                        }
    
                        Datum datum = dataStore.get(key);
                        if (datum == null) {
                            continue;
                        }
                        keyChecksums.put(key, datum.value.getChecksum());
                    }
    
                    if (keyChecksums.isEmpty()) {
                        return;
                    }
    
                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
                    }
    
                    for (Member member : getServers()) {
                        if (NetUtils.localServer().equals(member.getAddress())) {
                            continue;
                        }
                        NamingProxy.syncCheckSums(keyChecksums, member.getAddress());
                    }
                } catch (Exception e) {
                    Loggers.DISTRO.error("timed sync task failed.", e);
                }
            }
    
        }
    
        public Collection<Member> getServers() {
            return memberManager.allMembers();
        }
    
        public String buildKey(String key, String targetServer) {
            return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
        }
    }
    View Code

    上面的两个类就是处理同步的,其实同步除了这个之外,在com.alibaba.nacos.naming.consistency.ephemeral.distro.DataSyncer这个类中还有一个定时同步,这个同步是上一个同步的补充。

      第二个定时同步,会把每个服务和这个服务的校验和封装到一个map中,然后异步发送给别的节点。

    总结

      里面还是有很多问题没有搞明白,只是搞明白了流程,还有很多细节没有仔细看,有什么问题,忘大家指教

  • 相关阅读:
    LocalDateTime和Date使用@JsonFormat显示毫秒时间
    curl查看ip的几种方式
    thinkphp5.1生成缩略图很模糊
    ajax发送时禁用按钮
    thinkphp5 不使用form,用input+ajax异步上传图片
    GOLANG 闭包和普通函数的区别
    GOLANG 匿名函数笔记
    父级自适应自己高度且高度完全一致css
    子元素等高 css
    ios 用jquery为元素标签绑定点击事件时,ios上不执行点击事件
  • 原文地址:https://www.cnblogs.com/gunduzi/p/13219894.html
Copyright © 2011-2022 走看看