zoukankan      html  css  js  c++  java
  • Dubbo源码解析之registry注册中心

    阅读须知

    • dubbo版本:2.6.0
    • spring版本:4.3.8
    • 文章中使用/* */注释的方法会做深入分析

    正文
    注册中心是Dubbo的重要组成部分,主要用于服务的注册与发现,我们可以选择Redis、数据库、Zookeeper作为Dubbo的注册中心,Dubbo推荐用户使用Zookeeper作为注册中心

    在provider和consumer的初始化过程中,我们看到了dubbo通过调用RegistryFactory的getRegistry方法来获取注册中心实例,我们就以这个方法作为入口来分析注册中心的相关流程
    AbstractRegistryFactory:

        @Override
        public Registry getRegistry(URL url) {
            url = URLBuilder.from(url)
                    .setPath(RegistryService.class.getName())
                    .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                    .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY)
                    .build();
            String key = url.toServiceStringWithoutResolving();
            // Lock the registry access process to ensure a single instance of the registry
            //锁定注册中心的访问过程以确保注册中心的单个实例
            LOCK.lock();
            try {
                Registry registry = REGISTRIES.get(key);//缓存
                if (registry != null) {
                    return registry;
                }
                //create registry by spi/ioc
                //创建注册中心实例
                registry = createRegistry(url);
                if (registry == null) {
                    throw new IllegalStateException("Can not create registry " + url);
                }
                REGISTRIES.put(key, registry);
                return registry;
            } finally {
                // Release the lock
                LOCK.unlock();
            }
        }

    ZookeeperRegistryFactory:

        @Override
        public Registry createRegistry(URL url) {
            //创建zookeeperRegistery实例
            return new ZookeeperRegistry(url, zookeeperTransporter);
        }

    ZookeeperRegistry

        public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
            super(url); //构造父类构造方法
            if (url.isAnyHost()) {
                throw new IllegalStateException("registry address == null");
            }
            String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
            if (!group.startsWith(Constants.PATH_SEPARATOR)) {
                group = Constants.PATH_SEPARATOR + group;
            }
            this.root = group;
            //连接zookeeper
            zkClient = zookeeperTransporter.connect(url);
            //添加监听器
            zkClient.addStateListener(state -> {
                if (state == StateListener.RECONNECTED) {
                    try {
                        //如果监听到的状态是重连,做恢复操作。
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            });
        }

    FailbackRegistry

        public FailbackRegistry(URL url) {
            //调用父类构造方法
            super(url);
            //重试间隔,默认5000ms
            this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
    
            // since the retry task will not be very much. 128 ticks is enough.
            retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
        }

    AbstractRegistry

        public AbstractRegistry(URL url) {
            setUrl(url);
            // Start file save timer
            //是否同步保存文件,默认为false
            syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
           //配置缓存文件,默认在用户目录 /.dubbo文件夹下
            String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
            File file = null;
            if (ConfigUtils.isNotEmpty(filename)) {
                file = new File(filename);
                if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                    if (!file.getParentFile().mkdirs()) {
                        throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                    }
                }
            }
            this.file = file;
            // When starting the subscription center,
            // we need to read the local cache file for future Registry fault tolerance processing.
            loadProperties();//将缓存文件加载为properties
            notify(url.getBackupUrls());//通知更新配置
        }

    这里提到了缓存文件,

    简单来说它就是用来做容灾的,consumer从注册中心订阅了provider等信息后会缓存到本地文件中,

    这样当注册中心不可用时,consumer启动时就可以加载这个缓存文件里面的内容与provider进行交互,

    这样就可以不依赖注册中心,但是无法获取到新的provider变更通知,

    所以如果provider信息在注册中心不可用这段时间发生了很大变化,那就很可能会出现服务无法调用的情况,在2.5.7(记得是这个版本)版本之前,dubbo有一个bug,如果启动时注册中心连接不上,启动程序会hang住,无法启动,所以在2.5.7版本之前这个文件是没用的,在2.5.7版本进行了修复。下面我们来看FailbackRegistry用HashedWheelTimer重试什么东西:

        public HashedWheelTimer(
                ThreadFactory threadFactory,
                long tickDuration, TimeUnit unit, int ticksPerWheel,
                long maxPendingTimeouts) {
    
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            if (unit == null) {
                throw new NullPointerException("unit");
            }
            if (tickDuration <= 0) {
                throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
            }
            if (ticksPerWheel <= 0) {
                throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
            }
    
            // Normalize ticksPerWheel to power of two and initialize the wheel.
            wheel = createWheel(ticksPerWheel);
            mask = wheel.length - 1;
    
            // Convert tickDuration to nanos.
            this.tickDuration = unit.toNanos(tickDuration);
    
            // Prevent overflow.
            if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
                throw new IllegalArgumentException(String.format(
                        "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                        tickDuration, Long.MAX_VALUE / wheel.length));
            }
            workerThread = threadFactory.newThread(worker);
    
            this.maxPendingTimeouts = maxPendingTimeouts;
    
            if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
                    WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
                reportTooManyInstances();
            }
        }

    我们看到,重试就是对注册、订阅等各个失败的操作进行重试,dubbo会在这些动作失败时将失败的记录存入集合或map中,这里会取出这些记录进行重试。

    下面我们来看zookeeper的链接,可以选择使用ZkClient或curator来连接zookeeper,默认为ZkClient:

    public ZookeeperClient connect(URL url) { 

    /* 构建ZkclientZookeeperClient */
    return new ZkclientZookeeperClient(url);
    }

    ZkclientZookeeperClient

    public ZkclientZookeeperClient(URL url) {
        super(url);
        /* 构建ZkClientWrapper,可以在连接超时后自动监控连接的状态 */
        client = new ZkClientWrapper(url.getBackupAddress(), 30000);
        // 添加监听器,用于连接状态变更通知监听器
        client.addListener(new IZkStateListener() {
            public void handleStateChanged(KeeperState state) throws Exception {
                ZkclientZookeeperClient.this.state = state;
                if (state == KeeperState.Disconnected) {
                    stateChanged(StateListener.DISCONNECTED);
                } else if (state == KeeperState.SyncConnected) {
                    stateChanged(StateListener.CONNECTED);
                }
            }
            public void handleNewSession() throws Exception {
                stateChanged(StateListener.RECONNECTED);
            }
        });
        client.start(); /* 开启线程,连接zookeeper */
    }
    

     ZkClientWrapper

     

    public ZkClientWrapper(final String serverAddr, long timeout) {
        this.timeout = timeout;
        // 创建任务创建ZkClient
        listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() {
            @Override
            public ZkClient call() throws Exception {
                return new ZkClient(serverAddr, Integer.MAX_VALUE);
            }
        });
    }
    

      ZkClientWrapper

    public void start() {
        if (!started) {
            Thread connectThread = new Thread(listenableFutureTask);
            connectThread.setName("DubboZkclientConnector");
            connectThread.setDaemon(true);
            connectThread.start(); // 创建线程执行创建ZkClient任务
            try {
                client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS);
            } catch (Throwable t) {
                logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t);
            }
            started = true;
        } else {
            logger.warn("Zkclient has already been started!");
        }
    }

    创建ZkClient之后接下来添加添加了一个连接状态变更监听器,目的是在重连时做恢复操作
    FailbackRegistry:

    protected void recover() throws Exception {
        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                failedRegistered.add(url); // 将注册相关信息添加到失败注册集合中
            }
        }
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    // 将订阅相关信息添加到失败订阅map中
                    addFailedSubscribed(url, listener);
                }
            }
        }
    }
    

    我们看到,恢复操作其实就是将注册和订阅的信息保存起来,我们之前看到的重试流程会拉取这些信息进行重试。接下来就是将服务信息注册到注册中心:

    FailbackRegistry

    public void register(URL url) {
        if (destroyed.get()){
            return;
        }
        super.register(url); // 添加到已注册服务集合中
        failedRegistered.remove(url); // 从失败的注册集合中移除
        failedUnregistered.remove(url); // 从失败的注销集合中移除
        try {
            doRegister(url); /* 发起注册请求 */
        } catch (Exception e) {
            Throwable t = e;
            // 如果启动检测被打开,则直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
            && url.getParameter(Constants.CHECK_KEY, true)
            && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
            // 将失败的注册请求记录到失败的列表中,定期重试
            failedRegistered.add(url);
        }
    }

    ZookeeperRegistry

    protected void doRegister(URL url) {
        try {
            /* 创建zookeeper节点 默认为临时节点*/
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    

     AbstractZookeeperClient

     

    public void create(String path, boolean ephemeral) {
        int i = path.lastIndexOf('/');
        if (i > 0) {
            String parentPath = path.substring(0, i);
            if (!checkExists(parentPath)) {
                // 递归创建父节点
                create(parentPath, false);
            }
        }
        if (ephemeral) {
            createEphemeral(path); /* 创建临时节点 */
        } else {
            createPersistent(path); /* 创建持久节点 */
        }
    }
    

     

    ZkclientZookeeperClient:

    public void createEphemeral(String path) {
    try {
    /* 创建临时节点 */
    client.createEphemeral(path);
    } catch (ZkNodeExistsException e) {
    }
    }

    ZkClientWrapper:

    public void createEphemeral(String path) {
    Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
    // 调用ZkClient的createEphemeral方法创建临时节点
    client.createEphemeral(path);
    }
    

      

    ZkclientZookeeperClient:

    public void createPersistent(String path) {
    try {
    /* 创建持久节点 */
    client.createPersistent(path);
    } catch (ZkNodeExistsException e) {
    }
    }


    ZkClientWrapper:

    public void createPersistent(String path) {
    Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
    // 调用ZkClient的createPersistent方法创建临时节点
    client.createPersistent(path, true);
    }
    

      

    在注册中心创建完成服务信息节点之后是订阅操作:

    FailbackRegistry  

  • 相关阅读:
    RabbitMQ安装
    基于Linux的校园网破解思路和方法
    网络-0001-常见传输介质
    友情链接
    linux简史
    计算机的发展简史
    ArrayList&LinkedList&Vector区别
    Adobe Flash Player PPAPI 32.0.0.330
    ntoskrnl.exe导致蓝屏解决方法
    Git常用命令
  • 原文地址:https://www.cnblogs.com/shoshana-kong/p/10759548.html
Copyright © 2011-2022 走看看